From 6658c399dc7e5052b0abccdad59f5d8041d837f1 Mon Sep 17 00:00:00 2001 From: Markus-Rost Date: Sat, 8 Aug 2020 19:25:32 +0200 Subject: [PATCH] fix queue handler --- src/bot.py | 115 +++++++++++++++++++++---------------------- src/queue_handler.py | 7 ++- 2 files changed, 62 insertions(+), 60 deletions(-) diff --git a/src/bot.py b/src/bot.py index 10358ad..d1f0111 100644 --- a/src/bot.py +++ b/src/bot.py @@ -100,7 +100,7 @@ class RcQueue: """Makes a round on rcgcdw DB and looks for updates to the queues in self.domain_list""" try: fetch_all = db_cursor.execute( - 'SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID') + 'SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE rcid != -1 GROUP BY wiki ORDER BY ROWID') self.to_remove = [x[0] for x in filter(self.filter_rc_active, all_wikis.items())] # first populate this list and remove wikis that are still in the db, clean up the rest full = [] for db_wiki in fetch_all.fetchall(): @@ -176,7 +176,7 @@ def generate_targets(wiki_url: str) -> defaultdict: async def generate_domain_groups(): """Generate a list of wikis per domain (fandom.com, wikipedia.org etc.)""" domain_wikis = defaultdict(list) - fetch_all = db_cursor.execute('SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID ASC') + fetch_all = db_cursor.execute('SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE rcid != -1 GROUP BY wiki ORDER BY ROWID ASC') for db_wiki in fetch_all.fetchall(): all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"] domain_wikis[get_domain(db_wiki["wiki"])].append(db_wiki) @@ -289,64 +289,63 @@ async def discussion_handler(): try: while True: fetch_all = db_cursor.execute( - 'SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid, postid FROM rcgcdw WHERE NOT wikiid = ""') + 'SELECT wiki, wikiid, postid FROM rcgcdw WHERE wikiid IS NOT NULL') for db_wiki in fetch_all.fetchall(): - if db_wiki["wikiid"] is not None: - header = settings["header"] - header["Accept"] = "application/hal+json" - async with aiohttp.ClientSession(headers=header, - timeout=aiohttp.ClientTimeout(3.0)) as session: - try: - local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory - except KeyError: - local_wiki = all_wikis[db_wiki["wiki"]] = Wiki() - try: - feeds_response = await local_wiki.fetch_feeds(db_wiki["wikiid"], session) - except (WikiServerError, WikiError): - logger.error("Exeption when fetching the wiki") - continue # ignore this wiki if it throws errors - try: - discussion_feed_resp = await feeds_response.json(encoding="UTF-8") - if "title" in discussion_feed_resp: - error = discussion_feed_resp["error"] - if error == "site doesn't exists": - db_cursor.execute("UPDATE rcgcdw SET wikiid = ? WHERE wiki = ?", - (None, db_wiki["wiki"],)) - DBHandler.update_db() - continue - raise WikiError - discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"] - discussion_feed.reverse() - except aiohttp.ContentTypeError: - logger.exception("Wiki seems to be resulting in non-json content.") - continue - except: - logger.exception("On loading json of response.") - continue - if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel - if len(discussion_feed) > 0: - DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True) - else: - DBHandler.add(db_wiki["wiki"], "0", True) - DBHandler.update_db() + header = settings["header"] + header["Accept"] = "application/hal+json" + async with aiohttp.ClientSession(headers=header, + timeout=aiohttp.ClientTimeout(3.0)) as session: + try: + local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory + except KeyError: + local_wiki = all_wikis[db_wiki["wiki"]] = Wiki() + try: + feeds_response = await local_wiki.fetch_feeds(db_wiki["wikiid"], session) + except (WikiServerError, WikiError): + logger.error("Exeption when fetching the wiki") + continue # ignore this wiki if it throws errors + try: + discussion_feed_resp = await feeds_response.json(encoding="UTF-8") + if "title" in discussion_feed_resp: + error = discussion_feed_resp["error"] + if error == "site doesn't exists": + db_cursor.execute("UPDATE rcgcdw SET wikiid = ? WHERE wiki = ?", + (None, db_wiki["wiki"],)) + DBHandler.update_db() + continue + raise WikiError + discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"] + discussion_feed.reverse() + except aiohttp.ContentTypeError: + logger.exception("Wiki seems to be resulting in non-json content.") continue - targets = generate_targets(db_wiki["wiki"]) - for post in discussion_feed: - if post["id"] > db_wiki["postid"]: - for target in targets.items(): - try: - await essential_feeds(post, db_wiki, target) - except asyncio.CancelledError: - raise - except: - if command_line_args.debug: - logger.exception("Exception on Feeds formatter") - shutdown(loop=asyncio.get_event_loop()) - else: - logger.exception("Exception on Feeds formatter") - await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in feed formatter", Post=str(post)[0:1000], Wiki=db_wiki["wiki"]) - if discussion_feed: - DBHandler.add(db_wiki["wiki"], post["id"], True) + except: + logger.exception("On loading json of response.") + continue + if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel + if len(discussion_feed) > 0: + DBHandler.add(db_wiki["wikiid"], discussion_feed[-1]["id"], True) + else: + DBHandler.add(db_wiki["wikiid"], "0", True) + DBHandler.update_db() + continue + targets = generate_targets(db_wiki["wiki"]) + for post in discussion_feed: + if post["id"] > db_wiki["postid"]: + for target in targets.items(): + try: + await essential_feeds(post, db_wiki, target) + except asyncio.CancelledError: + raise + except: + if command_line_args.debug: + logger.exception("Exception on Feeds formatter") + shutdown(loop=asyncio.get_event_loop()) + else: + logger.exception("Exception on Feeds formatter") + await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in feed formatter", Post=str(post)[0:1000], Wiki=db_wiki["wiki"]) + if discussion_feed: + DBHandler.add(db_wiki["wikiid"], post["id"], True) await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more DBHandler.update_db() except asyncio.CancelledError: diff --git a/src/queue_handler.py b/src/queue_handler.py index 4bb9232..31ec2fe 100644 --- a/src/queue_handler.py +++ b/src/queue_handler.py @@ -16,8 +16,11 @@ class UpdateDB: def update_db(self): for update in self.updated: - update_type = "postid" if update[2] is not None else "rcid" - db_cursor.execute("UPDATE rcgcdw SET {} = ? WHERE wiki = ? AND NOT ? = -1".format(update_type), (update[1], update[0], update_type)) + if update[2] is None: + sql = "UPDATE rcgcdw SET rcid = ? WHERE wiki = ? AND rcid != -1" + else: + sql = "UPDATE rcgcdw SET postid = ? WHERE wikiid = ?" + db_cursor.execute(sql, (update[1], update[0])) db_connection.commit() self.clear_list()