From 1fde00078b83e1251285533ad948f2e8eed2d0bf Mon Sep 17 00:00:00 2001 From: Frisk Date: Sun, 28 Jul 2024 15:57:01 +0200 Subject: [PATCH] Added Discord webhook removal on webhook returning 404 --- src/discord/queue.py | 34 +++++++++++++++++++++++++++++++--- src/queue_handler.py | 4 ++-- src/wiki.py | 5 +++-- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/src/discord/queue.py b/src/discord/queue.py index c6ad203..c152c0d 100644 --- a/src/discord/queue.py +++ b/src/discord/queue.py @@ -17,6 +17,7 @@ import re import time import logging import asyncio +from collections.abc import ItemsView import aiohttp from aiohttp import ContentTypeError, ClientResponse @@ -64,6 +65,7 @@ class MessageQueue: """Message queue class for undelivered messages""" def __init__(self): self._queue: list[QueueEntry] = [] + self.webhook_suspensions: dict[str, asyncio.Task] = {} # Storing tasks counting one hour since last 404 def __repr__(self): return self._queue @@ -87,6 +89,17 @@ class MessageQueue: def cut_messages(self, item_num: int): self._queue = self._queue[item_num:] + async def suspension_check(self, webhook_url: str): + """Check after an hour if suspended webhook still returns ClientError""" + await asyncio.sleep(7200) # 2 hours + unsent_messages = await self.group_by_webhook() + unsent_messages = dict(unsent_messages) + webhook_messages = unsent_messages.get(webhook_url) + if webhook_messages is None: + logger.error("MessageQueue.suspension_check failed due to lack of messages belonging to a webhook ID {} in message queue".format(webhook_url.split("/")[0])) + return + await self.send_msg_set(webhook_messages) + @staticmethod def compare_message_to_dict(metadata: DiscordMessageMetadata, to_match: dict): """Compare DiscordMessageMetadata fields and match them against dictionary""" @@ -95,9 +108,12 @@ class MessageQueue: return False return True - async def group_by_webhook(self): + async def group_by_webhook(self) -> ItemsView[str, list[QueueEntry]]: """Group Discord messages in the queue by the dictionary, allowing to send multiple messages to different - webhooks at the same time avoiding ratelimits per Discord webhook route.""" + webhooks at the same time avoiding ratelimits per Discord webhook route. + + Takes messages from self._queue, for every webhook that is their target appends them to a list with webhook path as key + Returns that dictionary""" message_dict = defaultdict(list) for msg in self._queue: if not isinstance(msg.webhooks, list): @@ -141,6 +157,10 @@ class MessageQueue: yield current_pack, index, "POST" async def send_msg_set(self, msg_set: tuple[str, list[QueueEntry]]): + """Function that takes output of self.group_by_webhook, unpacks them and stacks them into + StackedDiscordMessages using self.pack_messages or returns a message if it's not POST request. + + If stacked message succeeds in changing, its status in _queue is changed to sent for given webhook.""" webhook_url, messages = msg_set # str("daosdkosakda/adkahfwegr34", list(DiscordMessage, DiscordMessage, DiscordMessage) async for msg, index, method in self.pack_massages(messages): if msg is None: # Msg can be None if last message was not POST @@ -172,13 +192,21 @@ class MessageQueue: message.metadata.domain.register_message_timing_report(message.metadata.time_of_change) if message and message.metadata.domain is not None: message.metadata.domain.discord_message_registration() - for queue_message in messages[max(index-len(msg.message_list), 0):index+1]: # mark messages as delivered + for queue_message in messages[max(index-len(msg.message_list), 0):index+1]: # mark messages as delivered TODO AAAA queue_message.confirm_sent_status(webhook_url) if client_error is False: msg.webhook = webhook_url msg.wiki.add_message(msg) + else: + webhook_id = webhook_url.split("/")[0] + if webhook_id in self.webhook_suspensions: + await msg.wiki.remove_webhook_from_db(webhook_url, "Attempts to send a message to a webhook result in client error.", send=False) + self.webhook_suspensions[webhook_id].cancel() + else: + self.webhook_suspensions[webhook_id] = asyncio.create_task(self.suspension_check(webhook_url), name="DC Sus Check for {}".format(webhook_id)) async def resend_msgs(self): + """Main function for orchestrating Discord message sending. It's a task that runs every half a second.""" self.global_rate_limit = False if self._queue: logger.info( diff --git a/src/queue_handler.py b/src/queue_handler.py index 88e6674..0bd5bd4 100644 --- a/src/queue_handler.py +++ b/src/queue_handler.py @@ -11,9 +11,9 @@ logger = logging.getLogger("rcgcdb.queue_handler") class UpdateDB: def __init__(self): - self.updated: list[tuple[str, tuple[Union[str, int]]]] = [] + self.updated: list[tuple[str, tuple[Union[str, int], ...]]] = [] - def add(self, sql_expression): + def add(self, sql_expression: tuple[str, tuple[Union[str, int], ...]]): self.updated.append(sql_expression) def clear_list(self): diff --git a/src/wiki.py b/src/wiki.py index cf05bc4..6778be6 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -416,8 +416,9 @@ class Wiki: dbmanager.add(("UPDATE rcgcdb SET rcid = $1 WHERE wiki = $2 AND ( rcid != -1 OR rcid IS NULL )", (highest_id, self.script_url))) # If this is not enough for the future, save rcid in message sending function to make sure we always send all of the changes return - async def remove_webhook_from_db(self, reason: str): - raise NotImplementedError + async def remove_webhook_from_db(self, webhook_url: str, reason: str, send_reason=False): + logger.info(f"Removing a webhook with ID of {webhook_url.split("/")[0]} from the database due to {reason}.") + dbmanager.add(("DELETE FROM rcgcdb WHERE webhook = $1", (webhook_url,))) async def remove_wiki_from_db(self, reason: str): raise NotImplementedError # TODO