Source code for MeowerBot.bot

import asyncio
import copy
import json
import logging
import shlex
import traceback
from enum import StrEnum
from typing import Optional, Callable, Dict, List

from ._cache import Cache
from .api import MeowerAPI
from .cl import Client
from .cog import Cog
from .command import AppCommand
from .context import Context, PartialChat, PartialUser, Post, User
from .data.generic import UUID
from types import CoroutineType


[docs] class CallBackIds(StrEnum): """Callbacks that the bot calls. You can find more documentation in :class:`MeowerBot.bot.Bot`""" error = "error" __raw__ = "__raw__" login = "login" disconnect = "disconnect" ulist = "ulist" message = "message" raw_message = "raw_message" direct = "direct" statuscode = "statuscode"
cbids = CallBackIds callbacks = [i for i in CallBackIds] # type: ignore
[docs] class Bot(Client): """A class that holds all the networking for a Meower bot to function and run""" messages: List[Post] = [] #: :meta private: :meta hide-value: message_condition = asyncio.Condition() #: :meta private: :meta hide-value: user: PartialUser | User # Parcial user when bot is not logged in cache: Cache __bridges__ = [ #: :meta public: :meta hide-value: "Discord", "Revower", "revolt" ] BOT_NO_PMSG_RESPONSE = [ #: :meta private: :meta hide-value: "I:500 | Bot", "I: 500 | Bot", "I: 100 | Bot", "I: 100 | Bot" ] userlist: List[str] = None #: :meta hide-value: #type: ignore @property def latency(self) -> float: """Gets the latency of the bot :return: Bot latency :rtype: int """ return self.ws.latency async def _t_ping(self): while True: try: await asyncio.sleep(5) await self.sendPacket({"cmd": "ping", "val": ""}) except Exception as e: await self._error(e) break def __init__(self, prefix=None): # type: ignore super().__init__() self.api: MeowerAPI = None # type: ignore self.callbacks: Dict[str, List[CoroutineType]] = {str(i): [] for i in callbacks} self.callbacks["__raw__"] = [] self.userlist = [] # to be used in start self.username: str = None # type: ignore #: :meta hide-value: self.password: str = None # type: ignore #: :meta hide-value: self.commands = {} self.prefix = prefix self.logger = logging.getLogger("MeowerBot") self.server: str = None # type: ignore self.cache = Cache() self.cogs: Dict[str, Cog] = {} # Interface
[docs] def event(self, func: Callable): """Creates a callback that takes over the original functionality of the bot. Valid callbacks are defined in :class:`CallBackIds` :param func: The callback function :type func: Callable :raises TypeError: The func provided does not have a valid callback name """ if func.__name__ not in callbacks: raise TypeError(f"{func.__name__} is not a valid callback") setattr(self, func.__name__, func)
[docs] def listen(self, callback: Optional[str] = None): """ Does the same thing as :meth MeowerBot.bot.Bot.event:but does not replace the bot's original functionality Valid callbacks are defined in :class:`CallBackIds` :raises TypeError: The listener provided is not valid """ def inner(func): nonlocal callback callback = callback if callback is not None else func.__name__ if callback not in callbacks: raise TypeError(f"{callback} is not a valid listener") self.callbacks[callback].append(func) return func return inner
[docs] def update_commands(self): for cog in self.cogs.values(): cog.update_commands() self.commands.update(cog.commands) for i in cog.callbacks.keys(): self.callbacks[str(i)].append(cog.callbacks[str(i)])
[docs] async def error(self, err: Exception): """Handles errors for the bot. This is a callback for :meth:`MeowerBot.bot.Bot.event` """ self.logger.error(traceback.print_exception(err))
async def __raw__(self, packet: dict): """Callback for raw packets. Gets called before the bot does any processing. This is a callback for :meth:`MeowerBot.bot.Bot.event` """ pass
[docs] async def login(self, token: str): """Gets called when the bot is fully ready and logged into meower This is a callback for :meth:`MeowerBot.bot.Bot.event` """ pass
[docs] async def disconnect(self): """Gets called when the bot gets disconnected from meower This is a callback for :meth:`MeowerBot.bot.Bot.event` """ pass
[docs] async def ulist(self, ulist: List[str]): """Gets called when a user connects to meower. This is a callback for :meth:`MeowerBot.bot.Bot.event` """ pass
[docs] async def message(self, message: Post): """Method for overiding how the bot handles messages. This is a callback for :meth:`MeowerBot.bot.Bot.event` """ message = await self.handle_bridges(message) if not message.data.startswith(self.prefix): return message.data = message.data.removeprefix(self.prefix) await self.run_commands(message)
[docs] async def statuscode(self, status, listerner): pass
[docs] async def raw_message(self, data: dict): pass
[docs] async def direct(self, data: dict): pass
async def _run_event(self, event: cbids, *args, **kwargs): events: List[Callable] = [getattr(self, str(event))] for i in self.callbacks[str(event)]: if type(i) is list: events.extend(i) # type: ignore elif callable(i): # Check if the element is Callable events.append(i) err = await asyncio.gather(*[i(*args, **kwargs) for i in events if callable(i)], return_exceptions=True) for i in err: if i is not None: if isinstance(i, Exception) and event != cbids.error: await self._error(i) # websocket
[docs] async def sendPacket(self, message: dict): if message.get("listener") != "mb.py_login": self.logger.debug("Sending Packet: " + json.dumps(message)) else: message_sensitive = copy.deepcopy(message) message_sensitive["val"]["val"]["pswd"] = "<PASSWORD>" self.logger.debug("Sending Packet: " + json.dumps(message_sensitive)) await super().sendPacket(message)
[docs] async def handle_bridges(self, message: Post): fetch = False if isinstance(message.user, User): fetch = True if message.user.username in self.__bridges__ and ":" in message.data: split = message.data.split(":", 1) message.data = split[1].strip() message.user = PartialUser(split[0].strip(), self) if fetch: data = self.cache.get_user() if not isinstance(data, User): data = await message.user.fetch() if data: message.user = data if message.data.startswith(self.prefix + "#0000"): message.data = message.data.replace("#0000", "") return message
[docs] def get_context(self, message: Post): return Context(message, self)
[docs] async def run_commands(self, message: Post): args = shlex.split(str(message)) if (err := await self.commands[args[0]].run_cmd( self.get_context(message), *args[1:] )) is not None: await self._run_event(CallBackIds.error, err)
[docs] def command(self, name=None, args=0, aliases: Optional[List[str]] = None): # type: ignore def inner(func): cmd = AppCommand(func, name=name, args=args, alias=aliases) self.commands = AppCommand.add_command(self.commands, cmd) return cmd return inner
async def _connect(self): await self._send_initial_commands() packet = await self._authenticate() await self._process_login_response(packet) async def _send_initial_commands(self): await self.sendPacket({"cmd": "direct", "val": "meower", "listener": "send_tkey"}) await self.sendPacket({"cmd": "direct", "val": {"cmd": "type", "val": "py"}}) async def _authenticate(self): async with self.message_condition: await self.sendPacket({ "cmd": "direct", "val": { "cmd": "authpswd", "val": { "username": str(self.username).strip(), "pswd": str(self.password).strip() } }, "listener": "mb.py_login" }) while True: await self.message_condition.wait() if self._packets[-1].get("listener") != "mb.py_login": continue print(self._packets[-1]) if self._packets[-1]["cmd"] == "statuscode" and self._packets[-1]["val"] != "I: 100 | OK": raise Exception(f"Wrong Username or Password!\n {self._packets[-1]['val']}") elif self._packets[-1]["cmd"] == "statuscode": self._packets.pop(-1) continue if not (self._packets[-1]["cmd"] == "direct" and "payload" in self._packets[-1]["val"].keys()): continue return self._packets.pop(-1) async def _process_login_response(self, packet): await self.api.login(packet['val']['payload']['token']) await self._run_event(CallBackIds.login, packet['val']['payload']['token']) self.user = await self.user.fetch()
[docs] def register_cog(self, cog: Cog): self.cogs[cog.__class__.__name__] = cog self.update_commands()
async def _disconnect(self): await self._run_event(CallBackIds.disconnect)
[docs] def get_chat(self, chat_id: str): chat = self.cache.get_chat(UUID(chat_id)) if chat is None: return PartialChat(chat_id, self) return chat
async def _message(self, message: dict): # noinspection PyBroadException try: if message.get("cmd") == "direct" and message.get("listener") == 'mb.py_login': message_sensitive = copy.deepcopy(message) message_sensitive['val']['payload']['token'] = "<TOKEN>" self.logger.debug(f"Recived message: {message_sensitive}") else: self.logger.debug(f"Recived message: {message}") except Exception: pass match message["cmd"]: case "statuscode": return await self._run_event(CallBackIds.statuscode, message["val"], message.get("listener")) case "ulist": self.userlist = message["val"].split(";") await self._check_bot_users(self.userlist) return await self._run_event(CallBackIds.ulist, self.userlist) case "direct": await self._handle_direct(message) if message["cmd"] == "pmsg": if message["val"] not in self.BOT_NO_PMSG_RESPONSE: await self.sendPacket({ "cmd": "pmsg", "val": "I:500 | Bot", "id": message["origin"] }) else: await self.handle_bot_pmsg(message["origin"]) async def _handle_direct(self, message): if "post_origin" not in message["val"]: # post return await self._run_event(CallBackIds.direct, message) await self._run_event(CallBackIds.__raw__, message["val"]) # type: ignore[call-arg] post = Post(self, message["val"], chat=message["val"]["post_origin"]) async with self.message_condition: self.messages.append(post) self.message_condition.notify_all() self.messages = self.messages[0: 50] await self._run_event(CallBackIds.message, post) async def _check_user(self, user): if user in self.__bridges__ or "bot" in user.lower(): self.cache.add_bot(user) return await self.sendPacket({ "cmd": "pmsg", "val": { "library": "MeowerBot.py", "known_bots": self.cache.bots }, "id": user })
[docs] async def handle_bot_pmsg(self, origin): if origin == self.username: return self.cache.add_bot(origin)
async def _check_bot_users(self, userlist): assert self.api is not None if not self.api.headers.get("token"): return # noinspection PyUnusedLocal loop = asyncio.get_event_loop() self.cache.try_clear_bots() # NO ONE CARES FOR CREATE_TASK DAMMIT PYCHARM # noinspection PyAsyncCall asyncio.gather(*[self._check_user(username) for username in userlist]) async def _error(self, error): await self._run_event(CallBackIds.error, error)
[docs] async def start(self, username, password, server="wss://server.meower.org", ): """ Runs The bot (Blocking) """ self.username = username self.password = password self.user = PartialUser(self.username, self) self.update_commands() # noinspection PyAsyncCall asyncio.create_task(self._t_ping()) if self.prefix is None: self.prefix = "@" + self.username self.logger = logging.getLogger(f"MeowerBot {self.username}") self.server = server self.api = MeowerAPI(username=username) await self.connect(server)
[docs] def run(self, username, password, server="wss://server.meower.org", ): """ Runs the bot (Blocking) """ loop = asyncio.get_event_loop() fut = loop.create_task(self.start(username, password, server=server)) loop.run_forever() return fut
__all__ = ["Bot", "CallBackIds", 'cbids']