diff --git a/src/discord/queue.py b/src/discord/queue.py index 8f24a91..6fa8b75 100644 --- a/src/discord/queue.py +++ b/src/discord/queue.py @@ -36,7 +36,7 @@ if TYPE_CHECKING: rate_limit = 0 logger = logging.getLogger("rcgcdb.discord.queue") - +message_ids = set() # Technically it gathers IDs forever but I don't anticipate it being a memory hog ever, optionally we could add it to DB but meh class QueueEntry: def __init__(self, discord_message, webhooks, wiki, method="POST"): @@ -340,3 +340,33 @@ async def send_to_discord_webhook(message: [StackedDiscordMessage, DiscordMessag elif method == "PATCH": async with session.request(method=method, url=f"https://discord.com/api/webhooks/{webhook_path}/messages/{message.discord_callback_message_id}", data=repr(message)) as resp: return await handle_discord_http(resp.status, repr(message), resp) + + +async def send_unique_generic_to_monitoring(message_id: str, title: str, description: str) -> None: + """ + Function to send messages to monitoring based on unique ID (if the same ID is passed for second time, the message will not send) + + :param message_id: ID for the given message + :param title: Title of the embed + :param description: Description of the embed + :return: None + """ + global message_ids + if message_id in message_ids: + return + message_ids.add(message_id) + discord_message = DiscordMessage("embed", "generic", [""]) + discord_message["title"] = title + discord_message["description"] = description + discord_message["description"] = discord_message["description"][0:2000] + header = settings["header"] + header['Content-Type'] = 'application/json' + header['X-RateLimit-Precision'] = "millisecond" + try: + async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(total=6)) as session: + async with session.post("https://discord.com/api/webhooks/{}".format(settings["monitoring_webhook"]), + data=repr(discord_message)) as resp: + pass + except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError): + logger.exception( + f"Couldn't communicate with Discord as a result of Server Error when trying to signal generic monitoring message {title} for {message_id}!") \ No newline at end of file diff --git a/src/domain.py b/src/domain.py index 9216001..e03dcdd 100644 --- a/src/domain.py +++ b/src/domain.py @@ -11,6 +11,7 @@ import sys import aiohttp +from exceptions import WikiOnTimeout from src.exceptions import WikiNotFoundError from src.misc import LimitedList from src.discord.message import DiscordMessage @@ -124,16 +125,29 @@ class Domain: self.wikis.move_to_end(wiki.script_url, last=False) logger.debug(f"Added new wiki {wiki.script_url} to domain {self.name}") - async def run_wiki_scan(self, wiki: src.wiki.Wiki, reason: Optional[str] = None): + async def run_wiki_scan(self, wiki: Optional[src.wiki.Wiki], reason: Optional[str] = None): + """ + + + :param wiki: Wiki object to scan + :param reason: Reason for scanning put into the logs + :return: None + :raises StopIteration: When None has been passed as wiki, means there are no more wikis in the queue besides timeouted ones + """ + if wiki is None: + raise StopIteration try: await wiki.scan() except WikiNotFoundError as e: self.wikis.move_to_end(wiki.script_url) logs_for_wiki = wiki.statistics.filter_by_time(60*60) if all([x.type == LogType.HTTP_ERROR for x in logs_for_wiki]) and len(logs_for_wiki) > 10: - await wiki.remove_wiki_from_db("This recent changes webhook has been removed for `wiki returning code {}`!".format(e.code), send_reason=True) - wiki.statistics.update(Log(type=LogType.SCAN_REASON, title=str(reason))) - self.wikis.move_to_end(wiki.script_url) + await wiki.remove_wiki_from_db({404: "wiki deleted", 410: "wiki inaccessible"}.get(e.code), send_reason=True) + except WikiOnTimeout: + pass + else: + wiki.statistics.update(Log(type=LogType.SCAN_REASON, title=str(reason))) + self.wikis.move_to_end(wiki.script_url) def failure_rate_investigation(self) -> Optional[set]: """Function is supposed to determine if a notification should be sent regarding a wiki/domain not working properly @@ -172,6 +186,11 @@ class Domain: def discord_message_registration(self): self.total_discord_messages_sent += 1 + def find_first_not_on_timeout(self): + for wiki in self.wikis.values(): + if not wiki.is_on_timeout(): + return wiki + async def irc_scheduler(self): try: while True: @@ -186,11 +205,15 @@ class Domain: continue await self.run_wiki_scan(wiki, "IRC feed event") while True: # Iterate until hitting return, we don't have to iterate using for since we are sending wiki to the end anyways - wiki: src.wiki.Wiki = next(iter(self.wikis.values())) + wiki: src.wiki.Wiki = self.find_first_not_on_timeout() + if (int(time.time()) - (wiki.statistics.last_checked_rc or 0)) > settings.get("irc_overtime", 3600): await self.run_wiki_scan(wiki, "IRC backup check") else: return # Recently scanned wikis will get at the end of the self.wikis, so we assume what is first hasn't been checked for a while + except StopIteration: + logger.debug(f"Domain {self.name} received StopIteration, returning from irc_scheduler...") + return except Exception as e: if command_line_args.debug: logger.exception("IRC scheduler task for domain {} failed!".format(self.name)) @@ -209,7 +232,10 @@ class Domain: try: while True: await asyncio.sleep(self.calculate_sleep_time(len(self))) # To make sure that we don't spam domains with one wiki every second we calculate a sane timeout for domains with few wikis - await self.run_wiki_scan(next(iter(self.wikis.values())), "regular check") + await self.run_wiki_scan(self.find_first_not_on_timeout(), "regular check") + except StopIteration: + logger.debug(f"Domain {self.name} received StopIteration, returning from regular_scheduler...") + return except Exception as e: if command_line_args.debug: logger.exception("Regular scheduler task for domain {} failed!".format(self.name)) diff --git a/src/domain_manager.py b/src/domain_manager.py index d26a80f..2d4666a 100644 --- a/src/domain_manager.py +++ b/src/domain_manager.py @@ -103,7 +103,7 @@ class DomainManager: reason = " ".join(split_payload[2:]) if wiki is not None: logger.debug("Wiki specified in pub/sub message has been found. Erasing the wiki from DB.") - await wiki.remove_wiki_from_db(reason, send_reason=True if reason else False) + await wiki.remove_wiki_from_db(reason, send_reason=True if reason else False, localize_reason=False) elif split_payload[0] == "DEBUG": asyncio.current_task().set_name("webhook_update") if split_payload[1] == "INFO": diff --git a/src/exceptions.py b/src/exceptions.py index c311867..f87fbef 100644 --- a/src/exceptions.py +++ b/src/exceptions.py @@ -1,3 +1,4 @@ +import datetime from typing import Union @@ -70,6 +71,13 @@ class MediaWikiError(Exception): self.message = f"MediaWiki returned the following errors: {errors}!" super().__init__(self.message) +class WikiOnTimeout(Exception): + """A wiki object is set on timeout""" + + def __init__(self, timeout: datetime.datetime): + self.message = f"Wiki is currently on timeout." + super().__init__(self.message) + class NoDomain(Exception): """When given domain does not exist""" pass diff --git a/src/statistics.py b/src/statistics.py index b6de2bd..0ec586d 100644 --- a/src/statistics.py +++ b/src/statistics.py @@ -4,7 +4,7 @@ import aiohttp.web_request from src.misc import LimitedList from src.config import settings -from typing import Union, Optional, List +from typing import Union, Optional, List, Tuple from enum import Enum @@ -14,7 +14,7 @@ class LogType(Enum): MEDIAWIKI_ERROR = 3 VALUE_UPDATE = 4 SCAN_REASON = 5 - + REDIRECT = 6 class Log: """Log class represents an event that happened to a wiki fetch. Main purpose of those logs is debug and error-tracking.""" @@ -59,6 +59,9 @@ class Statistics: for log in args: self.logs.append(log) + def last_connection_failures(self): + return last_failures(self.logs) + def filter_by_time(self, time_ago: int, logs: list = None) -> List[Log]: # cannot have self.logs in here as this is evaluated once """Returns logs with time between time_ago seconds ago and now""" time_limit = int(time.time()) - time_ago @@ -71,3 +74,15 @@ class Statistics: def recent_connection_errors(self) -> int: """Count how many connection errors there were recently (2 minutes)""" return len(self.filter_by_type(LogType.CONNECTION_ERROR, logs=self.filter_by_time(120))) # find connection errors from 2 minutes ago + + +def last_failures(logs: list[Log]) -> Tuple[List[Log], int]: + result = [] + success = None + for log in logs[::-1]: + if log.type.value < 4: + result.insert(0, log) + else: + success = log + break + return result, success or 0 \ No newline at end of file diff --git a/src/wiki.py b/src/wiki.py index b9d7258..12c35bf 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -2,18 +2,20 @@ from __future__ import annotations import datetime import functools +import math import pickle import time import re import logging, aiohttp import asyncio from contextlib import asynccontextmanager +from datetime import timedelta import requests from src.api.util import default_message from src.misc import prepare_settings, run_hooks -from src.discord.queue import messagequeue, QueueEntry, send_to_discord_webhook +from src.discord.queue import messagequeue, QueueEntry, send_to_discord_webhook, send_unique_generic_to_monitoring from src.mw_messages import MWMessages from src.exceptions import * from src.queue_handler import dbmanager @@ -132,6 +134,7 @@ class Wiki: self.session_requests = requests.Session() self.session_requests.headers.update(settings["header"]) self.request_cost = 0 # For tracking amount of times wiki has been requested in given context + self.dont_fetch_before: Optional[datetime.datetime] = None logger.debug("Creating new wiki object for {}".format(script_url)) def __str__(self): @@ -157,7 +160,8 @@ class Wiki: "logs": [x.json() for x in self.statistics.logs], "namespaces": self.namespaces, "tags": self.tags, - "recache_requested": self.recache_requested + "recache_requested": self.recache_requested, + "dont_fetch_before": self.dont_fetch_before.isoformat(timespec='seconds') if self.dont_fetch_before else "None" } if self.domain.name == "fandom.com": dict_obj.update(last_checked_discussion=self.statistics.last_checked_discussion, last_post=self.statistics.last_post) @@ -313,7 +317,6 @@ class Wiki: self.rc_targets = target_settings self.discussion_targets = discussion_targets - def parse_mw_request_info(self, request_data: dict, url: str): """A function parsing request JSON message from MediaWiki logging all warnings and raising on MediaWiki errors""" # any([True for k in request_data.keys() if k in ("error", "errors")]) @@ -342,13 +345,14 @@ class Wiki: Returns: request_content (dict): a dict resulting from json extraction of HTTP GET request with given json_path - OR - One of the following exceptions: + Raises: ServerError: When connection with the wiki failed due to server error ClientError: When connection with the wiki failed due to client error KeyError: When json_path contained keys that weren't found in response JSON response BadRequest: When params argument is of wrong type MediaWikiError: When MediaWiki returns an error + ServerRedirects: When status code for request is either 301 or 302 + WikiNotFoundError: When status code for request is either 410 or 404 """ # Making request try: @@ -361,7 +365,7 @@ class Wiki: allow_redirects=allow_redirects) else: raise BadRequest(params) - except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError, aiohttp.ContentTypeError) as exc: + except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError, aiohttp.ContentTypeError, aiohttp.ClientConnectionError) as exc: logger.warning("Reached {error} error for request on link {url}".format(error=repr(exc), url=self.script_url + str(params))) raise ServerError @@ -370,11 +374,12 @@ class Wiki: logger.warning(f"A request to {self.script_url} {params} resulted in {request.status}") raise ServerError elif request.status in (301, 302): - logger.critical( - "Redirect detected! Either the wiki given in the script settings (wiki field) is incorrect/the wiki got removed or is giving us the false value. Please provide the real URL to the wiki, current URL redirects to {}".format( + logger.error( + "Redirect detected! Either the wiki given in the script settings (wiki field) is incorrect/the wiki got removed or is giving us the false value. Current URL redirects to {}".format( request.url)) raise ServerRedirects(self.script_url, request.headers.get("Location", "unknown")) elif request.status in (410, 404): + self.statistics.update(Log(type=LogType.HTTP_ERROR, title="{} error".format(request.status))) raise WikiNotFoundError(request.status) elif 399 < request.status < 500: logger.error("Request returned ClientError status code on {url}".format(url=request.url)) @@ -457,29 +462,56 @@ class Wiki: "meta": "siteinfo", "utf8": 1, "rcshow": "!bot", "rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user|userid", "rclimit": amount, "rctype": "edit|new|log|categorize", "siprop": "namespaces|general"}) - try: - response = await self.api_request(params=params) - except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError, ServerRedirects) as e: - logger.error("A connection error occurred while requesting {}".format(params)) - raise WikiServerError(e) + response = await self.api_request(params=params) return response + def put_wiki_on_timeout(self, timeout: datetime.timedelta): + """:param timeout datetime.timedelta object when the wiki should be processed again""" + logger.debug(f"Putting wiki {self.script_url} on timeout for {str(timeout)}") + self.dont_fetch_before = datetime.datetime.now() + timeout + + def is_on_timeout(self) -> bool: + return self.dont_fetch_before > datetime.datetime.now() if self.dont_fetch_before is not None else False + async def scan(self, amount=10): """Main track of fetching RecentChanges of a wiki. :raises WikiServerError """ + if self.is_on_timeout(): + raise WikiOnTimeout(self.dont_fetch_before) + while True: # Trap event in case there are more changes needed to be fetched try: request = await self.fetch_wiki(amount=amount) self.client.last_request = request - except WikiServerError as e: - # If WikiServerError comes up 2 times in recent 2 minutes, this will reraise the exception, otherwise waits 2 seconds and retries + except (aiohttp.ServerTimeoutError, asyncio.TimeoutError, WikiServerError, ServerError) as e: self.statistics.update(Log(type=LogType.CONNECTION_ERROR, title=str(e.exception))) - if self.statistics.recent_connection_errors() > 9: - raise e - await asyncio.sleep(2.0) - continue + match len(self.statistics.last_connection_failures()[0]): + case 0, 1: + await asyncio.sleep(5.0) + continue + case 2: # Put the wiki on timeout 5 minutes per each + self.put_wiki_on_timeout(timedelta(minutes=5)) + raise WikiOnTimeout(self.dont_fetch_before) + case _: + x = len(self.statistics.last_connection_failures()[0]) + self.put_wiki_on_timeout(timedelta(seconds=min(21600, int(math.sqrt(2*x)*(x/0.4))*60))) # Max backoff is 6 hours + if x > 30 and self.domain.last_failure_report < (time.time() - 21600): # Report only if domain didn't report something within last 6 hours + await send_unique_generic_to_monitoring(self.script_url + "?CONNERROR", + f"{self.script_url} errors", repr(e)) + raise WikiOnTimeout(self.dont_fetch_before) + + except ServerRedirects as e: + self.statistics.update(Log(type=LogType.REDIRECT, title="")) + + if len(self.statistics.filter_by_type(LogType.REDIRECT)) > 1: + self.put_wiki_on_timeout(timedelta(hours=1) * len(self.statistics.filter_by_type(LogType.REDIRECT))) + await send_unique_generic_to_monitoring(self.script_url+"?REDIRECT", f"{self.script_url} redirects", e.message) + return + else: + self.put_wiki_on_timeout(timedelta(hours=1)) + return if not self.mw_messages or self.recache_requested: process_cachable(request, self) try: @@ -549,7 +581,7 @@ class Wiki: except: logger.exception("Webhook removal send_reason failure.") - async def remove_wiki_from_db(self, reason: str, send_reason=False): + async def remove_wiki_from_db(self, reason: str, send_reason=False, localize_reason = True): logger.info(f"Removing a wiki with script_url of {self.script_url} from the database due to {reason}.") dbmanager.add(("DELETE FROM rcgcdb WHERE wiki = $1", (self.script_url,))) if not send_reason: @@ -561,7 +593,7 @@ class Wiki: try: # This is best effort scenario, but I don't plan to add re-tries to this dc_msg = DiscordMessage("compact", "custom/webhook_removal", webhooks, content=lang.gettext( - "This recent changes webhook has been removed for `{reason}`!".format(reason=reason))) + "This recent changes webhook has been removed for `{reason}`!".format(reason=lang.gettext(reason) if localize_reason else reason))) for webhook in webhooks: await send_to_discord_webhook(dc_msg, webhook, "POST") except: