Skip to content

Consumers

The consumers module for chat app.

ChatConsumer (AsyncWebsocketConsumer)

Chat consumer.

Source code in chat/consumers.py
class ChatConsumer(AsyncWebsocketConsumer):
    """Chat consumer."""

    async def connect(self: "ChatConsumer") -> None:
        """Join room group."""
        self.room_id = self.scope["url_route"]["kwargs"]["room_id"]
        self.room_group_name = f"chat_{self.room_id}"
        await self.accept()

    async def disconnect(self: "ChatConsumer", close_code=None) -> None:  # noqa
        """Leave room group."""
        await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
        await self.close()

    async def receive(self: "ChatConsumer", **kwargs: dict) -> None:
        """Receive message from WebSocket."""
        data = json.loads(kwargs.get("text_data"))

        token = data.get("token")

        if not token:
            await self.send(text_data=json.dumps({"error": "Invalid token"}))

        jwt = JWTBaseAuthentication()

        try:
            validated_token = jwt.get_validated_token(token)
            user = await self.get_user(validated_token)

            room = await self.get_room(user, self.room_id)

            if not room:
                await self.send(text_data=json.dumps({"error": "Room not found"}))

            await self.channel_layer.group_add(self.room_group_name, self.channel_name)

        except InvalidToken as e:
            await self.send(text_data=json.dumps({"error": e.detail}))

        except ValidationError as e:
            await self.send(text_data=json.dumps({"error": e.messages}))
        except Room.DoesNotExist:
            await self.send(text_data=json.dumps({"error": "Room does not exist"}))

    async def chat_message(self: "ChatConsumer", event: dict) -> None:
        """It sends message to the room group."""
        message = event["message"]
        await self.send(text_data=message)

    @database_sync_to_async
    def get_user(self: "ChatConsumer", token: str) -> CustomUser:
        """It returns user object if token is valid."""
        jwt = JWTBaseAuthentication()

        return jwt.get_user(token)

    @database_sync_to_async
    def get_room(self: "ChatConsumer", user: CustomUser, room_id: str) -> Room:
        """It returns room object if user is participant of the room."""
        return Room.objects.filter(
            Q(participants__user__id=user.id) & Q(id=room_id),
        ).first()

chat_message(self, event) async

It sends message to the room group.

Source code in chat/consumers.py
async def chat_message(self: "ChatConsumer", event: dict) -> None:
    """It sends message to the room group."""
    message = event["message"]
    await self.send(text_data=message)

connect(self) async

Join room group.

Source code in chat/consumers.py
async def connect(self: "ChatConsumer") -> None:
    """Join room group."""
    self.room_id = self.scope["url_route"]["kwargs"]["room_id"]
    self.room_group_name = f"chat_{self.room_id}"
    await self.accept()

disconnect(self, close_code=None) async

Leave room group.

Source code in chat/consumers.py
async def disconnect(self: "ChatConsumer", close_code=None) -> None:  # noqa
    """Leave room group."""
    await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
    await self.close()

get_room(self, user, room_id)

It returns room object if user is participant of the room.

Source code in chat/consumers.py
@database_sync_to_async
def get_room(self: "ChatConsumer", user: CustomUser, room_id: str) -> Room:
    """It returns room object if user is participant of the room."""
    return Room.objects.filter(
        Q(participants__user__id=user.id) & Q(id=room_id),
    ).first()

get_user(self, token)

It returns user object if token is valid.

Source code in chat/consumers.py
@database_sync_to_async
def get_user(self: "ChatConsumer", token: str) -> CustomUser:
    """It returns user object if token is valid."""
    jwt = JWTBaseAuthentication()

    return jwt.get_user(token)

receive(self, **kwargs) async

Receive message from WebSocket.

Source code in chat/consumers.py
async def receive(self: "ChatConsumer", **kwargs: dict) -> None:
    """Receive message from WebSocket."""
    data = json.loads(kwargs.get("text_data"))

    token = data.get("token")

    if not token:
        await self.send(text_data=json.dumps({"error": "Invalid token"}))

    jwt = JWTBaseAuthentication()

    try:
        validated_token = jwt.get_validated_token(token)
        user = await self.get_user(validated_token)

        room = await self.get_room(user, self.room_id)

        if not room:
            await self.send(text_data=json.dumps({"error": "Room not found"}))

        await self.channel_layer.group_add(self.room_group_name, self.channel_name)

    except InvalidToken as e:
        await self.send(text_data=json.dumps({"error": e.detail}))

    except ValidationError as e:
        await self.send(text_data=json.dumps({"error": e.messages}))
    except Room.DoesNotExist:
        await self.send(text_data=json.dumps({"error": "Room does not exist"}))

Last update: 2023-06-09
Authors: Mohamed-Kaizen