From 293947c510116efe6f93fe58165ae7d82fe3585f Mon Sep 17 00:00:00 2001 From: Frisk Date: Wed, 5 Aug 2020 19:20:38 +0200 Subject: [PATCH] Added removal handling, should be now mostly ok, with exception to exception handling --- src/bot.py | 134 +++++++++++++++++++++++++++-------------------------- 1 file changed, 69 insertions(+), 65 deletions(-) diff --git a/src/bot.py b/src/bot.py index 233c530..4abf06f 100644 --- a/src/bot.py +++ b/src/bot.py @@ -36,9 +36,6 @@ mw_msgs: dict = {} # will have the type of id: tuple # Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests # 2. Easier to code -for wiki in db_cursor.execute('SELECT DISTINCT wiki FROM rcgcdw'): - all_wikis[wiki] = Wiki() - queue_limit = settings.get("queue_limit", 30) class LimitedList(list): @@ -64,12 +61,17 @@ class RcQueue: else: raise KeyError - async def remove_wiki_from_group(self, group, wiki): + async def remove_wiki_from_group(self, wiki): """Removes a wiki from query of given domain group""" - self[group]["query"] # there can be multiple webhooks with + group = get_domain(wiki) + self[group]["query"] = [x for x in self[group]["query"] if x["wiki"] == wiki] + if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task + self[group]["task"].cancel() + del self.domain_list[group] @asynccontextmanager async def retrieve_next_queued(self, group): + """Retrives next wiki in the queue for given domain""" try: yield self.domain_list[group]["query"][0] except IndexError: @@ -79,9 +81,10 @@ class RcQueue: self.domain_list[group]["query"].pop(0) async def update_queues(self): + """Makes a round on rcgcdw DB and looks for updates to the queues in self.domain_list""" fetch_all = db_cursor.execute( 'SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID') - self.to_remove = list(all_wikis.keys()) + self.to_remove = list(all_wikis.keys()) # first populate this list and remove wikis that are still in the db, clean up the rest full = [] for db_wiki in fetch_all.fetchall(): domain = get_domain(db_wiki["wiki"]) @@ -89,7 +92,7 @@ class RcQueue: try: if not db_wiki["ROWID"] < current_domain["last_rowid"]: current_domain["query"].append(db_wiki) - self.to_remove.remove(domain) + self.to_remove.remove(db_wiki["wiki"]) except KeyError: await self.start_group(domain, db_wiki) logger.info("A new domain group has been added since last time, adding it to the domain_list and starting a task...") @@ -97,11 +100,13 @@ class RcQueue: full.append(domain) current_domain["last_rowid"] = db_wiki["ROWID"] continue + for wiki in self.to_remove: + del all_wikis[wiki] + await self.remove_wiki_from_group(wiki) for group, data in self.domain_list: if group not in full: self["domain"]["last_rowid"] = 0 # iter reached the end without being stuck on full list - def __getitem__(self, item): """Returns the query of given domain group""" return self.domain_list[item] @@ -139,18 +144,17 @@ def generate_targets(wiki_url: str) -> defaultdict: async def generate_domain_groups(): """Generate a list of wikis per domain (fandom.com, wikipedia.org etc.)""" - combinations = defaultdict(list) + domain_wikis = defaultdict(list) fetch_all = db_cursor.execute('SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID ASC') for db_wiki in fetch_all.fetchall(): - combinations[get_domain(db_wiki["wiki"])].append(db_wiki) + domain_wikis[get_domain(db_wiki["wiki"])].append(db_wiki) all_wikis[db_wiki["wiki"]] = Wiki() # populate all_wikis - for group, db_wikis in combinations.items(): + for group, db_wikis in domain_wikis.items(): yield group, db_wikis async def scan_group(group: str): while True: - calc_delay = calculate_delay_for_group(len(rcqueue[group])) async with rcqueue.retrieve_next_queued(group) as db_wiki: # acquire next wiki in queue if db_wiki is None: raise QueueEmpty @@ -226,59 +230,59 @@ async def wiki_scanner(): while True: await asyncio.sleep(20.0) await rcqueue.update_queues() - - - if db_wiki["wikiid"] is not None: - header = settings["header"] - header["Accept"] = "application/hal+json" - async with aiohttp.ClientSession(headers=header, - timeout=aiohttp.ClientTimeout(3.0)) as session: - try: - feeds_response = await local_wiki.fetch_feeds(db_wiki["wikiid"], session) - except (WikiServerError, WikiError): - logger.error("Exeption when fetching the wiki") - continue # ignore this wiki if it throws errors - try: - discussion_feed_resp = await feeds_response.json(encoding="UTF-8") - if "title" in discussion_feed_resp: - error = discussion_feed_resp["error"] - if error == "site doesn't exists": - db_cursor.execute("UPDATE rcgcdw SET wikiid = ? WHERE wiki = ?", - (None, db_wiki["wiki"],)) - DBHandler.update_db() - continue - raise WikiError - discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"] - discussion_feed.reverse() - except aiohttp.ContentTypeError: - logger.exception("Wiki seems to be resulting in non-json content.") - continue - except: - logger.exception("On loading json of response.") - continue - if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel - if len(discussion_feed) > 0: - DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True) - else: - DBHandler.add(db_wiki["wiki"], "0", True) - DBHandler.update_db() - continue - targets = generate_targets(db_wiki["wiki"]) - for post in discussion_feed: - if post["id"] > db_wiki["postid"]: - for target in targets.items(): - try: - await essential_feeds(post, db_wiki, target) - except: - if command_line_args.debug: - raise # reraise the issue - else: - logger.exception("Exception on Feeds formatter") - await formatter_exception_logger(db_wiki["wiki"], post, traceback.format_exc()) - if discussion_feed: - DBHandler.add(db_wiki["wiki"], post["id"], True) - await asyncio.sleep(delay=calc_delay) - DBHandler.update_db() + # + # + # if db_wiki["wikiid"] is not None: + # header = settings["header"] + # header["Accept"] = "application/hal+json" + # async with aiohttp.ClientSession(headers=header, + # timeout=aiohttp.ClientTimeout(3.0)) as session: + # try: + # feeds_response = await local_wiki.fetch_feeds(db_wiki["wikiid"], session) + # except (WikiServerError, WikiError): + # logger.error("Exeption when fetching the wiki") + # continue # ignore this wiki if it throws errors + # try: + # discussion_feed_resp = await feeds_response.json(encoding="UTF-8") + # if "title" in discussion_feed_resp: + # error = discussion_feed_resp["error"] + # if error == "site doesn't exists": + # db_cursor.execute("UPDATE rcgcdw SET wikiid = ? WHERE wiki = ?", + # (None, db_wiki["wiki"],)) + # DBHandler.update_db() + # continue + # raise WikiError + # discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"] + # discussion_feed.reverse() + # except aiohttp.ContentTypeError: + # logger.exception("Wiki seems to be resulting in non-json content.") + # continue + # except: + # logger.exception("On loading json of response.") + # continue + # if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel + # if len(discussion_feed) > 0: + # DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True) + # else: + # DBHandler.add(db_wiki["wiki"], "0", True) + # DBHandler.update_db() + # continue + # targets = generate_targets(db_wiki["wiki"]) + # for post in discussion_feed: + # if post["id"] > db_wiki["postid"]: + # for target in targets.items(): + # try: + # await essential_feeds(post, db_wiki, target) + # except: + # if command_line_args.debug: + # raise # reraise the issue + # else: + # logger.exception("Exception on Feeds formatter") + # await formatter_exception_logger(db_wiki["wiki"], post, traceback.format_exc()) + # if discussion_feed: + # DBHandler.add(db_wiki["wiki"], post["id"], True) + # await asyncio.sleep(delay=calc_delay) + # DBHandler.update_db() except asyncio.CancelledError: raise