from __future__ import annotations import inspect import json import time from collections import OrderedDict from typing import TYPE_CHECKING, Callable, Optional from urllib.parse import urlparse, urlunparse import logging import asyncpg import asyncio from src.discord.queue import messagequeue from src.exceptions import NoDomain from src.config import settings from src.domain import Domain from src.irc_feed import AioIRCCat from src.queue_handler import dbmanager from io import StringIO from contextlib import redirect_stdout from src.wiki import Wiki import tldextract from src.statistics import Log, LogType logger = logging.getLogger("rcgcdb.domain_manager") def safe_type_for_id(unsafe_id: str, target: Callable): if unsafe_id == "null" or unsafe_id == "": # TODO Verify if correct return None return target(unsafe_id) class DomainManager: def __init__(self): self.domains: dict[str, Domain] = {} self.start_time: float = time.time() self.task_store: dict[str, asyncio.Task] = {} async def task_tracker(self, one_update=False): """Task tracer is supposed to keep track of all tasks spawned for /debug endpoint to show them. It replaces asyncio.all_tasks() in order to show even tasks that have finished running.""" while 1: for task in asyncio.all_tasks(): self.task_store[task.get_name()] = task if one_update: return await asyncio.sleep(3600.0) @staticmethod def chunkstring(payload, length): return (payload[0 + i:length + i] for i in range(0, len(payload), length)) async def webhook_update(self, connection: asyncpg.Connection, pid: int, channel: str, payload: str): def result_handler(result: asyncio.Task): if result.cancelled(): return "cancelled" try: result = result.result() except Exception as e: logger.exception("Re-raised exception from task on debug for task") result = e if result is None: return None if isinstance(result, Exception): return str(result) if inspect.iscoroutinefunction(result): return result.__name__ return str(type(result)) """Callback for database listener. Used to update our domain cache on changes such as new wikis or removed wikis""" split_payload = payload.split(" ") logger.debug("Received pub/sub message: {}".format(payload)) if len(split_payload) < 2: raise ValueError("Improper pub/sub message! Pub/sub payload: {}".format(payload)) if split_payload[0] == "ADD": await self.new_wiki(Wiki(split_payload[1], safe_type_for_id(split_payload[2], int), safe_type_for_id(split_payload[3], str))) elif split_payload[0] == "REMOVE": try: results = await connection.fetch("SELECT * FROM rcgcdb WHERE wiki = $1;", split_payload[1]) if len(results) > 0: # If there are still webhooks for this wiki - just update its targets await self.return_domain(self.get_domain(split_payload[1])).get_wiki(split_payload[1]).update_targets() else: self.remove_wiki(split_payload[1]) except asyncpg.IdleSessionTimeoutError: logger.error("Couldn't check amount of webhooks with {} wiki!".format(split_payload[1])) return elif split_payload[0] == "UPDATE": await self.return_domain(self.get_domain(split_payload[1])).get_wiki(split_payload[1]).update_targets() logger.info("Successfully force updated information about {}".format(split_payload[1])) elif split_payload[0] == "ERASE" and len(split_payload) > 2: logger.info(f"Received {' '.join(split_payload)} on pub/sub.") domain = self.return_domain(self.get_domain(split_payload[1])) wiki = domain.get_wiki(split_payload[1]) reason = " ".join(split_payload[2:]) if wiki is not None: logger.debug("Wiki specified in pub/sub message has been found. Erasing the wiki from DB.") await wiki.remove_wiki_from_db(reason, send_reason=True if reason else False) elif split_payload[0] == "DEBUG": asyncio.current_task().set_name("webhook_update") if split_payload[1] == "INFO": logger.info(self.domains) for name, domain in self.domains.items(): logger.info("RCGCDBDEBUG {name} - Status: {status}, exception: {exception}, irc: {irc}".format(name=name, status=domain.task.done(), exception=domain.task.print_stack(), irc=str(domain.irc))) for item in asyncio.all_tasks(): # Get discussions task if item.get_name() == "discussions": logger.info(item) if self.check_for_domain(self.get_domain(split_payload[1])): logger.info(str(self.return_domain(self.get_domain(split_payload[1])).get_wiki(split_payload[1]))) elif split_payload[1] == "EXEC": f = StringIO() with redirect_stdout(f): exec(" ".join(split_payload[2:])) logger.info(f.getvalue()) elif split_payload[1] == "WIKI" and len(split_payload) > 2: domain = self.return_domain(self.get_domain(split_payload[2])) logger.info("RCGCDBDEBUG Domain information for {}: {}".format(domain.name, str(domain))) logger.info("RCGCDBDEBUG Wiki information for {}: {}".format(split_payload[2], domain.get_wiki(split_payload[2]))) elif split_payload[1] == "DUMP" and len(split_payload) > 2: # Dump debug info JSON object into postgres pubsub channel try: logger.info(f"Received {' '.join(split_payload)} on pub/sub. Preparing JSON with data...") json_object = {"uptime": time.time() - self.start_time, "domain_count": len(self.domains), "wiki_count": sum([len(x.wikis) for x in self.domains.values()]), "tasks": {}, "domains": {}, "queued_messages": [], "awaiting_DB_queries": dbmanager.json(), "total_discord_messages_sent": sum([x.total_discord_messages_sent for x in self.domains.values()]) } await self.task_tracker(one_update=True) for task_name, task in self.task_store.items(): json_object["tasks"][task_name] = {"done": task.done(), "result": result_handler(task) if task.done() else None} for name, domain in self.domains.items(): json_object["domains"][name] = domain.json() for message in messagequeue._queue: json_object["queued_messages"].append({"metadata": str(message.discord_message.metadata), "url": message.wiki.script_url if hasattr(message, "wiki") else "#######"}) req_id: str = split_payload[2] json_string: str = json.dumps(json_object) for json_part in self.chunkstring(json_string, 7950): await connection.execute("select pg_notify('debugresponse', 'DUMP CHUNK ' || $1 || ' ' || $2);", req_id, json_part) await connection.execute("select pg_notify('debugresponse', 'DUMP END ' || $1);", req_id) except: logger.exception("DEBUG DUMP error") elif split_payload[1] == "SITE" and len(split_payload) > 3: logger.info(f"Received {' '.join(split_payload)} on pub/sub. Preparing JSON with data...") req_id: str = split_payload[2] domain = self.return_domain(self.get_domain(split_payload[3])) wiki = domain.get_wiki(split_payload[3]) if wiki is not None: logger.debug("Wiki specified in pub/sub message has been found. Preparing and sending dump.") wiki_json = wiki.json() try: wiki.statistics.update(Log(type=LogType.SCAN_REASON, title="Debug request for the wiki")) params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges", "meta": "siteinfo", "utf8": 1, "rcshow": "!bot", "rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user|userid", "rclimit": 500, "rctype": "edit|new|log|categorize", "siprop": "namespaces|general"}) wiki_json["wiki_rc"] = await wiki.api_request(params=params, timeout=5) except: wiki_json["wiki_rc"] = None json_string: str = json.dumps(wiki_json) for json_part in self.chunkstring(json_string, 7950): await connection.execute("select pg_notify('debugresponse', 'SITE CHUNK ' || $1 || ' ' || $2);", req_id, json_part) await connection.execute("select pg_notify('debugresponse', 'SITE END ' || $1);", req_id) else: logger.error("Unknown pub/sub command! Payload: {}".format(payload)) async def new_wiki(self, wiki: Wiki): """Finds a domain for the wiki and adds a wiki to the domain object. :parameter wiki - Wiki object to be added""" wiki_domain = self.get_domain(wiki.script_url) try: await self.domains[wiki_domain].add_wiki(wiki) except KeyError: new_domain = await self.new_domain(wiki_domain) new_domain.run_domain() await new_domain.add_wiki(wiki) def remove_domain(self, domain: Domain): logger.debug("Destroying domain and removing it from domain directory") domain.destroy() del self.domains[domain.name] def remove_wiki(self, script_url: str): wiki_domain = self.get_domain(script_url) try: domain = self.domains[wiki_domain] except KeyError: raise NoDomain else: domain.remove_wiki(script_url) logger.debug(f"Removed a wiki {script_url} from {domain.name}") if len(domain) == 0: self.remove_domain(domain) logger.debug(f"Removed domain {domain.name} due to removal of last queued wiki in its dictionary") @staticmethod def get_domain(url: str) -> str: """Returns a domain for given URL (for example fandom.com, wikipedia.org)""" return tldextract.extract(url).registered_domain def check_for_domain(self, domain: str): return domain in self.domains def return_domain(self, domain: str): return self.domains[domain] async def new_domain(self, name: str) -> Domain: logger.debug("Creating new domain object for {}".format(name)) domain_object = Domain(name) for irc_server in settings["irc_servers"].keys(): if name in settings["irc_servers"][irc_server]["domains"]: domain_object.set_irc(AioIRCCat(settings["irc_servers"][irc_server]["irc_channel_mapping"], domain_object, None, None)) domain_object.irc.connect(settings["irc_servers"][irc_server]["irc_host"], settings["irc_servers"][irc_server]["irc_port"], settings["irc_servers"][irc_server]["irc_name"], ircname=settings["irc_servers"][irc_server]["irc_nickname"]) break # Allow only one IRC for a domain self.domains[name] = domain_object return self.domains[name] def run_all_domains(self): for domain in self.domains.values(): domain.run_domain() domains = DomainManager()