diff --git a/requirements.txt b/requirements.txt index 1c45373..c752fe3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ beautifulsoup4 >= 4.6.0; python_version >= '3.6' aiohttp >= 3.6.2 lxml >= 4.2.1 -nest-asyncio >= 1.4.0 \ No newline at end of file +nest-asyncio >= 1.4.0 +irc >= 19.0.1 \ No newline at end of file diff --git a/settings.json.example b/settings.json.example index 3b5ca77..67dd2f2 100644 --- a/settings.json.example +++ b/settings.json.example @@ -7,6 +7,17 @@ "database_path": "rcgcdb.db", "monitoring_webhook": "111111111111111111/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", "support": "https://discord.gg/v77RTk5", + "irc_overtime": 3600, + "irc_servers": { + "your custom name for the farm": { + "domains": ["wikipedia.org", "otherwikipedia.org"], + "irc_host": "randomIRC.domain.com", + "irc_port": "6667", + "irc_nickname": "BotIRCNickname", + "irc_name": "BotIRCName", + "irc_channel_mapping": {"rc": "#rcchannel", "discussion": "#discussionchannel"} + } + }, "logging": { "version": 1, "disable_existing_loggers": false, diff --git a/src/bot.py b/src/bot.py index 18577d9..a4a70a6 100644 --- a/src/bot.py +++ b/src/bot.py @@ -4,6 +4,7 @@ import logging.config import signal import traceback import nest_asyncio +import time from collections import defaultdict, namedtuple from typing import Generator @@ -18,6 +19,7 @@ from src.queue_handler import DBHandler from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essential_feeds from src.discord import DiscordMessage, generic_msg_sender_exception_logger, stack_message_list from src.wiki_ratelimiter import RateLimiter +from src.irc_feed import AioIRCCat logging.config.dictConfig(settings["logging"]) @@ -60,11 +62,24 @@ class RcQueue: def __init__(self): self.domain_list = {} self.to_remove = [] + self.irc_mapping = {} async def start_group(self, group, initial_wikis): """Starts a task for given domain group""" if group not in self.domain_list: - self.domain_list[group] = {"task": asyncio.create_task(scan_group(group)), "last_rowid": 0, "query": LimitedList(initial_wikis), "rate_limiter": RateLimiter()} + if group in self.irc_mapping: # Hopefully there are no race conditions.... + irc_connection = self.irc_mapping[group] + else: + for irc_server in settings["irc_servers"].keys(): + if group in settings["irc_servers"][irc_server]["domains"]: + irc_connection = AioIRCCat(settings["irc_servers"][irc_server]["irc_channel_mapping"], all_wikis) + for domain in settings["irc_servers"][irc_server]["domains"]: + self.irc_mapping[domain] = irc_connection + irc_connection.connect(settings["irc_servers"][irc_server]["irc_host"], settings["irc_servers"][irc_server]["irc_port"], settings["irc_servers"][irc_server]["irc_name"]) + break + else: + irc_connection = None + self.domain_list[group] = {"task": asyncio.create_task(scan_group(group)), "last_rowid": 0, "query": LimitedList(initial_wikis), "rate_limiter": RateLimiter(), "irc": irc_connection} logger.debug(self.domain_list[group]) else: raise KeyError @@ -77,7 +92,10 @@ class RcQueue: all_wikis[wiki].rc_active = -1 if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task logger.debug(f"{group} no longer has any wikis queued!") - await self.stop_task_group(group) + if not self.check_if_domain_in_db(group): + await self.stop_task_group(group) + else: + logger.debug(f"But there are still wikis for it in DB!") async def stop_task_group(self, group): self[group]["task"].cancel() @@ -87,7 +105,7 @@ class RcQueue: fetch_all = db_cursor.execute( 'SELECT ROWID, webhook, wiki, lang, display, rcid FROM rcgcdw WHERE rcid != -1 GROUP BY wiki ORDER BY ROWID ASC') for wiki in fetch_all.fetchall(): - if get_domain(db_wiki["wiki"]) == domain: + if get_domain(wiki["wiki"]) == domain: return True return False @@ -143,6 +161,19 @@ class RcQueue: continue try: current_domain: dict = self[domain] + if current_domain["irc"]: + logger.debug('CURRENT STATUS:') + logger.debug("DOMAIN LIST FOR IRC: {}".format(current_domain["irc"].updated)) + logger.debug("CURRENT DOMAIN INFO: {}".format(domain)) + logger.debug("IS WIKI IN A LIST?: {}".format(db_wiki["wiki"] in current_domain["irc"].updated)) + logger.debug("LAST CHECK FOR THE WIKI {} IS {}".format(db_wiki["wiki"], all_wikis[db_wiki["wiki"]].last_check)) + if db_wiki["wiki"] not in current_domain["irc"].updated and all_wikis[db_wiki["wiki"]].last_check+settings["irc_overtime"] > time.time(): + continue # if domain has IRC, has not been updated, and it was updated less than an hour ago + else: # otherwise remove it from the list + try: + current_domain["irc"].updated.remove(db_wiki["wiki"]) + except KeyError: + pass # this is to be expected when third condition is not met above if not db_wiki["ROWID"] < current_domain["last_rowid"]: current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20)) except KeyError: @@ -218,7 +249,8 @@ async def scan_group(group: str): while True: try: async with rcqueue.retrieve_next_queued(group) as queued_wiki: # acquire next wiki in queue - await asyncio.sleep(calculate_delay_for_group(len(rcqueue[group]["query"]))) + if "irc" not in rcqueue[group]: + await asyncio.sleep(calculate_delay_for_group(len(rcqueue[group]["query"]))) logger.debug("Wiki {}".format(queued_wiki.url)) local_wiki = all_wikis[queued_wiki.url] # set a reference to a wiki object from memory extended = False @@ -271,6 +303,7 @@ async def scan_group(group: str): targets = generate_targets(queued_wiki.url, "AND (rcid != -1 OR rcid IS NULL)") paths = get_paths(queued_wiki.url, recent_changes_resp) new_events = 0 + local_wiki.last_check = time.time() # on successful check, save new last check time for change in recent_changes: if change["rcid"] > local_wiki.rc_active and queued_wiki.amount != 450: new_events += 1 @@ -314,7 +347,7 @@ async def scan_group(group: str): except asyncio.CancelledError: return except QueueEmpty: - await asyncio.sleep(21.0) + await asyncio.sleep(10.0) continue @@ -359,15 +392,22 @@ async def discussion_handler(): fetch_all = db_cursor.execute( "SELECT wiki, rcid, postid FROM rcgcdw WHERE postid != '-1' OR postid IS NULL GROUP BY wiki") for db_wiki in fetch_all.fetchall(): + try: + local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory + except KeyError: + local_wiki = all_wikis[db_wiki["wiki"]] = Wiki() + local_wiki.rc_active = db_wiki["rcid"] + if db_wiki["wiki"] not in rcqueue.irc_mapping["fandom.com"].updated_discussions and local_wiki.last_discussion_check+settings["irc_overtime"] > time.time(): # I swear if another wiki farm ever starts using Fandom discussions I'm gonna use explosion magic + continue + else: + try: + rcqueue.irc_mapping["fandom.com"].updated_discussions.remove(db_wiki["wiki"]) + except KeyError: + pass # to be expected header = settings["header"] header["Accept"] = "application/hal+json" async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(6.0)) as session: - try: - local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory - except KeyError: - local_wiki = all_wikis[db_wiki["wiki"]] = Wiki() - local_wiki.rc_active = db_wiki["rcid"] try: feeds_response = await local_wiki.fetch_feeds(db_wiki["wiki"], session) except (WikiServerError, WikiError): diff --git a/src/irc_feed.py b/src/irc_feed.py new file mode 100644 index 0000000..6c19597 --- /dev/null +++ b/src/irc_feed.py @@ -0,0 +1,62 @@ +import irc.client_aio +import json +import logging +from urllib.parse import urlparse, quote + +logger = logging.getLogger("rcgcdw.irc_feed") + + +class AioIRCCat(irc.client_aio.AioSimpleIRCClient): + def __init__(self, targets, all_wikis): + irc.client_aio.SimpleIRCClient.__init__(self) + self.targets = targets + self.updated = set() # Storage for edited wikis + self.updated_discussions = set() + self.wikis = all_wikis + + def on_welcome(self, connection, event): # Join IRC channels + for channel in self.targets.values(): + connection.join(channel) + + def on_pubmsg(self, connection, event): + if event.target == self.targets["rc"]: + self.parse_fandom_message(' '.join(event.arguments)) + elif event.target == self.targets["discussion"]: + self.parse_fandom_discussion(' '.join(event.arguments)) + + def on_nicknameinuse(self, c, e): + c.nick(c.get_nickname() + "_") + + def parse_fandom_message(self, message): + message = message.split("\x035*\x03") + # print(asyncio.all_tasks()) + half = message[0].find("\x0302http") + if half == -1: + return + message = message[0][half + 3:].strip() + # print(message) + url = urlparse(message) + full_url = "https://"+url.netloc + recognize_langs(url.path) + if full_url in self.wikis and self.wikis[full_url].rc_active != -1: + self.updated.add(full_url) + logger.debug("New website appended to the list! {}".format(full_url)) + + def parse_fandom_discussion(self, message): + post = json.loads(message) + if post.get('action', 'unknown') != "deleted": # ignore deletion events + url = urlparse(post.get('url')) + full_url ="https://"+ url.netloc + recognize_langs(url.path) + if full_url in self.wikis: # POSSIBLE MEMORY LEAK AS WE DON'T HAVE A WAY TO CHECK IF WIKI IS LOOKING FOR DISCUSSIONS OR NOT + self.updated_discussions.add("https://"+full_url) + logger.debug("New website appended to the list! {}".format(full_url)) + + +def recognize_langs(path): + lang = "" + new_path = path.split("/") + if len(new_path)>2: + if new_path[1] not in ("wiki", "f"): + lang = "/"+new_path[1] + return lang+"/" + + diff --git a/src/wiki.py b/src/wiki.py index e89fd3e..4366efa 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -24,6 +24,8 @@ class Wiki: fail_times: int = 0 # corresponding to amount of times connection with wiki failed for client reasons (400-499) session: aiohttp.ClientSession = None rc_active: int = 0 + last_check: float = 0.0 + last_discussion_check: float = 0.0 @staticmethod async def fetch_wiki(extended, script_path, session: aiohttp.ClientSession, ratelimiter: RateLimiter, amount=20) -> aiohttp.ClientResponse: @@ -188,6 +190,7 @@ async def process_mwmsgs(wiki_response: dict, local_wiki: Wiki, mw_msgs: dict): mw_msgs[key] = msgs # it may be a little bit messy for sure, however I don't expect any reason to remove mw_msgs entries by one local_wiki.mw_messages = key + # db_wiki: webhook, wiki, lang, display, rcid, postid async def essential_info(change: dict, changed_categories, local_wiki: Wiki, target: tuple, paths: tuple, request: dict, rate_limiter: RateLimiter) -> src.discord.DiscordMessage: