Crash in case message_queue raises exception, delete lingering messages from queue

This commit is contained in:
Frisk 2024-12-13 15:54:29 +01:00
parent 351e034e47
commit fd54c084b2
2 changed files with 28 additions and 8 deletions

View file

@ -74,12 +74,11 @@ async def message_sender():
await messagequeue.resend_msgs() await messagequeue.resend_msgs()
pass pass
except Exception as ex: except Exception as ex:
if command_line_args.debug: # Treat the issue as critical since otherwise it results in loss of data
logger.exception("Exception on DC message sender") logger.exception("Exception on DC message sender")
shutdown(loop=asyncio.get_event_loop()) await send_exception_to_monitoring(ex)
else: shutdown(loop=asyncio.get_event_loop())
logger.exception("Exception on DC message sender")
await send_exception_to_monitoring(ex)
def shutdown(loop, signal=None): def shutdown(loop, signal=None):

View file

@ -45,15 +45,27 @@ class QueueEntry:
self._sent_webhooks: set[str] = set() self._sent_webhooks: set[str] = set()
self.wiki: Wiki = wiki self.wiki: Wiki = wiki
self.method = method self.method = method
self.linger_counter = 0
def check_sent_status(self, webhook: str) -> bool: def check_sent_status(self, webhook: str) -> bool:
"""Checks sent status for given message, if True it means that the message has been sent before to given webhook, otherwise False.""" """Checks sent status for given message, if True it means that the message has been sent before to given webhook, otherwise False."""
return webhook in self._sent_webhooks return webhook in self._sent_webhooks
def confirm_sent_status(self, webhook: str): def confirm_sent_status(self, webhook: str):
"""Confirms sent status for a webhook. Returns True if sending to all webhooks has been completed, otherwise False.""" """Confirms sent status for a webhook."""
self._sent_webhooks.add(webhook) self._sent_webhooks.add(webhook)
def self_destruct(self):
"""Clean all of the webhooks from the list so the message is cleaned on next iteration of resend_messages, used for lingering messages"""
self.webhooks.clear()
def add_linger(self):
"""Increase linger counter for a message in case none of the webhooks the message hasn't been sent to are in
suspended list"""
if not any([x in messagequeue.webhook_suspensions for x in set(self.webhooks) - self._sent_webhooks]):
self.linger_counter += 1
return self.linger_counter
def clear_webhook_send_requirement_for(self, webhook: str): def clear_webhook_send_requirement_for(self, webhook: str):
"""In case webhook gets removed, this function is called to remove the webhook from the list or required recipients.""" """In case webhook gets removed, this function is called to remove the webhook from the list or required recipients."""
try: try:
@ -62,7 +74,7 @@ class QueueEntry:
pass pass
def complete(self) -> bool: def complete(self) -> bool:
return len(self._sent_webhooks) == len(self.webhooks) return len(self._sent_webhooks) >= len(self.webhooks)
def __iter__(self): def __iter__(self):
return iter(self.webhooks) return iter(self.webhooks)
@ -240,6 +252,14 @@ class MessageQueue:
self.webhook_suspensions[webhook_id] = asyncio.create_task(self.suspension_check(webhook_url), name="DC Sus Check for {}".format(webhook_id)) self.webhook_suspensions[webhook_id] = asyncio.create_task(self.suspension_check(webhook_url), name="DC Sus Check for {}".format(webhook_id))
break break
def report_lingering_messages(self):
"""Report and delete items marked as lingering for longer than 100 cycles"""
for item in self._queue:
if item.add_linger() > 100:
item.self_destruct()
logger.error(f"Could not deliver a message! Method: {item.method}, DM: {item.discord_message.__repr__()}")
return # In case there are multiple messages that linger, remove them one by one every cycle
async def resend_msgs(self): async def resend_msgs(self):
"""Main function for orchestrating Discord message sending. It's a task that runs every second+.""" """Main function for orchestrating Discord message sending. It's a task that runs every second+."""
self.global_rate_limit = False self.global_rate_limit = False
@ -254,6 +274,7 @@ class MessageQueue:
tasks_to_run.append(self.send_msg_set(set_msgs)) tasks_to_run.append(self.send_msg_set(set_msgs))
await asyncio.gather(*tasks_to_run) # we wait for all send_msg_set functions to finish await asyncio.gather(*tasks_to_run) # we wait for all send_msg_set functions to finish
self._queue = [x for x in self._queue if x.complete() is False] # get rid of sent messages self._queue = [x for x in self._queue if x.complete() is False] # get rid of sent messages
self.report_lingering_messages()
await asyncio.sleep(1+self.discord_error_rate_tracker) await asyncio.sleep(1+self.discord_error_rate_tracker)