Backuping work, added statistics class and redis content

This commit is contained in:
Frisk 2021-06-05 13:12:23 +02:00
parent 04f45c33e9
commit d9ddd30b1b
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
8 changed files with 233 additions and 22 deletions

View file

@ -9,6 +9,8 @@ from collections import defaultdict, namedtuple
from typing import Generator from typing import Generator
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from redis_connector import redis
from src.argparser import command_line_args from src.argparser import command_line_args
from src.config import settings from src.config import settings
from src.database import db from src.database import db
@ -33,7 +35,7 @@ if command_line_args.debug:
# Log Fail states with structure wiki_url: number of fail states # Log Fail states with structure wiki_url: number of fail states
all_wikis: dict = {} all_wikis: dict = {}
mw_msgs: dict = {} # will have the type of id: tuple
main_tasks: dict = {} main_tasks: dict = {}
@ -50,15 +52,6 @@ async def populate_wikis():
queue_limit = settings.get("queue_limit", 30) queue_limit = settings.get("queue_limit", 30)
QueuedWiki = namedtuple("QueuedWiki", ['url', 'amount']) QueuedWiki = namedtuple("QueuedWiki", ['url', 'amount'])
class LimitedList(list):
def __init__(self, *args):
list.__init__(self, *args)
def append(self, obj: QueuedWiki, forced: bool = False) -> None:
if len(self) < queue_limit or forced:
self.insert(len(self), obj)
return
raise ListFull
@ -550,11 +543,16 @@ def shutdown(loop, signal=None):
async def main_loop(): async def main_loop():
global main_tasks global main_tasks
# Fix some asyncio problems
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
nest_asyncio.apply(loop) nest_asyncio.apply(loop)
# Setup database connection
await db.setup_connection() await db.setup_connection()
logger.debug("Connection type: {}".format(db.connection)) logger.debug("Connection type: {}".format(db.connection))
await populate_wikis() await populate_wikis()
await redis.connect()
await redis.pubsub()
domains.run_all_domains()
try: try:
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals: for s in signals:

View file

@ -35,8 +35,15 @@ class Domain:
def set_irc(self, irc_client: src.irc_feed.AioIRCCat): def set_irc(self, irc_client: src.irc_feed.AioIRCCat):
self.irc = irc_client self.irc = irc_client
def stop_task(self):
"""Cancells the task"""
self.task.cancel() # Be aware that cancelling the task may take time
def run_domain(self): def run_domain(self):
self.task = asyncio.create_task(self.run_wiki_check()) if not self.task or self.task.cancelled():
self.task = asyncio.create_task(self.run_wiki_check())
else:
logger.error(f"Tried to start a task for domain {self.name} however the task already exists!")
def add_wiki(self, wiki: src.wiki.Wiki, first=False): def add_wiki(self, wiki: src.wiki.Wiki, first=False):
"""Adds a wiki to domain list. """Adds a wiki to domain list.
@ -54,7 +61,7 @@ class Domain:
self.rate_limiter.timeout_add(1.0) self.rate_limiter.timeout_add(1.0)
async def irc_scheduler(self): async def irc_scheduler(self):
while 1: while True:
try: try:
wiki_url = self.irc.updated_wikis.pop() wiki_url = self.irc.updated_wikis.pop()
except KeyError: except KeyError:
@ -72,14 +79,15 @@ class Domain:
return # Recently scanned wikis will get at the end of the self.wikis, so we assume what is first hasn't been checked for a while return # Recently scanned wikis will get at the end of the self.wikis, so we assume what is first hasn't been checked for a while
async def regular_scheduler(self): async def regular_scheduler(self):
while 1: while True:
additional_time = max((-25*len(self))+150, 0) await asyncio.sleep(max((-25*len(self))+150, 1)) # To make sure that we don't spam domains with one wiki every second we calculate a sane timeout for domains with few wikis
await self.run_wiki_scan(self.wikis.pop())
async def run_wiki_check(self): async def run_wiki_check(self):
if self.irc: if self.irc:
while: while True:
await self.irc_scheduler() await self.irc_scheduler()
await asyncio.sleep(10.0)
else: else:
await self.regular_scheduler() await self.regular_scheduler()

View file

@ -39,5 +39,8 @@ class DomainManager:
self.domains[name] = domain_object self.domains[name] = domain_object
return self.domains[name] return self.domains[name]
async def run_all_domains(self):
for domain in self.domains.values():
domain.run_domain()
domains = DomainManager() domains = DomainManager()

View file

@ -23,4 +23,31 @@ class ListFull(Exception):
pass pass
class EmbedListFull(Exception): class EmbedListFull(Exception):
pass pass
class ServerError(Exception):
"""Exception for when a request fails because of Server error"""
pass
class ClientError(Exception):
"""Exception for when a request failes because of Client error"""
def __init__(self, request):
self.message = f"Client have made wrong request! {request.status_code}: {request.reason}. {request.text}"
super().__init__(self.message)
class BadRequest(Exception):
"""When type of parameter given to request making method is invalid"""
def __init__(self, object_type):
self.message = f"params must be either a strong or OrderedDict object, not {type(object_type)}!"
super().__init__(self.message)
class MediaWikiError(Exception):
"""When MediaWiki responds with an error"""
def __init__(self, errors):
self.message = f"MediaWiki returned the following errors: {errors}!"
super().__init__(self.message)

View file

@ -1,14 +1,36 @@
import asyncio import asyncio
import aioredis import aioredis
import async_timeout
import logging
from typing import Optional
from src.config import settings from src.config import settings
logger = logging.getLogger("rcgcdb.redisconnector")
class Redis: class Redis:
def __init__(self): def __init__(self):
self.connection = None self.pub_connection: Optional[aioredis.connection] = None
self.stat_connection: Optional[aioredis.connection] = None
async def reader(self):
"""Based on code from https://aioredis.readthedocs.io/en/latest/getting-started/#pubsub-mode"""
while True:
try:
async with async_timeout.timeout(1):
message = await self.pub_connection.get_message(ignore_subscribe_messages=True)
if message is not None:
logger.debug(f"(Reader) Message Received: {message}")
await asyncio.sleep(1.0)
except asyncio.TimeoutError: # TODO Better handler
pass
async def connect(self): async def connect(self):
self.connection = await aioredis.create_pool("redis://" + settings["redis_host"], encoding="UTF-8") self.pub_connection = await aioredis.create_connection("redis://" + settings["redis_host"], encoding="UTF-8")
self.stat_connection = await aioredis.create_connection("redis://" + settings["redis_host"], encoding="UTF-8")
async def pubsub(self):
await self.pub_connection.subscribe("rcgcdb_updates")
asyncio.create_task(self.reader())
redis = Redis() redis = Redis()

32
src/statistics.py Normal file
View file

@ -0,0 +1,32 @@
from src.config import settings
from typing import Union, Optional
queue_limit = settings.get("queue_limit", 30)
class Log:
def __init__(self, **kwargs):
class LimitedList(list):
def __init__(self, *args):
list.__init__(self, *args)
def append(self, obj: Log) -> None:
if len(self) > queue_limit:
self.pop()
class Statistics:
def __init__(self, rc_id: int, discussion_id: int):
self.last_checked_rc: Optional[int] = None
self.last_action: Optional[int] = rc_id
self.last_checked_discussion: Optional[int] = None
self.last_post: Optional[int] = discussion_id
self.logs: LimitedList = LimitedList()
def update(self, *args: Log, **kwargs: dict[str, Union[float, int]]):
for key, value in kwargs:
self.__setattr__(key, value)
for log in args:
self.logs.append(log)

View file

@ -8,24 +8,142 @@ from src.formatters.discussions import feeds_embed_formatter, feeds_compact_form
from src.misc import parse_link from src.misc import parse_link
from src.i18n import langs from src.i18n import langs
from src.wiki_ratelimiter import RateLimiter from src.wiki_ratelimiter import RateLimiter
from statistics import Statistics
import sqlite3 import sqlite3
import src.discord import src.discord
import asyncio import asyncio
from src.config import settings from src.config import settings
# noinspection PyPackageRequirements # noinspection PyPackageRequirements
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from collections import OrderedDict
from typing import Union
logger = logging.getLogger("rcgcdb.wiki") logger = logging.getLogger("rcgcdb.wiki")
class Wiki: class Wiki:
def __init__(self, script_url: str, rc_id: int, discussion_id: int): def __init__(self, script_url: str, rc_id: int, discussion_id: int):
self.script_url = script_url self.script_url: str = script_url
self.session = aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(6.0)) self.session = aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(6.0))
self.statistics = Statistics() self.statistics: Statistics = Statistics(rc_id, discussion_id)
self.fail_times: int = 0
self.mw_messages: MWMessagesHashmap = MWMessagesHashmap()
@property @property
def rc_id(self): def rc_id(self):
return self.statistics.rc_id return self.statistics.last_action
def downtime_controller(self, down):
if down:
self.fail_times += 1
else:
self.fail_times -= 1
def parse_mw_request_info(self, request_data: dict, url: str):
"""A function parsing request JSON message from MediaWiki logging all warnings and raising on MediaWiki errors"""
# any([True for k in request_data.keys() if k in ("error", "errors")])
errors: list = request_data.get("errors", {}) # Is it ugly? I don't know tbh
if errors:
raise MediaWikiError(str(errors))
warnings: list = request_data.get("warnings", {})
if warnings:
for warning in warnings:
logger.warning("MediaWiki returned the following warning: {code} - {text} on {url}.".format(
code=warning["code"], text=warning.get("text", warning.get("*", "")), url=url
))
return request_data
async def api_request(self, params: Union[str, OrderedDict], *json_path: str, timeout: int = 10,
allow_redirects: bool = False) -> dict:
"""Method to GET request data from the wiki's API with error handling including recognition of MediaWiki errors.
Parameters:
params (str, OrderedDict): a string or collections.OrderedDict object containing query parameters
json_path (str): *args taking strings as values. After request is parsed as json it will extract data from given json path
timeout (int, float) (default=10): int or float limiting time required for receiving a full response from a server before returning TimeoutError
allow_redirects (bool) (default=False): switches whether the request should follow redirects or not
Returns:
request_content (dict): a dict resulting from json extraction of HTTP GET request with given json_path
OR
One of the following exceptions:
ServerError: When connection with the wiki failed due to server error
ClientError: When connection with the wiki failed due to client error
KeyError: When json_path contained keys that weren't found in response JSON response
BadRequest: When params argument is of wrong type
MediaWikiError: When MediaWiki returns an error
"""
# Making request
try:
if isinstance(params,
str): # Todo Make it so there are some default arguments like warning/error format appended
request = await self.session.get(self.script_url + "api.php?" + params + "&errorformat=raw", timeout=timeout,
allow_redirects=allow_redirects)
elif isinstance(params, OrderedDict):
params["errorformat"] = "raw"
request = await self.session.get(self.script_url + "api.php", params=params, timeout=timeout,
allow_redirects=allow_redirects)
else:
raise BadRequest(params)
except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError) as exc:
logger.warning("Reached {error} error for request on link {url}".format(error=repr(exc),
url=self.script_url + str(params)))
self.downtime_controller(True)
raise ServerError
# Catching HTTP errors
if 499 < request.status < 600:
self.downtime_controller(True)
raise ServerError
elif request.status == 302:
logger.critical(
"Redirect detected! Either the wiki given in the script settings (wiki field) is incorrect/the wiki got removed or is giving us the false value. Please provide the real URL to the wiki, current URL redirects to {}".format(
request.url))
elif 399 < request.status < 500:
logger.error("Request returned ClientError status code on {url}".format(url=request.url))
raise ClientError(request)
else:
# JSON Extraction
try:
request_json = self.parse_mw_request_info(await request.json(encoding="UTF-8"), str(request.url))
for item in json_path:
request_json = request_json[item]
except ValueError:
logger.warning("ValueError when extracting JSON data on {url}".format(url=request.url))
self.downtime_controller(True)
raise ServerError
except MediaWikiError:
logger.exception("MediaWiki error on request: {}".format(request.url))
raise
except KeyError:
logger.exception("KeyError while iterating over json_path, full response: {}".format(request.json()))
raise
return request_json
async def fetch_wiki(self, extended, script_path, session: aiohttp.ClientSession, ratelimiter: RateLimiter,
amount=20) -> aiohttp.ClientResponse:
await ratelimiter.timeout_wait()
if extended:
params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges",
"meta": "allmessages|siteinfo",
"utf8": 1, "tglimit": "max", "tgprop": "displayname",
"rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user",
"rclimit": amount, "rcshow": "!bot", "rctype": "edit|new|log|categorize",
"ammessages": "recentchanges-page-added-to-category|recentchanges-page-removed-from-category|recentchanges-page-added-to-category-bundled|recentchanges-page-removed-from-category-bundled",
"amenableparser": 1, "amincludelocal": 1, "siprop": "namespaces|general"})
else:
params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges",
"meta": "siteinfo", "utf8": 1,
"tglimit": "max", "rcshow": "!bot", "tgprop": "displayname",
"rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user",
"rclimit": amount, "rctype": "edit|new|log|categorize", "siprop": "namespaces|general"})
try:
response = await self.api_request(params=params)
ratelimiter.timeout_add(1.0)
except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError):
logger.error("A connection error occurred while requesting {}".format(params))
raise WikiServerError
return response
@dataclass @dataclass
class Wiki_old: class Wiki_old:

View file

@ -17,3 +17,6 @@ class RateLimiter:
#logger.debug("Waiting {}".format(calculated_timeout)) #logger.debug("Waiting {}".format(calculated_timeout))
if calculated_timeout > 0: if calculated_timeout > 0:
await asyncio.sleep(calculated_timeout) await asyncio.sleep(calculated_timeout)
def timeout_raw(self):
return self.timeout_until - time.time()