diff --git a/src/bot.py b/src/bot.py index 59839a8..b16acdb 100644 --- a/src/bot.py +++ b/src/bot.py @@ -4,6 +4,7 @@ import logging.config import signal import traceback import nest_asyncio +import time from collections import defaultdict, namedtuple from typing import Generator @@ -18,6 +19,7 @@ from src.queue_handler import DBHandler from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essential_feeds from src.discord import DiscordMessage, generic_msg_sender_exception_logger, stack_message_list from src.wiki_ratelimiter import RateLimiter +from src.irc_feed import AioIRCCat logging.config.dictConfig(settings["logging"]) @@ -60,11 +62,24 @@ class RcQueue: def __init__(self): self.domain_list = {} self.to_remove = [] + self.irc_mapping = {} async def start_group(self, group, initial_wikis): """Starts a task for given domain group""" if group not in self.domain_list: - self.domain_list[group] = {"task": asyncio.create_task(scan_group(group)), "last_rowid": 0, "query": LimitedList(initial_wikis), "rate_limiter": RateLimiter()} + if group in self.irc_mapping: # Hopefully there are no race conditions.... + irc_connection = self.irc_mapping[group] + else: + for irc_server in settings["irc_servers"].keys(): + if group in settings["irc_servers"][irc_server]["domains"]: + irc_connection = AioIRCCat(settings["irc_servers"][irc_server]["irc_channel_mapping"], all_wikis) + for domain in settings["irc_servers"][irc_server]["domains"]: + self.irc_mapping[domain] = irc_connection + irc_connection.connect(settings["irc_servers"][irc_server]["irc_host"], settings["irc_servers"][irc_server]["irc_port"], settings["irc_servers"][irc_server]["irc_name"]) + break + else: + irc_connection = None + self.domain_list[group] = {"task": asyncio.create_task(scan_group(group)), "last_rowid": 0, "query": LimitedList(initial_wikis), "rate_limiter": RateLimiter(), "irc": irc_connection} logger.debug(self.domain_list[group]) else: raise KeyError @@ -77,7 +92,10 @@ class RcQueue: all_wikis[wiki].rc_active = -1 if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task logger.debug(f"{group} no longer has any wikis queued!") - await self.stop_task_group(group) + if not self.check_if_domain_in_db(group): + await self.stop_task_group(group) + else: + logger.debug(f"But there are still wikis for it in DB!") async def stop_task_group(self, group): self[group]["task"].cancel() @@ -87,7 +105,7 @@ class RcQueue: fetch_all = db_cursor.execute( 'SELECT ROWID, webhook, wiki, lang, display, rcid FROM rcgcdw WHERE rcid != -1 GROUP BY wiki ORDER BY ROWID ASC') for wiki in fetch_all.fetchall(): - if get_domain(db_wiki["wiki"]) == domain: + if get_domain(wiki["wiki"]) == domain: return True return False @@ -143,9 +161,23 @@ class RcQueue: continue try: current_domain: dict = self[domain] + if current_domain["irc"]: + logger.info('CURRENT STATUS:') + logger.info("DOMAIN LIST FOR IRC: {}".format(current_domain["irc"].updated)) + logger.info("CURRENT DOMAIN INFO: {}".format(domain)) + logger.info("IS WIKI IN A LIST?: {}".format(db_wiki["wiki"] in current_domain["irc"].updated)) + logger.info("LAST CHECK FOR THE WIKI {} IS {}".format(db_wiki["wiki"], all_wikis[db_wiki["wiki"]].last_check)) + if db_wiki["wiki"] not in current_domain["irc"].updated and all_wikis[db_wiki["wiki"]].last_check+settings["irc_overtime"] > time.time(): + continue # if domain has IRC, has not been updated, and it was updated less than an hour ago + else: # otherwise remove it from the list + try: + current_domain["irc"].updated.remove(db_wiki["wiki"]) + except KeyError: + pass # this is to be expected when third condition is not met above if not db_wiki["ROWID"] < current_domain["last_rowid"]: current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20)) except KeyError: + raise await self.start_group(domain, [QueuedWiki(db_wiki["wiki"], 20)]) logger.info("A new domain group ({}) has been added since last time, adding it to the domain_list and starting a task...".format(domain)) except ListFull: @@ -271,6 +303,7 @@ async def scan_group(group: str): targets = generate_targets(queued_wiki.url, "AND (rcid != -1 OR rcid IS NULL)") paths = get_paths(queued_wiki.url, recent_changes_resp) new_events = 0 + local_wiki.last_check = time.time() # on successful check, save new last check time for change in recent_changes: if change["rcid"] > local_wiki.rc_active and queued_wiki.amount != 450: new_events += 1 @@ -359,6 +392,13 @@ async def discussion_handler(): fetch_all = db_cursor.execute( "SELECT wiki, rcid, postid FROM rcgcdw WHERE postid != '-1' OR postid IS NULL GROUP BY wiki") for db_wiki in fetch_all.fetchall(): + if db_wiki["wiki"] not in rcqueue.irc_mapping["fandom.com"].updated_discussions and all_wikis[db_wiki["wiki"]].last_discussion_check+settings["irc_overtime"] > time.time(): # I swear if another wiki farm ever starts using Fandom discussions I'm gonna use explosion magic + continue + else: + try: + rcqueue.irc_mapping["fandom.com"].updated_discussions.remove(db_wiki["wiki"]) + except KeyError: + pass # to be expected header = settings["header"] header["Accept"] = "application/hal+json" async with aiohttp.ClientSession(headers=header, diff --git a/src/irc_feed.py b/src/irc_feed.py new file mode 100644 index 0000000..6c19597 --- /dev/null +++ b/src/irc_feed.py @@ -0,0 +1,62 @@ +import irc.client_aio +import json +import logging +from urllib.parse import urlparse, quote + +logger = logging.getLogger("rcgcdw.irc_feed") + + +class AioIRCCat(irc.client_aio.AioSimpleIRCClient): + def __init__(self, targets, all_wikis): + irc.client_aio.SimpleIRCClient.__init__(self) + self.targets = targets + self.updated = set() # Storage for edited wikis + self.updated_discussions = set() + self.wikis = all_wikis + + def on_welcome(self, connection, event): # Join IRC channels + for channel in self.targets.values(): + connection.join(channel) + + def on_pubmsg(self, connection, event): + if event.target == self.targets["rc"]: + self.parse_fandom_message(' '.join(event.arguments)) + elif event.target == self.targets["discussion"]: + self.parse_fandom_discussion(' '.join(event.arguments)) + + def on_nicknameinuse(self, c, e): + c.nick(c.get_nickname() + "_") + + def parse_fandom_message(self, message): + message = message.split("\x035*\x03") + # print(asyncio.all_tasks()) + half = message[0].find("\x0302http") + if half == -1: + return + message = message[0][half + 3:].strip() + # print(message) + url = urlparse(message) + full_url = "https://"+url.netloc + recognize_langs(url.path) + if full_url in self.wikis and self.wikis[full_url].rc_active != -1: + self.updated.add(full_url) + logger.debug("New website appended to the list! {}".format(full_url)) + + def parse_fandom_discussion(self, message): + post = json.loads(message) + if post.get('action', 'unknown') != "deleted": # ignore deletion events + url = urlparse(post.get('url')) + full_url ="https://"+ url.netloc + recognize_langs(url.path) + if full_url in self.wikis: # POSSIBLE MEMORY LEAK AS WE DON'T HAVE A WAY TO CHECK IF WIKI IS LOOKING FOR DISCUSSIONS OR NOT + self.updated_discussions.add("https://"+full_url) + logger.debug("New website appended to the list! {}".format(full_url)) + + +def recognize_langs(path): + lang = "" + new_path = path.split("/") + if len(new_path)>2: + if new_path[1] not in ("wiki", "f"): + lang = "/"+new_path[1] + return lang+"/" + + diff --git a/src/wiki.py b/src/wiki.py index 148a7a1..1bf3a89 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -24,6 +24,8 @@ class Wiki: fail_times: int = 0 # corresponding to amount of times connection with wiki failed for client reasons (400-499) session: aiohttp.ClientSession = None rc_active: int = 0 + last_check: float = 0.0 + last_discussion_check: float = 0.0 @staticmethod async def fetch_wiki(extended, script_path, session: aiohttp.ClientSession, ratelimiter: RateLimiter, amount=20) -> aiohttp.ClientResponse: @@ -188,6 +190,7 @@ async def process_mwmsgs(wiki_response: dict, local_wiki: Wiki, mw_msgs: dict): mw_msgs[key] = msgs # it may be a little bit messy for sure, however I don't expect any reason to remove mw_msgs entries by one local_wiki.mw_messages = key + # db_wiki: webhook, wiki, lang, display, rcid, postid async def essential_info(change: dict, changed_categories, local_wiki: Wiki, target: tuple, paths: tuple, request: dict, rate_limiter: RateLimiter) -> src.discord.DiscordMessage: