Added removal handling, should be now mostly ok, with exception to exception handling

This commit is contained in:
Frisk 2020-08-05 19:20:38 +02:00
parent 1ab0eaa24f
commit 293947c510
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC

View file

@ -36,9 +36,6 @@ mw_msgs: dict = {} # will have the type of id: tuple
# Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests
# 2. Easier to code
for wiki in db_cursor.execute('SELECT DISTINCT wiki FROM rcgcdw'):
all_wikis[wiki] = Wiki()
queue_limit = settings.get("queue_limit", 30)
class LimitedList(list):
@ -64,12 +61,17 @@ class RcQueue:
else:
raise KeyError
async def remove_wiki_from_group(self, group, wiki):
async def remove_wiki_from_group(self, wiki):
"""Removes a wiki from query of given domain group"""
self[group]["query"] # there can be multiple webhooks with
group = get_domain(wiki)
self[group]["query"] = [x for x in self[group]["query"] if x["wiki"] == wiki]
if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task
self[group]["task"].cancel()
del self.domain_list[group]
@asynccontextmanager
async def retrieve_next_queued(self, group):
"""Retrives next wiki in the queue for given domain"""
try:
yield self.domain_list[group]["query"][0]
except IndexError:
@ -79,9 +81,10 @@ class RcQueue:
self.domain_list[group]["query"].pop(0)
async def update_queues(self):
"""Makes a round on rcgcdw DB and looks for updates to the queues in self.domain_list"""
fetch_all = db_cursor.execute(
'SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID')
self.to_remove = list(all_wikis.keys())
self.to_remove = list(all_wikis.keys()) # first populate this list and remove wikis that are still in the db, clean up the rest
full = []
for db_wiki in fetch_all.fetchall():
domain = get_domain(db_wiki["wiki"])
@ -89,7 +92,7 @@ class RcQueue:
try:
if not db_wiki["ROWID"] < current_domain["last_rowid"]:
current_domain["query"].append(db_wiki)
self.to_remove.remove(domain)
self.to_remove.remove(db_wiki["wiki"])
except KeyError:
await self.start_group(domain, db_wiki)
logger.info("A new domain group has been added since last time, adding it to the domain_list and starting a task...")
@ -97,11 +100,13 @@ class RcQueue:
full.append(domain)
current_domain["last_rowid"] = db_wiki["ROWID"]
continue
for wiki in self.to_remove:
del all_wikis[wiki]
await self.remove_wiki_from_group(wiki)
for group, data in self.domain_list:
if group not in full:
self["domain"]["last_rowid"] = 0 # iter reached the end without being stuck on full list
def __getitem__(self, item):
"""Returns the query of given domain group"""
return self.domain_list[item]
@ -139,18 +144,17 @@ def generate_targets(wiki_url: str) -> defaultdict:
async def generate_domain_groups():
"""Generate a list of wikis per domain (fandom.com, wikipedia.org etc.)"""
combinations = defaultdict(list)
domain_wikis = defaultdict(list)
fetch_all = db_cursor.execute('SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID ASC')
for db_wiki in fetch_all.fetchall():
combinations[get_domain(db_wiki["wiki"])].append(db_wiki)
domain_wikis[get_domain(db_wiki["wiki"])].append(db_wiki)
all_wikis[db_wiki["wiki"]] = Wiki() # populate all_wikis
for group, db_wikis in combinations.items():
for group, db_wikis in domain_wikis.items():
yield group, db_wikis
async def scan_group(group: str):
while True:
calc_delay = calculate_delay_for_group(len(rcqueue[group]))
async with rcqueue.retrieve_next_queued(group) as db_wiki: # acquire next wiki in queue
if db_wiki is None:
raise QueueEmpty
@ -226,59 +230,59 @@ async def wiki_scanner():
while True:
await asyncio.sleep(20.0)
await rcqueue.update_queues()
if db_wiki["wikiid"] is not None:
header = settings["header"]
header["Accept"] = "application/hal+json"
async with aiohttp.ClientSession(headers=header,
timeout=aiohttp.ClientTimeout(3.0)) as session:
try:
feeds_response = await local_wiki.fetch_feeds(db_wiki["wikiid"], session)
except (WikiServerError, WikiError):
logger.error("Exeption when fetching the wiki")
continue # ignore this wiki if it throws errors
try:
discussion_feed_resp = await feeds_response.json(encoding="UTF-8")
if "title" in discussion_feed_resp:
error = discussion_feed_resp["error"]
if error == "site doesn't exists":
db_cursor.execute("UPDATE rcgcdw SET wikiid = ? WHERE wiki = ?",
(None, db_wiki["wiki"],))
DBHandler.update_db()
continue
raise WikiError
discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"]
discussion_feed.reverse()
except aiohttp.ContentTypeError:
logger.exception("Wiki seems to be resulting in non-json content.")
continue
except:
logger.exception("On loading json of response.")
continue
if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel
if len(discussion_feed) > 0:
DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True)
else:
DBHandler.add(db_wiki["wiki"], "0", True)
DBHandler.update_db()
continue
targets = generate_targets(db_wiki["wiki"])
for post in discussion_feed:
if post["id"] > db_wiki["postid"]:
for target in targets.items():
try:
await essential_feeds(post, db_wiki, target)
except:
if command_line_args.debug:
raise # reraise the issue
else:
logger.exception("Exception on Feeds formatter")
await formatter_exception_logger(db_wiki["wiki"], post, traceback.format_exc())
if discussion_feed:
DBHandler.add(db_wiki["wiki"], post["id"], True)
await asyncio.sleep(delay=calc_delay)
DBHandler.update_db()
#
#
# if db_wiki["wikiid"] is not None:
# header = settings["header"]
# header["Accept"] = "application/hal+json"
# async with aiohttp.ClientSession(headers=header,
# timeout=aiohttp.ClientTimeout(3.0)) as session:
# try:
# feeds_response = await local_wiki.fetch_feeds(db_wiki["wikiid"], session)
# except (WikiServerError, WikiError):
# logger.error("Exeption when fetching the wiki")
# continue # ignore this wiki if it throws errors
# try:
# discussion_feed_resp = await feeds_response.json(encoding="UTF-8")
# if "title" in discussion_feed_resp:
# error = discussion_feed_resp["error"]
# if error == "site doesn't exists":
# db_cursor.execute("UPDATE rcgcdw SET wikiid = ? WHERE wiki = ?",
# (None, db_wiki["wiki"],))
# DBHandler.update_db()
# continue
# raise WikiError
# discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"]
# discussion_feed.reverse()
# except aiohttp.ContentTypeError:
# logger.exception("Wiki seems to be resulting in non-json content.")
# continue
# except:
# logger.exception("On loading json of response.")
# continue
# if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel
# if len(discussion_feed) > 0:
# DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True)
# else:
# DBHandler.add(db_wiki["wiki"], "0", True)
# DBHandler.update_db()
# continue
# targets = generate_targets(db_wiki["wiki"])
# for post in discussion_feed:
# if post["id"] > db_wiki["postid"]:
# for target in targets.items():
# try:
# await essential_feeds(post, db_wiki, target)
# except:
# if command_line_args.debug:
# raise # reraise the issue
# else:
# logger.exception("Exception on Feeds formatter")
# await formatter_exception_logger(db_wiki["wiki"], post, traceback.format_exc())
# if discussion_feed:
# DBHandler.add(db_wiki["wiki"], post["id"], True)
# await asyncio.sleep(delay=calc_delay)
# DBHandler.update_db()
except asyncio.CancelledError:
raise