From fd54c084b2c856184eb329bc07a40506d351ad98 Mon Sep 17 00:00:00 2001 From: Frisk Date: Fri, 13 Dec 2024 15:54:29 +0100 Subject: [PATCH] Crash in case message_queue raises exception, delete lingering messages from queue --- src/bot.py | 11 +++++------ src/discord/queue.py | 25 +++++++++++++++++++++++-- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/bot.py b/src/bot.py index da4e90d..d698644 100644 --- a/src/bot.py +++ b/src/bot.py @@ -74,12 +74,11 @@ async def message_sender(): await messagequeue.resend_msgs() pass except Exception as ex: - if command_line_args.debug: - logger.exception("Exception on DC message sender") - shutdown(loop=asyncio.get_event_loop()) - else: - logger.exception("Exception on DC message sender") - await send_exception_to_monitoring(ex) + # Treat the issue as critical since otherwise it results in loss of data + logger.exception("Exception on DC message sender") + await send_exception_to_monitoring(ex) + shutdown(loop=asyncio.get_event_loop()) + def shutdown(loop, signal=None): diff --git a/src/discord/queue.py b/src/discord/queue.py index 2d94a2f..b00aec0 100644 --- a/src/discord/queue.py +++ b/src/discord/queue.py @@ -45,15 +45,27 @@ class QueueEntry: self._sent_webhooks: set[str] = set() self.wiki: Wiki = wiki self.method = method + self.linger_counter = 0 def check_sent_status(self, webhook: str) -> bool: """Checks sent status for given message, if True it means that the message has been sent before to given webhook, otherwise False.""" return webhook in self._sent_webhooks def confirm_sent_status(self, webhook: str): - """Confirms sent status for a webhook. Returns True if sending to all webhooks has been completed, otherwise False.""" + """Confirms sent status for a webhook.""" self._sent_webhooks.add(webhook) + def self_destruct(self): + """Clean all of the webhooks from the list so the message is cleaned on next iteration of resend_messages, used for lingering messages""" + self.webhooks.clear() + + def add_linger(self): + """Increase linger counter for a message in case none of the webhooks the message hasn't been sent to are in + suspended list""" + if not any([x in messagequeue.webhook_suspensions for x in set(self.webhooks) - self._sent_webhooks]): + self.linger_counter += 1 + return self.linger_counter + def clear_webhook_send_requirement_for(self, webhook: str): """In case webhook gets removed, this function is called to remove the webhook from the list or required recipients.""" try: @@ -62,7 +74,7 @@ class QueueEntry: pass def complete(self) -> bool: - return len(self._sent_webhooks) == len(self.webhooks) + return len(self._sent_webhooks) >= len(self.webhooks) def __iter__(self): return iter(self.webhooks) @@ -240,6 +252,14 @@ class MessageQueue: self.webhook_suspensions[webhook_id] = asyncio.create_task(self.suspension_check(webhook_url), name="DC Sus Check for {}".format(webhook_id)) break + def report_lingering_messages(self): + """Report and delete items marked as lingering for longer than 100 cycles""" + for item in self._queue: + if item.add_linger() > 100: + item.self_destruct() + logger.error(f"Could not deliver a message! Method: {item.method}, DM: {item.discord_message.__repr__()}") + return # In case there are multiple messages that linger, remove them one by one every cycle + async def resend_msgs(self): """Main function for orchestrating Discord message sending. It's a task that runs every second+.""" self.global_rate_limit = False @@ -254,6 +274,7 @@ class MessageQueue: tasks_to_run.append(self.send_msg_set(set_msgs)) await asyncio.gather(*tasks_to_run) # we wait for all send_msg_set functions to finish self._queue = [x for x in self._queue if x.complete() is False] # get rid of sent messages + self.report_lingering_messages() await asyncio.sleep(1+self.discord_error_rate_tracker)