From af3cf6440e47217f38d1d55779aca169a0bd57df Mon Sep 17 00:00:00 2001 From: Frisk Date: Thu, 18 Aug 2022 12:42:48 +0200 Subject: [PATCH] Further work on Discord message sending code --- src/discord/queue.py | 130 ++++++++++++++++++++++++++++++++----------- src/msgqueue.py | 125 ----------------------------------------- src/wiki.py | 2 +- 3 files changed, 97 insertions(+), 160 deletions(-) delete mode 100644 src/msgqueue.py diff --git a/src/discord/queue.py b/src/discord/queue.py index 771507c..00faf84 100644 --- a/src/discord/queue.py +++ b/src/discord/queue.py @@ -17,11 +17,12 @@ import re import sys import time import logging -from typing import Optional, Union, Tuple +import asyncio +from src.config import settings +from src.discord.message import StackedDiscordMessage, MessageTooBig +from typing import Optional, Union, Tuple, AsyncGenerator +from collections import defaultdict -import requests - -from src.configloader import settings from src.discord.message import DiscordMessage, DiscordMessageMetadata, DiscordMessageRaw AUTO_SUPPRESSION_ENABLED = settings.get("auto_suppression", {"enabled": False}).get("enabled") @@ -32,10 +33,31 @@ rate_limit = 0 logger = logging.getLogger("rcgcdw.discord.queue") +class QueueEntry: + def __init__(self, discord_message, webhooks): + self.discord_message: DiscordMessage = discord_message + self.webhooks: list[str] = webhooks + self._sent_webhooks: set[str] = set() + + def check_sent_status(self, webhook: str) -> bool: + """Checks sent status for given message, if True it means that the message has been sent before to given webhook, otherwise False.""" + return webhook in self._sent_webhooks + + def confirm_sent_status(self, webhook: str): + """Confirms sent status for a webhook. Returns True if sending to all webhooks has been completed, otherwise False.""" + self._sent_webhooks.add(webhook) + + def complete(self) -> bool: + return len(self._sent_webhooks) == len(self.webhooks) + + def __iter__(self): + return iter(self.webhooks) + + class MessageQueue: """Message queue class for undelivered messages""" def __init__(self): - self._queue: list[Tuple[Union[DiscordMessage, DiscordMessageRaw], DiscordMessageMetadata]] = [] + self._queue: list[QueueEntry] = [] def __repr__(self): return self._queue @@ -49,7 +71,7 @@ class MessageQueue: def clear(self): self._queue.clear() - def add_message(self, message: Tuple[Union[DiscordMessage, DiscordMessageRaw], DiscordMessageMetadata]): + def add_message(self, message: QueueEntry): self._queue.append(message) def cut_messages(self, item_num: int): @@ -63,31 +85,76 @@ class MessageQueue: return False return True + async def group_by_webhook(self): + """Group Discord messages in the queue by the dictionary, allowing to send multiple messages to different + webhooks at the same time avoiding ratelimits per Discord webhook route.""" + message_dict = defaultdict(list) + for msg in self._queue: + if not isinstance(msg.webhooks, list): + raise TypeError('msg.webhook_url in _queue is not a list') + for webhook in msg.webhooks: + message_dict[webhook].append(msg) # defaultdict{"dadibadyvbdmadgqueh23/dihjd8agdandashd": [DiscordMessage, DiscordMessage]} + return message_dict.items() + def delete_all_with_matching_metadata(self, **properties): """Deletes all of the messages that have matching metadata properties (useful for message redaction)""" for index, item in reversed(list(enumerate(self._queue))): if self.compare_message_to_dict(item[1], properties): self._queue.pop(index) - def resend_msgs(self): + async def pack_massages(self, messages: list[QueueEntry]) -> AsyncGenerator[tuple[StackedDiscordMessage, int]]: + """Pack messages into StackedDiscordMessage. It's an async generator""" + current_pack = StackedDiscordMessage(0 if messages[0].discord_message.message_type == "compact" else 1) # first message + index = -1 + for index, message in enumerate(messages): + message = message.discord_message + try: + current_pack.add_message(message) + except MessageTooBig: + yield current_pack + current_pack = StackedDiscordMessage(0 if message.message_type == "compact" else 1) # next messages + current_pack.add_message(message) + yield current_pack, index + + async def send_msg_set(self, msg_set: tuple[str, list[QueueEntry]]): + webhook_url, messages = msg_set # str("daosdkosakda/adkahfwegr34", list(DiscordMessage, DiscordMessage, DiscordMessage) + async for msg, index in self.pack_massages(messages): + if self.global_rate_limit: + return # if we are globally rate limited just wait for first gblocked request to finish + # Verify that message hasn't been sent before + status = await send_to_discord_webhook(msg, webhook_url) + if status[0] < 2: + logger.debug("Sending message succeeded") + for queue_message in messages[max(index-10, 0):index]: # mark messages as delivered + queue_message.confirm_sent_status(webhook_url) + logger.debug("Current rate limit time: {}".format(status[1])) + if status[1] is not None: + await asyncio.sleep(float(status[1])) # note, the timer on the last request won't matter that much since it's separate task and for the time of sleep it will give control to other tasks + break + elif status[0] == 5: + if status[1]["global"] is True: + logger.debug( + "Global rate limit has been detected. Setting global_rate_limit to true and awaiting punishment.") + self.global_rate_limit = True + await asyncio.sleep(status[1]["retry_after"] / 1000) + break + else: + logger.debug("Sending message failed") + break + + async def resend_msgs(self): + self.global_rate_limit = False if self._queue: logger.info( - "{} messages waiting to be delivered to Discord due to Discord throwing errors/no connection to Discord servers.".format( - len(self._queue))) - for num, item in enumerate(self._queue): - logger.debug( - "Trying to send a message to Discord from the queue with id of {} and content {}".format(str(num), - str(item))) - if send_to_discord_webhook(item[0], metadata=item[1]) < 2: - logger.debug("Sending message succeeded") - else: - logger.debug("Sending message failed") - break - else: - self.clear() - logger.debug("Queue emptied, all messages delivered") - self.cut_messages(num) - logger.debug(self._queue) + "{} messages waiting to be delivered to Discord.".format(len(self._queue))) + tasks_to_run = [] + for set_msgs in await self.group_by_webhook(): + # logger.debug(set_msgs) + tasks_to_run.append(self.send_msg_set(set_msgs)) + await asyncio.gather(*tasks_to_run) # we wait for all send_msg_set functions to finish + self._queue = [x for x in self._queue if x.complete() is False] # get rid of sent messages + else: + await asyncio.sleep(0.5) messagequeue = MessageQueue() @@ -105,7 +172,7 @@ def handle_discord_http(code, formatted_embed, result): elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND if result.request.method == "POST": # Ignore not found for DELETE and PATCH requests since the message could already be removed by admin logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.") - sys.exit(1) + remove_webhook_maybe() else: return 0 elif code == 429: @@ -121,19 +188,14 @@ def handle_discord_http(code, formatted_embed, result): return 1 -def update_ratelimit(request): - """Updates rate limit time""" - global rate_limit - rate_limit = 0 if int(request.headers.get('x-ratelimit-remaining', "-1")) > 0 else int(request.headers.get( - 'x-ratelimit-reset-after', 0)) - rate_limit += settings.get("discord_message_cooldown", 0) - - -def send_to_discord_webhook(data: Optional[DiscordMessage], metadata: DiscordMessageMetadata): - global rate_limit +def send_to_discord_webhook(message: [StackedDiscordMessage, DiscordMessage]): header = settings["header"] header['Content-Type'] = 'application/json' standard_args = dict(headers=header) + if isinstance(message, StackedDiscordMessage): + req = + else: + message.metadata.method if metadata.method == "POST": req = requests.Request("POST", data.webhook_url+"?wait=" + ("true" if AUTO_SUPPRESSION_ENABLED else "false"), data=repr(data), **standard_args) elif metadata.method == "DELETE": diff --git a/src/msgqueue.py b/src/msgqueue.py deleted file mode 100644 index b856f90..0000000 --- a/src/msgqueue.py +++ /dev/null @@ -1,125 +0,0 @@ -import asyncio, logging, aiohttp -import typing - -from src.discord.message import DiscordMessage, StackedDiscordMessage, MessageTooBig -from src.config import settings -from src.exceptions import EmbedListFull -from collections import defaultdict -logger = logging.getLogger("rcgcdw.msgqueue") - -class QueueEntry: - def __init__(self, discord_message, webhooks): - self.discord_message: DiscordMessage = discord_message - self.webhooks: list[str] = webhooks - - def __iter__(self): - return iter(self.webhooks) - - -class MessageQueue: - """Message queue class for undelivered messages""" - def __init__(self): - self._queue = [] - self.global_rate_limit = False - - def __repr__(self): - return self._queue - - def __len__(self): - return len(self._queue) - - def __iter__(self): - return iter(self._queue) - - def clear(self): - self._queue.clear() - - def add_messages(self, messages: list[QueueEntry]): - self._queue.extend(messages) - logger.debug("Adding new messages") - # - # def replace_message(self, to_replace: DiscordMessage, with_replace: StackedDiscordMessage): - # try: - # self._queue[self._queue.index(to_replace)] = with_replace - # except ValueError: - # raise - - def cut_messages(self, item_num): - self._queue = self._queue[item_num:] - - async def group_by_webhook(self): # TODO Change into iterable - """Group Discord messages in the queue by the dictionary, allowing to send multiple messages to different - webhooks at the same time avoiding ratelimits per Discord webhook route.""" - message_dict = defaultdict(list) - for msg in self._queue: - if not isinstance(msg.webhook_url, list): - raise TypeError('msg.webhook_url in _queue is not a list') - for webhook in msg.webhook_url: - message_dict[webhook].append(msg) # defaultdict{"dadibadyvbdmadgqueh23/dihjd8agdandashd": [DiscordMessage, DiscordMessage]} - return message_dict.items() # dict_items([('daosdkosakda/adkahfwegr34', [DiscordMessage]), ('daosdkosakda/adkahfwegr33', [DiscordMessage, DiscordMessage])]) - - async def pack_massages(self, messages: list[DiscordMessage]) -> typing.AsyncGenerator: - """Pack messages into StackedDiscordMessage. It's an async generator""" - current_pack = StackedDiscordMessage(0 if messages[0].message_type == "compact" else 1) # first message - for message in messages: - try: - current_pack.add_message(message) - except MessageTooBig: - yield current_pack - current_pack = StackedDiscordMessage(0 if message.message_type == "compact" else 1) # next messages - current_pack.add_message(message) - yield current_pack - - async def send_msg_set(self, msg_set: tuple): - webhook_url, messages = msg_set # str("daosdkosakda/adkahfwegr34", list(DiscordMessage, DiscordMessage, DiscordMessage) - async for msg in self.pack_massages(messages): - if self.global_rate_limit: - return # if we are globally rate limited just wait for first gblocked request to finish - status = await send_to_discord_webhook(msg, webhook_url) - if status[0] < 2: - logger.debug("Sending message succeeded") - try: - if len(msg.webhook_url) > 1: - msg.webhook_url.remove(webhook_url) - else: - self._queue.remove(msg) - except ValueError: - # For the love of god I cannot figure why can it return ValueError: list.remove(x): x not in list, however considering it's not in the list, somehow, anymore we can just not care about it I guess - pass - logger.debug("Current rate limit time: {}".format(status[1])) - if status[1] is not None: - await asyncio.sleep(float(status[1])) # note, the timer on the last request won't matter that much since it's separate task and for the time of sleep it will give control to other tasks - break - elif status[0] == 5: - if status[1]["global"] is True: - logger.debug("Global rate limit has been detected. Setting global_rate_limit to true and awaiting punishment.") - self.global_rate_limit = True - await asyncio.sleep(status[1]["retry_after"]/1000) - break - else: - logger.debug("Sending message failed") - break - - async def resend_msgs(self): - self.global_rate_limit = False - if self._queue: - logger.info( - "{} messages waiting to be delivered to Discord.".format(len(self._queue))) - tasks_to_run = [] - for set_msgs in await self.group_by_webhook(): - # logger.debug(set_msgs) - tasks_to_run.append(self.send_msg_set(set_msgs)) - await asyncio.gather(*tasks_to_run) # we wait for all send_msg_set functions to finish - else: - await asyncio.sleep(0.5) - - -messagequeue = MessageQueue() - - -async def send_to_discord(msg): - messagequeue.add_message(msg) - # webhooks = msg.webhook_url.copy() - # for webhook in webhooks: - # msg.webhook_url = [webhook] # Doing it just so it doesn't store reference but value - # messagequeue.add_message(msg.copy()) diff --git a/src/wiki.py b/src/wiki.py index 4a7cd4f..b80d887 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -10,7 +10,7 @@ import logging, aiohttp from functools import cache from api.util import default_message -from msgqueue import QueueEntry, messagequeue +from src.discord.queue import messagequeue, QueueEntry from mw_messages import MWMessages from src.exceptions import * from src.database import db