Fixed anotter oopsie

This commit is contained in:
Frisk 2020-08-09 15:31:21 +02:00
parent d6df680e92
commit e2077a7ca1
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC

View file

@ -70,6 +70,7 @@ class RcQueue:
group = get_domain(wiki) group = get_domain(wiki)
self[group]["query"] = [x for x in self[group]["query"] if x.url == wiki] self[group]["query"] = [x for x in self[group]["query"] if x.url == wiki]
if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task
logger.debug(f"{group} no longer has any wikis queued!")
all_wikis[wiki].rc_active = -1 all_wikis[wiki].rc_active = -1
self[group]["task"].cancel() self[group]["task"].cancel()
del self.domain_list[group] del self.domain_list[group]
@ -106,12 +107,15 @@ class RcQueue:
for db_wiki in fetch_all.fetchall(): for db_wiki in fetch_all.fetchall():
domain = get_domain(db_wiki["wiki"]) domain = get_domain(db_wiki["wiki"])
try: try:
all_wikis[db_wiki["wiki"]] if db_wiki["wiki"] not in all_wikis:
except KeyError: raise AssertionError
self.to_remove.remove(db_wiki["wiki"])
except AssertionError:
all_wikis[db_wiki["wiki"]] = Wiki() all_wikis[db_wiki["wiki"]] = Wiki()
all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"] all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"]
except ValueError:
pass
try: try:
self.to_remove.remove(db_wiki["wiki"])
current_domain = self[domain] current_domain = self[domain]
if not db_wiki["ROWID"] < current_domain["last_rowid"]: if not db_wiki["ROWID"] < current_domain["last_rowid"]:
current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20)) current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20))
@ -186,83 +190,86 @@ async def generate_domain_groups():
async def scan_group(group: str): async def scan_group(group: str):
rate_limiter = rcqueue[group]["rate_limiter"] rate_limiter = rcqueue[group]["rate_limiter"]
while True: while True:
async with rcqueue.retrieve_next_queued(group) as queued_wiki: # acquire next wiki in queue try:
logger.debug("Wiki {}".format(queued_wiki.url)) async with rcqueue.retrieve_next_queued(group) as queued_wiki: # acquire next wiki in queue
local_wiki = all_wikis[queued_wiki.url] # set a reference to a wiki object from memory logger.debug("Wiki {}".format(queued_wiki.url))
extended = False local_wiki = all_wikis[queued_wiki.url] # set a reference to a wiki object from memory
if local_wiki.mw_messages is None: extended = False
extended = True if local_wiki.mw_messages is None:
async with aiohttp.ClientSession(headers=settings["header"], extended = True
timeout=aiohttp.ClientTimeout(3.0)) as session: async with aiohttp.ClientSession(headers=settings["header"],
try: timeout=aiohttp.ClientTimeout(3.0)) as session:
wiki_response = await local_wiki.fetch_wiki(extended, queued_wiki.url, session, rate_limiter, amount=queued_wiki.amount) try:
await local_wiki.check_status(queued_wiki.url, wiki_response.status) wiki_response = await local_wiki.fetch_wiki(extended, queued_wiki.url, session, rate_limiter, amount=queued_wiki.amount)
except (WikiServerError, WikiError): await local_wiki.check_status(queued_wiki.url, wiki_response.status)
logger.error("Exeption when fetching the wiki") except (WikiServerError, WikiError):
continue # ignore this wiki if it throws errors logger.error("Exeption when fetching the wiki")
try: continue # ignore this wiki if it throws errors
recent_changes_resp = await wiki_response.json() try:
if "error" in recent_changes_resp or "errors" in recent_changes_resp: recent_changes_resp = await wiki_response.json()
error = recent_changes_resp.get("error", recent_changes_resp["errors"]) if "error" in recent_changes_resp or "errors" in recent_changes_resp:
if error["code"] == "readapidenied": error = recent_changes_resp.get("error", recent_changes_resp["errors"])
await local_wiki.fail_add(queued_wiki.url, 410) if error["code"] == "readapidenied":
continue await local_wiki.fail_add(queued_wiki.url, 410)
raise WikiError continue
recent_changes = recent_changes_resp['query']['recentchanges'] raise WikiError
recent_changes.reverse() recent_changes = recent_changes_resp['query']['recentchanges']
except aiohttp.ContentTypeError: recent_changes.reverse()
logger.exception("Wiki seems to be resulting in non-json content.") except aiohttp.ContentTypeError:
await local_wiki.fail_add(queued_wiki.url, 410) logger.exception("Wiki seems to be resulting in non-json content.")
await local_wiki.fail_add(queued_wiki.url, 410)
continue
except:
logger.exception("On loading json of response.")
continue
if extended:
await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs)
if local_wiki.rc_active == 0: # new wiki, just get the last rc to not spam the channel
if len(recent_changes) > 0:
local_wiki.rc_active = recent_changes[-1]["rcid"]
DBHandler.add(queued_wiki.url, recent_changes[-1]["rcid"])
else:
local_wiki.rc_active = 0
DBHandler.add(queued_wiki.url, 0)
DBHandler.update_db()
continue continue
except: categorize_events = {}
logger.exception("On loading json of response.") targets = generate_targets(queued_wiki.url)
continue paths = get_paths(queued_wiki.url, recent_changes_resp)
if extended: new_events = 0
await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs) for change in recent_changes:
if local_wiki.rc_active == 0: # new wiki, just get the last rc to not spam the channel if change["rcid"] > local_wiki.rc_active and queued_wiki.amount != 450:
if len(recent_changes) > 0: new_events += 1
local_wiki.rc_active = recent_changes[-1]["rcid"] if new_events == 20:
DBHandler.add(queued_wiki.url, recent_changes[-1]["rcid"]) # call the function again with max limit for more results, ignore the ones in this request
else: logger.debug("There were too many new events, queuing wiki with 450 limit.")
local_wiki.rc_active = 0 rcqueue[group]["query"].insert(1, QueuedWiki(queued_wiki.url, 450))
DBHandler.add(queued_wiki.url, 0) break
DBHandler.update_db() await process_cats(change, local_wiki, mw_msgs, categorize_events)
continue else: # If we broke from previous loop (too many changes) don't execute sending messages here
categorize_events = {} for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up
targets = generate_targets(queued_wiki.url) if change["rcid"] > local_wiki.rc_active:
paths = get_paths(queued_wiki.url, recent_changes_resp) for target in targets.items():
new_events = 0 try:
for change in recent_changes: await essential_info(change, categorize_events, local_wiki, target, paths,
if change["rcid"] > local_wiki.rc_active and queued_wiki.amount != 450: recent_changes_resp, rate_limiter)
new_events += 1 except asyncio.CancelledError:
if new_events == 20:
# call the function again with max limit for more results, ignore the ones in this request
logger.debug("There were too many new events, queuing wiki with 450 limit.")
rcqueue[group]["query"].insert(1, QueuedWiki(queued_wiki.url, 450))
break
await process_cats(change, local_wiki, mw_msgs, categorize_events)
else: # If we broke from previous loop (too many changes) don't execute sending messages here
for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up
if change["rcid"] > local_wiki.rc_active:
for target in targets.items():
try:
await essential_info(change, categorize_events, local_wiki, target, paths,
recent_changes_resp, rate_limiter)
except asyncio.CancelledError:
raise
except:
if command_line_args.debug:
logger.exception("Exception on RC formatter")
raise raise
else: except:
logger.exception("Exception on RC formatter") if command_line_args.debug:
await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in RC formatter", Wiki=queued_wiki.url, Change=str(change)[0:1000]) logger.exception("Exception on RC formatter")
if recent_changes: raise
local_wiki.rc_active = change["rcid"] else:
DBHandler.add(queued_wiki.url, change["rcid"]) logger.exception("Exception on RC formatter")
delay_between_wikis = calculate_delay_for_group(len(rcqueue[group]["query"])) # TODO Find a way to not execute it every wiki await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in RC formatter", Wiki=queued_wiki.url, Change=str(change)[0:1000])
await asyncio.sleep(delay_between_wikis) if recent_changes:
DBHandler.update_db() local_wiki.rc_active = change["rcid"]
DBHandler.add(queued_wiki.url, change["rcid"])
delay_between_wikis = calculate_delay_for_group(len(rcqueue[group]["query"])) # TODO Find a way to not execute it every wiki
await asyncio.sleep(delay_between_wikis)
DBHandler.update_db()
except asyncio.CancelledError:
return
async def wiki_scanner(): async def wiki_scanner():