Source code for MeowerBot.cl

import asyncio
import json

import websockets


[docs] class Client: """ MeowerBot.py's async websocket wrapper. """ ws: websockets.WebSocketClientProtocol message_condition: asyncio.Condition _packets: list def __init__(self): self.ws = None self._packets = [] self.message_condition = asyncio.Condition() pass async def _connect(self): pass async def _disconnect(self): pass async def _message(self, message): pass async def _error(self, error): pass
[docs] async def sendPacket(self, message): await self.ws.send(json.dumps(message))
[docs] async def close(self, reason=None): await self.ws.close(reason=reason)
[docs] async def connect(self, server): loop = asyncio.get_event_loop() async for websocket in websockets.connect(server, ping_interval=None): # Meower uses its own implementation, crashes the connection if left on. try: self.ws = websocket try: _task = loop.create_task(self._connect()) # noqa except Exception as e: await self._error(e) async for message in websocket: try: data = json.loads(message) async with self.message_condition: self._packets.append(data) self.message_condition.notify_all() self._packets = self._packets[:50] await self._message(data) except websockets.ConnectionClosed: await self._disconnect() except Exception as e: await self._error(e) except websockets.ConnectionClosed: await self._disconnect() pass except Exception as e: await self._error(e) await self._disconnect()