Compare commits

...

3 commits

6 changed files with 141 additions and 31 deletions

View file

@ -36,7 +36,7 @@ if TYPE_CHECKING:
rate_limit = 0
logger = logging.getLogger("rcgcdb.discord.queue")
message_ids = set() # Technically it gathers IDs forever but I don't anticipate it being a memory hog ever, optionally we could add it to DB but meh
class QueueEntry:
def __init__(self, discord_message, webhooks, wiki, method="POST"):
@ -340,3 +340,33 @@ async def send_to_discord_webhook(message: [StackedDiscordMessage, DiscordMessag
elif method == "PATCH":
async with session.request(method=method, url=f"https://discord.com/api/webhooks/{webhook_path}/messages/{message.discord_callback_message_id}", data=repr(message)) as resp:
return await handle_discord_http(resp.status, repr(message), resp)
async def send_unique_generic_to_monitoring(message_id: str, title: str, description: str) -> None:
"""
Function to send messages to monitoring based on unique ID (if the same ID is passed for second time, the message will not send)
:param message_id: ID for the given message
:param title: Title of the embed
:param description: Description of the embed
:return: None
"""
global message_ids
if message_id in message_ids:
return
message_ids.add(message_id)
discord_message = DiscordMessage("embed", "generic", [""])
discord_message["title"] = title
discord_message["description"] = description
discord_message["description"] = discord_message["description"][0:2000]
header = settings["header"]
header['Content-Type'] = 'application/json'
header['X-RateLimit-Precision'] = "millisecond"
try:
async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(total=6)) as session:
async with session.post("https://discord.com/api/webhooks/{}".format(settings["monitoring_webhook"]),
data=repr(discord_message)) as resp:
pass
except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError):
logger.exception(
f"Couldn't communicate with Discord as a result of Server Error when trying to signal generic monitoring message {title} for {message_id}!")

View file

@ -11,6 +11,7 @@ import sys
import aiohttp
from src.exceptions import WikiOnTimeout
from src.exceptions import WikiNotFoundError
from src.misc import LimitedList
from src.discord.message import DiscordMessage
@ -124,14 +125,27 @@ class Domain:
self.wikis.move_to_end(wiki.script_url, last=False)
logger.debug(f"Added new wiki {wiki.script_url} to domain {self.name}")
async def run_wiki_scan(self, wiki: src.wiki.Wiki, reason: Optional[str] = None):
async def run_wiki_scan(self, wiki: Optional[src.wiki.Wiki], reason: Optional[str] = None):
"""
:param wiki: Wiki object to scan
:param reason: Reason for scanning put into the logs
:return: None
:raises StopIteration: When None has been passed as wiki, means there are no more wikis in the queue besides timeouted ones
"""
if wiki is None:
raise StopIteration
try:
await wiki.scan()
except WikiNotFoundError as e:
self.wikis.move_to_end(wiki.script_url)
logs_for_wiki = wiki.statistics.filter_by_time(60*60)
if all([x.type == LogType.HTTP_ERROR for x in logs_for_wiki]) and len(logs_for_wiki) > 10:
await wiki.remove_wiki_from_db("This recent changes webhook has been removed for `wiki returning code {}`!".format(e.code), send_reason=True)
await wiki.remove_wiki_from_db({404: "wiki deleted", 410: "wiki inaccessible"}.get(e.code), send_reason=True)
except WikiOnTimeout:
pass
else:
wiki.statistics.update(Log(type=LogType.SCAN_REASON, title=str(reason)))
self.wikis.move_to_end(wiki.script_url)
@ -172,6 +186,11 @@ class Domain:
def discord_message_registration(self):
self.total_discord_messages_sent += 1
def find_first_not_on_timeout(self):
for wiki in self.wikis.values():
if not wiki.is_on_timeout():
return wiki
async def irc_scheduler(self):
try:
while True:
@ -186,11 +205,15 @@ class Domain:
continue
await self.run_wiki_scan(wiki, "IRC feed event")
while True: # Iterate until hitting return, we don't have to iterate using for since we are sending wiki to the end anyways
wiki: src.wiki.Wiki = next(iter(self.wikis.values()))
wiki: src.wiki.Wiki = self.find_first_not_on_timeout()
if (int(time.time()) - (wiki.statistics.last_checked_rc or 0)) > settings.get("irc_overtime", 3600):
await self.run_wiki_scan(wiki, "IRC backup check")
else:
return # Recently scanned wikis will get at the end of the self.wikis, so we assume what is first hasn't been checked for a while
except StopIteration:
logger.debug(f"Domain {self.name} received StopIteration, returning from irc_scheduler...")
return
except Exception as e:
if command_line_args.debug:
logger.exception("IRC scheduler task for domain {} failed!".format(self.name))
@ -209,7 +232,10 @@ class Domain:
try:
while True:
await asyncio.sleep(self.calculate_sleep_time(len(self))) # To make sure that we don't spam domains with one wiki every second we calculate a sane timeout for domains with few wikis
await self.run_wiki_scan(next(iter(self.wikis.values())), "regular check")
await self.run_wiki_scan(self.find_first_not_on_timeout(), "regular check")
except StopIteration:
logger.debug(f"Domain {self.name} received StopIteration, returning from regular_scheduler...")
return
except Exception as e:
if command_line_args.debug:
logger.exception("Regular scheduler task for domain {} failed!".format(self.name))

View file

@ -103,7 +103,7 @@ class DomainManager:
reason = " ".join(split_payload[2:])
if wiki is not None:
logger.debug("Wiki specified in pub/sub message has been found. Erasing the wiki from DB.")
await wiki.remove_wiki_from_db(reason, send_reason=True if reason else False)
await wiki.remove_wiki_from_db(reason, send_reason=True if reason else False, localize_reason=False)
elif split_payload[0] == "DEBUG":
asyncio.current_task().set_name("webhook_update")
if split_payload[1] == "INFO":

View file

@ -1,3 +1,4 @@
import datetime
from typing import Union
@ -70,6 +71,13 @@ class MediaWikiError(Exception):
self.message = f"MediaWiki returned the following errors: {errors}!"
super().__init__(self.message)
class WikiOnTimeout(Exception):
"""A wiki object is set on timeout"""
def __init__(self, timeout: datetime.datetime):
self.message = f"Wiki is currently on timeout."
super().__init__(self.message)
class NoDomain(Exception):
"""When given domain does not exist"""
pass

View file

@ -4,7 +4,7 @@ import aiohttp.web_request
from src.misc import LimitedList
from src.config import settings
from typing import Union, Optional, List
from typing import Union, Optional, List, Tuple
from enum import Enum
@ -14,7 +14,7 @@ class LogType(Enum):
MEDIAWIKI_ERROR = 3
VALUE_UPDATE = 4
SCAN_REASON = 5
REDIRECT = 6
class Log:
"""Log class represents an event that happened to a wiki fetch. Main purpose of those logs is debug and error-tracking."""
@ -59,6 +59,9 @@ class Statistics:
for log in args:
self.logs.append(log)
def last_connection_failures(self):
return last_failures(self.logs)
def filter_by_time(self, time_ago: int, logs: list = None) -> List[Log]: # cannot have self.logs in here as this is evaluated once
"""Returns logs with time between time_ago seconds ago and now"""
time_limit = int(time.time()) - time_ago
@ -71,3 +74,15 @@ class Statistics:
def recent_connection_errors(self) -> int:
"""Count how many connection errors there were recently (2 minutes)"""
return len(self.filter_by_type(LogType.CONNECTION_ERROR, logs=self.filter_by_time(120))) # find connection errors from 2 minutes ago
def last_failures(logs: list[Log]) -> Tuple[List[Log], int]:
result = []
success = None
for log in logs[::-1]:
if log.type.value < 4:
result.insert(0, log)
else:
success = log
break
return result, success or 0

View file

@ -2,18 +2,20 @@ from __future__ import annotations
import datetime
import functools
import math
import pickle
import time
import re
import logging, aiohttp
import asyncio
from contextlib import asynccontextmanager
from datetime import timedelta
import requests
from src.api.util import default_message
from src.misc import prepare_settings, run_hooks
from src.discord.queue import messagequeue, QueueEntry, send_to_discord_webhook
from src.discord.queue import messagequeue, QueueEntry, send_to_discord_webhook, send_unique_generic_to_monitoring
from src.mw_messages import MWMessages
from src.exceptions import *
from src.queue_handler import dbmanager
@ -132,6 +134,7 @@ class Wiki:
self.session_requests = requests.Session()
self.session_requests.headers.update(settings["header"])
self.request_cost = 0 # For tracking amount of times wiki has been requested in given context
self.dont_fetch_before: Optional[datetime.datetime] = None
logger.debug("Creating new wiki object for {}".format(script_url))
def __str__(self):
@ -157,7 +160,8 @@ class Wiki:
"logs": [x.json() for x in self.statistics.logs],
"namespaces": self.namespaces,
"tags": self.tags,
"recache_requested": self.recache_requested
"recache_requested": self.recache_requested,
"dont_fetch_before": self.dont_fetch_before.isoformat(timespec='seconds') if self.dont_fetch_before else "None"
}
if self.domain.name == "fandom.com":
dict_obj.update(last_checked_discussion=self.statistics.last_checked_discussion, last_post=self.statistics.last_post)
@ -313,7 +317,6 @@ class Wiki:
self.rc_targets = target_settings
self.discussion_targets = discussion_targets
def parse_mw_request_info(self, request_data: dict, url: str):
"""A function parsing request JSON message from MediaWiki logging all warnings and raising on MediaWiki errors"""
# any([True for k in request_data.keys() if k in ("error", "errors")])
@ -342,13 +345,14 @@ class Wiki:
Returns:
request_content (dict): a dict resulting from json extraction of HTTP GET request with given json_path
OR
One of the following exceptions:
Raises:
ServerError: When connection with the wiki failed due to server error
ClientError: When connection with the wiki failed due to client error
KeyError: When json_path contained keys that weren't found in response JSON response
BadRequest: When params argument is of wrong type
MediaWikiError: When MediaWiki returns an error
ServerRedirects: When status code for request is either 301 or 302
WikiNotFoundError: When status code for request is either 410 or 404
"""
# Making request
try:
@ -361,7 +365,7 @@ class Wiki:
allow_redirects=allow_redirects)
else:
raise BadRequest(params)
except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError, aiohttp.ContentTypeError) as exc:
except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError, aiohttp.ContentTypeError, aiohttp.ClientConnectionError) as exc:
logger.warning("Reached {error} error for request on link {url}".format(error=repr(exc),
url=self.script_url + str(params)))
raise ServerError
@ -370,11 +374,12 @@ class Wiki:
logger.warning(f"A request to {self.script_url} {params} resulted in {request.status}")
raise ServerError
elif request.status in (301, 302):
logger.critical(
"Redirect detected! Either the wiki given in the script settings (wiki field) is incorrect/the wiki got removed or is giving us the false value. Please provide the real URL to the wiki, current URL redirects to {}".format(
logger.error(
"Redirect detected! Either the wiki given in the script settings (wiki field) is incorrect/the wiki got removed or is giving us the false value. Current URL redirects to {}".format(
request.url))
raise ServerRedirects(self.script_url, request.headers.get("Location", "unknown"))
elif request.status in (410, 404):
self.statistics.update(Log(type=LogType.HTTP_ERROR, title="{} error".format(request.status)))
raise WikiNotFoundError(request.status)
elif 399 < request.status < 500:
logger.error("Request returned ClientError status code on {url}".format(url=request.url))
@ -457,29 +462,55 @@ class Wiki:
"meta": "siteinfo", "utf8": 1, "rcshow": "!bot",
"rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user|userid",
"rclimit": amount, "rctype": "edit|new|log|categorize", "siprop": "namespaces|general"})
try:
response = await self.api_request(params=params)
except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError, ServerRedirects) as e:
logger.error("A connection error occurred while requesting {}".format(params))
raise WikiServerError(e)
return response
def put_wiki_on_timeout(self, timeout: datetime.timedelta):
""":param timeout datetime.timedelta object when the wiki should be processed again"""
logger.debug(f"Putting wiki {self.script_url} on timeout for {str(timeout)}")
self.dont_fetch_before = datetime.datetime.now() + timeout
def is_on_timeout(self) -> bool:
return self.dont_fetch_before > datetime.datetime.now() if self.dont_fetch_before is not None else False
async def scan(self, amount=10):
"""Main track of fetching RecentChanges of a wiki.
:raises WikiServerError
"""
if self.is_on_timeout():
raise WikiOnTimeout(self.dont_fetch_before)
while True: # Trap event in case there are more changes needed to be fetched
try:
request = await self.fetch_wiki(amount=amount)
self.client.last_request = request
except WikiServerError as e:
# If WikiServerError comes up 2 times in recent 2 minutes, this will reraise the exception, otherwise waits 2 seconds and retries
except (aiohttp.ServerTimeoutError, asyncio.TimeoutError, WikiServerError, ServerError) as e:
self.statistics.update(Log(type=LogType.CONNECTION_ERROR, title=str(e.exception)))
if self.statistics.recent_connection_errors() > 9:
raise e
await asyncio.sleep(2.0)
amount_of_failures = len(self.statistics.last_connection_failures()[0])
if amount_of_failures < 2:
await asyncio.sleep(5.0)
continue
elif amount_of_failures == 2: # Put the wiki on timeout 5 minutes per each
self.put_wiki_on_timeout(timedelta(minutes=5))
raise WikiOnTimeout(self.dont_fetch_before)
else:
self.put_wiki_on_timeout(timedelta(seconds=min(21600, int(math.sqrt(2*amount_of_failures)*(amount_of_failures/0.4))*60))) # Max backoff is 6 hours
if amount_of_failures > 30 and self.domain.last_failure_report < (time.time() - 21600): # Report only if domain didn't report something within last 6 hours
await send_unique_generic_to_monitoring(self.script_url + "?CONNERROR",
f"{self.script_url} errors", repr(e))
raise WikiOnTimeout(self.dont_fetch_before)
except ServerRedirects as e:
self.statistics.update(Log(type=LogType.REDIRECT, title=""))
if len(self.statistics.filter_by_type(LogType.REDIRECT)) > 1:
self.put_wiki_on_timeout(timedelta(hours=1) * len(self.statistics.filter_by_type(LogType.REDIRECT)))
await send_unique_generic_to_monitoring(self.script_url+"?REDIRECT", f"{self.script_url} redirects", e.message)
return
else:
self.put_wiki_on_timeout(timedelta(hours=1))
return
if not self.mw_messages or self.recache_requested:
process_cachable(request, self)
try:
@ -549,7 +580,7 @@ class Wiki:
except:
logger.exception("Webhook removal send_reason failure.")
async def remove_wiki_from_db(self, reason: str, send_reason=False):
async def remove_wiki_from_db(self, reason: str, send_reason=False, localize_reason = True):
logger.info(f"Removing a wiki with script_url of {self.script_url} from the database due to {reason}.")
dbmanager.add(("DELETE FROM rcgcdb WHERE wiki = $1", (self.script_url,)))
if not send_reason:
@ -561,7 +592,7 @@ class Wiki:
try: # This is best effort scenario, but I don't plan to add re-tries to this
dc_msg = DiscordMessage("compact", "custom/webhook_removal",
webhooks, content=lang.gettext(
"This recent changes webhook has been removed for `{reason}`!".format(reason=reason)))
"This recent changes webhook has been removed for `{reason}`!".format(reason=lang.gettext(reason) if localize_reason else reason)))
for webhook in webhooks:
await send_to_discord_webhook(dc_msg, webhook, "POST")
except: