mirror of
https://gitlab.com/chicken-riders/RcGcDb.git
synced 2025-02-23 00:54:09 +00:00
260 lines
12 KiB
Python
260 lines
12 KiB
Python
from __future__ import annotations
|
||
import asyncio
|
||
import datetime
|
||
import logging
|
||
import time
|
||
import traceback
|
||
from collections import OrderedDict
|
||
from typing import TYPE_CHECKING, Optional
|
||
from functools import cache
|
||
import sys
|
||
|
||
import aiohttp
|
||
|
||
from src.misc import LimitedList
|
||
from src.discord.message import DiscordMessage
|
||
from src.config import settings
|
||
from src.argparser import command_line_args
|
||
# from src.discussions import Discussions
|
||
from src.statistics import Log, LogType
|
||
|
||
logger = logging.getLogger("rcgcdb.domain")
|
||
|
||
if TYPE_CHECKING:
|
||
import src.wiki
|
||
import src.irc_feed
|
||
|
||
|
||
class Domain:
|
||
def __init__(self, name: str):
|
||
self.name = name # This should be always in format of topname.extension for example fandom.com
|
||
self.task: Optional[asyncio.Task] = None
|
||
self.wikis: OrderedDict[str, src.wiki.Wiki] = OrderedDict()
|
||
self.irc: Optional[src.irc_feed.AioIRCCat] = None
|
||
self.last_failure_report = 0
|
||
self.message_timings: LimitedList = LimitedList(limit=100)
|
||
self.total_discord_messages_sent: int = 0
|
||
# self.discussions_handler: Optional[Discussions] = Discussions(self.wikis) if name == "fandom.com" else None
|
||
|
||
def __iter__(self):
|
||
return iter(self.wikis)
|
||
|
||
def __str__(self) -> str:
|
||
if len(self.message_timings) > 0: # min throws exception when used on empty iterable
|
||
tmin, avg, tmax = (self.convert_seconds_to_readable(min(self.message_timings)),
|
||
self.convert_seconds_to_readable(int(sum(self.message_timings)/len(self.message_timings))),
|
||
self.convert_seconds_to_readable(max(self.message_timings)))
|
||
else:
|
||
tmin, avg, tmax = 0, 0, 0
|
||
return (f"<Domain name='{self.name}' task='{self.task}' wikis='{self.wikis}' "
|
||
f"irc='{self.irc.connection.connected if self.irc else False}' "
|
||
f"calculated_delay={self.calculate_sleep_time(len(self)) if not self.irc else 'handled by IRC scheduler'} "
|
||
f"msgdelays=(min={tmin}, avg={avg}, max={tmax})>")
|
||
|
||
def json(self) -> dict:
|
||
dict_obj = {
|
||
"wikis": [x for x in self.wikis.keys()],
|
||
"irc": self.irc.connection.connected if self.irc else False,
|
||
"delay": self.calculate_sleep_time(len(self)) if not self.irc else 'handled by IRC scheduler',
|
||
"msgdelay": {"min": min(self.message_timings or [0]), "avg": int(sum(self.message_timings)/(len(self.message_timings) or 1)),
|
||
"max": max(self.message_timings or [0])},
|
||
"discord_messages": self.total_discord_messages_sent,
|
||
"last_failure_report": self.last_failure_report
|
||
}
|
||
return dict_obj
|
||
def __repr__(self):
|
||
return self.__str__()
|
||
|
||
def __getitem__(self, item):
|
||
return
|
||
|
||
def __len__(self):
|
||
return len(self.wikis)
|
||
|
||
@staticmethod
|
||
def convert_seconds_to_readable(seconds: int) -> str:
|
||
"""Helper function to prepare human readable times for domain report"""
|
||
return f"{int(seconds/60)}m{seconds % 60}s"
|
||
|
||
def destroy(self):
|
||
"""Destroy the domain – do all of the tasks that should make sure there is no leftovers before being collected by GC"""
|
||
if self.irc:
|
||
logger.debug("Leaving IRC due to destroy() for domain {}".format(self.name))
|
||
self.irc.connection.die("Leaving")
|
||
# if self.discussions_handler:
|
||
# self.discussions_handler.close()
|
||
if self.task:
|
||
self.task.cancel()
|
||
|
||
def get_wiki(self, item: str, default=None) -> Optional[src.wiki.Wiki]:
|
||
"""Return a wiki with given domain name"""
|
||
return self.wikis.get(item, default)
|
||
|
||
def set_irc(self, irc_client: src.irc_feed.AioIRCCat):
|
||
"""Sets IRC"""
|
||
self.irc = irc_client
|
||
|
||
def stop_task(self):
|
||
"""Cancells the task"""
|
||
self.task.cancel() # Be aware that cancelling the task may take time
|
||
|
||
def run_domain(self):
|
||
"""Starts asyncio task for domain"""
|
||
if not self.task or self.task.cancelled():
|
||
self.task = asyncio.create_task(self.run_wiki_check(), name=self.name)
|
||
else:
|
||
logger.error(f"Tried to start a task for domain {self.name} however the task already exists!")
|
||
|
||
def remove_wiki(self, script_url: str):
|
||
self.wikis.pop(script_url)
|
||
|
||
async def add_wiki(self, wiki: src.wiki.Wiki, first=False):
|
||
"""Adds a wiki to domain list.
|
||
|
||
:parameter wiki - Wiki object
|
||
:parameter first (optional) - bool indicating if wikis should be added as first or last in the ordered dict"""
|
||
wiki.set_domain(self)
|
||
if wiki.script_url in self.wikis:
|
||
await self.wikis[wiki.script_url].update_targets()
|
||
else:
|
||
self.wikis[wiki.script_url] = wiki
|
||
await wiki.update_targets()
|
||
if first:
|
||
self.wikis.move_to_end(wiki.script_url, last=False)
|
||
logger.debug(f"Added new wiki {wiki.script_url} to domain {self.name}")
|
||
|
||
async def run_wiki_scan(self, wiki: src.wiki.Wiki, reason: Optional[str] = None):
|
||
await wiki.scan()
|
||
wiki.statistics.update(Log(type=LogType.SCAN_REASON, title=str(reason)))
|
||
self.wikis.move_to_end(wiki.script_url)
|
||
|
||
def failure_rate_investigation(self) -> Optional[set]:
|
||
"""Function is supposed to determine if a notification should be sent regarding a wiki/domain not working properly
|
||
|
||
Cases considered worthy of notification:
|
||
An entire farm (20% of wikis when 15+ wikis from domain) is responding with errors for the past 10 minutes
|
||
A single wiki returning connection errors either for full queue_length or for an hour
|
||
"""
|
||
if len(self) > 15:
|
||
affected = set()
|
||
for wiki_url, wiki_obj in self.wikis.items():
|
||
failures = 0
|
||
logs_last_10 = wiki_obj.statistics.filter_by_time(10 * 60)
|
||
for log in logs_last_10:
|
||
if log.type == LogType.CONNECTION_ERROR:
|
||
failures += 1
|
||
if max(len(logs_last_10)/2, 1) <= failures:
|
||
affected.add(wiki_url)
|
||
if len(affected) > len(self)/5:
|
||
return affected
|
||
else:
|
||
affected = set()
|
||
for wiki_url, wiki_obj in self.wikis.items():
|
||
if all([x.type in (LogType.CONNECTION_ERROR, LogType.MEDIAWIKI_ERROR, LogType.HTTP_ERROR) for x in wiki_obj.statistics.filter_by_time(60*60)]):
|
||
affected.add(wiki_url)
|
||
if affected:
|
||
return affected
|
||
|
||
def register_message_timing_report(self, initial_time: datetime.datetime, send_time: Optional[datetime.datetime] = None) -> None:
|
||
"""This function registers time between edit being made and message with given edit being sent on Discord
|
||
For metrics and debugging"""
|
||
if send_time is None:
|
||
send_time = datetime.datetime.now(tz=datetime.timezone.utc)
|
||
self.message_timings.append((send_time - initial_time).seconds)
|
||
|
||
def discord_message_registration(self):
|
||
self.total_discord_messages_sent += 1
|
||
|
||
async def irc_scheduler(self):
|
||
try:
|
||
while True:
|
||
try:
|
||
wiki_url = self.irc.updated_wikis.pop()
|
||
except KeyError:
|
||
break
|
||
try:
|
||
wiki = self.wikis[wiki_url]
|
||
except KeyError:
|
||
logger.error(f"Could not find a wiki with URL {wiki_url} in the domain group!")
|
||
continue
|
||
await self.run_wiki_scan(wiki, "IRC feed event")
|
||
while True: # Iterate until hitting return, we don't have to iterate using for since we are sending wiki to the end anyways
|
||
wiki: src.wiki.Wiki = next(iter(self.wikis.values()))
|
||
if (int(time.time()) - (wiki.statistics.last_checked_rc or 0)) > settings.get("irc_overtime", 3600):
|
||
await self.run_wiki_scan(wiki, "IRC backup check")
|
||
else:
|
||
return # Recently scanned wikis will get at the end of the self.wikis, so we assume what is first hasn't been checked for a while
|
||
except Exception as e:
|
||
if command_line_args.debug:
|
||
logger.exception("IRC scheduler task for domain {} failed!".format(self.name))
|
||
raise asyncio.exceptions.CancelledError()
|
||
else: # production
|
||
if not (time.time()-172800 > self.last_failure_report): # If we haven't reported for more than 2 days or at all
|
||
return
|
||
traceback.print_exc()
|
||
wikis = self.failure_rate_investigation()
|
||
if wikis:
|
||
await self.send_exception_to_monitoring(e, wikis)
|
||
self.last_failure_report = time.time()
|
||
|
||
async def regular_scheduler(self):
|
||
try:
|
||
while True:
|
||
await asyncio.sleep(self.calculate_sleep_time(len(self))) # To make sure that we don't spam domains with one wiki every second we calculate a sane timeout for domains with few wikis
|
||
await self.run_wiki_scan(next(iter(self.wikis.values())), "regular check")
|
||
except Exception as e:
|
||
if command_line_args.debug:
|
||
logger.exception("Regular scheduler task for domain {} failed!".format(self.name))
|
||
raise asyncio.exceptions.CancelledError()
|
||
else:
|
||
if not (time.time()-172800 > self.last_failure_report): # If we haven't reported for more than 2 days or at all
|
||
return
|
||
traceback.print_exc()
|
||
wikis = self.failure_rate_investigation()
|
||
if wikis:
|
||
await self.send_exception_to_monitoring(e, wikis)
|
||
self.last_failure_report = time.time()
|
||
|
||
@cache
|
||
def calculate_sleep_time(self, queue_length: int):
|
||
return max(int(125/queue_length), 1)
|
||
|
||
async def run_wiki_check(self):
|
||
"""Runs appropriate scheduler depending on existence of IRC"""
|
||
if self.irc:
|
||
try:
|
||
while True:
|
||
await self.irc_scheduler()
|
||
await asyncio.sleep(10.0)
|
||
except asyncio.exceptions.CancelledError:
|
||
for wiki in self.wikis.values():
|
||
await wiki.session.close()
|
||
self.irc.connection.disconnect()
|
||
raise
|
||
else:
|
||
try:
|
||
while True:
|
||
await self.regular_scheduler()
|
||
await asyncio.sleep(self.calculate_sleep_time(len(self)))
|
||
except asyncio.exceptions.CancelledError:
|
||
for wiki in self.wikis.values():
|
||
await wiki.session.close()
|
||
raise
|
||
|
||
async def send_exception_to_monitoring(self, ex: Exception, wikis: set):
|
||
discord_message = DiscordMessage("embed", "generic", [""])
|
||
discord_message["title"] = "Domain scheduler exception for {} (recovered)".format(self.name)
|
||
discord_message["description"] = "Affected wikis: {}".format(", ".join(wikis)) + "\n" + str(ex)
|
||
discord_message["description"] = discord_message["description"][0:2000]
|
||
# discord_message.add_field("Failure count", str(self.failures))
|
||
header = settings["header"]
|
||
header['Content-Type'] = 'application/json'
|
||
header['X-RateLimit-Precision'] = "millisecond"
|
||
try:
|
||
async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(total=6)) as session:
|
||
async with session.post("https://discord.com/api/webhooks/{}".format(settings["monitoring_webhook"]),
|
||
data=repr(discord_message)) as resp:
|
||
pass
|
||
except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError):
|
||
logger.exception("Couldn't communicate with Discord as a result of Server Error when trying to signal domain task issue!")
|