From a4462369bb6ba6afe13a5577bc731571ebe682c8 Mon Sep 17 00:00:00 2001 From: Frisk Date: Mon, 3 Aug 2020 16:44:42 +0200 Subject: [PATCH] Additional work done on the ratelimiting --- src/bot.py | 153 +++++++++++++++++++++++++++++----------------------- src/misc.py | 6 +++ 2 files changed, 91 insertions(+), 68 deletions(-) diff --git a/src/bot.py b/src/bot.py index 6f0b563..4d226ba 100644 --- a/src/bot.py +++ b/src/bot.py @@ -12,7 +12,7 @@ from src.argparser import command_line_args from src.config import settings from src.database import db_cursor from src.exceptions import * -from src.misc import get_paths +from src.misc import get_paths, get_domain from src.msgqueue import messagequeue from src.queue_handler import DBHandler from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essential_feeds @@ -41,11 +41,11 @@ for wiki in db_cursor.execute('SELECT DISTINCT wiki FROM rcgcdw'): # Start queueing logic -def calculate_delay() -> float: +def calculate_delay_for_group(group_length: int) -> float: """Calculate the delay between fetching each wiki to avoid rate limits""" min_delay = 60 / settings["max_requests_per_minute"] - if (len(all_wikis) * min_delay) < settings["minimal_cooldown_per_wiki_in_sec"]: - return settings["minimal_cooldown_per_wiki_in_sec"] / len(all_wikis) + if (group_length * min_delay) < settings["minimal_cooldown_per_wiki_in_sec"]: + return settings["minimal_cooldown_per_wiki_in_sec"] / group_length else: return min_delay @@ -62,78 +62,95 @@ def generate_targets(wiki_url: str) -> defaultdict: return combinations +async def generate_domain_groups(): # oh boy, I cannot wait to learn about async generators + combinations = defaultdict(list) + fetch_all = db_cursor.execute('SELECT webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw GROUP BY wiki') + for db_wiki in fetch_all.fetchall(): + combinations[get_domain(db_wiki["wiki"])].append(db_wiki) + for item in combinations.values(): + yield item + + +async def scan_group(group: list): + calc_delay = calculate_delay_for_group(len(group)) + for db_wiki in group: + logger.debug("Wiki {}".format(db_wiki["wiki"])) + if db_wiki["wiki"] not in all_wikis: + logger.info("Registering new wiki locally: {}".format(db_wiki["wiki"])) + all_wikis[db_wiki["wiki"]] = Wiki() + local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory + if db_wiki["rcid"] != -1: + extended = False + if local_wiki.mw_messages is None: + extended = True + async with aiohttp.ClientSession(headers=settings["header"], + timeout=aiohttp.ClientTimeout(3.0)) as session: + try: + wiki_response = await local_wiki.fetch_wiki(extended, db_wiki["wiki"], session) + await local_wiki.check_status(db_wiki["wiki"], wiki_response.status) + except (WikiServerError, WikiError): + logger.error("Exeption when fetching the wiki") + continue # ignore this wiki if it throws errors + try: + recent_changes_resp = await wiki_response.json() + if "error" in recent_changes_resp or "errors" in recent_changes_resp: + error = recent_changes_resp.get("error", recent_changes_resp["errors"]) + if error["code"] == "readapidenied": + await local_wiki.fail_add(db_wiki["wiki"], 410) + continue + raise WikiError + recent_changes = recent_changes_resp['query']['recentchanges'] + recent_changes.reverse() + except aiohttp.ContentTypeError: + logger.exception("Wiki seems to be resulting in non-json content.") + await local_wiki.fail_add(db_wiki["wiki"], 410) + continue + except: + logger.exception("On loading json of response.") + continue + if extended: + await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs) + if db_wiki["rcid"] is None: # new wiki, just get the last rc to not spam the channel + if len(recent_changes) > 0: + DBHandler.add(db_wiki["wiki"], recent_changes[-1]["rcid"]) + else: + DBHandler.add(db_wiki["wiki"], 0) + DBHandler.update_db() + continue + categorize_events = {} + targets = generate_targets(db_wiki["wiki"]) + paths = get_paths(db_wiki["wiki"], recent_changes_resp) + for change in recent_changes: + await process_cats(change, local_wiki, mw_msgs, categorize_events) + for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up + if change["rcid"] > db_wiki["rcid"]: + for target in targets.items(): + try: + await essential_info(change, categorize_events, local_wiki, db_wiki, + target, paths, recent_changes_resp) + except: + if command_line_args.debug: + raise # reraise the issue + else: + logger.exception("Exception on RC formatter") + await formatter_exception_logger(db_wiki["wiki"], change, traceback.format_exc()) + if recent_changes: + DBHandler.add(db_wiki["wiki"], change["rcid"]) + await asyncio.sleep(delay=calc_delay) + + async def wiki_scanner(): """Wiki scanner is spawned as a task which purpose is to continuously run over wikis in the DB, fetching recent changes to add messages based on the changes to message queue later handled by message_sender coroutine.""" try: while True: - calc_delay = calculate_delay() + async for group in generate_domain_groups(): + asyncio.create_task(scan_group(group)) + fetch_all = db_cursor.execute( 'SELECT webhook, wiki, lang, display, wikiid, rcid, postid FROM rcgcdw GROUP BY wiki') for db_wiki in fetch_all.fetchall(): - logger.debug("Wiki {}".format(db_wiki["wiki"])) - if db_wiki["wiki"] not in all_wikis: - logger.info("Registering new wiki locally: {}".format(db_wiki["wiki"])) - all_wikis[db_wiki["wiki"]] = Wiki() - local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory - if db_wiki["rcid"] != -1: - extended = False - if local_wiki.mw_messages is None: - extended = True - async with aiohttp.ClientSession(headers=settings["header"], - timeout=aiohttp.ClientTimeout(3.0)) as session: - try: - wiki_response = await local_wiki.fetch_wiki(extended, db_wiki["wiki"], session) - await local_wiki.check_status(db_wiki["wiki"], wiki_response.status) - except (WikiServerError, WikiError): - logger.error("Exeption when fetching the wiki") - continue # ignore this wiki if it throws errors - try: - recent_changes_resp = await wiki_response.json() - if "error" in recent_changes_resp or "errors" in recent_changes_resp: - error = recent_changes_resp.get("error", recent_changes_resp["errors"]) - if error["code"] == "readapidenied": - await local_wiki.fail_add(db_wiki["wiki"], 410) - continue - raise WikiError - recent_changes = recent_changes_resp['query']['recentchanges'] - recent_changes.reverse() - except aiohttp.ContentTypeError: - logger.exception("Wiki seems to be resulting in non-json content.") - await local_wiki.fail_add(db_wiki["wiki"], 410) - continue - except: - logger.exception("On loading json of response.") - continue - if extended: - await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs) - if db_wiki["rcid"] is None: # new wiki, just get the last rc to not spam the channel - if len(recent_changes) > 0: - DBHandler.add(db_wiki["wiki"], recent_changes[-1]["rcid"]) - else: - DBHandler.add(db_wiki["wiki"], 0) - DBHandler.update_db() - continue - categorize_events = {} - targets = generate_targets(db_wiki["wiki"]) - paths = get_paths(db_wiki["wiki"], recent_changes_resp) - for change in recent_changes: - await process_cats(change, local_wiki, mw_msgs, categorize_events) - for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up - if change["rcid"] > db_wiki["rcid"]: - for target in targets.items(): - try: - await essential_info(change, categorize_events, local_wiki, db_wiki, - target, paths, recent_changes_resp) - except: - if command_line_args.debug: - raise # reraise the issue - else: - logger.exception("Exception on RC formatter") - await formatter_exception_logger(db_wiki["wiki"], change, traceback.format_exc()) - if recent_changes: - DBHandler.add(db_wiki["wiki"], change["rcid"]) - await asyncio.sleep(delay=2.0) # temporary measure until rate limiting is not implemented + if db_wiki["wikiid"] is not None: header = settings["header"] header["Accept"] = "application/hal+json" diff --git a/src/misc.py b/src/misc.py index c8fcc3a..a041066 100644 --- a/src/misc.py +++ b/src/misc.py @@ -17,6 +17,12 @@ def get_paths(wiki: str, request) -> tuple: return WIKI_API_PATH, WIKI_SCRIPT_PATH, WIKI_ARTICLE_PATH, WIKI_JUST_DOMAIN +def get_domain(url: str) -> str: + """Get domain of given URL""" + parsed_url = urlparse(url) + return ".".join(urlunparse((*parsed_url[0:2], "", "", "", "")).split(".")[-2:]) # something like gamepedia.com, fandom.com + + class LinkParser(HTMLParser): new_string = ""