mirror of
https://gitlab.com/chicken-riders/RcGcDb.git
synced 2025-02-22 00:44:10 +00:00
Compare commits
3 commits
17b82f8782
...
3d6a2c5ac0
Author | SHA1 | Date | |
---|---|---|---|
|
3d6a2c5ac0 | ||
|
76b19f03b3 | ||
|
fd5ae05740 |
|
@ -36,7 +36,7 @@ if TYPE_CHECKING:
|
|||
rate_limit = 0
|
||||
|
||||
logger = logging.getLogger("rcgcdb.discord.queue")
|
||||
|
||||
message_ids = set() # Technically it gathers IDs forever but I don't anticipate it being a memory hog ever, optionally we could add it to DB but meh
|
||||
|
||||
class QueueEntry:
|
||||
def __init__(self, discord_message, webhooks, wiki, method="POST"):
|
||||
|
@ -340,3 +340,33 @@ async def send_to_discord_webhook(message: [StackedDiscordMessage, DiscordMessag
|
|||
elif method == "PATCH":
|
||||
async with session.request(method=method, url=f"https://discord.com/api/webhooks/{webhook_path}/messages/{message.discord_callback_message_id}", data=repr(message)) as resp:
|
||||
return await handle_discord_http(resp.status, repr(message), resp)
|
||||
|
||||
|
||||
async def send_unique_generic_to_monitoring(message_id: str, title: str, description: str) -> None:
|
||||
"""
|
||||
Function to send messages to monitoring based on unique ID (if the same ID is passed for second time, the message will not send)
|
||||
|
||||
:param message_id: ID for the given message
|
||||
:param title: Title of the embed
|
||||
:param description: Description of the embed
|
||||
:return: None
|
||||
"""
|
||||
global message_ids
|
||||
if message_id in message_ids:
|
||||
return
|
||||
message_ids.add(message_id)
|
||||
discord_message = DiscordMessage("embed", "generic", [""])
|
||||
discord_message["title"] = title
|
||||
discord_message["description"] = description
|
||||
discord_message["description"] = discord_message["description"][0:2000]
|
||||
header = settings["header"]
|
||||
header['Content-Type'] = 'application/json'
|
||||
header['X-RateLimit-Precision'] = "millisecond"
|
||||
try:
|
||||
async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(total=6)) as session:
|
||||
async with session.post("https://discord.com/api/webhooks/{}".format(settings["monitoring_webhook"]),
|
||||
data=repr(discord_message)) as resp:
|
||||
pass
|
||||
except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError):
|
||||
logger.exception(
|
||||
f"Couldn't communicate with Discord as a result of Server Error when trying to signal generic monitoring message {title} for {message_id}!")
|
|
@ -11,6 +11,7 @@ import sys
|
|||
|
||||
import aiohttp
|
||||
|
||||
from src.exceptions import WikiOnTimeout
|
||||
from src.exceptions import WikiNotFoundError
|
||||
from src.misc import LimitedList
|
||||
from src.discord.message import DiscordMessage
|
||||
|
@ -124,14 +125,27 @@ class Domain:
|
|||
self.wikis.move_to_end(wiki.script_url, last=False)
|
||||
logger.debug(f"Added new wiki {wiki.script_url} to domain {self.name}")
|
||||
|
||||
async def run_wiki_scan(self, wiki: src.wiki.Wiki, reason: Optional[str] = None):
|
||||
async def run_wiki_scan(self, wiki: Optional[src.wiki.Wiki], reason: Optional[str] = None):
|
||||
"""
|
||||
|
||||
|
||||
:param wiki: Wiki object to scan
|
||||
:param reason: Reason for scanning put into the logs
|
||||
:return: None
|
||||
:raises StopIteration: When None has been passed as wiki, means there are no more wikis in the queue besides timeouted ones
|
||||
"""
|
||||
if wiki is None:
|
||||
raise StopIteration
|
||||
try:
|
||||
await wiki.scan()
|
||||
except WikiNotFoundError as e:
|
||||
self.wikis.move_to_end(wiki.script_url)
|
||||
logs_for_wiki = wiki.statistics.filter_by_time(60*60)
|
||||
if all([x.type == LogType.HTTP_ERROR for x in logs_for_wiki]) and len(logs_for_wiki) > 10:
|
||||
await wiki.remove_wiki_from_db("This recent changes webhook has been removed for `wiki returning code {}`!".format(e.code), send_reason=True)
|
||||
await wiki.remove_wiki_from_db({404: "wiki deleted", 410: "wiki inaccessible"}.get(e.code), send_reason=True)
|
||||
except WikiOnTimeout:
|
||||
pass
|
||||
else:
|
||||
wiki.statistics.update(Log(type=LogType.SCAN_REASON, title=str(reason)))
|
||||
self.wikis.move_to_end(wiki.script_url)
|
||||
|
||||
|
@ -172,6 +186,11 @@ class Domain:
|
|||
def discord_message_registration(self):
|
||||
self.total_discord_messages_sent += 1
|
||||
|
||||
def find_first_not_on_timeout(self):
|
||||
for wiki in self.wikis.values():
|
||||
if not wiki.is_on_timeout():
|
||||
return wiki
|
||||
|
||||
async def irc_scheduler(self):
|
||||
try:
|
||||
while True:
|
||||
|
@ -186,11 +205,15 @@ class Domain:
|
|||
continue
|
||||
await self.run_wiki_scan(wiki, "IRC feed event")
|
||||
while True: # Iterate until hitting return, we don't have to iterate using for since we are sending wiki to the end anyways
|
||||
wiki: src.wiki.Wiki = next(iter(self.wikis.values()))
|
||||
wiki: src.wiki.Wiki = self.find_first_not_on_timeout()
|
||||
|
||||
if (int(time.time()) - (wiki.statistics.last_checked_rc or 0)) > settings.get("irc_overtime", 3600):
|
||||
await self.run_wiki_scan(wiki, "IRC backup check")
|
||||
else:
|
||||
return # Recently scanned wikis will get at the end of the self.wikis, so we assume what is first hasn't been checked for a while
|
||||
except StopIteration:
|
||||
logger.debug(f"Domain {self.name} received StopIteration, returning from irc_scheduler...")
|
||||
return
|
||||
except Exception as e:
|
||||
if command_line_args.debug:
|
||||
logger.exception("IRC scheduler task for domain {} failed!".format(self.name))
|
||||
|
@ -209,7 +232,10 @@ class Domain:
|
|||
try:
|
||||
while True:
|
||||
await asyncio.sleep(self.calculate_sleep_time(len(self))) # To make sure that we don't spam domains with one wiki every second we calculate a sane timeout for domains with few wikis
|
||||
await self.run_wiki_scan(next(iter(self.wikis.values())), "regular check")
|
||||
await self.run_wiki_scan(self.find_first_not_on_timeout(), "regular check")
|
||||
except StopIteration:
|
||||
logger.debug(f"Domain {self.name} received StopIteration, returning from regular_scheduler...")
|
||||
return
|
||||
except Exception as e:
|
||||
if command_line_args.debug:
|
||||
logger.exception("Regular scheduler task for domain {} failed!".format(self.name))
|
||||
|
|
|
@ -103,7 +103,7 @@ class DomainManager:
|
|||
reason = " ".join(split_payload[2:])
|
||||
if wiki is not None:
|
||||
logger.debug("Wiki specified in pub/sub message has been found. Erasing the wiki from DB.")
|
||||
await wiki.remove_wiki_from_db(reason, send_reason=True if reason else False)
|
||||
await wiki.remove_wiki_from_db(reason, send_reason=True if reason else False, localize_reason=False)
|
||||
elif split_payload[0] == "DEBUG":
|
||||
asyncio.current_task().set_name("webhook_update")
|
||||
if split_payload[1] == "INFO":
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import datetime
|
||||
from typing import Union
|
||||
|
||||
|
||||
|
@ -70,6 +71,13 @@ class MediaWikiError(Exception):
|
|||
self.message = f"MediaWiki returned the following errors: {errors}!"
|
||||
super().__init__(self.message)
|
||||
|
||||
class WikiOnTimeout(Exception):
|
||||
"""A wiki object is set on timeout"""
|
||||
|
||||
def __init__(self, timeout: datetime.datetime):
|
||||
self.message = f"Wiki is currently on timeout."
|
||||
super().__init__(self.message)
|
||||
|
||||
class NoDomain(Exception):
|
||||
"""When given domain does not exist"""
|
||||
pass
|
||||
|
|
|
@ -4,7 +4,7 @@ import aiohttp.web_request
|
|||
|
||||
from src.misc import LimitedList
|
||||
from src.config import settings
|
||||
from typing import Union, Optional, List
|
||||
from typing import Union, Optional, List, Tuple
|
||||
from enum import Enum
|
||||
|
||||
|
||||
|
@ -14,7 +14,7 @@ class LogType(Enum):
|
|||
MEDIAWIKI_ERROR = 3
|
||||
VALUE_UPDATE = 4
|
||||
SCAN_REASON = 5
|
||||
|
||||
REDIRECT = 6
|
||||
|
||||
class Log:
|
||||
"""Log class represents an event that happened to a wiki fetch. Main purpose of those logs is debug and error-tracking."""
|
||||
|
@ -59,6 +59,9 @@ class Statistics:
|
|||
for log in args:
|
||||
self.logs.append(log)
|
||||
|
||||
def last_connection_failures(self):
|
||||
return last_failures(self.logs)
|
||||
|
||||
def filter_by_time(self, time_ago: int, logs: list = None) -> List[Log]: # cannot have self.logs in here as this is evaluated once
|
||||
"""Returns logs with time between time_ago seconds ago and now"""
|
||||
time_limit = int(time.time()) - time_ago
|
||||
|
@ -71,3 +74,15 @@ class Statistics:
|
|||
def recent_connection_errors(self) -> int:
|
||||
"""Count how many connection errors there were recently (2 minutes)"""
|
||||
return len(self.filter_by_type(LogType.CONNECTION_ERROR, logs=self.filter_by_time(120))) # find connection errors from 2 minutes ago
|
||||
|
||||
|
||||
def last_failures(logs: list[Log]) -> Tuple[List[Log], int]:
|
||||
result = []
|
||||
success = None
|
||||
for log in logs[::-1]:
|
||||
if log.type.value < 4:
|
||||
result.insert(0, log)
|
||||
else:
|
||||
success = log
|
||||
break
|
||||
return result, success or 0
|
69
src/wiki.py
69
src/wiki.py
|
@ -2,18 +2,20 @@ from __future__ import annotations
|
|||
|
||||
import datetime
|
||||
import functools
|
||||
import math
|
||||
import pickle
|
||||
import time
|
||||
import re
|
||||
import logging, aiohttp
|
||||
import asyncio
|
||||
from contextlib import asynccontextmanager
|
||||
from datetime import timedelta
|
||||
|
||||
import requests
|
||||
|
||||
from src.api.util import default_message
|
||||
from src.misc import prepare_settings, run_hooks
|
||||
from src.discord.queue import messagequeue, QueueEntry, send_to_discord_webhook
|
||||
from src.discord.queue import messagequeue, QueueEntry, send_to_discord_webhook, send_unique_generic_to_monitoring
|
||||
from src.mw_messages import MWMessages
|
||||
from src.exceptions import *
|
||||
from src.queue_handler import dbmanager
|
||||
|
@ -132,6 +134,7 @@ class Wiki:
|
|||
self.session_requests = requests.Session()
|
||||
self.session_requests.headers.update(settings["header"])
|
||||
self.request_cost = 0 # For tracking amount of times wiki has been requested in given context
|
||||
self.dont_fetch_before: Optional[datetime.datetime] = None
|
||||
logger.debug("Creating new wiki object for {}".format(script_url))
|
||||
|
||||
def __str__(self):
|
||||
|
@ -157,7 +160,8 @@ class Wiki:
|
|||
"logs": [x.json() for x in self.statistics.logs],
|
||||
"namespaces": self.namespaces,
|
||||
"tags": self.tags,
|
||||
"recache_requested": self.recache_requested
|
||||
"recache_requested": self.recache_requested,
|
||||
"dont_fetch_before": self.dont_fetch_before.isoformat(timespec='seconds') if self.dont_fetch_before else "None"
|
||||
}
|
||||
if self.domain.name == "fandom.com":
|
||||
dict_obj.update(last_checked_discussion=self.statistics.last_checked_discussion, last_post=self.statistics.last_post)
|
||||
|
@ -313,7 +317,6 @@ class Wiki:
|
|||
self.rc_targets = target_settings
|
||||
self.discussion_targets = discussion_targets
|
||||
|
||||
|
||||
def parse_mw_request_info(self, request_data: dict, url: str):
|
||||
"""A function parsing request JSON message from MediaWiki logging all warnings and raising on MediaWiki errors"""
|
||||
# any([True for k in request_data.keys() if k in ("error", "errors")])
|
||||
|
@ -342,13 +345,14 @@ class Wiki:
|
|||
Returns:
|
||||
|
||||
request_content (dict): a dict resulting from json extraction of HTTP GET request with given json_path
|
||||
OR
|
||||
One of the following exceptions:
|
||||
Raises:
|
||||
ServerError: When connection with the wiki failed due to server error
|
||||
ClientError: When connection with the wiki failed due to client error
|
||||
KeyError: When json_path contained keys that weren't found in response JSON response
|
||||
BadRequest: When params argument is of wrong type
|
||||
MediaWikiError: When MediaWiki returns an error
|
||||
ServerRedirects: When status code for request is either 301 or 302
|
||||
WikiNotFoundError: When status code for request is either 410 or 404
|
||||
"""
|
||||
# Making request
|
||||
try:
|
||||
|
@ -361,7 +365,7 @@ class Wiki:
|
|||
allow_redirects=allow_redirects)
|
||||
else:
|
||||
raise BadRequest(params)
|
||||
except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError, aiohttp.ContentTypeError) as exc:
|
||||
except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError, aiohttp.ContentTypeError, aiohttp.ClientConnectionError) as exc:
|
||||
logger.warning("Reached {error} error for request on link {url}".format(error=repr(exc),
|
||||
url=self.script_url + str(params)))
|
||||
raise ServerError
|
||||
|
@ -370,11 +374,12 @@ class Wiki:
|
|||
logger.warning(f"A request to {self.script_url} {params} resulted in {request.status}")
|
||||
raise ServerError
|
||||
elif request.status in (301, 302):
|
||||
logger.critical(
|
||||
"Redirect detected! Either the wiki given in the script settings (wiki field) is incorrect/the wiki got removed or is giving us the false value. Please provide the real URL to the wiki, current URL redirects to {}".format(
|
||||
logger.error(
|
||||
"Redirect detected! Either the wiki given in the script settings (wiki field) is incorrect/the wiki got removed or is giving us the false value. Current URL redirects to {}".format(
|
||||
request.url))
|
||||
raise ServerRedirects(self.script_url, request.headers.get("Location", "unknown"))
|
||||
elif request.status in (410, 404):
|
||||
self.statistics.update(Log(type=LogType.HTTP_ERROR, title="{} error".format(request.status)))
|
||||
raise WikiNotFoundError(request.status)
|
||||
elif 399 < request.status < 500:
|
||||
logger.error("Request returned ClientError status code on {url}".format(url=request.url))
|
||||
|
@ -457,29 +462,55 @@ class Wiki:
|
|||
"meta": "siteinfo", "utf8": 1, "rcshow": "!bot",
|
||||
"rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user|userid",
|
||||
"rclimit": amount, "rctype": "edit|new|log|categorize", "siprop": "namespaces|general"})
|
||||
try:
|
||||
response = await self.api_request(params=params)
|
||||
except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError, ServerRedirects) as e:
|
||||
logger.error("A connection error occurred while requesting {}".format(params))
|
||||
raise WikiServerError(e)
|
||||
return response
|
||||
|
||||
def put_wiki_on_timeout(self, timeout: datetime.timedelta):
|
||||
""":param timeout datetime.timedelta object when the wiki should be processed again"""
|
||||
logger.debug(f"Putting wiki {self.script_url} on timeout for {str(timeout)}")
|
||||
self.dont_fetch_before = datetime.datetime.now() + timeout
|
||||
|
||||
def is_on_timeout(self) -> bool:
|
||||
return self.dont_fetch_before > datetime.datetime.now() if self.dont_fetch_before is not None else False
|
||||
|
||||
async def scan(self, amount=10):
|
||||
"""Main track of fetching RecentChanges of a wiki.
|
||||
|
||||
:raises WikiServerError
|
||||
"""
|
||||
if self.is_on_timeout():
|
||||
raise WikiOnTimeout(self.dont_fetch_before)
|
||||
|
||||
while True: # Trap event in case there are more changes needed to be fetched
|
||||
try:
|
||||
request = await self.fetch_wiki(amount=amount)
|
||||
self.client.last_request = request
|
||||
except WikiServerError as e:
|
||||
# If WikiServerError comes up 2 times in recent 2 minutes, this will reraise the exception, otherwise waits 2 seconds and retries
|
||||
except (aiohttp.ServerTimeoutError, asyncio.TimeoutError, WikiServerError, ServerError) as e:
|
||||
self.statistics.update(Log(type=LogType.CONNECTION_ERROR, title=str(e.exception)))
|
||||
if self.statistics.recent_connection_errors() > 9:
|
||||
raise e
|
||||
await asyncio.sleep(2.0)
|
||||
amount_of_failures = len(self.statistics.last_connection_failures()[0])
|
||||
if amount_of_failures < 2:
|
||||
await asyncio.sleep(5.0)
|
||||
continue
|
||||
elif amount_of_failures == 2: # Put the wiki on timeout 5 minutes per each
|
||||
self.put_wiki_on_timeout(timedelta(minutes=5))
|
||||
raise WikiOnTimeout(self.dont_fetch_before)
|
||||
else:
|
||||
self.put_wiki_on_timeout(timedelta(seconds=min(21600, int(math.sqrt(2*amount_of_failures)*(amount_of_failures/0.4))*60))) # Max backoff is 6 hours
|
||||
if amount_of_failures > 30 and self.domain.last_failure_report < (time.time() - 21600): # Report only if domain didn't report something within last 6 hours
|
||||
await send_unique_generic_to_monitoring(self.script_url + "?CONNERROR",
|
||||
f"{self.script_url} errors", repr(e))
|
||||
raise WikiOnTimeout(self.dont_fetch_before)
|
||||
|
||||
except ServerRedirects as e:
|
||||
self.statistics.update(Log(type=LogType.REDIRECT, title=""))
|
||||
|
||||
if len(self.statistics.filter_by_type(LogType.REDIRECT)) > 1:
|
||||
self.put_wiki_on_timeout(timedelta(hours=1) * len(self.statistics.filter_by_type(LogType.REDIRECT)))
|
||||
await send_unique_generic_to_monitoring(self.script_url+"?REDIRECT", f"{self.script_url} redirects", e.message)
|
||||
return
|
||||
else:
|
||||
self.put_wiki_on_timeout(timedelta(hours=1))
|
||||
return
|
||||
if not self.mw_messages or self.recache_requested:
|
||||
process_cachable(request, self)
|
||||
try:
|
||||
|
@ -549,7 +580,7 @@ class Wiki:
|
|||
except:
|
||||
logger.exception("Webhook removal send_reason failure.")
|
||||
|
||||
async def remove_wiki_from_db(self, reason: str, send_reason=False):
|
||||
async def remove_wiki_from_db(self, reason: str, send_reason=False, localize_reason = True):
|
||||
logger.info(f"Removing a wiki with script_url of {self.script_url} from the database due to {reason}.")
|
||||
dbmanager.add(("DELETE FROM rcgcdb WHERE wiki = $1", (self.script_url,)))
|
||||
if not send_reason:
|
||||
|
@ -561,7 +592,7 @@ class Wiki:
|
|||
try: # This is best effort scenario, but I don't plan to add re-tries to this
|
||||
dc_msg = DiscordMessage("compact", "custom/webhook_removal",
|
||||
webhooks, content=lang.gettext(
|
||||
"This recent changes webhook has been removed for `{reason}`!".format(reason=reason)))
|
||||
"This recent changes webhook has been removed for `{reason}`!".format(reason=lang.gettext(reason) if localize_reason else reason)))
|
||||
for webhook in webhooks:
|
||||
await send_to_discord_webhook(dc_msg, webhook, "POST")
|
||||
except:
|
||||
|
|
Loading…
Reference in a new issue