diff --git a/src/api/client.py b/src/api/client.py index a68ee4c..29ecf49 100644 --- a/src/api/client.py +++ b/src/api/client.py @@ -27,8 +27,7 @@ class Client: """ A client for interacting with RcGcDw when creating formatters or hooks. """ - def __init__(self, hooks, wiki): - self._formatters = hooks + def __init__(self, wiki): self.__recent_changes: Wiki = wiki self.WIKI_API_PATH: str = src.misc.WIKI_API_PATH self.WIKI_ARTICLE_PATH: str = src.misc.WIKI_ARTICLE_PATH @@ -39,10 +38,6 @@ class Client: self.LinkParser: type(src.misc.LinkParser) = src.misc.LinkParser #self.make_api_request: src.rc.wiki.__recent_changes.api_request = self.__recent_changes.api_request - def refresh_internal_data(self): - """Refreshes internal storage data for wiki tags and MediaWiki messages.""" - self.__recent_changes.init_info() - @property def namespaces(self) -> dict: """Return a dict of namespaces, if None return empty dict""" @@ -88,9 +83,6 @@ class Client: """ return self.__recent_changes.api_request(params, *json_path, timeout=timeout, allow_redirects=allow_redirects) - def get_formatters(self): - return self._formatters - def get_ipmapper(self) -> dict: """Returns a dict mapping IPs with amount of their edits""" return self.__recent_changes.map_ips diff --git a/src/api/util.py b/src/api/util.py index 2560ad5..ec3bd8f 100644 --- a/src/api/util.py +++ b/src/api/util.py @@ -32,9 +32,9 @@ if TYPE_CHECKING: logger = logging.getLogger("src.api.util") -def default_message(event: str, formatter_hooks: dict) -> Callable: +def default_message(event: str, display: str, formatter_hooks: dict) -> Callable: """Returns a method of a formatter responsible for the event or None if such does not exist.""" - return formatter_hooks.get(event, formatter_hooks.get("generic", formatter_hooks["no_formatter"])) + return formatter_hooks[display].get(event, formatter_hooks.get("generic", formatter_hooks["no_formatter"])) def clean_link(link: str) -> str: diff --git a/src/bot.py b/src/bot.py index 8d4e800..235a2c1 100644 --- a/src/bot.py +++ b/src/bot.py @@ -10,7 +10,7 @@ from typing import Generator from contextlib import asynccontextmanager -from redis_connector import redis +from redis_connector import Redis from src.argparser import command_line_args from src.config import settings from src.database import db @@ -47,7 +47,7 @@ async def populate_wikis(): async with db.pool().acquire() as connection: async with connection.transaction(): async for db_wiki in connection.cursor('SELECT DISTINCT wiki, rcid, postid FROM rcgcdw'): - domains.new_wiki(Wiki(db_wiki["wiki"], db_wiki["rcid"], db_wiki["postid"])) + await domains.new_wiki(Wiki(db_wiki["wiki"], db_wiki["rcid"], db_wiki["postid"])) queue_limit = settings.get("queue_limit", 30) QueuedWiki = namedtuple("QueuedWiki", ['url', 'amount']) @@ -548,6 +548,7 @@ async def main_loop(): await db.setup_connection() logger.debug("Connection type: {}".format(db.connection)) await populate_wikis() + redis = Redis(domains) await redis.connect() await redis.pubsub() domains.run_all_domains() @@ -562,7 +563,8 @@ async def main_loop(): # loop.set_exception_handler(global_exception_handler) try: main_tasks = {"wiki_scanner": asyncio.create_task(wiki_scanner()), "message_sender": asyncio.create_task(message_sender()), - "discussion_handler": asyncio.create_task(discussion_handler()), "database_updates": asyncio.create_task(DBHandler.update_db())} + "discussion_handler": asyncio.create_task(discussion_handler()), "database_updates": asyncio.create_task(DBHandler.update_db()), + "redis_updates": asyncio.create_task(redis.reader())} main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"]) main_tasks["database_updates_shield"] = asyncio.shield(main_tasks["database_updates"]) await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"], main_tasks["database_updates"]) diff --git a/src/domain.py b/src/domain.py index 9077ffa..bb3b40d 100644 --- a/src/domain.py +++ b/src/domain.py @@ -47,7 +47,7 @@ class Domain: logger.error(f"Tried to start a task for domain {self.name} however the task already exists!") def remove_wiki(self, script_url: str): - + self.wikis.pop(script_url) def add_wiki(self, wiki: src.wiki.Wiki, first=False): """Adds a wiki to domain list. diff --git a/src/domain_manager.py b/src/domain_manager.py index 050ae66..b954fd7 100644 --- a/src/domain_manager.py +++ b/src/domain_manager.py @@ -14,7 +14,7 @@ class DomainManager: def __init__(self): self.domains: dict[str, Domain] = {} - def new_wiki(self, wiki: Wiki): + async def new_wiki(self, wiki: Wiki): """Finds a domain for the wiki and adds a wiki to the domain object. :parameter wiki - Wiki object to be added""" @@ -22,7 +22,17 @@ class DomainManager: try: self.domains[wiki_domain].add_wiki(wiki) except KeyError: - self.new_domain(wiki_domain).add_wiki(wiki) + new_domain = await self.new_domain(wiki_domain) + new_domain.add_wiki(wiki) + + def remove_wiki(self, script_url: str): + wiki_domain = self.get_domain(script_url) + try: + domain = self.domains[wiki_domain] + except KeyError: + raise NoDomain + else: + domain.remove_wiki(script_url) @staticmethod def get_domain(url: str) -> str: diff --git a/src/redis_connector.py b/src/redis_connector.py index 80872bd..4e24fb4 100644 --- a/src/redis_connector.py +++ b/src/redis_connector.py @@ -2,15 +2,20 @@ import asyncio import aioredis import async_timeout import logging -from typing import Optional +from typing import Optional, TYPE_CHECKING from src.config import settings +from src.wiki import Wiki logger = logging.getLogger("rcgcdb.redisconnector") +if TYPE_CHECKING: + from src.domain_manager import DomainManager + class Redis: - def __init__(self): + def __init__(self, domain_manager): self.pub_connection: Optional[aioredis.connection] = None self.stat_connection: Optional[aioredis.connection] = None + self.domain_manager: DomainManager = domain_manager async def reader(self): """Based on code from https://aioredis.readthedocs.io/en/latest/getting-started/#pubsub-mode""" @@ -21,11 +26,24 @@ class Redis: if message is not None: print(f"(Reader) Message Received: {message}") logger.debug(f"(Reader) Message Received: {message}") + await self.process_changes(message["data"]) await asyncio.sleep(1.0) except asyncio.TimeoutError: # TODO Better handler pass except aioredis.exceptions.ConnectionError: pass + except asyncio.CancelledError: + # TODO Send a message about shutdown + raise NotImplementedError + + async def process_changes(self, data: str): + data = data.split(" ") + if data[0] == "REMOVE": + self.domain_manager.remove_wiki(data[1]) # TODO Add response to source + elif data[0] == "ADD": # ADD https://new_wiki.somedamain.com 43 1 where 43 stands for rc_id and 1 for discussion_id + wiki = Wiki(data[1], int(data[2]), int(data[3])) # TODO This might raise an issue if non-int value + await self.domain_manager.new_wiki(wiki) + async def connect(self): self.stat_connection = await aioredis.from_url("redis://" + settings["redis_host"], encoding="UTF-8") @@ -33,7 +51,3 @@ class Redis: async def pubsub(self): self.pub_connection = self.stat_connection.pubsub() await self.pub_connection.subscribe("rcgcdb_updates") - asyncio.create_task(self.reader()) - - -redis = Redis() diff --git a/src/wiki.py b/src/wiki.py index 093da2f..4c14f2d 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -12,6 +12,8 @@ from src.database import db from src.queue_handler import DBHandler from src.formatters.rc import embed_formatter, compact_formatter from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter +from src.api.hooks import formatter_hooks +from src.api.client import Client from src.misc import parse_link from src.i18n import langs from src.wiki_ratelimiter import RateLimiter @@ -41,6 +43,7 @@ class Wiki: self.mw_messages: Optional[MWMessages] = None self.first_fetch_done: bool = False self.domain: Optional[Domain] = None + self.client: Client = Client(self) @property def rc_id(self): @@ -224,6 +227,11 @@ class Wiki: break await process_cats(change, self, categorize_events) else: # adequate amount of changes + for tag in request["query"]["tags"]: + try: + self.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text() + except KeyError: + self.tags[tag["name"]] = None targets = await self.generate_targets() message_list = defaultdict(list) for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up @@ -231,7 +239,7 @@ class Wiki: if highest_id is None or change["rcid"] > highest_id: # make sure that the highest_rc is really highest rcid but do allow other entries with potentially lesser rcids come after without breaking the cycle highest_id = change["rcid"] for combination, webhooks in targets.items(): - message = await rc_processor(self, change, categorize_events, ) + message = await rc_processor(self, change, categorize_events, combination, webhooks) async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, display_options: namedtuple("Settings", ["lang", "display"]), webhooks: list): @@ -239,12 +247,12 @@ async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, displ LinkParser = LinkParser() metadata = src.discord.DiscordMessageMetadata("POST", rev_id=change.get("revid", None), log_id=change.get("logid", None), page_id=change.get("pageid", None)) - context = Context(display_options, webhooks, client) + context = Context(display_options, webhooks, wiki.client) if ("actionhidden" in change or "suppressed" in change) and "suppressed" not in settings[ "ignored"]: # if event is hidden using suppression context.event = "suppressed" try: - discord_message: Optional[src.discord.DiscordMessage] = default_message("suppressed", formatter_hooks)(context, change) + discord_message: Optional[src.discord.DiscordMessage] = default_message("suppressed", display_options.display, formatter_hooks)(context, change) except NoFormatter: return except: @@ -290,16 +298,16 @@ async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, displ else: raise if identification_string in ( - "delete/delete", "delete/delete_redir") and AUTO_SUPPRESSION_ENABLED: # TODO Move it into a hook? + "delete/delete", "delete/delete_redir"): # TODO Move it into a hook? delete_messages(dict(pageid=change.get("pageid"))) - elif identification_string == "delete/event" and AUTO_SUPPRESSION_ENABLED: + elif identification_string == "delete/event": logparams = change.get('logparams', {"ids": []}) if settings["appearance"]["mode"] == "embed": redact_messages(logparams.get("ids", []), 1, logparams.get("new", {})) else: for logid in logparams.get("ids", []): delete_messages(dict(logid=logid)) - elif identification_string == "delete/revision" and AUTO_SUPPRESSION_ENABLED: + elif identification_string == "delete/revision": logparams = change.get('logparams', {"ids": []}) if settings["appearance"]["mode"] == "embed": redact_messages(logparams.get("ids", []), 0, logparams.get("new", {}))