From e2077a7ca1638f69898c159215bf8039039e5cae Mon Sep 17 00:00:00 2001 From: Frisk Date: Sun, 9 Aug 2020 15:31:21 +0200 Subject: [PATCH] Fixed anotter oopsie --- src/bot.py | 163 ++++++++++++++++++++++++++++------------------------- 1 file changed, 85 insertions(+), 78 deletions(-) diff --git a/src/bot.py b/src/bot.py index 0602cb6..aa71c6d 100644 --- a/src/bot.py +++ b/src/bot.py @@ -70,6 +70,7 @@ class RcQueue: group = get_domain(wiki) self[group]["query"] = [x for x in self[group]["query"] if x.url == wiki] if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task + logger.debug(f"{group} no longer has any wikis queued!") all_wikis[wiki].rc_active = -1 self[group]["task"].cancel() del self.domain_list[group] @@ -106,12 +107,15 @@ class RcQueue: for db_wiki in fetch_all.fetchall(): domain = get_domain(db_wiki["wiki"]) try: - all_wikis[db_wiki["wiki"]] - except KeyError: + if db_wiki["wiki"] not in all_wikis: + raise AssertionError + self.to_remove.remove(db_wiki["wiki"]) + except AssertionError: all_wikis[db_wiki["wiki"]] = Wiki() all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"] + except ValueError: + pass try: - self.to_remove.remove(db_wiki["wiki"]) current_domain = self[domain] if not db_wiki["ROWID"] < current_domain["last_rowid"]: current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20)) @@ -186,83 +190,86 @@ async def generate_domain_groups(): async def scan_group(group: str): rate_limiter = rcqueue[group]["rate_limiter"] while True: - async with rcqueue.retrieve_next_queued(group) as queued_wiki: # acquire next wiki in queue - logger.debug("Wiki {}".format(queued_wiki.url)) - local_wiki = all_wikis[queued_wiki.url] # set a reference to a wiki object from memory - extended = False - if local_wiki.mw_messages is None: - extended = True - async with aiohttp.ClientSession(headers=settings["header"], - timeout=aiohttp.ClientTimeout(3.0)) as session: - try: - wiki_response = await local_wiki.fetch_wiki(extended, queued_wiki.url, session, rate_limiter, amount=queued_wiki.amount) - await local_wiki.check_status(queued_wiki.url, wiki_response.status) - except (WikiServerError, WikiError): - logger.error("Exeption when fetching the wiki") - continue # ignore this wiki if it throws errors - try: - recent_changes_resp = await wiki_response.json() - if "error" in recent_changes_resp or "errors" in recent_changes_resp: - error = recent_changes_resp.get("error", recent_changes_resp["errors"]) - if error["code"] == "readapidenied": - await local_wiki.fail_add(queued_wiki.url, 410) - continue - raise WikiError - recent_changes = recent_changes_resp['query']['recentchanges'] - recent_changes.reverse() - except aiohttp.ContentTypeError: - logger.exception("Wiki seems to be resulting in non-json content.") - await local_wiki.fail_add(queued_wiki.url, 410) + try: + async with rcqueue.retrieve_next_queued(group) as queued_wiki: # acquire next wiki in queue + logger.debug("Wiki {}".format(queued_wiki.url)) + local_wiki = all_wikis[queued_wiki.url] # set a reference to a wiki object from memory + extended = False + if local_wiki.mw_messages is None: + extended = True + async with aiohttp.ClientSession(headers=settings["header"], + timeout=aiohttp.ClientTimeout(3.0)) as session: + try: + wiki_response = await local_wiki.fetch_wiki(extended, queued_wiki.url, session, rate_limiter, amount=queued_wiki.amount) + await local_wiki.check_status(queued_wiki.url, wiki_response.status) + except (WikiServerError, WikiError): + logger.error("Exeption when fetching the wiki") + continue # ignore this wiki if it throws errors + try: + recent_changes_resp = await wiki_response.json() + if "error" in recent_changes_resp or "errors" in recent_changes_resp: + error = recent_changes_resp.get("error", recent_changes_resp["errors"]) + if error["code"] == "readapidenied": + await local_wiki.fail_add(queued_wiki.url, 410) + continue + raise WikiError + recent_changes = recent_changes_resp['query']['recentchanges'] + recent_changes.reverse() + except aiohttp.ContentTypeError: + logger.exception("Wiki seems to be resulting in non-json content.") + await local_wiki.fail_add(queued_wiki.url, 410) + continue + except: + logger.exception("On loading json of response.") + continue + if extended: + await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs) + if local_wiki.rc_active == 0: # new wiki, just get the last rc to not spam the channel + if len(recent_changes) > 0: + local_wiki.rc_active = recent_changes[-1]["rcid"] + DBHandler.add(queued_wiki.url, recent_changes[-1]["rcid"]) + else: + local_wiki.rc_active = 0 + DBHandler.add(queued_wiki.url, 0) + DBHandler.update_db() continue - except: - logger.exception("On loading json of response.") - continue - if extended: - await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs) - if local_wiki.rc_active == 0: # new wiki, just get the last rc to not spam the channel - if len(recent_changes) > 0: - local_wiki.rc_active = recent_changes[-1]["rcid"] - DBHandler.add(queued_wiki.url, recent_changes[-1]["rcid"]) - else: - local_wiki.rc_active = 0 - DBHandler.add(queued_wiki.url, 0) - DBHandler.update_db() - continue - categorize_events = {} - targets = generate_targets(queued_wiki.url) - paths = get_paths(queued_wiki.url, recent_changes_resp) - new_events = 0 - for change in recent_changes: - if change["rcid"] > local_wiki.rc_active and queued_wiki.amount != 450: - new_events += 1 - if new_events == 20: - # call the function again with max limit for more results, ignore the ones in this request - logger.debug("There were too many new events, queuing wiki with 450 limit.") - rcqueue[group]["query"].insert(1, QueuedWiki(queued_wiki.url, 450)) - break - await process_cats(change, local_wiki, mw_msgs, categorize_events) - else: # If we broke from previous loop (too many changes) don't execute sending messages here - for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up - if change["rcid"] > local_wiki.rc_active: - for target in targets.items(): - try: - await essential_info(change, categorize_events, local_wiki, target, paths, - recent_changes_resp, rate_limiter) - except asyncio.CancelledError: - raise - except: - if command_line_args.debug: - logger.exception("Exception on RC formatter") + categorize_events = {} + targets = generate_targets(queued_wiki.url) + paths = get_paths(queued_wiki.url, recent_changes_resp) + new_events = 0 + for change in recent_changes: + if change["rcid"] > local_wiki.rc_active and queued_wiki.amount != 450: + new_events += 1 + if new_events == 20: + # call the function again with max limit for more results, ignore the ones in this request + logger.debug("There were too many new events, queuing wiki with 450 limit.") + rcqueue[group]["query"].insert(1, QueuedWiki(queued_wiki.url, 450)) + break + await process_cats(change, local_wiki, mw_msgs, categorize_events) + else: # If we broke from previous loop (too many changes) don't execute sending messages here + for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up + if change["rcid"] > local_wiki.rc_active: + for target in targets.items(): + try: + await essential_info(change, categorize_events, local_wiki, target, paths, + recent_changes_resp, rate_limiter) + except asyncio.CancelledError: raise - else: - logger.exception("Exception on RC formatter") - await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in RC formatter", Wiki=queued_wiki.url, Change=str(change)[0:1000]) - if recent_changes: - local_wiki.rc_active = change["rcid"] - DBHandler.add(queued_wiki.url, change["rcid"]) - delay_between_wikis = calculate_delay_for_group(len(rcqueue[group]["query"])) # TODO Find a way to not execute it every wiki - await asyncio.sleep(delay_between_wikis) - DBHandler.update_db() + except: + if command_line_args.debug: + logger.exception("Exception on RC formatter") + raise + else: + logger.exception("Exception on RC formatter") + await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in RC formatter", Wiki=queued_wiki.url, Change=str(change)[0:1000]) + if recent_changes: + local_wiki.rc_active = change["rcid"] + DBHandler.add(queued_wiki.url, change["rcid"]) + delay_between_wikis = calculate_delay_for_group(len(rcqueue[group]["query"])) # TODO Find a way to not execute it every wiki + await asyncio.sleep(delay_between_wikis) + DBHandler.update_db() + except asyncio.CancelledError: + return async def wiki_scanner():