From 5e2a0f30284d4c3da3094e229e9969befa08591e Mon Sep 17 00:00:00 2001 From: Frisk Date: Sun, 18 Aug 2024 12:11:48 +0200 Subject: [PATCH] Added "request price" to stop rapidly requesting wiki data and add some timeout, fixed Discord sending --- extensions/base/approvedrevs.py | 4 ++-- src/discord/queue.py | 39 ++++++++++++++++++--------------- src/exceptions.py | 2 +- src/wiki.py | 7 ++++++ 4 files changed, 31 insertions(+), 21 deletions(-) diff --git a/extensions/base/approvedrevs.py b/extensions/base/approvedrevs.py index 6462da1..e3b76c9 100644 --- a/extensions/base/approvedrevs.py +++ b/extensions/base/approvedrevs.py @@ -29,7 +29,7 @@ def embed_approval_approvefile(ctx: Context, change: dict): embed_helper(ctx, embed, change) embed["url"] = ctx.client.create_article_path(sanitize_to_url(change["title"])) embed["title"] = ctx._("Approved a file revision of {file}").format(file=sanitize_to_markdown(change["title"])) - link_parser_object = LinkParser(ctx.client.WIKI_JUST_DOMAIN) + link_parser_object = ctx.client.LinkParser(ctx.client.WIKI_JUST_DOMAIN) link_parser_object.feed(change.get("logparams", {}).get("0", "")) embed["description"] = ctx._("File version from {time} got approved").format(name=change["title"], time=link_parser_object.new_string) # TODO Make timestamp more user friendly? Getting user who uploaded will be a pain though, same with approval/approve @@ -55,7 +55,7 @@ def embed_approval_approve(ctx: Context, change: dict): embed_helper(ctx, embed, change) embed["url"] = ctx.client.create_article_path(sanitize_to_url(change["title"])) embed["title"] = ctx._("Approved a revision of {article}").format(article=sanitize_to_markdown(change["title"])) - link_parser_object = LinkParser(ctx.client.WIKI_JUST_DOMAIN) + link_parser_object = ctx.client.LinkParser(ctx.client.WIKI_JUST_DOMAIN) link_parser_object.feed(change.get("logparams", {}).get("0", "")) embed["description"] = ctx._("Revision number {revision_id} got approved").format(name=change["title"], time=link_parser_object.new_string) return embed diff --git a/src/discord/queue.py b/src/discord/queue.py index a021f22..34cc4d9 100644 --- a/src/discord/queue.py +++ b/src/discord/queue.py @@ -66,6 +66,7 @@ class MessageQueue: def __init__(self): self._queue: list[QueueEntry] = [] self.webhook_suspensions: dict[str, asyncio.Task] = {} # Storing tasks counting one hour since last 404 + self.global_rate_limit = False def __repr__(self): return self._queue @@ -91,6 +92,7 @@ class MessageQueue: async def suspension_check(self, webhook_url: str): """Check after an hour if suspended webhook still returns ClientError""" + logger.debug(f"Putting webhook {webhook_url} into suspension. Checking status in 2 hours.") await asyncio.sleep(7200) # 2 hours unsent_messages = await self.group_by_webhook() unsent_messages = dict(unsent_messages) @@ -174,17 +176,18 @@ class MessageQueue: # noinspection PyTypeChecker try: status = await send_to_discord_webhook(msg, webhook_url, method) - except aiohttp.ClientError: - client_error = True + except ExhaustedDiscordBucket as e: + if e.is_global: + self.global_rate_limit = True + await asyncio.sleep(e.remaining) + return except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError): # Retry on next Discord message sent attempt logger.debug(f"Received timeout or connection error when sending a Discord message for {msg.wiki.script_url if hasattr(msg, "wiki") else "PATCH OR DELETE MESSAGE"}.") return - except ExhaustedDiscordBucket as e: - if e.is_global: - self.global_rate_limit = True - await asyncio.sleep(e.remaining / 1000) - return + except aiohttp.ClientError as e: + client_error = True + logger.exception("Client error has been reported on Discord message sending send_msg_set function.") else: if status == 0 and method == "POST": message = None @@ -216,14 +219,13 @@ class MessageQueue: if self._queue: logger.info( "{} messages waiting to be delivered to Discord.".format(len(self._queue))) - tasks_to_run = [] - for set_msgs in await self.group_by_webhook(): - # logger.debug(set_msgs) - tasks_to_run.append(self.send_msg_set(set_msgs)) - await asyncio.gather(*tasks_to_run) # we wait for all send_msg_set functions to finish - self._queue = [x for x in self._queue if x.complete() is False] # get rid of sent messages - else: - await asyncio.sleep(0.5) + tasks_to_run = [] + for set_msgs in await self.group_by_webhook(): + # logger.debug(set_msgs) + tasks_to_run.append(self.send_msg_set(set_msgs)) + await asyncio.gather(*tasks_to_run) # we wait for all send_msg_set functions to finish + self._queue = [x for x in self._queue if x.complete() is False] # get rid of sent messages + await asyncio.sleep(1) messagequeue = MessageQueue() @@ -250,8 +252,8 @@ async def handle_discord_http(code: int, formatted_embed: str, result: ClientRes elif code == 429: logger.error("We are sending too many requests to the Discord, slowing down...") if "x-ratelimit-global" in result.headers.keys(): - raise ExhaustedDiscordBucket(remaining=int(result.headers.get("x-ratelimit-reset-after")), is_global=True) - raise ExhaustedDiscordBucket(remaining=int(result.headers.get("x-ratelimit-reset-after")), is_global=False) + raise ExhaustedDiscordBucket(remaining=result.headers.get("x-ratelimit-reset-after", 1), is_global=True) + raise ExhaustedDiscordBucket(remaining=result.headers.get("x-ratelimit-reset-after"), is_global=False) elif 499 < code < 600: logger.error( "Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format( @@ -271,11 +273,12 @@ async def send_to_discord_webhook(message: [StackedDiscordMessage, DiscordMessag if method == "POST": async with session.post(f"https://discord.com/api/webhooks/{webhook_path}?wait=true", data=repr(message)) as resp: # TODO Detect Invalid Webhook Token try: + logger.debug(f"repr_json reached") resp_json = await resp.json() # Add Discord Message ID which we can later use to delete/redact messages if we want message.discord_callback_message_id = resp_json["id"] except KeyError: - raise aiohttp.ServerConnectionError(f"Could not get the ID from POST request with message data. Data: {await resp.text()}") + logger.exception(f"Could not get the ID from POST request with message data. Data: {await resp.text()}]") except ContentTypeError: logger.exception("Could not receive message ID from Discord due to invalid MIME type of response.") except ValueError: diff --git a/src/exceptions.py b/src/exceptions.py index c5f6d2f..df239cb 100644 --- a/src/exceptions.py +++ b/src/exceptions.py @@ -71,6 +71,6 @@ class WikiExists(Exception): class ExhaustedDiscordBucket(BaseException): - def __init__(self, remaining: int, is_global: bool): + def __init__(self, remaining: int | float, is_global: bool): self.remaining = remaining self.is_global = is_global diff --git a/src/wiki.py b/src/wiki.py index 4234504..1070906 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -130,6 +130,7 @@ class Wiki: self.recache_requested: bool = False self.session_requests = requests.Session() self.session_requests.headers.update(settings["header"]) + self.request_cost = 0 # For tracking amount of times wiki has been requested in given context logger.debug("Creating new wiki object for {}".format(script_url)) def __str__(self): @@ -140,6 +141,9 @@ class Wiki: f"") + def request_made(self): + self.request_cost += 1 + def json(self) -> dict: dict_obj = { "wiki_url": self.script_url, @@ -364,6 +368,7 @@ class Wiki: def sync_api_request(self, params: Union[str, OrderedDict], *json_path: str, timeout: int = 10, allow_redirects: bool = False) -> dict: """Synchronous function based on api_request created for compatibility reasons with RcGcDw API""" + self.request_made() try: if isinstance(params, str): request = self.session_requests.get(self.script_url + "api.php" + params + "&errorformat=raw", timeout=10, allow_redirects=allow_redirects) @@ -620,6 +625,8 @@ async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, displ await wiki.redact_messages(context, logparams.get("ids", []), "rev_id", logparams.get("new", {}), page_id=change.get("pageid", -1)) await wiki.delete_messages(dict(rev_id=[int(x) for x in logparams.get("ids", [])], message_display=0)) run_hooks(post_hooks, discord_message, metadata, context, change) + await asyncio.sleep(wiki.request_cost*0.5) # Await amount of time due to additional requests made in the formatters + wiki.request_cost = 0 if discord_message: # TODO How to react when none? (crash in formatter), probably bad handling atm discord_message.finish_embed() discord_message.metadata = metadata