From 88ff0a3552315a5d290d2a9397d873d109ee7a44 Mon Sep 17 00:00:00 2001 From: Frisk Date: Sun, 30 May 2021 13:23:48 +0200 Subject: [PATCH] Created a skeleton for rewrite --- requirements.txt | 3 +- src/bot.py | 941 +++++++++++++++++++++--------------------- src/domain.py | 20 +- src/domain_manager.py | 38 ++ src/wiki.py | 11 +- 5 files changed, 539 insertions(+), 474 deletions(-) create mode 100644 src/domain_manager.py diff --git a/requirements.txt b/requirements.txt index 079cf8e..ce2d6a5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ lxml >= 4.2.1 nest-asyncio >= 1.4.0 irc >= 19.0.1 beautifulsoup4>=4.9.3 -asyncpg>=0.22.0 \ No newline at end of file +asyncpg>=0.22.0 +aioredis >= 1.3.1 \ No newline at end of file diff --git a/src/bot.py b/src/bot.py index be89926..9e1fa6e 100644 --- a/src/bot.py +++ b/src/bot.py @@ -20,6 +20,7 @@ from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essenti from src.discord import DiscordMessage, generic_msg_sender_exception_logger, stack_message_list from src.wiki_ratelimiter import RateLimiter from src.irc_feed import AioIRCCat +from src.domain_manager import domains logging.config.dictConfig(settings["logging"]) @@ -28,7 +29,7 @@ logger.debug("Current settings: {settings}".format(settings=settings)) logger.info("RcGcDb v{} is starting up.".format("1.1")) if command_line_args.debug: - logger.info("Debug mode is active!") + logger.info("Debug mode is active!") # Log Fail states with structure wiki_url: number of fail states all_wikis: dict = {} @@ -41,177 +42,176 @@ main_tasks: dict = {} # 2. Easier to code async def populate_allwikis(): - async with db.pool().acquire() as connection: - async with connection.transaction(): - async for db_wiki in connection.cursor('SELECT DISTINCT wiki, rcid FROM rcgcdw'): - all_wikis[db_wiki["wiki"]] = Wiki() # populate all_wikis - all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"] + async with db.pool().acquire() as connection: + async with connection.transaction(): + async for db_wiki in connection.cursor('SELECT DISTINCT wiki, rcid, postid FROM rcgcdw'): + domains.new_wiki(Wiki(db_wiki["wiki"], db_wiki["rcid"], db_wiki["postid"])) queue_limit = settings.get("queue_limit", 30) QueuedWiki = namedtuple("QueuedWiki", ['url', 'amount']) class LimitedList(list): - def __init__(self, *args): - list.__init__(self, *args) + def __init__(self, *args): + list.__init__(self, *args) - def append(self, obj: QueuedWiki, forced: bool = False) -> None: - if len(self) < queue_limit or forced: - self.insert(len(self), obj) - return - raise ListFull + def append(self, obj: QueuedWiki, forced: bool = False) -> None: + if len(self) < queue_limit or forced: + self.insert(len(self), obj) + return + raise ListFull class RcQueue: - def __init__(self): - self.domain_list = {} - self.to_remove = [] - self.irc_mapping = {} + def __init__(self): + self.domain_list = {} + self.to_remove = [] + self.irc_mapping = {} - async def start_group(self, group, initial_wikis): - """Starts a task for given domain group""" - if group not in self.domain_list: - if group in self.irc_mapping: # Hopefully there are no race conditions.... - irc_connection = self.irc_mapping[group] - else: - for irc_server in settings["irc_servers"].keys(): - if group in settings["irc_servers"][irc_server]["domains"]: - irc_connection = AioIRCCat(settings["irc_servers"][irc_server]["irc_channel_mapping"], all_wikis) - for domain in settings["irc_servers"][irc_server]["domains"]: - self.irc_mapping[domain] = irc_connection - irc_connection.connect(settings["irc_servers"][irc_server]["irc_host"], settings["irc_servers"][irc_server]["irc_port"], settings["irc_servers"][irc_server]["irc_name"]) - break - else: - irc_connection = None - self.domain_list[group] = {"task": asyncio.create_task(scan_group(group)), "last_rowid": 0, "query": LimitedList(initial_wikis), "rate_limiter": RateLimiter(), "irc": irc_connection} - logger.debug(self.domain_list[group]) - else: - raise KeyError + async def start_group(self, group, initial_wikis): + """Starts a task for given domain group""" + if group not in self.domain_list: + if group in self.irc_mapping: # Hopefully there are no race conditions.... + irc_connection = self.irc_mapping[group] + else: + for irc_server in settings["irc_servers"].keys(): + if group in settings["irc_servers"][irc_server]["domains"]: + irc_connection = AioIRCCat(settings["irc_servers"][irc_server]["irc_channel_mapping"], all_wikis) + for domain in settings["irc_servers"][irc_server]["domains"]: + self.irc_mapping[domain] = irc_connection + irc_connection.connect(settings["irc_servers"][irc_server]["irc_host"], settings["irc_servers"][irc_server]["irc_port"], settings["irc_servers"][irc_server]["irc_name"]) + break + else: + irc_connection = None + self.domain_list[group] = {"task": asyncio.create_task(scan_group(group)), "last_rowid": 0, "query": LimitedList(initial_wikis), "rate_limiter": RateLimiter(), "irc": irc_connection} + logger.debug(self.domain_list[group]) + else: + raise KeyError - async def remove_wiki_from_group(self, wiki): - """Removes a wiki from query of given domain group""" - logger.debug(f"Removing {wiki} from group queue.") - group = get_domain(wiki) - self[group]["query"] = LimitedList([x for x in self[group]["query"] if x.url != wiki]) - all_wikis[wiki].rc_active = -1 - if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task - logger.debug(f"{group} no longer has any wikis queued!") - if not await self.check_if_domain_in_db(group): - await self.stop_task_group(group) - else: - logger.debug(f"But there are still wikis for it in DB!") + async def remove_wiki_from_group(self, wiki): + """Removes a wiki from query of given domain group""" + logger.debug(f"Removing {wiki} from group queue.") + group = get_domain(wiki) + self[group]["query"] = LimitedList([x for x in self[group]["query"] if x.url != wiki]) + all_wikis[wiki].rc_active = -1 + if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task + logger.debug(f"{group} no longer has any wikis queued!") + if not await self.check_if_domain_in_db(group): + await self.stop_task_group(group) + else: + logger.debug(f"But there are still wikis for it in DB!") - async def stop_task_group(self, group): - self[group]["task"].cancel() - del self.domain_list[group] + async def stop_task_group(self, group): + self[group]["task"].cancel() + del self.domain_list[group] - async def check_if_domain_in_db(self, domain): - async with db.pool().acquire() as connection: - async with connection.transaction(): - async for wiki in connection.cursor('SELECT DISTINCT wiki FROM rcgcdw WHERE rcid != -1;'): - if get_domain(wiki["wiki"]) == domain: - return True - return False + async def check_if_domain_in_db(self, domain): + async with db.pool().acquire() as connection: + async with connection.transaction(): + async for wiki in connection.cursor('SELECT DISTINCT wiki FROM rcgcdw WHERE rcid != -1;'): + if get_domain(wiki["wiki"]) == domain: + return True + return False - @asynccontextmanager - async def retrieve_next_queued(self, group) -> Generator[QueuedWiki, None, None]: - """Retrives next wiki in the queue for given domain""" - if len(self.domain_list[group]["query"]) == 0: - # make sure we are not removing the group because entire domain group went down, it's expensive - yes, but could theoretically cause issues - raise QueueEmpty - # if self.check_if_domain_in_db(group): - # #logger.warning("Domain group {} has 0 elements yet there are still wikis in the db of the same domain group! This may indicate we ran over the list too fast. We are waiting...".format(group)) - # raise QueueEmpty - # else: - # await self.stop_task_group(group) - # return - try: - yield self.domain_list[group]["query"][0] - except asyncio.CancelledError: - raise - except: - if command_line_args.debug: - logger.exception("RC Group exception") - shutdown(asyncio.get_event_loop()) - else: - logger.exception("Group task returned error") - await generic_msg_sender_exception_logger(traceback.format_exc(), "Group task error logger", Group=group) - else: - self.domain_list[group]["query"].pop(0) + @asynccontextmanager + async def retrieve_next_queued(self, group) -> Generator[QueuedWiki, None, None]: + """Retrives next wiki in the queue for given domain""" + if len(self.domain_list[group]["query"]) == 0: + # make sure we are not removing the group because entire domain group went down, it's expensive - yes, but could theoretically cause issues + raise QueueEmpty + # if self.check_if_domain_in_db(group): + # #logger.warning("Domain group {} has 0 elements yet there are still wikis in the db of the same domain group! This may indicate we ran over the list too fast. We are waiting...".format(group)) + # raise QueueEmpty + # else: + # await self.stop_task_group(group) + # return + try: + yield self.domain_list[group]["query"][0] + except asyncio.CancelledError: + raise + except: + if command_line_args.debug: + logger.exception("RC Group exception") + shutdown(asyncio.get_event_loop()) + else: + logger.exception("Group task returned error") + await generic_msg_sender_exception_logger(traceback.format_exc(), "Group task error logger", Group=group) + else: + self.domain_list[group]["query"].pop(0) - @staticmethod - def filter_rc_active(wiki_obj): - return wiki_obj[1].rc_active is None or wiki_obj[1].rc_active > -1 + @staticmethod + def filter_rc_active(wiki_obj): + return wiki_obj[1].rc_active is None or wiki_obj[1].rc_active > -1 - async def update_queues(self): - """Makes a round on rcgcdb DB and looks for updates to the queues in self.domain_list""" - try: - self.to_remove = [x[0] for x in filter(self.filter_rc_active, all_wikis.items())] # first populate this list and remove wikis that are still in the db, clean up the rest - full = set() - async with db.pool().acquire() as connection: - async with connection.transaction(): - async for db_wiki in connection.cursor('SELECT DISTINCT wiki, row_number() OVER (ORDER BY webhook) AS rowid, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL ORDER BY webhook'): - domain = get_domain(db_wiki["wiki"]) - try: - if db_wiki["wiki"] not in all_wikis: - raise AssertionError - self.to_remove.remove(db_wiki["wiki"]) - except AssertionError: - all_wikis[db_wiki["wiki"]] = Wiki() - all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"] - except ValueError: - pass - if domain in full: - continue - try: - current_domain: dict = self[domain] - if current_domain["irc"]: - logger.debug("DOMAIN LIST FOR IRC: {}".format(current_domain["irc"].updated)) - logger.debug("CURRENT DOMAIN INFO: {}".format(domain)) - logger.debug("IS WIKI IN A LIST?: {}".format(db_wiki["wiki"] in current_domain["irc"].updated)) - logger.debug("LAST CHECK FOR THE WIKI {} IS {}".format(db_wiki["wiki"], all_wikis[db_wiki["wiki"]].last_check)) - if db_wiki["wiki"] in current_domain["irc"].updated: # Priority wikis are the ones with IRC, if they get updated forcefully add them to queue - current_domain["irc"].updated.remove(db_wiki["wiki"]) - current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20), forced=True) - logger.debug("Updated in IRC so adding to queue.") - continue - elif all_wikis[db_wiki["wiki"]].last_check+settings["irc_overtime"] < time.time(): # if time went by and wiki should be updated now use default mechanics - logger.debug("Overtime so adding to queue.") - pass - else: # Continue without adding - logger.debug("No condition fulfilled so skipping.") - continue - if not db_wiki["rowid"] < current_domain["last_rowid"]: - current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20)) - except KeyError: - await self.start_group(domain, [QueuedWiki(db_wiki["wiki"], 20)]) - logger.info("A new domain group ({}) has been added since last time, adding it to the domain_list and starting a task...".format(domain)) - except ListFull: - full.add(domain) - current_domain["last_rowid"] = db_wiki["rowid"] - continue - for wiki in self.to_remove: - await self.remove_wiki_from_group(wiki) - for group, data in self.domain_list.items(): - if group not in full: - self[group]["last_rowid"] = 0 # iter reached the end without being stuck on full list - logger.debug("Current domain_list structure: {}".format(self.domain_list)) - except: - if command_line_args.debug: - logger.exception("Queue error!") - shutdown(asyncio.get_event_loop()) - else: - logger.exception("Exception on queue updater") - await generic_msg_sender_exception_logger(traceback.format_exc(), "Queue updator") + async def update_queues(self): + """Makes a round on rcgcdb DB and looks for updates to the queues in self.domain_list""" + try: + self.to_remove = [x[0] for x in filter(self.filter_rc_active, all_wikis.items())] # first populate this list and remove wikis that are still in the db, clean up the rest + full = set() + async with db.pool().acquire() as connection: + async with connection.transaction(): + async for db_wiki in connection.cursor('SELECT DISTINCT wiki, row_number() OVER (ORDER BY webhook) AS rowid, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL ORDER BY webhook'): + domain = get_domain(db_wiki["wiki"]) + try: + if db_wiki["wiki"] not in all_wikis: + raise AssertionError + self.to_remove.remove(db_wiki["wiki"]) + except AssertionError: + all_wikis[db_wiki["wiki"]] = Wiki() + all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"] + except ValueError: + pass + if domain in full: + continue + try: + current_domain: dict = self[domain] + if current_domain["irc"]: + logger.debug("DOMAIN LIST FOR IRC: {}".format(current_domain["irc"].updated)) + logger.debug("CURRENT DOMAIN INFO: {}".format(domain)) + logger.debug("IS WIKI IN A LIST?: {}".format(db_wiki["wiki"] in current_domain["irc"].updated)) + logger.debug("LAST CHECK FOR THE WIKI {} IS {}".format(db_wiki["wiki"], all_wikis[db_wiki["wiki"]].last_check)) + if db_wiki["wiki"] in current_domain["irc"].updated: # Priority wikis are the ones with IRC, if they get updated forcefully add them to queue + current_domain["irc"].updated.remove(db_wiki["wiki"]) + current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20), forced=True) + logger.debug("Updated in IRC so adding to queue.") + continue + elif all_wikis[db_wiki["wiki"]].last_check+settings["irc_overtime"] < time.time(): # if time went by and wiki should be updated now use default mechanics + logger.debug("Overtime so adding to queue.") + pass + else: # Continue without adding + logger.debug("No condition fulfilled so skipping.") + continue + if not db_wiki["rowid"] < current_domain["last_rowid"]: + current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20)) + except KeyError: + await self.start_group(domain, [QueuedWiki(db_wiki["wiki"], 20)]) + logger.info("A new domain group ({}) has been added since last time, adding it to the domain_list and starting a task...".format(domain)) + except ListFull: + full.add(domain) + current_domain["last_rowid"] = db_wiki["rowid"] + continue + for wiki in self.to_remove: + await self.remove_wiki_from_group(wiki) + for group, data in self.domain_list.items(): + if group not in full: + self[group]["last_rowid"] = 0 # iter reached the end without being stuck on full list + logger.debug("Current domain_list structure: {}".format(self.domain_list)) + except: + if command_line_args.debug: + logger.exception("Queue error!") + shutdown(asyncio.get_event_loop()) + else: + logger.exception("Exception on queue updater") + await generic_msg_sender_exception_logger(traceback.format_exc(), "Queue updator") - def __getitem__(self, item): - """Returns the query of given domain group""" - return self.domain_list[item] + def __getitem__(self, item): + """Returns the query of given domain group""" + return self.domain_list[item] - def __setitem__(self, key, value): - self.domain_list[key] = value + def __setitem__(self, key, value): + self.domain_list[key] = value rcqueue = RcQueue() @@ -220,321 +220,322 @@ rcqueue = RcQueue() # Start queueing logic def calculate_delay_for_group(group_length: int) -> float: - """Calculate the delay between fetching each wiki to avoid rate limits""" - min_delay = 60 / settings["max_requests_per_minute"] - if group_length == 0: - group_length = 1 - if (group_length * min_delay) < settings["minimal_cooldown_per_wiki_in_sec"]: - return settings["minimal_cooldown_per_wiki_in_sec"] / group_length - else: - return 0.0 + """Calculate the delay between fetching each wiki to avoid rate limits""" + min_delay = 60 / settings["max_requests_per_minute"] + if group_length == 0: + group_length = 1 + if (group_length * min_delay) < settings["minimal_cooldown_per_wiki_in_sec"]: + return settings["minimal_cooldown_per_wiki_in_sec"] / group_length + else: + return 0.0 async def generate_targets(wiki_url: str, additional_requirements: str) -> defaultdict: - """To minimize the amount of requests, we generate a list of language/display mode combinations to create messages for - this way we can send the same message to multiple webhooks which have the same wiki and settings without doing another - request to the wiki just to duplicate the message. - """ - combinations = defaultdict(list) - async with db.pool().acquire() as connection: - async with connection.transaction(): - async for webhook in connection.cursor('SELECT webhook, lang, display FROM rcgcdw WHERE wiki = $1 {}'.format(additional_requirements), wiki_url): - combination = (webhook["lang"], webhook["display"]) - combinations[combination].append(webhook["webhook"]) - return combinations + """To minimize the amount of requests, we generate a list of language/display mode combinations to create messages for + this way we can send the same message to multiple webhooks which have the same wiki and settings without doing another + request to the wiki just to duplicate the message. + """ + combinations = defaultdict(list) + async with db.pool().acquire() as connection: + async with connection.transaction(): + async for webhook in connection.cursor('SELECT webhook, lang, display FROM rcgcdw WHERE wiki = $1 {}'.format(additional_requirements), wiki_url): + combination = (webhook["lang"], webhook["display"]) + combinations[combination].append(webhook["webhook"]) + return combinations async def generate_domain_groups(): - """Generate a list of wikis per domain (fandom.com, wikipedia.org etc.) + """Generate a list of wikis per domain (fandom.com, wikipedia.org etc.) - :returns tuple[str, list]""" - domain_wikis = defaultdict(list) - async with db.pool().acquire() as connection: - async with connection.transaction(): - async for db_wiki in connection.cursor('SELECT DISTINCT wiki, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL'): - domain_wikis[get_domain(db_wiki["wiki"])].append(QueuedWiki(db_wiki["wiki"], 20)) - for group, db_wikis in domain_wikis.items(): - yield group, db_wikis + :returns tuple[str, list]""" + domain_wikis = defaultdict(list) + async with db.pool().acquire() as connection: + async with connection.transaction(): + async for db_wiki in connection.cursor('SELECT DISTINCT wiki, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL'): + domain_wikis[get_domain(db_wiki["wiki"])].append(QueuedWiki(db_wiki["wiki"], 20)) + for group, db_wikis in domain_wikis.items(): + yield group, db_wikis async def scan_group(group: str): - rate_limiter = rcqueue[group]["rate_limiter"] - while True: - try: - async with rcqueue.retrieve_next_queued(group) as queued_wiki: # acquire next wiki in queue - if "irc" not in rcqueue[group]: - await asyncio.sleep(calculate_delay_for_group(len(rcqueue[group]["query"]))) - logger.debug("Wiki {}".format(queued_wiki.url)) - local_wiki = all_wikis[queued_wiki.url] # set a reference to a wiki object from memory - extended = False - if local_wiki.mw_messages is None: - extended = True - async with aiohttp.ClientSession(headers=settings["header"], - timeout=aiohttp.ClientTimeout(6.0)) as session: - try: - wiki_response = await local_wiki.fetch_wiki(extended, queued_wiki.url, session, rate_limiter, amount=queued_wiki.amount) - await local_wiki.check_status(queued_wiki.url, wiki_response.status) - except (WikiServerError, WikiError): - logger.error("Exeption when fetching the wiki") - continue # ignore this wiki if it throws errors - try: - recent_changes_resp = await wiki_response.json() - if not isinstance(recent_changes_resp, dict): - logger.error(f"recent_changes_resp has a bad type, found {type(recent_changes_resp)}, __repr__ here: {recent_changes_resp}.") - raise TypeError - if "errors" in recent_changes_resp: - error = recent_changes_resp.get('errors') - if error["code"] == "readapidenied": - await local_wiki.fail_add(queued_wiki.url, 410) - continue - raise WikiError - except aiohttp.ContentTypeError: - logger.exception("Wiki seems to be resulting in non-json content.") - await local_wiki.fail_add(queued_wiki.url, 410) - continue - except: - logger.exception("On loading json of response.") - continue - try: - recent_changes = recent_changes_resp['query']['recentchanges'] - recent_changes.reverse() - except KeyError: - logger.error("recent_changes_resp returned KeyError. skipping this check. (usually this happens when the wiki doesn't respond properly, it's pretty normal)") - continue - if extended: - await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs) - if local_wiki.rc_active in (0, None, -1): # new wiki, just get the last rc to not spam the channel, -1 for -1 to NULL changes - if len(recent_changes) > 0: - local_wiki.rc_active = recent_changes[-1]["rcid"] - DBHandler.add(queued_wiki.url, recent_changes[-1]["rcid"]) - else: - local_wiki.rc_active = 0 - DBHandler.add(queued_wiki.url, 0) - await DBHandler.update_db() - continue - categorize_events = {} - targets = await generate_targets(queued_wiki.url, "AND (rcid != -1 OR rcid IS NULL)") - paths = get_paths(queued_wiki.url, recent_changes_resp) - new_events = 0 - local_wiki.last_check = time.time() # on successful check, save new last check time - for change in recent_changes: - if change["rcid"] > local_wiki.rc_active and queued_wiki.amount != 450: - new_events += 1 - if new_events == 20: - # call the function again with max limit for more results, ignore the ones in this request - logger.debug("There were too many new events, queuing wiki with 450 limit.") - rcqueue[group]["query"].insert(1, QueuedWiki(queued_wiki.url, 450)) - break - await process_cats(change, local_wiki, mw_msgs, categorize_events) - else: # If we broke from previous loop (too many changes) don't execute sending messages here - highest_rc = local_wiki.rc_active # setup var for later use - message_list = defaultdict(list) - for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up - if change["rcid"] > local_wiki.rc_active: - if highest_rc is None or change["rcid"] > highest_rc: # make sure that the highest_rc is really highest rcid but do allow other entries with potentially lesser rcids come after without breaking the cycle - highest_rc = change["rcid"] - for target in targets.items(): - try: - message = await essential_info(change, categorize_events, local_wiki, target, paths, - recent_changes_resp, rate_limiter) - if message is not None: - message_list[target[0]].append(message) - except asyncio.CancelledError: - raise - except: - if command_line_args.debug: - logger.exception("Exception on RC formatter") - raise - else: - logger.exception("Exception on RC formatter") - await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in RC formatter", Wiki=queued_wiki.url, Change=str(change)[0:1000]) - # Lets stack the messages - for messages in message_list.values(): - messages = stack_message_list(messages) - for message in messages: - await send_to_discord(message) - if recent_changes: # we don't have to test for highest_rc being null, because if there are no RC entries recent_changes will be an empty list which will result in false in here and DO NOT save the value - local_wiki.rc_active = highest_rc - DBHandler.add(queued_wiki.url, highest_rc) - await DBHandler.update_db() - except asyncio.CancelledError: - return - except QueueEmpty: - await asyncio.sleep(10.0) - continue + rate_limiter = rcqueue[group]["rate_limiter"] + while True: + try: + async with rcqueue.retrieve_next_queued(group) as queued_wiki: # acquire next wiki in queue + if "irc" not in rcqueue[group]: + await asyncio.sleep(calculate_delay_for_group(len(rcqueue[group]["query"]))) + logger.debug("Wiki {}".format(queued_wiki.url)) + local_wiki = all_wikis[queued_wiki.url] # set a reference to a wiki object from memory + extended = False + if local_wiki.mw_messages is None: + extended = True + async with aiohttp.ClientSession(headers=settings["header"], + timeout=aiohttp.ClientTimeout(6.0)) as session: + try: + wiki_response = await local_wiki.fetch_wiki(extended, queued_wiki.url, session, rate_limiter, amount=queued_wiki.amount) + await local_wiki.check_status(queued_wiki.url, wiki_response.status) + except (WikiServerError, WikiError): + logger.error("Exeption when fetching the wiki") + continue # ignore this wiki if it throws errors + try: + recent_changes_resp = await wiki_response.json() + if not isinstance(recent_changes_resp, dict): + logger.error(f"recent_changes_resp has a bad type, found {type(recent_changes_resp)}, __repr__ here: {recent_changes_resp}.") + raise TypeError + if "errors" in recent_changes_resp: + error = recent_changes_resp.get('errors') + if error["code"] == "readapidenied": + await local_wiki.fail_add(queued_wiki.url, 410) + continue + raise WikiError + except aiohttp.ContentTypeError: + logger.exception("Wiki seems to be resulting in non-json content.") + await local_wiki.fail_add(queued_wiki.url, 410) + continue + except: + logger.exception("On loading json of response.") + continue + try: + recent_changes = recent_changes_resp['query']['recentchanges'] + recent_changes.reverse() + except KeyError: + logger.error("recent_changes_resp returned KeyError. skipping this check. (usually this happens when the wiki doesn't respond properly, it's pretty normal)") + continue + if extended: + await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs) + if local_wiki.rc_active in (0, None, -1): # new wiki, just get the last rc to not spam the channel, -1 for -1 to NULL changes + if len(recent_changes) > 0: + local_wiki.rc_active = recent_changes[-1]["rcid"] + DBHandler.add(queued_wiki.url, recent_changes[-1]["rcid"]) + else: + local_wiki.rc_active = 0 + DBHandler.add(queued_wiki.url, 0) + await DBHandler.update_db() + continue + categorize_events = {} + targets = await generate_targets(queued_wiki.url, "AND (rcid != -1 OR rcid IS NULL)") + paths = get_paths(queued_wiki.url, recent_changes_resp) + new_events = 0 + local_wiki.last_check = time.time() # on successful check, save new last check time + for change in recent_changes: + if change["rcid"] > local_wiki.rc_active and queued_wiki.amount != 450: + new_events += 1 + if new_events == 20: + # call the function again with max limit for more results, ignore the ones in this request + logger.debug("There were too many new events, queuing wiki with 450 limit.") + rcqueue[group]["query"].insert(1, QueuedWiki(queued_wiki.url, 450)) + break + await process_cats(change, local_wiki, mw_msgs, categorize_events) + else: # If we broke from previous loop (too many changes) don't execute sending messages here + highest_rc = local_wiki.rc_active # setup var for later use + message_list = defaultdict(list) + for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up + if change["rcid"] > local_wiki.rc_active: + if highest_rc is None or change["rcid"] > highest_rc: # make sure that the highest_rc is really highest rcid but do allow other entries with potentially lesser rcids come after without breaking the cycle + highest_rc = change["rcid"] + for target in targets.items(): + try: + message = await essential_info(change, categorize_events, local_wiki, target, paths, + recent_changes_resp, rate_limiter) + if message is not None: + message_list[target[0]].append(message) + except asyncio.CancelledError: + raise + except: + if command_line_args.debug: + logger.exception("Exception on RC formatter") + raise + else: + logger.exception("Exception on RC formatter") + await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in RC formatter", Wiki=queued_wiki.url, Change=str(change)[0:1000]) + # Lets stack the messages + for messages in message_list.values(): + messages = stack_message_list(messages) + for message in messages: + await send_to_discord(message) + if recent_changes: # we don't have to test for highest_rc being null, because if there are no RC entries recent_changes will be an empty list which will result in false in here and DO NOT save the value + local_wiki.rc_active = highest_rc + DBHandler.add(queued_wiki.url, highest_rc) + await DBHandler.update_db() + except asyncio.CancelledError: + return + except QueueEmpty: + await asyncio.sleep(10.0) + continue async def wiki_scanner(): - """Wiki scanner is spawned as a task which purpose is to continuously run over wikis in the DB, fetching recent changes - to add messages based on the changes to message queue later handled by message_sender coroutine.""" - try: - async for group, db_wikis in generate_domain_groups(): # First scan - await rcqueue.start_group(group, db_wikis) - while True: - await asyncio.sleep(20.0) - await rcqueue.update_queues() - except asyncio.CancelledError: - for item in rcqueue.domain_list.values(): # cancel running tasks - item["task"].cancel() - raise + """Wiki scanner is spawned as a task which purpose is to continuously run over wikis in the DB, fetching recent changes + to add messages based on the changes to message queue later handled by message_sender coroutine.""" + try: + async for group, db_wikis in generate_domain_groups(): # First scan + await rcqueue.start_group(group, db_wikis) + while True: + await asyncio.sleep(20.0) + await rcqueue.update_queues() + except asyncio.CancelledError: + for item in rcqueue.domain_list.values(): # cancel running tasks + item["task"].cancel() + raise async def message_sender(): - """message_sender is a coroutine responsible for handling Discord messages and their sending to Discord""" - try: - while True: - await messagequeue.resend_msgs() - if main_tasks["msg_queue_shield"].cancelled(): - raise asyncio.CancelledError - except asyncio.CancelledError: - while len(messagequeue): - logger.info("Shutting down after sending {} more Discord messages...".format(len(messagequeue))) - await messagequeue.resend_msgs() - pass - except: - if command_line_args.debug: - logger.exception("Exception on DC message sender") - shutdown(loop=asyncio.get_event_loop()) - else: - logger.exception("Exception on DC message sender") - await generic_msg_sender_exception_logger(traceback.format_exc(), "Message sender exception") + """message_sender is a coroutine responsible for handling Discord messages and their sending to Discord""" + try: + while True: + await messagequeue.resend_msgs() + if main_tasks["msg_queue_shield"].cancelled(): + raise asyncio.CancelledError + except asyncio.CancelledError: + while len(messagequeue): + logger.info("Shutting down after sending {} more Discord messages...".format(len(messagequeue))) + await messagequeue.resend_msgs() + pass + except: + if command_line_args.debug: + logger.exception("Exception on DC message sender") + shutdown(loop=asyncio.get_event_loop()) + else: + logger.exception("Exception on DC message sender") + await generic_msg_sender_exception_logger(traceback.format_exc(), "Message sender exception") + async def discussion_handler(): - await asyncio.sleep(3.0) # Make some time before IRC code is executed, happens only once and saves if inside - try: - while True: - async with db.pool().acquire() as connection: - async with connection.transaction(): - async for db_wiki in connection.cursor("SELECT DISTINCT wiki, rcid, postid FROM rcgcdw WHERE postid != '-1' OR postid IS NULL"): - try: - local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory - except KeyError: - local_wiki = all_wikis[db_wiki["wiki"]] = Wiki() - local_wiki.rc_active = db_wiki["rcid"] - if db_wiki["wiki"] not in rcqueue.irc_mapping["fandom.com"].updated_discussions and \ - local_wiki.last_discussion_check+settings["irc_overtime"] > time.time(): # I swear if another wiki farm ever starts using Fandom discussions I'm gonna use explosion magic - continue - else: - try: - rcqueue.irc_mapping["fandom.com"].updated_discussions.remove(db_wiki["wiki"]) - except KeyError: - pass # to be expected - header = settings["header"] - header["Accept"] = "application/hal+json" - async with aiohttp.ClientSession(headers=header, - timeout=aiohttp.ClientTimeout(6.0)) as session: - try: - feeds_response = await local_wiki.fetch_feeds(db_wiki["wiki"], session) - except (WikiServerError, WikiError): - continue # ignore this wiki if it throws errors - try: - discussion_feed_resp = await feeds_response.json(encoding="UTF-8") - if "error" in discussion_feed_resp: - error = discussion_feed_resp["error"] - if error == "NotFoundException": # Discussions disabled - if db_wiki["rcid"] != -1: # RC feed is disabled - await connection.execute("UPDATE rcgcdw SET postid = $1 WHERE wiki = $2", "-1", db_wiki["wiki"]) - else: - await local_wiki.remove(db_wiki["wiki"], 1000) - await DBHandler.update_db() - continue - raise WikiError - discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"] - discussion_feed.reverse() - except aiohttp.ContentTypeError: - logger.exception("Wiki seems to be resulting in non-json content.") - continue - except asyncio.TimeoutError: - logger.debug("Timeout on reading JSON of discussion post feeed.") - continue - except: - logger.exception("On loading json of response.") - continue - if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel - if len(discussion_feed) > 0: - DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True) - else: - DBHandler.add(db_wiki["wiki"], "0", True) - await DBHandler.update_db() - continue - comment_events = [] - targets = await generate_targets(db_wiki["wiki"], "AND NOT postid = '-1'") - for post in discussion_feed: - if post["_embedded"]["thread"][0]["containerType"] == "ARTICLE_COMMENT" and post["id"] > db_wiki["postid"]: - comment_events.append(post["forumId"]) - comment_pages: dict = {} - if comment_events: - try: - comment_pages = await local_wiki.safe_request( - "{wiki}wikia.php?controller=FeedsAndPosts&method=getArticleNamesAndUsernames&stablePageIds={pages}&format=json".format( - wiki=db_wiki["wiki"], pages=",".join(comment_events) - ), RateLimiter(), "articleNames") - except aiohttp.ClientResponseError: # Fandom can be funny sometimes... See #30 - comment_pages = None - except: - if command_line_args.debug: - logger.exception("Exception on Feeds article comment request") - shutdown(loop=asyncio.get_event_loop()) - else: - logger.exception("Exception on Feeds article comment request") - await generic_msg_sender_exception_logger(traceback.format_exc(), - "Exception on Feeds article comment request", - Post=str(post)[0:1000], Wiki=db_wiki["wiki"]) - message_list = defaultdict(list) - for post in discussion_feed: # Yeah, second loop since the comments require an extra request - if post["id"] > db_wiki["postid"]: - for target in targets.items(): - try: - message = await essential_feeds(post, comment_pages, db_wiki, target) - if message is not None: - message_list[target[0]].append(message) - except asyncio.CancelledError: - raise - except: - if command_line_args.debug: - logger.exception("Exception on Feeds formatter") - shutdown(loop=asyncio.get_event_loop()) - else: - logger.exception("Exception on Feeds formatter") - await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in feed formatter", Post=str(post)[0:1000], Wiki=db_wiki["wiki"]) - # Lets stack the messages - for messages in message_list.values(): - messages = stack_message_list(messages) - for message in messages: - await send_to_discord(message) - if discussion_feed: - DBHandler.add(db_wiki["wiki"], post["id"], True) - await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more - await asyncio.sleep(delay=1.0) # Avoid lock on no wikis - await DBHandler.update_db() - except asyncio.CancelledError: - pass - except: - if command_line_args.debug: - raise # reraise the issue - else: - logger.exception("Exception on Feeds formatter") - await generic_msg_sender_exception_logger(traceback.format_exc(), "Discussion handler task exception", Wiki=db_wiki["wiki"]) + await asyncio.sleep(3.0) # Make some time before IRC code is executed, happens only once and saves if inside + try: + while True: + async with db.pool().acquire() as connection: + async with connection.transaction(): + async for db_wiki in connection.cursor("SELECT DISTINCT wiki, rcid, postid FROM rcgcdw WHERE postid != '-1' OR postid IS NULL"): + try: + local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory + except KeyError: + local_wiki = all_wikis[db_wiki["wiki"]] = Wiki() + local_wiki.rc_active = db_wiki["rcid"] + if db_wiki["wiki"] not in rcqueue.irc_mapping["fandom.com"].updated_discussions and \ + local_wiki.last_discussion_check+settings["irc_overtime"] > time.time(): # I swear if another wiki farm ever starts using Fandom discussions I'm gonna use explosion magic + continue + else: + try: + rcqueue.irc_mapping["fandom.com"].updated_discussions.remove(db_wiki["wiki"]) + except KeyError: + pass # to be expected + header = settings["header"] + header["Accept"] = "application/hal+json" + async with aiohttp.ClientSession(headers=header, + timeout=aiohttp.ClientTimeout(6.0)) as session: + try: + feeds_response = await local_wiki.fetch_feeds(db_wiki["wiki"], session) + except (WikiServerError, WikiError): + continue # ignore this wiki if it throws errors + try: + discussion_feed_resp = await feeds_response.json(encoding="UTF-8") + if "error" in discussion_feed_resp: + error = discussion_feed_resp["error"] + if error == "NotFoundException": # Discussions disabled + if db_wiki["rcid"] != -1: # RC feed is disabled + await connection.execute("UPDATE rcgcdw SET postid = $1 WHERE wiki = $2", "-1", db_wiki["wiki"]) + else: + await local_wiki.remove(db_wiki["wiki"], 1000) + await DBHandler.update_db() + continue + raise WikiError + discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"] + discussion_feed.reverse() + except aiohttp.ContentTypeError: + logger.exception("Wiki seems to be resulting in non-json content.") + continue + except asyncio.TimeoutError: + logger.debug("Timeout on reading JSON of discussion post feeed.") + continue + except: + logger.exception("On loading json of response.") + continue + if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel + if len(discussion_feed) > 0: + DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True) + else: + DBHandler.add(db_wiki["wiki"], "0", True) + await DBHandler.update_db() + continue + comment_events = [] + targets = await generate_targets(db_wiki["wiki"], "AND NOT postid = '-1'") + for post in discussion_feed: + if post["_embedded"]["thread"][0]["containerType"] == "ARTICLE_COMMENT" and post["id"] > db_wiki["postid"]: + comment_events.append(post["forumId"]) + comment_pages: dict = {} + if comment_events: + try: + comment_pages = await local_wiki.safe_request( + "{wiki}wikia.php?controller=FeedsAndPosts&method=getArticleNamesAndUsernames&stablePageIds={pages}&format=json".format( + wiki=db_wiki["wiki"], pages=",".join(comment_events) + ), RateLimiter(), "articleNames") + except aiohttp.ClientResponseError: # Fandom can be funny sometimes... See #30 + comment_pages = None + except: + if command_line_args.debug: + logger.exception("Exception on Feeds article comment request") + shutdown(loop=asyncio.get_event_loop()) + else: + logger.exception("Exception on Feeds article comment request") + await generic_msg_sender_exception_logger(traceback.format_exc(), + "Exception on Feeds article comment request", + Post=str(post)[0:1000], Wiki=db_wiki["wiki"]) + message_list = defaultdict(list) + for post in discussion_feed: # Yeah, second loop since the comments require an extra request + if post["id"] > db_wiki["postid"]: + for target in targets.items(): + try: + message = await essential_feeds(post, comment_pages, db_wiki, target) + if message is not None: + message_list[target[0]].append(message) + except asyncio.CancelledError: + raise + except: + if command_line_args.debug: + logger.exception("Exception on Feeds formatter") + shutdown(loop=asyncio.get_event_loop()) + else: + logger.exception("Exception on Feeds formatter") + await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in feed formatter", Post=str(post)[0:1000], Wiki=db_wiki["wiki"]) + # Lets stack the messages + for messages in message_list.values(): + messages = stack_message_list(messages) + for message in messages: + await send_to_discord(message) + if discussion_feed: + DBHandler.add(db_wiki["wiki"], post["id"], True) + await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more + await asyncio.sleep(delay=1.0) # Avoid lock on no wikis + await DBHandler.update_db() + except asyncio.CancelledError: + pass + except: + if command_line_args.debug: + raise # reraise the issue + else: + logger.exception("Exception on Feeds formatter") + await generic_msg_sender_exception_logger(traceback.format_exc(), "Discussion handler task exception", Wiki=db_wiki["wiki"]) def shutdown(loop, signal=None): - global main_tasks - loop.remove_signal_handler(signal) - if len(messagequeue) > 0: - logger.warning("Some messages are still queued!") - for task in (main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["msg_queue_shield"]): - task.cancel() - loop.run_until_complete(main_tasks["message_sender"]) - for task in asyncio.all_tasks(loop): - logger.debug("Killing task") - task.cancel() - try: - loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop))) - except asyncio.CancelledError: - loop.stop() - logger.info("Script has shut down due to signal {}.".format(signal)) - # sys.exit(0) + global main_tasks + loop.remove_signal_handler(signal) + if len(messagequeue) > 0: + logger.warning("Some messages are still queued!") + for task in (main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["msg_queue_shield"]): + task.cancel() + loop.run_until_complete(main_tasks["message_sender"]) + for task in asyncio.all_tasks(loop): + logger.debug("Killing task") + task.cancel() + try: + loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop))) + except asyncio.CancelledError: + loop.stop() + logger.info("Script has shut down due to signal {}.".format(signal)) + # sys.exit(0) # def global_exception_handler(loop, context): @@ -548,31 +549,31 @@ def shutdown(loop, signal=None): async def main_loop(): - global main_tasks - loop = asyncio.get_event_loop() - nest_asyncio.apply(loop) - await db.setup_connection() - logger.debug("Connection type: {}".format(db.connection)) - await populate_allwikis() - try: - signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) - for s in signals: - loop.add_signal_handler( - s, lambda s=s: shutdown(loop, signal=s)) - except AttributeError: - logger.info("Running on Windows, some things may not work as they should.") - signals = (signal.SIGBREAK, signal.SIGTERM, signal.SIGINT) - # loop.set_exception_handler(global_exception_handler) - try: - main_tasks = {"wiki_scanner": asyncio.create_task(wiki_scanner()), "message_sender": asyncio.create_task(message_sender()), - "discussion_handler": asyncio.create_task(discussion_handler())} - main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"]) - await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"]) - except KeyboardInterrupt: - await DBHandler.update_db() - await db.shutdown_connection() - shutdown(loop) - except asyncio.CancelledError: - return + global main_tasks + loop = asyncio.get_event_loop() + nest_asyncio.apply(loop) + await db.setup_connection() + logger.debug("Connection type: {}".format(db.connection)) + await populate_allwikis() + try: + signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) + for s in signals: + loop.add_signal_handler( + s, lambda s=s: shutdown(loop, signal=s)) + except AttributeError: + logger.info("Running on Windows, some things may not work as they should.") + signals = (signal.SIGBREAK, signal.SIGTERM, signal.SIGINT) + # loop.set_exception_handler(global_exception_handler) + try: + main_tasks = {"wiki_scanner": asyncio.create_task(wiki_scanner()), "message_sender": asyncio.create_task(message_sender()), + "discussion_handler": asyncio.create_task(discussion_handler())} + main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"]) + await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"]) + except KeyboardInterrupt: + await DBHandler.update_db() + await db.shutdown_connection() + shutdown(loop) + except asyncio.CancelledError: + return asyncio.run(main_loop(), debug=False) diff --git a/src/domain.py b/src/domain.py index 439b4e5..0078a8b 100644 --- a/src/domain.py +++ b/src/domain.py @@ -10,9 +10,25 @@ if TYPE_CHECKING: class Domain: - def __init__(self, task: asyncio.Task, irc_client: Optional[irc.client_aio.AioSimpleIRCClient] = None): - self.task = task + def __init__(self, name: str, irc_client: Optional[irc.client_aio.AioSimpleIRCClient] = None): + self.name = name # This should be always in format of topname.extension for example fandom.com + self.task: asyncio.Task = self.create_task() self.wikis: list[src.wiki.Wiki] = list() self.rate_limiter: src.wiki_ratelimiter = src.wiki_ratelimiter.RateLimiter() self.irc = irc_client + def add_wiki(self, wiki: src.wiki.Wiki, index: int = None): + """Adds a wiki to domain list. + + :parameter wiki - Wiki object + :parameter index (optional) - index at which the wiki should be added, if not specified it's the end of the list""" + if index: + self.wikis.insert(index, wiki) + else: + self.wikis.append(wiki) + + def create_task(self) -> asyncio.Task: + return asyncio.create_task(self.run_wiki_check()) + + async def run_wiki_check(self): + raise NotImplementedError diff --git a/src/domain_manager.py b/src/domain_manager.py new file mode 100644 index 0000000..af7f6fd --- /dev/null +++ b/src/domain_manager.py @@ -0,0 +1,38 @@ +from __future__ import annotations +from typing import TYPE_CHECKING, Optional +from urllib.parse import urlparse, urlunparse +from src.config import settings +from src.domain import Domain + + +if TYPE_CHECKING: + from src.wiki import Wiki + + +class DomainManager: + def __init__(self): + self.domains: dict[str, Domain] = {} + + def new_wiki(self, wiki: Wiki): + """Finds a domain for the wiki and adds a wiki to the domain object. + + :parameter wiki - Wiki object to be added""" + wiki_domain = self.get_domain(wiki.script_url) + try: + self.domains[wiki_domain].add_wiki(wiki) + except KeyError: + self.new_domain(wiki_domain).add_wiki(wiki) + + @staticmethod + def get_domain(url: str) -> str: + """Returns a domain for given URL (for example fandom.com, wikipedia.org)""" + parsed_url = urlparse(url) + return ".".join(urlunparse((*parsed_url[0:2], "", "", "", "")).split(".")[-2:]) + + def new_domain(self, name: str) -> Domain: + # TODO IRC Part + self.domains[name] = Domain(name, irc) + return self.domains[name] + + +domains = DomainManager() diff --git a/src/wiki.py b/src/wiki.py index 02c9d19..49d0e69 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -17,9 +17,18 @@ from bs4 import BeautifulSoup logger = logging.getLogger("rcgcdb.wiki") +class Wiki: + def __init__(self, script_url: str, rc_id: int, discussion_id: int): + self.script_url = script_url + self.session = aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(6.0)) + self.statistics = Statistics() + + @property + def rc_id(self): + return self.statistics.rc_id @dataclass -class Wiki: +class Wiki_old: mw_messages: int = None fail_times: int = 0 # corresponding to amount of times connection with wiki failed for client reasons (400-499) session: aiohttp.ClientSession = None