diff --git a/src/bot.py b/src/bot.py index 8538d95..233c530 100644 --- a/src/bot.py +++ b/src/bot.py @@ -9,6 +9,7 @@ from collections import defaultdict import functools import requests +from contextlib import asynccontextmanager from src.argparser import command_line_args from src.config import settings from src.database import db_cursor @@ -38,15 +39,28 @@ mw_msgs: dict = {} # will have the type of id: tuple for wiki in db_cursor.execute('SELECT DISTINCT wiki FROM rcgcdw'): all_wikis[wiki] = Wiki() +queue_limit = settings.get("queue_limit", 30) + +class LimitedList(list): + def __init__(self, *args): + list.__init__(self, *args) + + def append(self, object) -> None: + if len(self) < queue_limit: + self.append(object) + return + raise ListFull + + class RcQueue: def __init__(self): self.domain_list = {} self.to_remove = [] - async def start_group(self, group): + async def start_group(self, group, initial_wikis): """Starts a task for given domain group""" if group not in self.domain_list: - self.domain_list[group] = {"task": asyncio.create_task(scan_group(group)), "last_rowid": 0, "query": []} + self.domain_list[group] = {"task": asyncio.create_task(scan_group(group)), "last_rowid": 0, "query": LimitedList(initial_wikis)} else: raise KeyError @@ -54,6 +68,39 @@ class RcQueue: """Removes a wiki from query of given domain group""" self[group]["query"] # there can be multiple webhooks with + @asynccontextmanager + async def retrieve_next_queued(self, group): + try: + yield self.domain_list[group]["query"][0] + except IndexError: + logger.warning("Queue for {} domain group is empty.".format(group)) + yield None + finally: # add exception handling? + self.domain_list[group]["query"].pop(0) + + async def update_queues(self): + fetch_all = db_cursor.execute( + 'SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID') + self.to_remove = list(all_wikis.keys()) + full = [] + for db_wiki in fetch_all.fetchall(): + domain = get_domain(db_wiki["wiki"]) + current_domain = self[domain] + try: + if not db_wiki["ROWID"] < current_domain["last_rowid"]: + current_domain["query"].append(db_wiki) + self.to_remove.remove(domain) + except KeyError: + await self.start_group(domain, db_wiki) + logger.info("A new domain group has been added since last time, adding it to the domain_list and starting a task...") + except ListFull: + full.append(domain) + current_domain["last_rowid"] = db_wiki["ROWID"] + continue + for group, data in self.domain_list: + if group not in full: + self["domain"]["last_rowid"] = 0 # iter reached the end without being stuck on full list + def __getitem__(self, item): """Returns the query of given domain group""" @@ -90,149 +137,148 @@ def generate_targets(wiki_url: str) -> defaultdict: return combinations -async def generate_domain_groups(): # oh boy, I cannot wait to learn about async generators +async def generate_domain_groups(): """Generate a list of wikis per domain (fandom.com, wikipedia.org etc.)""" combinations = defaultdict(list) - fetch_all = db_cursor.execute('SELECT webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw GROUP BY wiki') + fetch_all = db_cursor.execute('SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID ASC') for db_wiki in fetch_all.fetchall(): combinations[get_domain(db_wiki["wiki"])].append(db_wiki) + all_wikis[db_wiki["wiki"]] = Wiki() # populate all_wikis for group, db_wikis in combinations.items(): yield group, db_wikis -async def scan_group(group: list): +async def scan_group(group: str): while True: calc_delay = calculate_delay_for_group(len(rcqueue[group])) - rcqueue[group] - logger.debug("Wiki {}".format(db_wiki["wiki"])) - if db_wiki["wiki"] not in all_wikis: - logger.info("Registering new wiki locally: {}".format(db_wiki["wiki"])) - all_wikis[db_wiki["wiki"]] = Wiki() - local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory - if db_wiki["rcid"] != -1: - extended = False - if local_wiki.mw_messages is None: - extended = True - async with aiohttp.ClientSession(headers=settings["header"], - timeout=aiohttp.ClientTimeout(3.0)) as session: - try: - wiki_response = await local_wiki.fetch_wiki(extended, db_wiki["wiki"], session) - await local_wiki.check_status(db_wiki["wiki"], wiki_response.status) - except (WikiServerError, WikiError): - logger.error("Exeption when fetching the wiki") - continue # ignore this wiki if it throws errors - try: - recent_changes_resp = await wiki_response.json() - if "error" in recent_changes_resp or "errors" in recent_changes_resp: - error = recent_changes_resp.get("error", recent_changes_resp["errors"]) - if error["code"] == "readapidenied": - await local_wiki.fail_add(db_wiki["wiki"], 410) - continue - raise WikiError - recent_changes = recent_changes_resp['query']['recentchanges'] - recent_changes.reverse() - except aiohttp.ContentTypeError: - logger.exception("Wiki seems to be resulting in non-json content.") - await local_wiki.fail_add(db_wiki["wiki"], 410) + async with rcqueue.retrieve_next_queued(group) as db_wiki: # acquire next wiki in queue + if db_wiki is None: + raise QueueEmpty + logger.debug("Wiki {}".format(db_wiki["wiki"])) + local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory + if db_wiki["rcid"] != -1: + extended = False + if local_wiki.mw_messages is None: + extended = True + async with aiohttp.ClientSession(headers=settings["header"], + timeout=aiohttp.ClientTimeout(3.0)) as session: + try: + wiki_response = await local_wiki.fetch_wiki(extended, db_wiki["wiki"], session) + await local_wiki.check_status(db_wiki["wiki"], wiki_response.status) + except (WikiServerError, WikiError): + logger.error("Exeption when fetching the wiki") + continue # ignore this wiki if it throws errors + try: + recent_changes_resp = await wiki_response.json() + if "error" in recent_changes_resp or "errors" in recent_changes_resp: + error = recent_changes_resp.get("error", recent_changes_resp["errors"]) + if error["code"] == "readapidenied": + await local_wiki.fail_add(db_wiki["wiki"], 410) + continue + raise WikiError + recent_changes = recent_changes_resp['query']['recentchanges'] + recent_changes.reverse() + except aiohttp.ContentTypeError: + logger.exception("Wiki seems to be resulting in non-json content.") + await local_wiki.fail_add(db_wiki["wiki"], 410) + continue + except: + logger.exception("On loading json of response.") + continue + if extended: + await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs) + if db_wiki["rcid"] is None: # new wiki, just get the last rc to not spam the channel + if len(recent_changes) > 0: + DBHandler.add(db_wiki["wiki"], recent_changes[-1]["rcid"]) + else: + DBHandler.add(db_wiki["wiki"], 0) + DBHandler.update_db() continue - except: - logger.exception("On loading json of response.") - continue - if extended: - await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs) - if db_wiki["rcid"] is None: # new wiki, just get the last rc to not spam the channel - if len(recent_changes) > 0: - DBHandler.add(db_wiki["wiki"], recent_changes[-1]["rcid"]) - else: - DBHandler.add(db_wiki["wiki"], 0) - DBHandler.update_db() - continue - categorize_events = {} - targets = generate_targets(db_wiki["wiki"]) - paths = get_paths(db_wiki["wiki"], recent_changes_resp) - for change in recent_changes: - await process_cats(change, local_wiki, mw_msgs, categorize_events) - for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up - if change["rcid"] > db_wiki["rcid"]: - for target in targets.items(): - try: - await essential_info(change, categorize_events, local_wiki, db_wiki, - target, paths, recent_changes_resp) - except: - if command_line_args.debug: - raise # reraise the issue - else: - logger.exception("Exception on RC formatter") - await formatter_exception_logger(db_wiki["wiki"], change, traceback.format_exc()) - if recent_changes: - DBHandler.add(db_wiki["wiki"], change["rcid"]) - await asyncio.sleep(delay=calc_delay) - return group + categorize_events = {} + targets = generate_targets(db_wiki["wiki"]) + paths = get_paths(db_wiki["wiki"], recent_changes_resp) + for change in recent_changes: + await process_cats(change, local_wiki, mw_msgs, categorize_events) + for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up + if change["rcid"] > db_wiki["rcid"]: + for target in targets.items(): + try: + await essential_info(change, categorize_events, local_wiki, db_wiki, + target, paths, recent_changes_resp) + except: + if command_line_args.debug: + raise # reraise the issue + else: + logger.exception("Exception on RC formatter") + await formatter_exception_logger(db_wiki["wiki"], change, traceback.format_exc()) + if recent_changes: + DBHandler.add(db_wiki["wiki"], change["rcid"]) + await asyncio.sleep(delay=calc_delay) + return group async def wiki_scanner(): """Wiki scanner is spawned as a task which purpose is to continuously run over wikis in the DB, fetching recent changes to add messages based on the changes to message queue later handled by message_sender coroutine.""" try: - async for group, db_wikis in generate_domain_groups(): - await rcqueue.start_group(group) - rcqueue[group]["query"] = db_wikis # __SETITEM__ MIGHT BE BAD FOR NESTED, SEE IF CRASHES - while True: - await asyncio.sleep(20.0) + async for group, db_wikis in generate_domain_groups(): # First scan + await rcqueue.start_group(group, db_wikis) + while True: + await asyncio.sleep(20.0) + await rcqueue.update_queues() - - if db_wiki["wikiid"] is not None: - header = settings["header"] - header["Accept"] = "application/hal+json" - async with aiohttp.ClientSession(headers=header, - timeout=aiohttp.ClientTimeout(3.0)) as session: - try: - feeds_response = await local_wiki.fetch_feeds(db_wiki["wikiid"], session) - except (WikiServerError, WikiError): - logger.error("Exeption when fetching the wiki") - continue # ignore this wiki if it throws errors - try: - discussion_feed_resp = await feeds_response.json(encoding="UTF-8") - if "title" in discussion_feed_resp: - error = discussion_feed_resp["error"] - if error == "site doesn't exists": - db_cursor.execute("UPDATE rcgcdw SET wikiid = ? WHERE wiki = ?", - (None, db_wiki["wiki"],)) - DBHandler.update_db() - continue - raise WikiError - discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"] - discussion_feed.reverse() - except aiohttp.ContentTypeError: - logger.exception("Wiki seems to be resulting in non-json content.") - continue - except: - logger.exception("On loading json of response.") - continue - if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel - if len(discussion_feed) > 0: - DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True) - else: - DBHandler.add(db_wiki["wiki"], "0", True) - DBHandler.update_db() + if db_wiki["wikiid"] is not None: + header = settings["header"] + header["Accept"] = "application/hal+json" + async with aiohttp.ClientSession(headers=header, + timeout=aiohttp.ClientTimeout(3.0)) as session: + try: + feeds_response = await local_wiki.fetch_feeds(db_wiki["wikiid"], session) + except (WikiServerError, WikiError): + logger.error("Exeption when fetching the wiki") + continue # ignore this wiki if it throws errors + try: + discussion_feed_resp = await feeds_response.json(encoding="UTF-8") + if "title" in discussion_feed_resp: + error = discussion_feed_resp["error"] + if error == "site doesn't exists": + db_cursor.execute("UPDATE rcgcdw SET wikiid = ? WHERE wiki = ?", + (None, db_wiki["wiki"],)) + DBHandler.update_db() + continue + raise WikiError + discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"] + discussion_feed.reverse() + except aiohttp.ContentTypeError: + logger.exception("Wiki seems to be resulting in non-json content.") continue - targets = generate_targets(db_wiki["wiki"]) - for post in discussion_feed: - if post["id"] > db_wiki["postid"]: - for target in targets.items(): - try: - await essential_feeds(post, db_wiki, target) - except: - if command_line_args.debug: - raise # reraise the issue - else: - logger.exception("Exception on Feeds formatter") - await formatter_exception_logger(db_wiki["wiki"], post, traceback.format_exc()) - if discussion_feed: - DBHandler.add(db_wiki["wiki"], post["id"], True) - await asyncio.sleep(delay=calc_delay) - DBHandler.update_db() + except: + logger.exception("On loading json of response.") + continue + if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel + if len(discussion_feed) > 0: + DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True) + else: + DBHandler.add(db_wiki["wiki"], "0", True) + DBHandler.update_db() + continue + targets = generate_targets(db_wiki["wiki"]) + for post in discussion_feed: + if post["id"] > db_wiki["postid"]: + for target in targets.items(): + try: + await essential_feeds(post, db_wiki, target) + except: + if command_line_args.debug: + raise # reraise the issue + else: + logger.exception("Exception on Feeds formatter") + await formatter_exception_logger(db_wiki["wiki"], post, traceback.format_exc()) + if discussion_feed: + DBHandler.add(db_wiki["wiki"], post["id"], True) + await asyncio.sleep(delay=calc_delay) + DBHandler.update_db() except asyncio.CancelledError: raise diff --git a/src/exceptions.py b/src/exceptions.py index 6fde0c1..0d54bb6 100644 --- a/src/exceptions.py +++ b/src/exceptions.py @@ -14,4 +14,10 @@ class WikiUnauthorizedError(Exception): pass class OtherWikiError(Exception): + pass + +class QueueEmpty(Exception): + pass + +class ListFull(Exception): pass \ No newline at end of file