diff --git a/.gitignore b/.gitignore index 07a55a8..f510280 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ logs/ *.db *.code-workspace *.bat -source-file-list.txt \ No newline at end of file +source-file-list.txt +*.po~ \ No newline at end of file diff --git a/scripts/trigger.psql b/scripts/trigger.psql index 72dd3c3..aa4f85b 100644 --- a/scripts/trigger.psql +++ b/scripts/trigger.psql @@ -3,10 +3,11 @@ $BODY$ begin IF (TG_OP = 'DELETE') THEN perform pg_notify('webhookupdates', concat('REMOVE ', old.wiki)); + return old; ELSIF (TG_OP = 'INSERT') then perform pg_notify('webhookupdates', concat('ADD ', new.wiki)); + return new; end if; - return new; end; $BODY$ language plpgsql; diff --git a/src/bot.py b/src/bot.py index 8239710..30f9b5a 100644 --- a/src/bot.py +++ b/src/bot.py @@ -9,6 +9,8 @@ from collections import defaultdict, namedtuple from typing import Generator import importlib from contextlib import asynccontextmanager + +from discussions import Discussions from src.discord.queue import messagequeue from src.argparser import command_line_args from src.config import settings @@ -213,6 +215,7 @@ def shutdown(loop, signal=None): except asyncio.CancelledError: loop.stop() logger.info("Script has shut down due to signal {}.".format(signal)) + logging.shutdown() # sys.exit(0) @@ -239,7 +242,7 @@ async def main_loop(): await populate_wikis() # START LISTENER CONNECTION domains.run_all_domains() - # We are here + discussions = Discussions(domains.return_domain("fandom.com") if domains.check_for_domain("fandom.com") else None) try: signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) for s in signals: @@ -251,7 +254,8 @@ async def main_loop(): # loop.set_exception_handler(global_exception_handler) try: main_tasks = {"message_sender": asyncio.create_task(message_sender()), - "database_updates": asyncio.create_task(dbmanager.update_db())} # "discussion_handler": asyncio.create_task(discussion_handler()), + "database_updates": asyncio.create_task(dbmanager.update_db()), + "fandom_discussions": asyncio.create_task(discussions.tick_discussions())} # "discussion_handler": asyncio.create_task(discussion_handler()), main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"]) main_tasks["database_updates_shield"] = asyncio.shield(main_tasks["database_updates"]) await asyncio.gather(main_tasks["message_sender"], main_tasks["database_updates"]) diff --git a/src/discord/message.py b/src/discord/message.py index 635c178..ec3c614 100644 --- a/src/discord/message.py +++ b/src/discord/message.py @@ -20,6 +20,8 @@ from collections import defaultdict from typing import Optional, TYPE_CHECKING +from src.exceptions import EmbedListFull + if TYPE_CHECKING: from wiki import Wiki @@ -157,12 +159,6 @@ class DiscordMessage: return self.webhook_object["content"] -class DiscordMessageRaw(DiscordMessage): - def __init__(self, content: dict, webhook_url: str): - self.webhook_object = content - self.webhook_url = webhook_url - - class MessageTooBig(BaseException): pass diff --git a/src/discussions.py b/src/discussions.py index dbdf53b..cd77f92 100644 --- a/src/discussions.py +++ b/src/discussions.py @@ -1,29 +1,46 @@ +from __future__ import annotations + +import asyncio +import logging +import typing from collections import OrderedDict from src.config import settings +from typing import TYPE_CHECKING, Optional -import src.wiki +if TYPE_CHECKING: + from src.domain import Domain + from src.wiki import Wiki +logger = logging.getLogger("rcgcdb.discussions") -class Discussions(): - def __init__(self, wikis: OrderedDict[str, src.wiki.Wiki]): - self.wikis = wikis +class Discussions: + def __init__(self, domain): + self.domain_object: Optional[Domain] = domain async def tick_discussions(self): + if self.domain_object is None: + raise asyncio.CancelledError("fandom.com is not a domain we have any wikis for.") + while True: try: - wiki_url = self.irc.updated_wikis.pop() + wiki_url = self.domain_object.irc.updated_discussions.pop() except KeyError: break - try: - wiki = self.wikis[wiki_url] - except KeyError: + wiki = self.domain_object.get_wiki(wiki_url) + if wiki is None: logger.error(f"Could not find a wiki with URL {wiki_url} in the domain group!") continue await self.run_discussion_scan(wiki) - for wiki in self.wikis.values(): + + for wiki in self.filter_and_sort(): if wiki.statistics.last_checked_discussion < settings.get("irc_overtime", 3600): await self.run_discussion_scan(wiki) else: return # Recently scanned wikis will get at the end of the self.wikis, so we assume what is first hasn't been checked for a while - async def add_wiki(self, wiki): \ No newline at end of file + def filter_and_sort(self) -> list[Wiki]: + """Filters and sorts wikis from domain to return only the ones that aren't -1 and sorts them from oldest in checking to newest""" + # return OrderedDict(sorted(filter(lambda wiki: wiki[1].discussion_id != -1, self.domain_object.wikis.items()), key=lambda wiki: wiki[1].statistics.last_checked_discussion)) + return sorted(filter(lambda wiki: wiki.discussion_id != -1, self.domain_object.wikis.values()), key=lambda wiki: wiki.statistics.last_checked_discussion) + + async def run_discussion_scan(self, wiki: Wiki): diff --git a/src/domain.py b/src/domain.py index 29f704c..6acb71e 100644 --- a/src/domain.py +++ b/src/domain.py @@ -2,12 +2,16 @@ from __future__ import annotations import asyncio import logging from collections import OrderedDict -from src.config import settings from typing import TYPE_CHECKING, Optional -from src.argparser import command_line_args from functools import cache + +import aiohttp + +from discord.message import DiscordMessage +from src.config import settings +from src.argparser import command_line_args # from src.discussions import Discussions -from statistics import Log, LogType +from src.statistics import Log, LogType logger = logging.getLogger("rcgcdb.domain") @@ -22,6 +26,7 @@ class Domain: self.task: Optional[asyncio.Task] = None self.wikis: OrderedDict[str, src.wiki.Wiki] = OrderedDict() self.irc: Optional[src.irc_feed.AioIRCCat] = None + self.failures = 0 # self.discussions_handler: Optional[Discussions] = Discussions(self.wikis) if name == "fandom.com" else None def __iter__(self): @@ -98,29 +103,32 @@ class Domain: await self.run_wiki_scan(wiki) while True: # Iterate until hitting return, we don't have to iterate using for since we are sending wiki to the end anyways wiki: src.wiki.Wiki = next(iter(self.wikis.values())) - if (wiki.statistics.last_checked_rc or 0) < settings.get("irc_overtime", 3600): + if (wiki.statistics.last_checked_rc or 0) < settings.get("irc_overtime", 3600): # TODO This makes no sense, comparing last_checked_rc to nothing await self.run_wiki_scan(wiki) else: return # Recently scanned wikis will get at the end of the self.wikis, so we assume what is first hasn't been checked for a while - except: + except Exception as e: if command_line_args.debug: logger.exception("IRC scheduler task for domain {} failed!".format(self.name)) else: - # TODO Write - pass - + await self.send_exception_to_monitoring(e) + self.failures += 1 + if self.failures > 2: + raise asyncio.exceptions.CancelledError async def regular_scheduler(self): try: while True: await asyncio.sleep(self.calculate_sleep_time(len(self))) # To make sure that we don't spam domains with one wiki every second we calculate a sane timeout for domains with few wikis await self.run_wiki_scan(next(iter(self.wikis.values()))) - except: + except Exception as e: if command_line_args.debug: logger.exception("IRC task for domain {} failed!".format(self.name)) else: - # TODO Write - pass + await self.send_exception_to_monitoring(e) + self.failures += 1 + if self.failures > 2: + raise asyncio.exceptions.CancelledError @cache def calculate_sleep_time(self, queue_length: int): @@ -143,3 +151,20 @@ class Domain: except asyncio.exceptions.CancelledError: for wiki in self.wikis.values(): await wiki.session.close() + + async def send_exception_to_monitoring(self, ex: Exception): + discord_message = DiscordMessage("embed", "generic", [""]) + discord_message["title"] = "Domain scheduler exception for {} (recovered)".format(self.name) + discord_message["content"] = str(ex)[0:1995] + discord_message.add_field("Failure count", self.failures) + discord_message.finish_embed_message() + header = settings["header"] + header['Content-Type'] = 'application/json' + header['X-RateLimit-Precision'] = "millisecond" + try: + async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(total=6)) as session: + async with session.post("https://discord.com/api/webhooks/{}".format(settings["monitoring_webhook"]), + data=repr(discord_message)) as resp: + pass + except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError): + logger.exception("Couldn't communicate with Discord as a result of Server Error when trying to signal domain task issue!") \ No newline at end of file diff --git a/src/domain_manager.py b/src/domain_manager.py index 07dddc4..41dd158 100644 --- a/src/domain_manager.py +++ b/src/domain_manager.py @@ -19,7 +19,6 @@ class DomainManager: async def webhook_update(self, connection: asyncpg.Connection, pid: int, channel: str, payload: str): """Callback for database listener. Used to update our domain cache on changes such as new wikis or removed wikis""" - # TODO Write a trigger for pub/sub in database/Wiki-Bot repo split_payload = payload.split(" ") logger.debug("Received pub/sub message: {}".format(payload)) if len(split_payload) < 2: @@ -70,6 +69,9 @@ class DomainManager: parsed_url = urlparse(url) return ".".join(urlunparse((*parsed_url[0:2], "", "", "", "")).split(".")[-2:]) + def check_for_domain(self, domain: str): + return domain in self.domains + def return_domain(self, domain: str): return self.domains[domain] diff --git a/src/irc_feed.py b/src/irc_feed.py index 7ef4184..9c97aca 100644 --- a/src/irc_feed.py +++ b/src/irc_feed.py @@ -9,7 +9,7 @@ import logging from typing import TYPE_CHECKING, Callable, Optional from urllib.parse import urlparse, quote -logger = logging.getLogger("rcgcdw.irc_feed") +logger = logging.getLogger("rcgcdb.irc_feed") if TYPE_CHECKING: from src.domain import Domain @@ -24,6 +24,7 @@ class AioIRCCat(irc.client_aio.AioSimpleIRCClient): irc.client_aio.SimpleIRCClient.__init__(self) self.targets = targets self.updated_wikis: set[str] = set() + self.updated_discussions: set[str] = set() self.rc_callback = rc_callback self.discussion_callback = discussion_callback self.domain = domain_object @@ -72,6 +73,9 @@ class AioIRCCat(irc.client_aio.AioSimpleIRCClient): if post.get('action', 'unknown') != "deleted": # ignore deletion events url = urlparse(post.get('url')) full_url ="https://"+ url.netloc + recognize_langs(url.path) + wiki = self.domain.get_wiki(full_url) + if wiki and wiki.discussion_id != -1: + self.updated_discussions.add(full_url) # if full_url in self.domain: # self.discussion_callback(full_url) diff --git a/src/misc.py b/src/misc.py index cc52b2b..ebba7fc 100644 --- a/src/misc.py +++ b/src/misc.py @@ -4,7 +4,6 @@ import base64, re import logging from typing import Callable from urllib.parse import urlparse, urlunparse -from src.i18n import langs logger = logging.getLogger("rcgcdw.misc") @@ -185,3 +184,5 @@ class ContentParser(HTMLParser): self.small_prev_del = self.small_prev_del + self.more self.last_del = None self.empty = False + + diff --git a/src/statistics.py b/src/statistics.py index 2814625..e08ad8a 100644 --- a/src/statistics.py +++ b/src/statistics.py @@ -25,6 +25,7 @@ class Log: self.title: str = kwargs["title"] self.details: Optional[str] = kwargs.get("details", None) + class LimitedList(list): def __init__(self, *args): list.__init__(self, *args) diff --git a/src/wiki.py b/src/wiki.py index 9962276..8732fd1 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -59,6 +59,10 @@ class Wiki: def rc_id(self): return self.statistics.last_action + @property + def discussion_id(self): + return self.statistics.last_post + @property def last_request(self): return self.statistics.last_request @@ -277,7 +281,6 @@ class Wiki: raise return request_json - async def fetch_wiki(self, amount=10) -> dict: if self.mw_messages is None: params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges", @@ -361,6 +364,22 @@ class Wiki: dbmanager.add(("UPDATE rcgcdw SET rcid = $1 WHERE wiki = $2", (highest_id, self.script_url))) # If this is not enough for the future, save rcid in message sending function to make sure we always send all of the changes return + async def scan_discussions(self): + header = settings["header"] + header["Accept"] = "application/hal+json" + async with aiohttp.ClientSession(headers=header, + timeout=aiohttp.ClientTimeout(6.0)) as session: + url_path = "{wiki}wikia.php".format(wiki=self.script_url) + params = {"controller": "DiscussionPost", "method": "getPosts", "includeCounters": "false", + "sortDirection": "descending", "sortKey": "creation_date", "limit": 20} + try: + feeds_response = session.get(url_path, params=params) + response.raise_for_status() + except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError, + aiohttp.ClientResponseError, aiohttp.TooManyRedirects): + logger.error("A connection error occurred while requesting {}".format(url_path)) + raise WikiServerError + @cache def prepare_settings(display_mode: int) -> dict: