2021-05-30 11:23:48 +00:00
|
|
|
from __future__ import annotations
|
2024-07-13 12:31:01 +00:00
|
|
|
|
2024-07-18 22:46:50 +00:00
|
|
|
import inspect
|
2024-07-13 12:31:01 +00:00
|
|
|
import json
|
|
|
|
import time
|
2024-08-08 12:40:45 +00:00
|
|
|
from collections import OrderedDict
|
2024-07-13 12:31:01 +00:00
|
|
|
from typing import TYPE_CHECKING, Callable, Optional
|
2021-05-30 11:23:48 +00:00
|
|
|
from urllib.parse import urlparse, urlunparse
|
2022-07-26 13:48:44 +00:00
|
|
|
import logging
|
2022-06-22 17:17:20 +00:00
|
|
|
import asyncpg
|
2023-08-14 16:29:33 +00:00
|
|
|
import asyncio
|
2024-07-16 18:51:45 +00:00
|
|
|
|
2024-09-30 15:00:49 +00:00
|
|
|
from src.misc import flatten_lists
|
2024-07-18 14:30:30 +00:00
|
|
|
from src.discord.queue import messagequeue
|
2022-09-29 21:10:36 +00:00
|
|
|
from src.exceptions import NoDomain
|
2021-05-30 11:23:48 +00:00
|
|
|
from src.config import settings
|
|
|
|
from src.domain import Domain
|
2021-05-30 13:31:51 +00:00
|
|
|
from src.irc_feed import AioIRCCat
|
2024-07-25 10:16:03 +00:00
|
|
|
from src.queue_handler import dbmanager
|
2024-03-23 14:14:27 +00:00
|
|
|
from io import StringIO
|
|
|
|
from contextlib import redirect_stdout
|
2022-11-04 14:59:26 +00:00
|
|
|
from src.wiki import Wiki
|
2024-07-13 11:19:54 +00:00
|
|
|
import tldextract
|
2021-05-30 11:23:48 +00:00
|
|
|
|
2024-08-08 12:40:45 +00:00
|
|
|
from src.statistics import Log, LogType
|
|
|
|
|
2022-07-26 13:48:44 +00:00
|
|
|
logger = logging.getLogger("rcgcdb.domain_manager")
|
2021-05-30 11:23:48 +00:00
|
|
|
|
2022-11-25 16:24:44 +00:00
|
|
|
|
|
|
|
def safe_type_for_id(unsafe_id: str, target: Callable):
|
2023-08-15 09:08:56 +00:00
|
|
|
if unsafe_id == "null" or unsafe_id == "": # TODO Verify if correct
|
2022-11-25 16:24:44 +00:00
|
|
|
return None
|
|
|
|
return target(unsafe_id)
|
|
|
|
|
|
|
|
|
2021-05-30 11:23:48 +00:00
|
|
|
class DomainManager:
|
|
|
|
def __init__(self):
|
|
|
|
self.domains: dict[str, Domain] = {}
|
2024-07-13 12:31:01 +00:00
|
|
|
self.start_time: float = time.time()
|
2024-09-09 20:38:21 +00:00
|
|
|
self.task_store: dict[str, asyncio.Task] = {}
|
|
|
|
|
|
|
|
async def task_tracker(self, one_update=False):
|
|
|
|
"""Task tracer is supposed to keep track of all tasks spawned for /debug endpoint to show them.
|
|
|
|
It replaces asyncio.all_tasks() in order to show even tasks that have finished running."""
|
|
|
|
while 1:
|
|
|
|
for task in asyncio.all_tasks():
|
|
|
|
self.task_store[task.get_name()] = task
|
|
|
|
if one_update:
|
|
|
|
return
|
|
|
|
await asyncio.sleep(3600.0)
|
2021-05-30 11:23:48 +00:00
|
|
|
|
2024-07-22 21:49:50 +00:00
|
|
|
@staticmethod
|
|
|
|
def chunkstring(payload, length):
|
|
|
|
return (payload[0 + i:length + i] for i in range(0, len(payload), length))
|
|
|
|
|
2022-06-22 17:17:20 +00:00
|
|
|
async def webhook_update(self, connection: asyncpg.Connection, pid: int, channel: str, payload: str):
|
2024-09-10 08:06:56 +00:00
|
|
|
def result_handler(result: asyncio.Task):
|
|
|
|
if result.cancelled():
|
|
|
|
return "cancelled"
|
2024-09-19 12:36:02 +00:00
|
|
|
try:
|
|
|
|
result = result.result()
|
|
|
|
except Exception as e:
|
|
|
|
logger.exception("Re-raised exception from task on debug for task")
|
|
|
|
result = e
|
2024-07-18 22:46:50 +00:00
|
|
|
if result is None:
|
|
|
|
return None
|
|
|
|
if isinstance(result, Exception):
|
|
|
|
return str(result)
|
|
|
|
if inspect.iscoroutinefunction(result):
|
|
|
|
return result.__name__
|
|
|
|
return str(type(result))
|
2022-06-22 17:17:20 +00:00
|
|
|
"""Callback for database listener. Used to update our domain cache on changes such as new wikis or removed wikis"""
|
|
|
|
split_payload = payload.split(" ")
|
2022-11-04 14:59:26 +00:00
|
|
|
logger.debug("Received pub/sub message: {}".format(payload))
|
2022-06-22 17:17:20 +00:00
|
|
|
if len(split_payload) < 2:
|
|
|
|
raise ValueError("Improper pub/sub message! Pub/sub payload: {}".format(payload))
|
|
|
|
if split_payload[0] == "ADD":
|
2022-11-25 16:24:44 +00:00
|
|
|
await self.new_wiki(Wiki(split_payload[1], safe_type_for_id(split_payload[2], int), safe_type_for_id(split_payload[3], str)))
|
2022-06-22 17:17:20 +00:00
|
|
|
elif split_payload[0] == "REMOVE":
|
2024-09-30 15:00:49 +00:00
|
|
|
webhook_diff = set()
|
2022-07-24 20:02:25 +00:00
|
|
|
try:
|
2023-08-13 13:57:05 +00:00
|
|
|
results = await connection.fetch("SELECT * FROM rcgcdb WHERE wiki = $1;", split_payload[1])
|
2022-11-04 14:59:26 +00:00
|
|
|
if len(results) > 0: # If there are still webhooks for this wiki - just update its targets
|
2024-09-30 15:00:49 +00:00
|
|
|
wiki_obj: Wiki = self.return_domain(self.get_domain(split_payload[1])).get_wiki(split_payload[1], None)
|
|
|
|
if wiki_obj is not None:
|
|
|
|
flattened_set_of_webhooks = set(flatten_lists(wiki_obj.rc_targets.values())).union(set(flatten_lists(wiki_obj.discussion_targets.values())))
|
|
|
|
await wiki_obj.update_targets()
|
|
|
|
webhook_diff = flattened_set_of_webhooks - set(flatten_lists(wiki_obj.rc_targets.values())).union(set(flatten_lists(wiki_obj.discussion_targets.values())))
|
2024-02-25 13:30:33 +00:00
|
|
|
else:
|
|
|
|
self.remove_wiki(split_payload[1])
|
2024-09-30 15:00:49 +00:00
|
|
|
for removed_webhook in webhook_diff:
|
|
|
|
messagequeue.nuke_all_messages_to_webhook(removed_webhook)
|
2022-07-24 20:02:25 +00:00
|
|
|
except asyncpg.IdleSessionTimeoutError:
|
|
|
|
logger.error("Couldn't check amount of webhooks with {} wiki!".format(split_payload[1]))
|
|
|
|
return
|
2022-11-25 16:24:44 +00:00
|
|
|
elif split_payload[0] == "UPDATE":
|
|
|
|
await self.return_domain(self.get_domain(split_payload[1])).get_wiki(split_payload[1]).update_targets()
|
2024-02-25 13:23:22 +00:00
|
|
|
logger.info("Successfully force updated information about {}".format(split_payload[1]))
|
2024-09-10 09:50:39 +00:00
|
|
|
elif split_payload[0] == "ERASE" and len(split_payload) > 2:
|
|
|
|
logger.info(f"Received {' '.join(split_payload)} on pub/sub.")
|
|
|
|
domain = self.return_domain(self.get_domain(split_payload[1]))
|
|
|
|
wiki = domain.get_wiki(split_payload[1])
|
|
|
|
reason = " ".join(split_payload[2:])
|
|
|
|
if wiki is not None:
|
|
|
|
logger.debug("Wiki specified in pub/sub message has been found. Erasing the wiki from DB.")
|
|
|
|
await wiki.remove_wiki_from_db(reason, send_reason=True if reason else False)
|
2023-05-06 12:29:27 +00:00
|
|
|
elif split_payload[0] == "DEBUG":
|
2024-07-25 14:32:40 +00:00
|
|
|
asyncio.current_task().set_name("webhook_update")
|
2023-11-25 09:12:18 +00:00
|
|
|
if split_payload[1] == "INFO":
|
|
|
|
logger.info(self.domains)
|
|
|
|
for name, domain in self.domains.items():
|
2023-11-25 14:11:11 +00:00
|
|
|
logger.info("RCGCDBDEBUG {name} - Status: {status}, exception: {exception}, irc: {irc}".format(name=name, status=domain.task.done(),
|
2023-11-25 09:12:18 +00:00
|
|
|
exception=domain.task.print_stack(), irc=str(domain.irc)))
|
|
|
|
for item in asyncio.all_tasks(): # Get discussions task
|
|
|
|
if item.get_name() == "discussions":
|
|
|
|
logger.info(item)
|
|
|
|
if self.check_for_domain(self.get_domain(split_payload[1])):
|
|
|
|
logger.info(str(self.return_domain(self.get_domain(split_payload[1])).get_wiki(split_payload[1])))
|
2023-11-25 14:11:11 +00:00
|
|
|
elif split_payload[1] == "EXEC":
|
2024-03-23 14:14:27 +00:00
|
|
|
f = StringIO()
|
|
|
|
with redirect_stdout(f):
|
|
|
|
exec(" ".join(split_payload[2:]))
|
|
|
|
logger.info(f.getvalue())
|
2023-11-25 14:11:11 +00:00
|
|
|
elif split_payload[1] == "WIKI" and len(split_payload) > 2:
|
|
|
|
domain = self.return_domain(self.get_domain(split_payload[2]))
|
2024-03-23 14:14:27 +00:00
|
|
|
logger.info("RCGCDBDEBUG Domain information for {}: {}".format(domain.name, str(domain)))
|
|
|
|
logger.info("RCGCDBDEBUG Wiki information for {}: {}".format(split_payload[2], domain.get_wiki(split_payload[2])))
|
2024-07-22 21:49:50 +00:00
|
|
|
elif split_payload[1] == "DUMP" and len(split_payload) > 2:
|
2024-07-13 12:31:01 +00:00
|
|
|
# Dump debug info JSON object into postgres pubsub channel
|
2024-09-09 23:06:44 +00:00
|
|
|
try:
|
|
|
|
logger.info(f"Received {' '.join(split_payload)} on pub/sub. Preparing JSON with data...")
|
|
|
|
json_object = {"uptime": time.time() - self.start_time, "domain_count": len(self.domains),
|
|
|
|
"wiki_count": sum([len(x.wikis) for x in self.domains.values()]),
|
|
|
|
"tasks": {},
|
|
|
|
"domains": {},
|
|
|
|
"queued_messages": [],
|
|
|
|
"awaiting_DB_queries": dbmanager.json(),
|
|
|
|
"total_discord_messages_sent": sum([x.total_discord_messages_sent for x in self.domains.values()])
|
|
|
|
}
|
|
|
|
await self.task_tracker(one_update=True)
|
|
|
|
for task_name, task in self.task_store.items():
|
2024-09-10 08:06:56 +00:00
|
|
|
json_object["tasks"][task_name] = {"done": task.done(), "result": result_handler(task) if task.done() else None}
|
2024-09-09 23:06:44 +00:00
|
|
|
for name, domain in self.domains.items():
|
|
|
|
json_object["domains"][name] = domain.json()
|
|
|
|
for message in messagequeue._queue:
|
|
|
|
json_object["queued_messages"].append({"metadata": str(message.discord_message.metadata), "url": message.wiki.script_url if hasattr(message, "wiki") else "#######"})
|
|
|
|
req_id: str = split_payload[2]
|
|
|
|
json_string: str = json.dumps(json_object)
|
|
|
|
for json_part in self.chunkstring(json_string, 7950):
|
|
|
|
await connection.execute("select pg_notify('debugresponse', 'DUMP CHUNK ' || $1 || ' ' || $2);", req_id, json_part)
|
|
|
|
await connection.execute("select pg_notify('debugresponse', 'DUMP END ' || $1);", req_id)
|
|
|
|
except:
|
|
|
|
logger.exception("DEBUG DUMP error")
|
2024-07-22 21:49:50 +00:00
|
|
|
elif split_payload[1] == "SITE" and len(split_payload) > 3:
|
2024-07-22 14:25:25 +00:00
|
|
|
logger.info(f"Received {' '.join(split_payload)} on pub/sub. Preparing JSON with data...")
|
2024-07-22 21:49:50 +00:00
|
|
|
req_id: str = split_payload[2]
|
2024-07-21 23:22:27 +00:00
|
|
|
domain = self.return_domain(self.get_domain(split_payload[3]))
|
|
|
|
wiki = domain.get_wiki(split_payload[3])
|
2024-07-20 23:57:01 +00:00
|
|
|
if wiki is not None:
|
2024-07-22 14:25:25 +00:00
|
|
|
logger.debug("Wiki specified in pub/sub message has been found. Preparing and sending dump.")
|
2024-08-08 12:40:45 +00:00
|
|
|
wiki_json = wiki.json()
|
|
|
|
try:
|
|
|
|
wiki.statistics.update(Log(type=LogType.SCAN_REASON, title="Debug request for the wiki"))
|
|
|
|
params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges",
|
|
|
|
"meta": "siteinfo", "utf8": 1, "rcshow": "!bot",
|
|
|
|
"rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user|userid",
|
|
|
|
"rclimit": 500, "rctype": "edit|new|log|categorize", "siprop": "namespaces|general"})
|
|
|
|
wiki_json["wiki_rc"] = await wiki.api_request(params=params, timeout=5)
|
|
|
|
except:
|
|
|
|
wiki_json["wiki_rc"] = None
|
2024-09-09 20:38:21 +00:00
|
|
|
json_string: str = json.dumps(wiki_json)
|
2024-07-22 21:49:50 +00:00
|
|
|
for json_part in self.chunkstring(json_string, 7950):
|
2024-07-24 16:54:23 +00:00
|
|
|
await connection.execute("select pg_notify('debugresponse', 'SITE CHUNK ' || $1 || ' ' || $2);",
|
2024-08-08 12:40:45 +00:00
|
|
|
req_id, json_part)
|
2024-07-22 21:49:50 +00:00
|
|
|
await connection.execute("select pg_notify('debugresponse', 'SITE END ' || $1);",
|
|
|
|
req_id)
|
2022-06-22 17:17:20 +00:00
|
|
|
else:
|
2024-07-22 14:25:25 +00:00
|
|
|
logger.error("Unknown pub/sub command! Payload: {}".format(payload))
|
2022-06-22 17:17:20 +00:00
|
|
|
|
2024-07-13 12:31:01 +00:00
|
|
|
|
2021-07-09 12:55:23 +00:00
|
|
|
async def new_wiki(self, wiki: Wiki):
|
2021-05-30 11:23:48 +00:00
|
|
|
"""Finds a domain for the wiki and adds a wiki to the domain object.
|
|
|
|
|
|
|
|
:parameter wiki - Wiki object to be added"""
|
|
|
|
wiki_domain = self.get_domain(wiki.script_url)
|
|
|
|
try:
|
2022-10-09 12:10:08 +00:00
|
|
|
await self.domains[wiki_domain].add_wiki(wiki)
|
2021-05-30 11:23:48 +00:00
|
|
|
except KeyError:
|
2021-07-09 12:55:23 +00:00
|
|
|
new_domain = await self.new_domain(wiki_domain)
|
2024-04-01 19:32:08 +00:00
|
|
|
new_domain.run_domain()
|
2022-10-09 12:10:08 +00:00
|
|
|
await new_domain.add_wiki(wiki)
|
2021-07-09 12:55:23 +00:00
|
|
|
|
2022-11-04 14:59:26 +00:00
|
|
|
def remove_domain(self, domain: Domain):
|
2023-08-14 15:03:03 +00:00
|
|
|
logger.debug("Destroying domain and removing it from domain directory")
|
2022-11-04 14:59:26 +00:00
|
|
|
domain.destroy()
|
|
|
|
del self.domains[domain.name]
|
2022-08-09 14:08:30 +00:00
|
|
|
|
2021-07-09 12:55:23 +00:00
|
|
|
def remove_wiki(self, script_url: str):
|
|
|
|
wiki_domain = self.get_domain(script_url)
|
|
|
|
try:
|
|
|
|
domain = self.domains[wiki_domain]
|
|
|
|
except KeyError:
|
|
|
|
raise NoDomain
|
|
|
|
else:
|
|
|
|
domain.remove_wiki(script_url)
|
2024-02-25 13:23:22 +00:00
|
|
|
logger.debug(f"Removed a wiki {script_url} from {domain.name}")
|
2022-08-09 14:08:30 +00:00
|
|
|
if len(domain) == 0:
|
|
|
|
self.remove_domain(domain)
|
2024-02-25 13:23:22 +00:00
|
|
|
logger.debug(f"Removed domain {domain.name} due to removal of last queued wiki in its dictionary")
|
2021-05-30 11:23:48 +00:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_domain(url: str) -> str:
|
|
|
|
"""Returns a domain for given URL (for example fandom.com, wikipedia.org)"""
|
2024-07-13 11:19:54 +00:00
|
|
|
return tldextract.extract(url).registered_domain
|
2021-05-30 11:23:48 +00:00
|
|
|
|
2022-11-07 14:46:15 +00:00
|
|
|
def check_for_domain(self, domain: str):
|
|
|
|
return domain in self.domains
|
|
|
|
|
2022-08-09 10:57:40 +00:00
|
|
|
def return_domain(self, domain: str):
|
|
|
|
return self.domains[domain]
|
|
|
|
|
2021-05-30 13:31:51 +00:00
|
|
|
async def new_domain(self, name: str) -> Domain:
|
2023-08-15 10:20:38 +00:00
|
|
|
logger.debug("Creating new domain object for {}".format(name))
|
2021-05-30 13:31:51 +00:00
|
|
|
domain_object = Domain(name)
|
|
|
|
for irc_server in settings["irc_servers"].keys():
|
|
|
|
if name in settings["irc_servers"][irc_server]["domains"]:
|
2022-10-09 12:10:08 +00:00
|
|
|
domain_object.set_irc(AioIRCCat(settings["irc_servers"][irc_server]["irc_channel_mapping"], domain_object, None, None))
|
2022-11-04 14:59:26 +00:00
|
|
|
domain_object.irc.connect(settings["irc_servers"][irc_server]["irc_host"], settings["irc_servers"][irc_server]["irc_port"],
|
|
|
|
settings["irc_servers"][irc_server]["irc_name"], ircname=settings["irc_servers"][irc_server]["irc_nickname"])
|
2021-05-30 13:31:51 +00:00
|
|
|
break # Allow only one IRC for a domain
|
|
|
|
self.domains[name] = domain_object
|
2021-05-30 11:23:48 +00:00
|
|
|
return self.domains[name]
|
|
|
|
|
2022-08-09 10:57:40 +00:00
|
|
|
def run_all_domains(self):
|
2021-06-05 11:12:23 +00:00
|
|
|
for domain in self.domains.values():
|
|
|
|
domain.run_domain()
|
2021-05-30 11:23:48 +00:00
|
|
|
|
2022-06-16 21:12:10 +00:00
|
|
|
|
2021-05-30 11:23:48 +00:00
|
|
|
domains = DomainManager()
|