From 1b6be292d9e724de5d64c49572131029458ffb65 Mon Sep 17 00:00:00 2001 From: Frisk Date: Thu, 6 Aug 2020 15:26:06 +0200 Subject: [PATCH] Wrapping up the work on rate-limiting --- src/bot.py | 84 +++++++++++++++++++++++++++-------------- src/discord.py | 38 +++---------------- src/request_tracking.py | 23 ----------- src/wiki.py | 2 +- 4 files changed, 61 insertions(+), 86 deletions(-) delete mode 100644 src/request_tracking.py diff --git a/src/bot.py b/src/bot.py index 8a5ad62..baaa9cd 100644 --- a/src/bot.py +++ b/src/bot.py @@ -6,7 +6,6 @@ import sys import traceback from collections import defaultdict -import functools import requests from contextlib import asynccontextmanager @@ -18,8 +17,7 @@ from src.misc import get_paths, get_domain from src.msgqueue import messagequeue from src.queue_handler import DBHandler from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essential_feeds -from src.discord import DiscordMessage, formatter_exception_logger, msg_sender_exception_logger, \ - group_task_exception_logger, discussion_task_exception_logger +from src.discord import DiscordMessage, generic_msg_sender_exception_logger from src.wiki_ratelimiter import RateLimiter logging.config.dictConfig(settings["logging"]) @@ -38,6 +36,9 @@ mw_msgs: dict = {} # will have the type of id: tuple # Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests # 2. Easier to code +for db_wiki in db_cursor.execute('SELECT wiki FROM rcgcdw GROUP BY wiki ORDER BY ROWID'): + all_wikis[db_wiki["wiki"]] = Wiki() # populate all_wikis + queue_limit = settings.get("queue_limit", 30) class LimitedList(list): @@ -65,9 +66,11 @@ class RcQueue: async def remove_wiki_from_group(self, wiki): """Removes a wiki from query of given domain group""" + logger.debug(f"Removing {wiki} from group queue.") group = get_domain(wiki) self[group]["query"] = [x for x in self[group]["query"] if x["wiki"] == wiki] if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task + all_wikis[wiki].rc_active = False self[group]["task"].cancel() del self.domain_list[group] @@ -76,23 +79,29 @@ class RcQueue: """Retrives next wiki in the queue for given domain""" try: yield self.domain_list[group]["query"][0] + except asyncio.CancelledError: + raise except: if command_line_args.debug: logger.exception("RC Group exception") - raise # reraise the issue + shutdown(asyncio.get_event_loop()) else: logger.exception("Group task returned error") - await group_task_exception_logger(group, traceback.format_exc()) + await generic_msg_sender_exception_logger(traceback.format_exc(), "Group task error logger", Group=group) else: self.domain_list[group]["query"].pop(0) + @staticmethod + def filter_rc_active(wiki_obj): + return wiki_obj[1].rc_active + async def update_queues(self): """Makes a round on rcgcdw DB and looks for updates to the queues in self.domain_list""" try: fetch_all = db_cursor.execute( 'SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID') - self.to_remove = list(all_wikis.keys()) # first populate this list and remove wikis that are still in the db, clean up the rest + self.to_remove = [x[0] for x in filter(self.filter_rc_active, all_wikis.items())] # first populate this list and remove wikis that are still in the db, clean up the rest full = [] for db_wiki in fetch_all.fetchall(): domain = get_domain(db_wiki["wiki"]) @@ -109,13 +118,19 @@ class RcQueue: current_domain["last_rowid"] = db_wiki["ROWID"] continue for wiki in self.to_remove: - del all_wikis[wiki] await self.remove_wiki_from_group(wiki) for group, data in self.domain_list.items(): if group not in full: self[group]["last_rowid"] = 0 # iter reached the end without being stuck on full list + logger.debug("Current domain_list structure: {}".format(self.domain_list)) except: - logger.exception("Queue error!") + if command_line_args.debug: + logger.exception("Queue error!") + shutdown(asyncio.get_event_loop()) + else: + logger.exception("Exception on queue updater") + await generic_msg_sender_exception_logger(traceback.format_exc(), "Queue updator") + def __getitem__(self, item): """Returns the query of given domain group""" @@ -133,6 +148,8 @@ rcqueue = RcQueue() def calculate_delay_for_group(group_length: int) -> float: """Calculate the delay between fetching each wiki to avoid rate limits""" min_delay = 60 / settings["max_requests_per_minute"] + if group_length == 0: + group_length = 1 if (group_length * min_delay) < settings["minimal_cooldown_per_wiki_in_sec"]: return settings["minimal_cooldown_per_wiki_in_sec"] / group_length else: @@ -156,8 +173,8 @@ async def generate_domain_groups(): domain_wikis = defaultdict(list) fetch_all = db_cursor.execute('SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID ASC') for db_wiki in fetch_all.fetchall(): + all_wikis[db_wiki["wiki"]].rc_active = True domain_wikis[get_domain(db_wiki["wiki"])].append(db_wiki) - all_wikis[db_wiki["wiki"]] = Wiki() # populate all_wikis for group, db_wikis in domain_wikis.items(): yield group, db_wikis @@ -216,13 +233,15 @@ async def scan_group(group: str): try: await essential_info(change, categorize_events, local_wiki, db_wiki, target, paths, recent_changes_resp, rate_limiter) + except asyncio.CancelledError: + raise except: if command_line_args.debug: logger.exception("Exception on RC formatter") raise else: logger.exception("Exception on RC formatter") - await formatter_exception_logger(db_wiki["wiki"], change, traceback.format_exc()) + await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in RC formatter", Wiki=db_wiki["wiki"], Change=str(change)[0:1000]) if recent_changes: DBHandler.add(db_wiki["wiki"], change["rcid"]) delay_between_wikis = calculate_delay_for_group(len(rcqueue[group]["query"])) @@ -248,13 +267,15 @@ async def message_sender(): try: while True: await messagequeue.resend_msgs() + except asyncio.CancelledError: + pass except: if command_line_args.debug: logger.exception("Exception on DC message sender") - raise # reraise the issue + shutdown(loop=asyncio.get_event_loop()) else: logger.exception("Exception on DC message sender") - await msg_sender_exception_logger(traceback.format_exc()) + await generic_msg_sender_exception_logger(traceback.format_exc(), "Message sender exception") async def discussion_handler(): try: @@ -304,22 +325,27 @@ async def discussion_handler(): for target in targets.items(): try: await essential_feeds(post, db_wiki, target) + except asyncio.CancelledError: + raise except: if command_line_args.debug: - raise # reraise the issue + logger.exception("Exception on Feeds formatter") + shutdown(loop=asyncio.get_event_loop()) else: logger.exception("Exception on Feeds formatter") - await formatter_exception_logger(db_wiki["wiki"], post, traceback.format_exc()) + await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in feed formatter", Post=str(post)[0:1000], Wiki=db_wiki["wiki"]) if discussion_feed: DBHandler.add(db_wiki["wiki"], post["id"], True) await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more DBHandler.update_db() + except asyncio.CancelledError: + pass except: if command_line_args.debug: raise # reraise the issue else: logger.exception("Exception on Feeds formatter") - await discussion_task_exception_logger(db_wiki["wiki"], traceback.format_exc()) + await generic_msg_sender_exception_logger(traceback.format_exc(), "Discussion handler task exception", Wiki=db_wiki["wiki"]) @@ -327,22 +353,23 @@ def shutdown(loop, signal=None): DBHandler.update_db() if len(messagequeue) > 0: logger.warning("Some messages are still queued!") - loop.stop() - logger.info("Script has shut down due to signal {}.".format(signal)) for task in asyncio.all_tasks(loop): logger.debug("Killing task {}".format(task.get_name())) task.cancel() - sys.exit(0) + loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop))) + loop.stop() + logger.info("Script has shut down due to signal {}.".format(signal)) + # sys.exit(0) -def global_exception_handler(loop, context): - """Global exception handler for asyncio, lets us know when something crashes""" - msg = context.get("exception", context["message"]) - logger.error("Global exception handler: {}".format(msg)) - if command_line_args.debug is False: - requests.post("https://discord.com/api/webhooks/"+settings["monitoring_webhook"], data=repr(DiscordMessage("compact", "monitoring", [settings["monitoring_webhook"]], wiki=None, content="[RcGcDb] Global exception handler: {}".format(msg))), headers={'Content-Type': 'application/json'}) - else: - shutdown(loop) +# def global_exception_handler(loop, context): +# """Global exception handler for asyncio, lets us know when something crashes""" +# msg = context.get("exception", context["message"]) +# logger.error("Global exception handler: {}".format(msg)) +# if command_line_args.debug is False: +# requests.post("https://discord.com/api/webhooks/"+settings["monitoring_webhook"], data=repr(DiscordMessage("compact", "monitoring", [settings["monitoring_webhook"]], wiki=None, content="[RcGcDb] Global exception handler: {}".format(msg))), headers={'Content-Type': 'application/json'}) +# else: +# shutdown(loop) async def main_loop(): @@ -355,7 +382,7 @@ async def main_loop(): except AttributeError: logger.info("Running on Windows, some things may not work as they should.") signals = (signal.SIGBREAK, signal.SIGTERM, signal.SIGINT) - loop.set_exception_handler(global_exception_handler) + # loop.set_exception_handler(global_exception_handler) try: task1 = asyncio.create_task(wiki_scanner()) task2 = asyncio.create_task(message_sender()) @@ -366,5 +393,4 @@ async def main_loop(): except KeyboardInterrupt: shutdown(loop) - -asyncio.run(main_loop(), debug=command_line_args.debug) +asyncio.run(main_loop(), debug=command_line_args.debug) \ No newline at end of file diff --git a/src/discord.py b/src/discord.py index 659bbf9..d1ebae9 100644 --- a/src/discord.py +++ b/src/discord.py @@ -110,41 +110,13 @@ async def wiki_removal_monitor(wiki_url, status): await send_to_discord_webhook_monitoring(DiscordMessage("compact", "webhook/remove", content="Removing {} because {}.".format(wiki_url, status), webhook_url=[None], wiki=None)) -async def discussion_task_exception_logger(wiki, exception): +async def generic_msg_sender_exception_logger(exception: str, title: str, **kwargs): + """Creates a Discord message reporting a crash""" message = DiscordMessage("embed", "bot/exception", [None], wiki=None) message["description"] = exception - message["title"] = "Discussion task exception logger" - message.add_field("Wiki", wiki) - message.finish_embed() - await send_to_discord_webhook_monitoring(message) - - -async def group_task_exception_logger(group, exception): - message = DiscordMessage("embed", "bot/exception", [None], wiki=None) - message["description"] = exception - message["title"] = "Group task exception logger" - message.add_field("Group", group) - message.finish_embed() - await send_to_discord_webhook_monitoring(message) - - -async def formatter_exception_logger(wiki_url, change, exception): - """Creates a Discord message reporting a crash in RC formatter area""" - message = DiscordMessage("embed", "bot/exception", [None], wiki=None) - message["description"] = exception - message["title"] = "RC Exception Report" - change = str(change)[0:1000] - message.add_field("Wiki URL", wiki_url) - message.add_field("Change", change) - message.finish_embed() - await send_to_discord_webhook_monitoring(message) - - -async def msg_sender_exception_logger(exception): - """Creates a Discord message reporting a crash in RC formatter area""" - message = DiscordMessage("embed", "bot/exception", [None], wiki=None) - message["description"] = exception - message["title"] = "MSGSENDER Exception Report" + message["title"] = title + for key, value in kwargs: + message.add_field(key, value) message.finish_embed() await send_to_discord_webhook_monitoring(message) diff --git a/src/request_tracking.py b/src/request_tracking.py deleted file mode 100644 index 13843ad..0000000 --- a/src/request_tracking.py +++ /dev/null @@ -1,23 +0,0 @@ -import aiohttp -import logging -from src.config import settings - -logger = logging.getLogger("rcgcdb.request_tracking") - -class WikiRequestTracking: - def __init__(self): - self.current_timeout = 0 - - async def add_timeout(self, time: float): - self.current_timeout += time - - def is_fandom(self, url): - if any(x in url for x in ("fandom.com", "gamepedia.com", "wikia.org")): - return True - return False - -async def on_request_start(session, trace_config_ctx, params): - if - -trace_config = aiohttp.TraceConfig() -trace_config.on_request_start.append(on_request_start) \ No newline at end of file diff --git a/src/wiki.py b/src/wiki.py index 727cf00..c3cabdf 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -24,7 +24,7 @@ class Wiki: mw_messages: int = None fail_times: int = 0 # corresponding to amount of times connection with wiki failed for client reasons (400-499) session: aiohttp.ClientSession = None - + rc_active: bool = False @staticmethod async def fetch_wiki(extended, script_path, session: aiohttp.ClientSession, ratelimiter: RateLimiter) -> aiohttp.ClientResponse: