This commit is contained in:
Frisk 2022-11-07 15:46:15 +01:00
parent 090a14c6c4
commit 879de217ed
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
11 changed files with 106 additions and 35 deletions

3
.gitignore vendored
View file

@ -5,4 +5,5 @@ logs/
*.db
*.code-workspace
*.bat
source-file-list.txt
source-file-list.txt
*.po~

View file

@ -3,10 +3,11 @@ $BODY$
begin
IF (TG_OP = 'DELETE') THEN
perform pg_notify('webhookupdates', concat('REMOVE ', old.wiki));
return old;
ELSIF (TG_OP = 'INSERT') then
perform pg_notify('webhookupdates', concat('ADD ', new.wiki));
return new;
end if;
return new;
end;
$BODY$
language plpgsql;

View file

@ -9,6 +9,8 @@ from collections import defaultdict, namedtuple
from typing import Generator
import importlib
from contextlib import asynccontextmanager
from discussions import Discussions
from src.discord.queue import messagequeue
from src.argparser import command_line_args
from src.config import settings
@ -213,6 +215,7 @@ def shutdown(loop, signal=None):
except asyncio.CancelledError:
loop.stop()
logger.info("Script has shut down due to signal {}.".format(signal))
logging.shutdown()
# sys.exit(0)
@ -239,7 +242,7 @@ async def main_loop():
await populate_wikis()
# START LISTENER CONNECTION
domains.run_all_domains()
# We are here
discussions = Discussions(domains.return_domain("fandom.com") if domains.check_for_domain("fandom.com") else None)
try:
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals:
@ -251,7 +254,8 @@ async def main_loop():
# loop.set_exception_handler(global_exception_handler)
try:
main_tasks = {"message_sender": asyncio.create_task(message_sender()),
"database_updates": asyncio.create_task(dbmanager.update_db())} # "discussion_handler": asyncio.create_task(discussion_handler()),
"database_updates": asyncio.create_task(dbmanager.update_db()),
"fandom_discussions": asyncio.create_task(discussions.tick_discussions())} # "discussion_handler": asyncio.create_task(discussion_handler()),
main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"])
main_tasks["database_updates_shield"] = asyncio.shield(main_tasks["database_updates"])
await asyncio.gather(main_tasks["message_sender"], main_tasks["database_updates"])

View file

@ -20,6 +20,8 @@ from collections import defaultdict
from typing import Optional, TYPE_CHECKING
from src.exceptions import EmbedListFull
if TYPE_CHECKING:
from wiki import Wiki
@ -157,12 +159,6 @@ class DiscordMessage:
return self.webhook_object["content"]
class DiscordMessageRaw(DiscordMessage):
def __init__(self, content: dict, webhook_url: str):
self.webhook_object = content
self.webhook_url = webhook_url
class MessageTooBig(BaseException):
pass

View file

@ -1,29 +1,46 @@
from __future__ import annotations
import asyncio
import logging
import typing
from collections import OrderedDict
from src.config import settings
from typing import TYPE_CHECKING, Optional
import src.wiki
if TYPE_CHECKING:
from src.domain import Domain
from src.wiki import Wiki
logger = logging.getLogger("rcgcdb.discussions")
class Discussions():
def __init__(self, wikis: OrderedDict[str, src.wiki.Wiki]):
self.wikis = wikis
class Discussions:
def __init__(self, domain):
self.domain_object: Optional[Domain] = domain
async def tick_discussions(self):
if self.domain_object is None:
raise asyncio.CancelledError("fandom.com is not a domain we have any wikis for.")
while True:
try:
wiki_url = self.irc.updated_wikis.pop()
wiki_url = self.domain_object.irc.updated_discussions.pop()
except KeyError:
break
try:
wiki = self.wikis[wiki_url]
except KeyError:
wiki = self.domain_object.get_wiki(wiki_url)
if wiki is None:
logger.error(f"Could not find a wiki with URL {wiki_url} in the domain group!")
continue
await self.run_discussion_scan(wiki)
for wiki in self.wikis.values():
for wiki in self.filter_and_sort():
if wiki.statistics.last_checked_discussion < settings.get("irc_overtime", 3600):
await self.run_discussion_scan(wiki)
else:
return # Recently scanned wikis will get at the end of the self.wikis, so we assume what is first hasn't been checked for a while
async def add_wiki(self, wiki):
def filter_and_sort(self) -> list[Wiki]:
"""Filters and sorts wikis from domain to return only the ones that aren't -1 and sorts them from oldest in checking to newest"""
# return OrderedDict(sorted(filter(lambda wiki: wiki[1].discussion_id != -1, self.domain_object.wikis.items()), key=lambda wiki: wiki[1].statistics.last_checked_discussion))
return sorted(filter(lambda wiki: wiki.discussion_id != -1, self.domain_object.wikis.values()), key=lambda wiki: wiki.statistics.last_checked_discussion)
async def run_discussion_scan(self, wiki: Wiki):

View file

@ -2,12 +2,16 @@ from __future__ import annotations
import asyncio
import logging
from collections import OrderedDict
from src.config import settings
from typing import TYPE_CHECKING, Optional
from src.argparser import command_line_args
from functools import cache
import aiohttp
from discord.message import DiscordMessage
from src.config import settings
from src.argparser import command_line_args
# from src.discussions import Discussions
from statistics import Log, LogType
from src.statistics import Log, LogType
logger = logging.getLogger("rcgcdb.domain")
@ -22,6 +26,7 @@ class Domain:
self.task: Optional[asyncio.Task] = None
self.wikis: OrderedDict[str, src.wiki.Wiki] = OrderedDict()
self.irc: Optional[src.irc_feed.AioIRCCat] = None
self.failures = 0
# self.discussions_handler: Optional[Discussions] = Discussions(self.wikis) if name == "fandom.com" else None
def __iter__(self):
@ -98,29 +103,32 @@ class Domain:
await self.run_wiki_scan(wiki)
while True: # Iterate until hitting return, we don't have to iterate using for since we are sending wiki to the end anyways
wiki: src.wiki.Wiki = next(iter(self.wikis.values()))
if (wiki.statistics.last_checked_rc or 0) < settings.get("irc_overtime", 3600):
if (wiki.statistics.last_checked_rc or 0) < settings.get("irc_overtime", 3600): # TODO This makes no sense, comparing last_checked_rc to nothing
await self.run_wiki_scan(wiki)
else:
return # Recently scanned wikis will get at the end of the self.wikis, so we assume what is first hasn't been checked for a while
except:
except Exception as e:
if command_line_args.debug:
logger.exception("IRC scheduler task for domain {} failed!".format(self.name))
else:
# TODO Write
pass
await self.send_exception_to_monitoring(e)
self.failures += 1
if self.failures > 2:
raise asyncio.exceptions.CancelledError
async def regular_scheduler(self):
try:
while True:
await asyncio.sleep(self.calculate_sleep_time(len(self))) # To make sure that we don't spam domains with one wiki every second we calculate a sane timeout for domains with few wikis
await self.run_wiki_scan(next(iter(self.wikis.values())))
except:
except Exception as e:
if command_line_args.debug:
logger.exception("IRC task for domain {} failed!".format(self.name))
else:
# TODO Write
pass
await self.send_exception_to_monitoring(e)
self.failures += 1
if self.failures > 2:
raise asyncio.exceptions.CancelledError
@cache
def calculate_sleep_time(self, queue_length: int):
@ -143,3 +151,20 @@ class Domain:
except asyncio.exceptions.CancelledError:
for wiki in self.wikis.values():
await wiki.session.close()
async def send_exception_to_monitoring(self, ex: Exception):
discord_message = DiscordMessage("embed", "generic", [""])
discord_message["title"] = "Domain scheduler exception for {} (recovered)".format(self.name)
discord_message["content"] = str(ex)[0:1995]
discord_message.add_field("Failure count", self.failures)
discord_message.finish_embed_message()
header = settings["header"]
header['Content-Type'] = 'application/json'
header['X-RateLimit-Precision'] = "millisecond"
try:
async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(total=6)) as session:
async with session.post("https://discord.com/api/webhooks/{}".format(settings["monitoring_webhook"]),
data=repr(discord_message)) as resp:
pass
except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError):
logger.exception("Couldn't communicate with Discord as a result of Server Error when trying to signal domain task issue!")

View file

@ -19,7 +19,6 @@ class DomainManager:
async def webhook_update(self, connection: asyncpg.Connection, pid: int, channel: str, payload: str):
"""Callback for database listener. Used to update our domain cache on changes such as new wikis or removed wikis"""
# TODO Write a trigger for pub/sub in database/Wiki-Bot repo
split_payload = payload.split(" ")
logger.debug("Received pub/sub message: {}".format(payload))
if len(split_payload) < 2:
@ -70,6 +69,9 @@ class DomainManager:
parsed_url = urlparse(url)
return ".".join(urlunparse((*parsed_url[0:2], "", "", "", "")).split(".")[-2:])
def check_for_domain(self, domain: str):
return domain in self.domains
def return_domain(self, domain: str):
return self.domains[domain]

View file

@ -9,7 +9,7 @@ import logging
from typing import TYPE_CHECKING, Callable, Optional
from urllib.parse import urlparse, quote
logger = logging.getLogger("rcgcdw.irc_feed")
logger = logging.getLogger("rcgcdb.irc_feed")
if TYPE_CHECKING:
from src.domain import Domain
@ -24,6 +24,7 @@ class AioIRCCat(irc.client_aio.AioSimpleIRCClient):
irc.client_aio.SimpleIRCClient.__init__(self)
self.targets = targets
self.updated_wikis: set[str] = set()
self.updated_discussions: set[str] = set()
self.rc_callback = rc_callback
self.discussion_callback = discussion_callback
self.domain = domain_object
@ -72,6 +73,9 @@ class AioIRCCat(irc.client_aio.AioSimpleIRCClient):
if post.get('action', 'unknown') != "deleted": # ignore deletion events
url = urlparse(post.get('url'))
full_url ="https://"+ url.netloc + recognize_langs(url.path)
wiki = self.domain.get_wiki(full_url)
if wiki and wiki.discussion_id != -1:
self.updated_discussions.add(full_url)
# if full_url in self.domain:
# self.discussion_callback(full_url)

View file

@ -4,7 +4,6 @@ import base64, re
import logging
from typing import Callable
from urllib.parse import urlparse, urlunparse
from src.i18n import langs
logger = logging.getLogger("rcgcdw.misc")
@ -185,3 +184,5 @@ class ContentParser(HTMLParser):
self.small_prev_del = self.small_prev_del + self.more
self.last_del = None
self.empty = False

View file

@ -25,6 +25,7 @@ class Log:
self.title: str = kwargs["title"]
self.details: Optional[str] = kwargs.get("details", None)
class LimitedList(list):
def __init__(self, *args):
list.__init__(self, *args)

View file

@ -59,6 +59,10 @@ class Wiki:
def rc_id(self):
return self.statistics.last_action
@property
def discussion_id(self):
return self.statistics.last_post
@property
def last_request(self):
return self.statistics.last_request
@ -277,7 +281,6 @@ class Wiki:
raise
return request_json
async def fetch_wiki(self, amount=10) -> dict:
if self.mw_messages is None:
params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges",
@ -361,6 +364,22 @@ class Wiki:
dbmanager.add(("UPDATE rcgcdw SET rcid = $1 WHERE wiki = $2", (highest_id, self.script_url))) # If this is not enough for the future, save rcid in message sending function to make sure we always send all of the changes
return
async def scan_discussions(self):
header = settings["header"]
header["Accept"] = "application/hal+json"
async with aiohttp.ClientSession(headers=header,
timeout=aiohttp.ClientTimeout(6.0)) as session:
url_path = "{wiki}wikia.php".format(wiki=self.script_url)
params = {"controller": "DiscussionPost", "method": "getPosts", "includeCounters": "false",
"sortDirection": "descending", "sortKey": "creation_date", "limit": 20}
try:
feeds_response = session.get(url_path, params=params)
response.raise_for_status()
except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError,
aiohttp.ClientResponseError, aiohttp.TooManyRedirects):
logger.error("A connection error occurred while requesting {}".format(url_path))
raise WikiServerError
@cache
def prepare_settings(display_mode: int) -> dict: