fix queue handler

This commit is contained in:
Markus-Rost 2020-08-08 19:25:32 +02:00
parent fca9eb72b9
commit 6658c399dc
2 changed files with 62 additions and 60 deletions

View file

@ -100,7 +100,7 @@ class RcQueue:
"""Makes a round on rcgcdw DB and looks for updates to the queues in self.domain_list""" """Makes a round on rcgcdw DB and looks for updates to the queues in self.domain_list"""
try: try:
fetch_all = db_cursor.execute( fetch_all = db_cursor.execute(
'SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID') 'SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE rcid != -1 GROUP BY wiki ORDER BY ROWID')
self.to_remove = [x[0] for x in filter(self.filter_rc_active, all_wikis.items())] # first populate this list and remove wikis that are still in the db, clean up the rest self.to_remove = [x[0] for x in filter(self.filter_rc_active, all_wikis.items())] # first populate this list and remove wikis that are still in the db, clean up the rest
full = [] full = []
for db_wiki in fetch_all.fetchall(): for db_wiki in fetch_all.fetchall():
@ -176,7 +176,7 @@ def generate_targets(wiki_url: str) -> defaultdict:
async def generate_domain_groups(): async def generate_domain_groups():
"""Generate a list of wikis per domain (fandom.com, wikipedia.org etc.)""" """Generate a list of wikis per domain (fandom.com, wikipedia.org etc.)"""
domain_wikis = defaultdict(list) domain_wikis = defaultdict(list)
fetch_all = db_cursor.execute('SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID ASC') fetch_all = db_cursor.execute('SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE rcid != -1 GROUP BY wiki ORDER BY ROWID ASC')
for db_wiki in fetch_all.fetchall(): for db_wiki in fetch_all.fetchall():
all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"] all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"]
domain_wikis[get_domain(db_wiki["wiki"])].append(db_wiki) domain_wikis[get_domain(db_wiki["wiki"])].append(db_wiki)
@ -289,64 +289,63 @@ async def discussion_handler():
try: try:
while True: while True:
fetch_all = db_cursor.execute( fetch_all = db_cursor.execute(
'SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid, postid FROM rcgcdw WHERE NOT wikiid = ""') 'SELECT wiki, wikiid, postid FROM rcgcdw WHERE wikiid IS NOT NULL')
for db_wiki in fetch_all.fetchall(): for db_wiki in fetch_all.fetchall():
if db_wiki["wikiid"] is not None: header = settings["header"]
header = settings["header"] header["Accept"] = "application/hal+json"
header["Accept"] = "application/hal+json" async with aiohttp.ClientSession(headers=header,
async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(3.0)) as session:
timeout=aiohttp.ClientTimeout(3.0)) as session: try:
try: local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory
local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory except KeyError:
except KeyError: local_wiki = all_wikis[db_wiki["wiki"]] = Wiki()
local_wiki = all_wikis[db_wiki["wiki"]] = Wiki() try:
try: feeds_response = await local_wiki.fetch_feeds(db_wiki["wikiid"], session)
feeds_response = await local_wiki.fetch_feeds(db_wiki["wikiid"], session) except (WikiServerError, WikiError):
except (WikiServerError, WikiError): logger.error("Exeption when fetching the wiki")
logger.error("Exeption when fetching the wiki") continue # ignore this wiki if it throws errors
continue # ignore this wiki if it throws errors try:
try: discussion_feed_resp = await feeds_response.json(encoding="UTF-8")
discussion_feed_resp = await feeds_response.json(encoding="UTF-8") if "title" in discussion_feed_resp:
if "title" in discussion_feed_resp: error = discussion_feed_resp["error"]
error = discussion_feed_resp["error"] if error == "site doesn't exists":
if error == "site doesn't exists": db_cursor.execute("UPDATE rcgcdw SET wikiid = ? WHERE wiki = ?",
db_cursor.execute("UPDATE rcgcdw SET wikiid = ? WHERE wiki = ?", (None, db_wiki["wiki"],))
(None, db_wiki["wiki"],)) DBHandler.update_db()
DBHandler.update_db() continue
continue raise WikiError
raise WikiError discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"]
discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"] discussion_feed.reverse()
discussion_feed.reverse() except aiohttp.ContentTypeError:
except aiohttp.ContentTypeError: logger.exception("Wiki seems to be resulting in non-json content.")
logger.exception("Wiki seems to be resulting in non-json content.")
continue
except:
logger.exception("On loading json of response.")
continue
if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel
if len(discussion_feed) > 0:
DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True)
else:
DBHandler.add(db_wiki["wiki"], "0", True)
DBHandler.update_db()
continue continue
targets = generate_targets(db_wiki["wiki"]) except:
for post in discussion_feed: logger.exception("On loading json of response.")
if post["id"] > db_wiki["postid"]: continue
for target in targets.items(): if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel
try: if len(discussion_feed) > 0:
await essential_feeds(post, db_wiki, target) DBHandler.add(db_wiki["wikiid"], discussion_feed[-1]["id"], True)
except asyncio.CancelledError: else:
raise DBHandler.add(db_wiki["wikiid"], "0", True)
except: DBHandler.update_db()
if command_line_args.debug: continue
logger.exception("Exception on Feeds formatter") targets = generate_targets(db_wiki["wiki"])
shutdown(loop=asyncio.get_event_loop()) for post in discussion_feed:
else: if post["id"] > db_wiki["postid"]:
logger.exception("Exception on Feeds formatter") for target in targets.items():
await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in feed formatter", Post=str(post)[0:1000], Wiki=db_wiki["wiki"]) try:
if discussion_feed: await essential_feeds(post, db_wiki, target)
DBHandler.add(db_wiki["wiki"], post["id"], True) except asyncio.CancelledError:
raise
except:
if command_line_args.debug:
logger.exception("Exception on Feeds formatter")
shutdown(loop=asyncio.get_event_loop())
else:
logger.exception("Exception on Feeds formatter")
await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in feed formatter", Post=str(post)[0:1000], Wiki=db_wiki["wiki"])
if discussion_feed:
DBHandler.add(db_wiki["wikiid"], post["id"], True)
await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more
DBHandler.update_db() DBHandler.update_db()
except asyncio.CancelledError: except asyncio.CancelledError:

View file

@ -16,8 +16,11 @@ class UpdateDB:
def update_db(self): def update_db(self):
for update in self.updated: for update in self.updated:
update_type = "postid" if update[2] is not None else "rcid" if update[2] is None:
db_cursor.execute("UPDATE rcgcdw SET {} = ? WHERE wiki = ? AND NOT ? = -1".format(update_type), (update[1], update[0], update_type)) sql = "UPDATE rcgcdw SET rcid = ? WHERE wiki = ? AND rcid != -1"
else:
sql = "UPDATE rcgcdw SET postid = ? WHERE wikiid = ?"
db_cursor.execute(sql, (update[1], update[0]))
db_connection.commit() db_connection.commit()
self.clear_list() self.clear_list()