RcGcDb/src/domain_manager.py

166 lines
8 KiB
Python
Raw Normal View History

2021-05-30 11:23:48 +00:00
from __future__ import annotations
2024-07-13 12:31:01 +00:00
import inspect
2024-07-13 12:31:01 +00:00
import json
import time
from typing import TYPE_CHECKING, Callable, Optional
2021-05-30 11:23:48 +00:00
from urllib.parse import urlparse, urlunparse
2022-07-26 13:48:44 +00:00
import logging
2022-06-22 17:17:20 +00:00
import asyncpg
2023-08-14 16:29:33 +00:00
import asyncio
2024-07-16 18:51:45 +00:00
2024-07-18 14:30:30 +00:00
from src.discord.queue import messagequeue
2022-09-29 21:10:36 +00:00
from src.exceptions import NoDomain
2021-05-30 11:23:48 +00:00
from src.config import settings
from src.domain import Domain
2021-05-30 13:31:51 +00:00
from src.irc_feed import AioIRCCat
from io import StringIO
from contextlib import redirect_stdout
2022-11-04 14:59:26 +00:00
from src.wiki import Wiki
import tldextract
2021-05-30 11:23:48 +00:00
2022-07-26 13:48:44 +00:00
logger = logging.getLogger("rcgcdb.domain_manager")
2021-05-30 11:23:48 +00:00
2022-11-25 16:24:44 +00:00
def safe_type_for_id(unsafe_id: str, target: Callable):
if unsafe_id == "null" or unsafe_id == "": # TODO Verify if correct
2022-11-25 16:24:44 +00:00
return None
return target(unsafe_id)
2021-05-30 11:23:48 +00:00
class DomainManager:
def __init__(self):
self.domains: dict[str, Domain] = {}
2024-07-13 12:31:01 +00:00
self.start_time: float = time.time()
2021-05-30 11:23:48 +00:00
2022-06-22 17:17:20 +00:00
async def webhook_update(self, connection: asyncpg.Connection, pid: int, channel: str, payload: str):
def result_handler(result):
if result is None:
return None
if isinstance(result, Exception):
return str(result)
if inspect.iscoroutinefunction(result):
return result.__name__
return str(type(result))
2022-06-22 17:17:20 +00:00
"""Callback for database listener. Used to update our domain cache on changes such as new wikis or removed wikis"""
split_payload = payload.split(" ")
2022-11-04 14:59:26 +00:00
logger.debug("Received pub/sub message: {}".format(payload))
2022-06-22 17:17:20 +00:00
if len(split_payload) < 2:
raise ValueError("Improper pub/sub message! Pub/sub payload: {}".format(payload))
if split_payload[0] == "ADD":
2022-11-25 16:24:44 +00:00
await self.new_wiki(Wiki(split_payload[1], safe_type_for_id(split_payload[2], int), safe_type_for_id(split_payload[3], str)))
2022-06-22 17:17:20 +00:00
elif split_payload[0] == "REMOVE":
2022-07-24 20:02:25 +00:00
try:
results = await connection.fetch("SELECT * FROM rcgcdb WHERE wiki = $1;", split_payload[1])
2022-11-04 14:59:26 +00:00
if len(results) > 0: # If there are still webhooks for this wiki - just update its targets
await self.return_domain(self.get_domain(split_payload[1])).get_wiki(split_payload[1]).update_targets()
else:
self.remove_wiki(split_payload[1])
2022-07-24 20:02:25 +00:00
except asyncpg.IdleSessionTimeoutError:
logger.error("Couldn't check amount of webhooks with {} wiki!".format(split_payload[1]))
return
2022-11-25 16:24:44 +00:00
elif split_payload[0] == "UPDATE":
await self.return_domain(self.get_domain(split_payload[1])).get_wiki(split_payload[1]).update_targets()
2024-02-25 13:23:22 +00:00
logger.info("Successfully force updated information about {}".format(split_payload[1]))
2023-05-06 12:29:27 +00:00
elif split_payload[0] == "DEBUG":
2023-11-25 09:12:18 +00:00
if split_payload[1] == "INFO":
logger.info(self.domains)
for name, domain in self.domains.items():
logger.info("RCGCDBDEBUG {name} - Status: {status}, exception: {exception}, irc: {irc}".format(name=name, status=domain.task.done(),
2023-11-25 09:12:18 +00:00
exception=domain.task.print_stack(), irc=str(domain.irc)))
for item in asyncio.all_tasks(): # Get discussions task
if item.get_name() == "discussions":
logger.info(item)
if self.check_for_domain(self.get_domain(split_payload[1])):
logger.info(str(self.return_domain(self.get_domain(split_payload[1])).get_wiki(split_payload[1])))
elif split_payload[1] == "EXEC":
f = StringIO()
with redirect_stdout(f):
exec(" ".join(split_payload[2:]))
logger.info(f.getvalue())
elif split_payload[1] == "WIKI" and len(split_payload) > 2:
domain = self.return_domain(self.get_domain(split_payload[2]))
logger.info("RCGCDBDEBUG Domain information for {}: {}".format(domain.name, str(domain)))
logger.info("RCGCDBDEBUG Wiki information for {}: {}".format(split_payload[2], domain.get_wiki(split_payload[2])))
2024-07-13 12:31:01 +00:00
elif split_payload[1] == "DUMP":
# Dump debug info JSON object into postgres pubsub channel
json_object = {"uptime": time.time() - self.start_time, "domain_count": len(self.domains),
"wiki_count": sum([len(x.wikis) for x in self.domains.values()]),
"tasks": {},
"domains": {},
2024-07-16 18:51:45 +00:00
"queued_messages": [],
2024-07-13 12:31:01 +00:00
"total_discord_messages_sent": sum([x.total_discord_messages_sent for x in self.domains.values()])
}
for task in asyncio.all_tasks():
2024-07-18 22:52:02 +00:00
json_object["tasks"][task.get_name()] = {"done": task.done(), "result": result_handler(task.result()) if task.done() else None}
2024-07-13 12:31:01 +00:00
for name, domain in self.domains.items():
2024-07-16 18:51:45 +00:00
json_object["domains"][name] = domain.json()
for message in messagequeue._queue:
json_object["queued_messages"].append({"metadata": str(message.discord_message.metadata), "url": message.wiki.script_url})
await connection.execute("select pg_notify('debugresponse', $1);", json.dumps(json_object))
2024-07-16 18:51:45 +00:00
elif split_payload[1] == "RESPONSE":
return
2022-06-22 17:17:20 +00:00
else:
raise ValueError("Unknown pub/sub command! Payload: {}".format(payload))
2024-07-13 12:31:01 +00:00
2021-07-09 12:55:23 +00:00
async def new_wiki(self, wiki: Wiki):
2021-05-30 11:23:48 +00:00
"""Finds a domain for the wiki and adds a wiki to the domain object.
:parameter wiki - Wiki object to be added"""
wiki_domain = self.get_domain(wiki.script_url)
try:
2022-10-09 12:10:08 +00:00
await self.domains[wiki_domain].add_wiki(wiki)
2021-05-30 11:23:48 +00:00
except KeyError:
2021-07-09 12:55:23 +00:00
new_domain = await self.new_domain(wiki_domain)
new_domain.run_domain()
2022-10-09 12:10:08 +00:00
await new_domain.add_wiki(wiki)
2021-07-09 12:55:23 +00:00
2022-11-04 14:59:26 +00:00
def remove_domain(self, domain: Domain):
2023-08-14 15:03:03 +00:00
logger.debug("Destroying domain and removing it from domain directory")
2022-11-04 14:59:26 +00:00
domain.destroy()
del self.domains[domain.name]
2021-07-09 12:55:23 +00:00
def remove_wiki(self, script_url: str):
wiki_domain = self.get_domain(script_url)
try:
domain = self.domains[wiki_domain]
except KeyError:
raise NoDomain
else:
domain.remove_wiki(script_url)
2024-02-25 13:23:22 +00:00
logger.debug(f"Removed a wiki {script_url} from {domain.name}")
if len(domain) == 0:
self.remove_domain(domain)
2024-02-25 13:23:22 +00:00
logger.debug(f"Removed domain {domain.name} due to removal of last queued wiki in its dictionary")
2021-05-30 11:23:48 +00:00
@staticmethod
def get_domain(url: str) -> str:
"""Returns a domain for given URL (for example fandom.com, wikipedia.org)"""
return tldextract.extract(url).registered_domain
2021-05-30 11:23:48 +00:00
2022-11-07 14:46:15 +00:00
def check_for_domain(self, domain: str):
return domain in self.domains
def return_domain(self, domain: str):
return self.domains[domain]
2021-05-30 13:31:51 +00:00
async def new_domain(self, name: str) -> Domain:
logger.debug("Creating new domain object for {}".format(name))
2021-05-30 13:31:51 +00:00
domain_object = Domain(name)
for irc_server in settings["irc_servers"].keys():
if name in settings["irc_servers"][irc_server]["domains"]:
2022-10-09 12:10:08 +00:00
domain_object.set_irc(AioIRCCat(settings["irc_servers"][irc_server]["irc_channel_mapping"], domain_object, None, None))
2022-11-04 14:59:26 +00:00
domain_object.irc.connect(settings["irc_servers"][irc_server]["irc_host"], settings["irc_servers"][irc_server]["irc_port"],
settings["irc_servers"][irc_server]["irc_name"], ircname=settings["irc_servers"][irc_server]["irc_nickname"])
2021-05-30 13:31:51 +00:00
break # Allow only one IRC for a domain
self.domains[name] = domain_object
2021-05-30 11:23:48 +00:00
return self.domains[name]
def run_all_domains(self):
for domain in self.domains.values():
domain.run_domain()
2021-05-30 11:23:48 +00:00
2021-05-30 11:23:48 +00:00
domains = DomainManager()