diff --git a/src/bot.py b/src/bot.py index e8b9036..e1f7128 100644 --- a/src/bot.py +++ b/src/bot.py @@ -9,6 +9,8 @@ from collections import defaultdict, namedtuple from typing import Generator from contextlib import asynccontextmanager + +from redis_connector import redis from src.argparser import command_line_args from src.config import settings from src.database import db @@ -33,7 +35,7 @@ if command_line_args.debug: # Log Fail states with structure wiki_url: number of fail states all_wikis: dict = {} -mw_msgs: dict = {} # will have the type of id: tuple + main_tasks: dict = {} @@ -50,15 +52,6 @@ async def populate_wikis(): queue_limit = settings.get("queue_limit", 30) QueuedWiki = namedtuple("QueuedWiki", ['url', 'amount']) -class LimitedList(list): - def __init__(self, *args): - list.__init__(self, *args) - - def append(self, obj: QueuedWiki, forced: bool = False) -> None: - if len(self) < queue_limit or forced: - self.insert(len(self), obj) - return - raise ListFull @@ -550,11 +543,16 @@ def shutdown(loop, signal=None): async def main_loop(): global main_tasks + # Fix some asyncio problems loop = asyncio.get_event_loop() nest_asyncio.apply(loop) + # Setup database connection await db.setup_connection() logger.debug("Connection type: {}".format(db.connection)) await populate_wikis() + await redis.connect() + await redis.pubsub() + domains.run_all_domains() try: signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) for s in signals: diff --git a/src/domain.py b/src/domain.py index 580aa24..20a4f6a 100644 --- a/src/domain.py +++ b/src/domain.py @@ -35,8 +35,15 @@ class Domain: def set_irc(self, irc_client: src.irc_feed.AioIRCCat): self.irc = irc_client + def stop_task(self): + """Cancells the task""" + self.task.cancel() # Be aware that cancelling the task may take time + def run_domain(self): - self.task = asyncio.create_task(self.run_wiki_check()) + if not self.task or self.task.cancelled(): + self.task = asyncio.create_task(self.run_wiki_check()) + else: + logger.error(f"Tried to start a task for domain {self.name} however the task already exists!") def add_wiki(self, wiki: src.wiki.Wiki, first=False): """Adds a wiki to domain list. @@ -54,7 +61,7 @@ class Domain: self.rate_limiter.timeout_add(1.0) async def irc_scheduler(self): - while 1: + while True: try: wiki_url = self.irc.updated_wikis.pop() except KeyError: @@ -72,14 +79,15 @@ class Domain: return # Recently scanned wikis will get at the end of the self.wikis, so we assume what is first hasn't been checked for a while async def regular_scheduler(self): - while 1: - additional_time = max((-25*len(self))+150, 0) - + while True: + await asyncio.sleep(max((-25*len(self))+150, 1)) # To make sure that we don't spam domains with one wiki every second we calculate a sane timeout for domains with few wikis + await self.run_wiki_scan(self.wikis.pop()) async def run_wiki_check(self): if self.irc: - while: + while True: await self.irc_scheduler() + await asyncio.sleep(10.0) else: await self.regular_scheduler() diff --git a/src/domain_manager.py b/src/domain_manager.py index 9ac0999..050ae66 100644 --- a/src/domain_manager.py +++ b/src/domain_manager.py @@ -39,5 +39,8 @@ class DomainManager: self.domains[name] = domain_object return self.domains[name] + async def run_all_domains(self): + for domain in self.domains.values(): + domain.run_domain() domains = DomainManager() diff --git a/src/exceptions.py b/src/exceptions.py index a6c3f68..02f16da 100644 --- a/src/exceptions.py +++ b/src/exceptions.py @@ -23,4 +23,31 @@ class ListFull(Exception): pass class EmbedListFull(Exception): - pass \ No newline at end of file + pass + +class ServerError(Exception): + """Exception for when a request fails because of Server error""" + pass + + +class ClientError(Exception): + """Exception for when a request failes because of Client error""" + + def __init__(self, request): + self.message = f"Client have made wrong request! {request.status_code}: {request.reason}. {request.text}" + super().__init__(self.message) + + +class BadRequest(Exception): + """When type of parameter given to request making method is invalid""" + def __init__(self, object_type): + self.message = f"params must be either a strong or OrderedDict object, not {type(object_type)}!" + super().__init__(self.message) + + +class MediaWikiError(Exception): + """When MediaWiki responds with an error""" + def __init__(self, errors): + self.message = f"MediaWiki returned the following errors: {errors}!" + super().__init__(self.message) + diff --git a/src/redis_connector.py b/src/redis_connector.py index 215d0b8..ffb1ad3 100644 --- a/src/redis_connector.py +++ b/src/redis_connector.py @@ -1,14 +1,36 @@ import asyncio import aioredis +import async_timeout +import logging +from typing import Optional from src.config import settings +logger = logging.getLogger("rcgcdb.redisconnector") class Redis: def __init__(self): - self.connection = None + self.pub_connection: Optional[aioredis.connection] = None + self.stat_connection: Optional[aioredis.connection] = None + + async def reader(self): + """Based on code from https://aioredis.readthedocs.io/en/latest/getting-started/#pubsub-mode""" + while True: + try: + async with async_timeout.timeout(1): + message = await self.pub_connection.get_message(ignore_subscribe_messages=True) + if message is not None: + logger.debug(f"(Reader) Message Received: {message}") + await asyncio.sleep(1.0) + except asyncio.TimeoutError: # TODO Better handler + pass async def connect(self): - self.connection = await aioredis.create_pool("redis://" + settings["redis_host"], encoding="UTF-8") + self.pub_connection = await aioredis.create_connection("redis://" + settings["redis_host"], encoding="UTF-8") + self.stat_connection = await aioredis.create_connection("redis://" + settings["redis_host"], encoding="UTF-8") + + async def pubsub(self): + await self.pub_connection.subscribe("rcgcdb_updates") + asyncio.create_task(self.reader()) redis = Redis() diff --git a/src/statistics.py b/src/statistics.py new file mode 100644 index 0000000..045aa3d --- /dev/null +++ b/src/statistics.py @@ -0,0 +1,32 @@ +from src.config import settings +from typing import Union, Optional + +queue_limit = settings.get("queue_limit", 30) + +class Log: + def __init__(self, **kwargs): + + + +class LimitedList(list): + def __init__(self, *args): + list.__init__(self, *args) + + def append(self, obj: Log) -> None: + if len(self) > queue_limit: + self.pop() + + +class Statistics: + def __init__(self, rc_id: int, discussion_id: int): + self.last_checked_rc: Optional[int] = None + self.last_action: Optional[int] = rc_id + self.last_checked_discussion: Optional[int] = None + self.last_post: Optional[int] = discussion_id + self.logs: LimitedList = LimitedList() + + def update(self, *args: Log, **kwargs: dict[str, Union[float, int]]): + for key, value in kwargs: + self.__setattr__(key, value) + for log in args: + self.logs.append(log) \ No newline at end of file diff --git a/src/wiki.py b/src/wiki.py index 49d0e69..96c3399 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -8,24 +8,142 @@ from src.formatters.discussions import feeds_embed_formatter, feeds_compact_form from src.misc import parse_link from src.i18n import langs from src.wiki_ratelimiter import RateLimiter +from statistics import Statistics import sqlite3 import src.discord import asyncio from src.config import settings # noinspection PyPackageRequirements from bs4 import BeautifulSoup +from collections import OrderedDict +from typing import Union logger = logging.getLogger("rcgcdb.wiki") class Wiki: def __init__(self, script_url: str, rc_id: int, discussion_id: int): - self.script_url = script_url + self.script_url: str = script_url self.session = aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(6.0)) - self.statistics = Statistics() + self.statistics: Statistics = Statistics(rc_id, discussion_id) + self.fail_times: int = 0 + self.mw_messages: MWMessagesHashmap = MWMessagesHashmap() @property def rc_id(self): - return self.statistics.rc_id + return self.statistics.last_action + + def downtime_controller(self, down): + if down: + self.fail_times += 1 + else: + self.fail_times -= 1 + + def parse_mw_request_info(self, request_data: dict, url: str): + """A function parsing request JSON message from MediaWiki logging all warnings and raising on MediaWiki errors""" + # any([True for k in request_data.keys() if k in ("error", "errors")]) + errors: list = request_data.get("errors", {}) # Is it ugly? I don't know tbh + if errors: + raise MediaWikiError(str(errors)) + warnings: list = request_data.get("warnings", {}) + if warnings: + for warning in warnings: + logger.warning("MediaWiki returned the following warning: {code} - {text} on {url}.".format( + code=warning["code"], text=warning.get("text", warning.get("*", "")), url=url + )) + return request_data + + async def api_request(self, params: Union[str, OrderedDict], *json_path: str, timeout: int = 10, + allow_redirects: bool = False) -> dict: + """Method to GET request data from the wiki's API with error handling including recognition of MediaWiki errors. + + Parameters: + + params (str, OrderedDict): a string or collections.OrderedDict object containing query parameters + json_path (str): *args taking strings as values. After request is parsed as json it will extract data from given json path + timeout (int, float) (default=10): int or float limiting time required for receiving a full response from a server before returning TimeoutError + allow_redirects (bool) (default=False): switches whether the request should follow redirects or not + + Returns: + + request_content (dict): a dict resulting from json extraction of HTTP GET request with given json_path + OR + One of the following exceptions: + ServerError: When connection with the wiki failed due to server error + ClientError: When connection with the wiki failed due to client error + KeyError: When json_path contained keys that weren't found in response JSON response + BadRequest: When params argument is of wrong type + MediaWikiError: When MediaWiki returns an error + """ + # Making request + try: + if isinstance(params, + str): # Todo Make it so there are some default arguments like warning/error format appended + request = await self.session.get(self.script_url + "api.php?" + params + "&errorformat=raw", timeout=timeout, + allow_redirects=allow_redirects) + elif isinstance(params, OrderedDict): + params["errorformat"] = "raw" + request = await self.session.get(self.script_url + "api.php", params=params, timeout=timeout, + allow_redirects=allow_redirects) + else: + raise BadRequest(params) + except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError) as exc: + logger.warning("Reached {error} error for request on link {url}".format(error=repr(exc), + url=self.script_url + str(params))) + self.downtime_controller(True) + raise ServerError + # Catching HTTP errors + if 499 < request.status < 600: + self.downtime_controller(True) + raise ServerError + elif request.status == 302: + logger.critical( + "Redirect detected! Either the wiki given in the script settings (wiki field) is incorrect/the wiki got removed or is giving us the false value. Please provide the real URL to the wiki, current URL redirects to {}".format( + request.url)) + elif 399 < request.status < 500: + logger.error("Request returned ClientError status code on {url}".format(url=request.url)) + raise ClientError(request) + else: + # JSON Extraction + try: + request_json = self.parse_mw_request_info(await request.json(encoding="UTF-8"), str(request.url)) + for item in json_path: + request_json = request_json[item] + except ValueError: + logger.warning("ValueError when extracting JSON data on {url}".format(url=request.url)) + self.downtime_controller(True) + raise ServerError + except MediaWikiError: + logger.exception("MediaWiki error on request: {}".format(request.url)) + raise + except KeyError: + logger.exception("KeyError while iterating over json_path, full response: {}".format(request.json())) + raise + return request_json + + async def fetch_wiki(self, extended, script_path, session: aiohttp.ClientSession, ratelimiter: RateLimiter, + amount=20) -> aiohttp.ClientResponse: + await ratelimiter.timeout_wait() + if extended: + params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges", + "meta": "allmessages|siteinfo", + "utf8": 1, "tglimit": "max", "tgprop": "displayname", + "rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user", + "rclimit": amount, "rcshow": "!bot", "rctype": "edit|new|log|categorize", + "ammessages": "recentchanges-page-added-to-category|recentchanges-page-removed-from-category|recentchanges-page-added-to-category-bundled|recentchanges-page-removed-from-category-bundled", + "amenableparser": 1, "amincludelocal": 1, "siprop": "namespaces|general"}) + else: + params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges", + "meta": "siteinfo", "utf8": 1, + "tglimit": "max", "rcshow": "!bot", "tgprop": "displayname", + "rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user", + "rclimit": amount, "rctype": "edit|new|log|categorize", "siprop": "namespaces|general"}) + try: + response = await self.api_request(params=params) + ratelimiter.timeout_add(1.0) + except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError): + logger.error("A connection error occurred while requesting {}".format(params)) + raise WikiServerError + return response @dataclass class Wiki_old: diff --git a/src/wiki_ratelimiter.py b/src/wiki_ratelimiter.py index 98d785c..f67bf63 100644 --- a/src/wiki_ratelimiter.py +++ b/src/wiki_ratelimiter.py @@ -17,3 +17,6 @@ class RateLimiter: #logger.debug("Waiting {}".format(calculated_timeout)) if calculated_timeout > 0: await asyncio.sleep(calculated_timeout) + + def timeout_raw(self): + return self.timeout_until - time.time() \ No newline at end of file