From e26258edb7e045ef3634d38fc29551858c137b66 Mon Sep 17 00:00:00 2001 From: Frisk Date: Sat, 1 Aug 2020 12:45:41 +0200 Subject: [PATCH] Many changes to logic of sending messages "better" exception handling --- src/bot.py | 16 +++++++++++++--- src/discord.py | 23 +++++++++++++++++++---- src/msgqueue.py | 17 +++++++++++++---- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/bot.py b/src/bot.py index 836b595..07297e1 100644 --- a/src/bot.py +++ b/src/bot.py @@ -16,7 +16,7 @@ from src.misc import get_paths from src.msgqueue import messagequeue from src.queue_handler import DBHandler from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info -from src.discord import DiscordMessage, formatter_exception_logger +from src.discord import DiscordMessage, formatter_exception_logger, msg_sender_exception_logger logging.config.dictConfig(settings["logging"]) logger = logging.getLogger("rcgcdb.bot") @@ -140,12 +140,22 @@ async def wiki_scanner(): async def message_sender(): """message_sender is a coroutine responsible for handling Discord messages and their sending to Discord""" - while True: - await messagequeue.resend_msgs() + try: + while True: + await messagequeue.resend_msgs() + except: + if command_line_args.debug: + logger.exception("Exception on DC message sender") + raise # reraise the issue + else: + logger.exception("Exception on DC message sender") + await msg_sender_exception_logger(traceback.format_exc()) def shutdown(loop, signal=None): DBHandler.update_db() + if len(messagequeue) > 0: + logger.warning("Some messages are still queued!") loop.stop() logger.info("Script has shut down due to signal {}.".format(signal)) for task in asyncio.all_tasks(loop): diff --git a/src/discord.py b/src/discord.py index 5d46d6a..9ac489b 100644 --- a/src/discord.py +++ b/src/discord.py @@ -118,6 +118,15 @@ async def formatter_exception_logger(wiki_url, change, exception): await send_to_discord_webhook_monitoring(message) +async def msg_sender_exception_logger(exception): + """Creates a Discord message reporting a crash in RC formatter area""" + message = DiscordMessage("embed", "bot/exception", [None], wiki=None) + message["description"] = exception + message["title"] = "MSGSENDER Exception Report" + message.finish_embed() + await send_to_discord_webhook_monitoring(message) + + async def send_to_discord_webhook_monitoring(data: DiscordMessage): header = settings["header"] header['Content-Type'] = 'application/json' @@ -135,24 +144,30 @@ async def send_to_discord_webhook(data: DiscordMessage, webhook_url: str) -> tup :return tuple(status code for request, rate limit info (None for can send more, string for amount of seconds to wait)""" header = settings["header"] header['Content-Type'] = 'application/json' + header["X-RateLimit-Precision"] = "millisecond" async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(5.0)) as session: try: result = await session.post("https://discord.com/api/webhooks/"+webhook_url, data=repr(data)) + logger.debug(result.headers) rate_limit = None if int(result.headers.get('x-ratelimit-remaining')) > 0 else result.headers.get('x-ratelimit-reset-after') except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError, TimeoutError): logger.exception("Could not send the message to Discord") return 3, None - return await handle_discord_http(result.status, repr(data), await result.text(), data), rate_limit + status = await handle_discord_http(result.status, repr(data), result, data) + if status == 5: + return 5, await result.json() + else: + return status, rate_limit -async def handle_discord_http(code, formatted_embed, result, dmsg): +async def handle_discord_http(code: int, formatted_embed: str, result: aiohttp.ClientResponse, dmsg: DiscordMessage): if 300 > code > 199: # message went through return 0 elif code == 400: # HTTP BAD REQUEST result.status_code, data, result, header logger.error( "Following message has been rejected by Discord, please submit a bug on our bugtracker adding it:") logger.error(formatted_embed) - logger.error(result.text) + logger.error(await result.text()) return 1 elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.") @@ -161,7 +176,7 @@ async def handle_discord_http(code, formatted_embed, result, dmsg): return 1 elif code == 429: logger.error("We are sending too many requests to the Discord, slowing down...") - return 2 + return 5 elif 499 < code < 600: logger.error( "Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format( diff --git a/src/msgqueue.py b/src/msgqueue.py index df48613..81de3af 100644 --- a/src/msgqueue.py +++ b/src/msgqueue.py @@ -8,7 +8,7 @@ class MessageQueue: """Message queue class for undelivered messages""" def __init__(self): self._queue = [] - self.session = None + self.global_rate_limit = False def __repr__(self): return self._queue @@ -28,8 +28,6 @@ class MessageQueue: def cut_messages(self, item_num): self._queue = self._queue[item_num:] - async def create_session(self): - self.session = aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(5.0)) async def group_by_webhook(self): # TODO Change into iterable """Group Discord messages in the queue by the dictionary, allowing to send multiple messages to different @@ -43,6 +41,8 @@ class MessageQueue: async def send_msg_set(self, msg_set: tuple): webhook_url, messages = msg_set for msg in messages: + if self.global_rate_limit: + return # if we are globally rate limited just wait for first gblocked request to finish status = await send_to_discord_webhook(msg, webhook_url) if status[0] < 2: logger.debug("Sending message succeeded") @@ -50,11 +50,19 @@ class MessageQueue: logger.debug("Current rate limit time: {}".format(status[1])) if status[1] is not None: await asyncio.sleep(float(status[1])) # note, the timer on the last request won't matter that much since it's separate task and for the time of sleep it will give control to other tasks + break + elif status[0] == 5: + if status[1]["global"] is True: + logger.debug("Global rate limit has been detected. Setting global_rate_limit to true and awaiting punishment.") + self.global_rate_limit = True + await asyncio.sleep(status[1]["retry_after"]/1000) + break else: logger.debug("Sending message failed") break async def resend_msgs(self): + self.global_rate_limit = False if self._queue: logger.info( "{} messages waiting to be delivered to Discord.".format(len(self._queue))) @@ -64,7 +72,8 @@ class MessageQueue: tasks_to_run.append(self.send_msg_set(set_msgs)) await asyncio.gather(*tasks_to_run) logger.debug(self._queue) - await asyncio.sleep(0.1) + else: + await asyncio.sleep(0.5) messagequeue = MessageQueue()