Added Discord webhook removal on webhook returning 404

This commit is contained in:
Frisk 2024-07-28 15:57:01 +02:00
parent 347cf92f6f
commit 1fde00078b
3 changed files with 36 additions and 7 deletions

View file

@ -17,6 +17,7 @@ import re
import time import time
import logging import logging
import asyncio import asyncio
from collections.abc import ItemsView
import aiohttp import aiohttp
from aiohttp import ContentTypeError, ClientResponse from aiohttp import ContentTypeError, ClientResponse
@ -64,6 +65,7 @@ class MessageQueue:
"""Message queue class for undelivered messages""" """Message queue class for undelivered messages"""
def __init__(self): def __init__(self):
self._queue: list[QueueEntry] = [] self._queue: list[QueueEntry] = []
self.webhook_suspensions: dict[str, asyncio.Task] = {} # Storing tasks counting one hour since last 404
def __repr__(self): def __repr__(self):
return self._queue return self._queue
@ -87,6 +89,17 @@ class MessageQueue:
def cut_messages(self, item_num: int): def cut_messages(self, item_num: int):
self._queue = self._queue[item_num:] self._queue = self._queue[item_num:]
async def suspension_check(self, webhook_url: str):
"""Check after an hour if suspended webhook still returns ClientError"""
await asyncio.sleep(7200) # 2 hours
unsent_messages = await self.group_by_webhook()
unsent_messages = dict(unsent_messages)
webhook_messages = unsent_messages.get(webhook_url)
if webhook_messages is None:
logger.error("MessageQueue.suspension_check failed due to lack of messages belonging to a webhook ID {} in message queue".format(webhook_url.split("/")[0]))
return
await self.send_msg_set(webhook_messages)
@staticmethod @staticmethod
def compare_message_to_dict(metadata: DiscordMessageMetadata, to_match: dict): def compare_message_to_dict(metadata: DiscordMessageMetadata, to_match: dict):
"""Compare DiscordMessageMetadata fields and match them against dictionary""" """Compare DiscordMessageMetadata fields and match them against dictionary"""
@ -95,9 +108,12 @@ class MessageQueue:
return False return False
return True return True
async def group_by_webhook(self): async def group_by_webhook(self) -> ItemsView[str, list[QueueEntry]]:
"""Group Discord messages in the queue by the dictionary, allowing to send multiple messages to different """Group Discord messages in the queue by the dictionary, allowing to send multiple messages to different
webhooks at the same time avoiding ratelimits per Discord webhook route.""" webhooks at the same time avoiding ratelimits per Discord webhook route.
Takes messages from self._queue, for every webhook that is their target appends them to a list with webhook path as key
Returns that dictionary"""
message_dict = defaultdict(list) message_dict = defaultdict(list)
for msg in self._queue: for msg in self._queue:
if not isinstance(msg.webhooks, list): if not isinstance(msg.webhooks, list):
@ -141,6 +157,10 @@ class MessageQueue:
yield current_pack, index, "POST" yield current_pack, index, "POST"
async def send_msg_set(self, msg_set: tuple[str, list[QueueEntry]]): async def send_msg_set(self, msg_set: tuple[str, list[QueueEntry]]):
"""Function that takes output of self.group_by_webhook, unpacks them and stacks them into
StackedDiscordMessages using self.pack_messages or returns a message if it's not POST request.
If stacked message succeeds in changing, its status in _queue is changed to sent for given webhook."""
webhook_url, messages = msg_set # str("daosdkosakda/adkahfwegr34", list(DiscordMessage, DiscordMessage, DiscordMessage) webhook_url, messages = msg_set # str("daosdkosakda/adkahfwegr34", list(DiscordMessage, DiscordMessage, DiscordMessage)
async for msg, index, method in self.pack_massages(messages): async for msg, index, method in self.pack_massages(messages):
if msg is None: # Msg can be None if last message was not POST if msg is None: # Msg can be None if last message was not POST
@ -172,13 +192,21 @@ class MessageQueue:
message.metadata.domain.register_message_timing_report(message.metadata.time_of_change) message.metadata.domain.register_message_timing_report(message.metadata.time_of_change)
if message and message.metadata.domain is not None: if message and message.metadata.domain is not None:
message.metadata.domain.discord_message_registration() message.metadata.domain.discord_message_registration()
for queue_message in messages[max(index-len(msg.message_list), 0):index+1]: # mark messages as delivered for queue_message in messages[max(index-len(msg.message_list), 0):index+1]: # mark messages as delivered TODO AAAA
queue_message.confirm_sent_status(webhook_url) queue_message.confirm_sent_status(webhook_url)
if client_error is False: if client_error is False:
msg.webhook = webhook_url msg.webhook = webhook_url
msg.wiki.add_message(msg) msg.wiki.add_message(msg)
else:
webhook_id = webhook_url.split("/")[0]
if webhook_id in self.webhook_suspensions:
await msg.wiki.remove_webhook_from_db(webhook_url, "Attempts to send a message to a webhook result in client error.", send=False)
self.webhook_suspensions[webhook_id].cancel()
else:
self.webhook_suspensions[webhook_id] = asyncio.create_task(self.suspension_check(webhook_url), name="DC Sus Check for {}".format(webhook_id))
async def resend_msgs(self): async def resend_msgs(self):
"""Main function for orchestrating Discord message sending. It's a task that runs every half a second."""
self.global_rate_limit = False self.global_rate_limit = False
if self._queue: if self._queue:
logger.info( logger.info(

View file

@ -11,9 +11,9 @@ logger = logging.getLogger("rcgcdb.queue_handler")
class UpdateDB: class UpdateDB:
def __init__(self): def __init__(self):
self.updated: list[tuple[str, tuple[Union[str, int]]]] = [] self.updated: list[tuple[str, tuple[Union[str, int], ...]]] = []
def add(self, sql_expression): def add(self, sql_expression: tuple[str, tuple[Union[str, int], ...]]):
self.updated.append(sql_expression) self.updated.append(sql_expression)
def clear_list(self): def clear_list(self):

View file

@ -416,8 +416,9 @@ class Wiki:
dbmanager.add(("UPDATE rcgcdb SET rcid = $1 WHERE wiki = $2 AND ( rcid != -1 OR rcid IS NULL )", (highest_id, self.script_url))) # If this is not enough for the future, save rcid in message sending function to make sure we always send all of the changes dbmanager.add(("UPDATE rcgcdb SET rcid = $1 WHERE wiki = $2 AND ( rcid != -1 OR rcid IS NULL )", (highest_id, self.script_url))) # If this is not enough for the future, save rcid in message sending function to make sure we always send all of the changes
return return
async def remove_webhook_from_db(self, reason: str): async def remove_webhook_from_db(self, webhook_url: str, reason: str, send_reason=False):
raise NotImplementedError logger.info(f"Removing a webhook with ID of {webhook_url.split("/")[0]} from the database due to {reason}.")
dbmanager.add(("DELETE FROM rcgcdb WHERE webhook = $1", (webhook_url,)))
async def remove_wiki_from_db(self, reason: str): async def remove_wiki_from_db(self, reason: str):
raise NotImplementedError # TODO raise NotImplementedError # TODO