Back to Documentation

Python SDK Examples

Real-world examples showing how to use the OddSockets Python SDK in production applications with async/await support

Download Example Code
Basic Usage
Available
Simple example showing how to connect, subscribe to a channel, and publish messages using async/await with the Python SDK.
import asyncio
from oddsockets import OddSockets

async def message_handler(data):
    print(f"📨 Received: {data}")

async def main():
    # Create client
    client = OddSockets({
        'api_key': 'ak_live_1234567890abcdef'
    })
    
    # Get channel
    channel = client.channel('my-channel')
    
    # Subscribe to messages
    await channel.subscribe(message_handler)
    
    # Publish a message
    await channel.publish('Hello, World!')
    
    # Clean up
    await client.disconnect()

asyncio.run(main())
FastAPI Integration
Available
Real-time WebSocket integration with FastAPI, showing how to bridge WebSocket connections with OddSockets channels.
from fastapi import FastAPI, WebSocket
from oddsockets import OddSockets

app = FastAPI()
client = OddSockets({
    'api_key': 'ak_live_1234567890abcdef'
})

@app.on_event("startup")
async def startup_event():
    await client.connect()

@app.websocket("/ws/{channel_name}")
async def websocket_endpoint(websocket: WebSocket, channel_name: str):
    await websocket.accept()
    channel = client.channel(channel_name)
    
    async def on_message(data):
        await websocket.send_json(data)
    
    await channel.subscribe(on_message)
    
    try:
        while True:
            data = await websocket.receive_json()
            await channel.publish(data)
    except Exception:
        await channel.unsubscribe()
Django Channels
Available
Integration with Django Channels for real-time web applications, showing how to use OddSockets with Django's async consumers.
from channels.generic.websocket import AsyncWebsocketConsumer
from oddsockets import OddSockets
import json

class ChatConsumer(AsyncWebsocketConsumer):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.client = OddSockets({
            'api_key': 'ak_live_1234567890abcdef'
        })

    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        await self.accept()
        
        self.channel = self.client.channel(f'chat_{self.room_name}')
        await self.channel.subscribe(self.on_oddsockets_message)

    async def receive(self, text_data):
        data = json.loads(text_data)
        await self.channel.publish({
            'message': data['message'],
            'user': self.scope['user'].username
        })
Type Hints & Safety
Available
Comprehensive type hints example showing how to use the Python SDK with full type safety and IDE support.
import asyncio
from typing import Dict, Any, Optional
from oddsockets import OddSockets, Channel

async def typed_message_handler(data: Dict[str, Any]) -> None:
    message: str = data.get('message', '')
    user: Optional[str] = data.get('user')
    print(f"📨 {user}: {message}")

async def main() -> None:
    client: OddSockets = OddSockets({
        'api_key': 'ak_live_1234567890abcdef',
        'user_id': 'python-user',
        'auto_connect': True
    })
    
    channel: Channel = client.channel('typed-channel')
    
    await channel.subscribe(
        typed_message_handler,
        {'enable_presence': True}
    )
    
    result: Dict[str, Any] = await channel.publish({
        'message': 'Hello with types!',
        'timestamp': '2024-01-01T00:00:00Z'
    })
    
    await client.disconnect()

asyncio.run(main())
Bulk Operations
Available
High-performance bulk message publishing example showing how to send multiple messages efficiently.
import asyncio
from oddsockets import OddSockets

async def main():
    client = OddSockets({
        'api_key': 'ak_live_1234567890abcdef'
    })
    
    # Bulk publish multiple messages
    bulk_messages = [
        {'channel': 'notifications', 'message': 'System alert 1'},
        {'channel': 'notifications', 'message': 'System alert 2'},
        {'channel': 'chat', 'message': 'Bulk message to chat'},
        {'channel': 'updates', 'message': {'type': 'status', 'value': 'online'}}
    ]
    
    results = await client.publish_bulk(bulk_messages)
    
    for i, result in enumerate(results):
        if result['success']:
            print(f"✅ Message {i+1} published successfully")
        else:
            print(f"❌ Message {i+1} failed: {result['error']}")
    
    await client.disconnect()

asyncio.run(main())
Error Handling
Available
Comprehensive error handling and reconnection example showing how to build resilient real-time applications.
import asyncio
from oddsockets import OddSockets, ConnectionError, OddSocketsError

async def main():
    client = OddSockets({
        'api_key': 'ak_live_1234567890abcdef',
        'options': {
            'reconnect_attempts': 5,
            'heartbeat_interval': 30.0
        }
    })
    
    # Set up event handlers
    def on_connecting():
        print("🔄 Connecting...")
    
    def on_connected():
        print("✅ Connected!")
    
    def on_error(error):
        print(f"💥 Error: {error}")
    
    def on_reconnecting(data):
        print(f"🔄 Reconnecting... Attempt {data['attempt']}")
    
    client.on('connecting', on_connecting)
    client.on('connected', on_connected)
    client.on('error', on_error)
    client.on('reconnecting', on_reconnecting)
    
    try:
        channel = client.channel('error-handling-demo')
        await channel.subscribe(lambda data: print(f"📨 {data}"))
        
        # Simulate operations with error handling
        await channel.publish('Test message')
        
    except ConnectionError as e:
        print(f"Connection failed: {e}")
    except OddSocketsError as e:
        print(f"OddSockets error: {e}")
    finally:
        await client.disconnect()

asyncio.run(main())