From 391c897367308ccb557ccbb19fa7f6c47ada7e22 Mon Sep 17 00:00:00 2001 From: Frisk Date: Mon, 27 Jul 2020 18:32:30 +0200 Subject: [PATCH] Major rework of how the message sending is handled --- src/bot.py | 1 - src/discord.py | 17 ++++++++--------- src/formatters/rc.py | 2 ++ src/misc.py | 3 ++- src/msgqueue.py | 42 ++++++++++++++++++++++++++---------------- src/wiki.py | 1 + 6 files changed, 39 insertions(+), 27 deletions(-) diff --git a/src/bot.py b/src/bot.py index e44bd8c..924bfe8 100644 --- a/src/bot.py +++ b/src/bot.py @@ -54,7 +54,6 @@ async def wiki_scanner(): while True: calc_delay = calculate_delay() fetch_all = db_cursor.execute('SELECT webhook, wiki, lang, display, wikiid, rcid, postid FROM rcgcdw GROUP BY wiki') - # webhook, wiki, lang, display, wikiid, rcid, postid for db_wiki in fetch_all.fetchall(): logger.debug("Wiki {}".format(db_wiki[1])) extended = False diff --git a/src/discord.py b/src/discord.py index 101a605..0cefadb 100644 --- a/src/discord.py +++ b/src/discord.py @@ -23,7 +23,7 @@ async def wiki_removal(wiki_url, status): reasons = {410: _("wiki deletion"), 404: _("wiki deletion"), 401: _("wiki becoming inaccessible"), 402: _("wiki becoming inaccessible"), 403: _("wiki becoming inaccessible"), 410: _("wiki becoming inaccessible")} reason = reasons.get(status, _("unknown error")) - await send_to_discord_webhook(DiscordMessage("compact", "webhook/remove", webhook_url=[observer[0]], content=_("The webhook for {} has been removed due to {}.".format(wiki_url, reason)), wiki=None)) + await send_to_discord_webhook(DiscordMessage("compact", "webhook/remove", webhook_url=[], content=_("The webhook for {} has been removed due to {}.".format(wiki_url, reason)), wiki=None), webhook_url=observer[0]) header = settings["header"] header['Content-Type'] = 'application/json' header['X-Audit-Log-Reason'] = "Wiki becoming unavailable" @@ -117,17 +117,16 @@ async def send_to_discord_webhook_monitoring(data: DiscordMessage): return 3 -async def send_to_discord_webhook(data: DiscordMessage): +async def send_to_discord_webhook(data: DiscordMessage, webhook_url: str): header = settings["header"] header['Content-Type'] = 'application/json' async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(5.0)) as session: - for webhook in data.webhook_url: - try: - result = await session.post("https://discord.com/api/webhooks/"+webhook, data=repr(data)) - except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError): - logger.exception("Could not send the message to Discord") - return 3 - return await handle_discord_http(result.status, repr(data), await result.text(), data) + try: + result = await session.post("https://discord.com/api/webhooks/"+webhook_url, data=repr(data)) + except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError): + logger.exception("Could not send the message to Discord") + return 3 + return await handle_discord_http(result.status, repr(data), await result.text(), data) async def handle_discord_http(code, formatted_embed, result, dmsg): diff --git a/src/formatters/rc.py b/src/formatters/rc.py index ecde89f..526ef6c 100644 --- a/src/formatters/rc.py +++ b/src/formatters/rc.py @@ -24,6 +24,7 @@ logger = logging.getLogger("rcgcdw.rc_formatters") async def compact_formatter(action, change, parsed_comment, categories, recent_changes, target, _, ngettext, paths, additional_data=None): + """Recent Changes compact formatter, part of RcGcDw""" if additional_data is None: additional_data = {"namespaces": {}, "tags": {}} WIKI_API_PATH = paths[0] @@ -315,6 +316,7 @@ async def compact_formatter(action, change, parsed_comment, categories, recent_c async def embed_formatter(action, change, parsed_comment, categories, recent_changes, target, _, ngettext, paths, additional_data=None): + """Recent Changes embed formatter, part of RcGcDw""" if additional_data is None: additional_data = {"namespaces": {}, "tags": {}} WIKI_API_PATH = paths[0] diff --git a/src/misc.py b/src/misc.py index f96b7e0..4dfa3f8 100644 --- a/src/misc.py +++ b/src/misc.py @@ -9,8 +9,9 @@ logger = logging.getLogger("rcgcdw.misc") def get_paths(wiki: str, request) -> tuple: + """Prepares wiki paths for the functions""" parsed_url = urlparse(wiki) - WIKI_API_PATH = wiki + request["query"]["general"]["scriptpath"] + "api.php" + WIKI_API_PATH = wiki + "api.php" WIKI_SCRIPT_PATH = wiki WIKI_ARTICLE_PATH = urlunparse((*parsed_url[0:2], "", "", "", "")) + request["query"]["general"]["articlepath"] WIKI_JUST_DOMAIN = urlunparse((*parsed_url[0:2], "", "", "", "")) diff --git a/src/msgqueue.py b/src/msgqueue.py index b892e73..a15aefc 100644 --- a/src/msgqueue.py +++ b/src/msgqueue.py @@ -1,6 +1,7 @@ import asyncio, logging, aiohttp from src.discord import send_to_discord_webhook from src.config import settings +from collections import defaultdict, ItemsView logger = logging.getLogger("rcgcdw.msgqueue") class MessageQueue: @@ -30,26 +31,35 @@ class MessageQueue: async def create_session(self): self.session = aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(5.0)) + async def group_by_webhook(self): # TODO Change into iterable + """Group Discord messages in the queue by the dictionary, allowing to send multiple messages to different + webhooks at the same time avoiding ratelimits per Discord webhook route.""" + message_dict = defaultdict(list) + for msg in self._queue: + for webhook in msg.webhook_url: + message_dict[webhook].append(msg) + return message_dict.items() + + async def send_msg_set(self, msg_set: tuple): + webhook_url, messages = msg_set + for msg in messages: + if await send_to_discord_webhook(msg, webhook_url) < 2: + logger.debug("Sending message succeeded") + self._queue.remove(msg) + await asyncio.sleep(1.9) + else: + logger.debug("Sending message failed") + break + async def resend_msgs(self): - if self.session is None: - await self.create_session() if self._queue: logger.info( "{} messages waiting to be delivered to Discord.".format(len(self._queue))) - for num, item in enumerate(self._queue): - logger.debug( - "Trying to send a message to Discord from the queue with id of {} and content {}".format(str(num), - str(item))) - if await send_to_discord_webhook(item) < 2: - logger.debug("Sending message succeeded") - await asyncio.sleep(1.9) - else: - logger.debug("Sending message failed") - break - else: - self.clear() - logger.debug("Queue emptied, all messages delivered") - self.cut_messages(num) + tasks_to_run = [] + for set_msgs in await self.group_by_webhook(): + logger.debug(set_msgs) + tasks_to_run.append(self.send_msg_set(set_msgs)) + await asyncio.gather(*tasks_to_run) logger.debug(self._queue) await asyncio.sleep(4.0) diff --git a/src/wiki.py b/src/wiki.py index bff6d03..e37db64 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -102,6 +102,7 @@ class Wiki: async def process_cats(event: dict, local_wiki: Wiki, category_msgs: dict, categorize_events: dict): + """Process categories based on local MW messages. """ if event["type"] == "categorize": if "commenthidden" not in event: if local_wiki.mw_messages: