From 2b09e7019ad2eb442439f464dd1c80951b9e1e6d Mon Sep 17 00:00:00 2001 From: Frisk Date: Wed, 22 Jun 2022 19:17:20 +0200 Subject: [PATCH] Updates --- requirements.txt | 2 +- src/bot.py | 351 ++--------------------------------------- src/database.py | 35 ++-- src/domain.py | 2 + src/domain_manager.py | 16 ++ src/redis_connector.py | 53 ------- src/statistics.py | 19 ++- src/wiki.py | 47 +++--- 8 files changed, 93 insertions(+), 432 deletions(-) delete mode 100644 src/redis_connector.py diff --git a/requirements.txt b/requirements.txt index ce2d6a5..e51df3e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -python_version >= '3.6' +python_version >= '3.9' aiohttp >= 3.6.2 lxml >= 4.2.1 nest-asyncio >= 1.4.0 diff --git a/src/bot.py b/src/bot.py index 235a2c1..ac9a522 100644 --- a/src/bot.py +++ b/src/bot.py @@ -10,10 +10,9 @@ from typing import Generator from contextlib import asynccontextmanager -from redis_connector import Redis from src.argparser import command_line_args from src.config import settings -from src.database import db +from src.database import db_connection from src.exceptions import * from src.misc import get_paths, get_domain from src.msgqueue import messagequeue, send_to_discord @@ -28,7 +27,7 @@ from src.domain_manager import domains logging.config.dictConfig(settings["logging"]) logger = logging.getLogger("rcgcdb.bot") logger.debug("Current settings: {settings}".format(settings=settings)) -logger.info("RcGcDb v{} is starting up.".format("1.1")) +logger.info("RcGcDb v{} is starting up.".format("2.0")) if command_line_args.debug: logger.info("Debug mode is active!") @@ -38,338 +37,23 @@ all_wikis: dict = {} main_tasks: dict = {} +db = db_connection() # First populate the all_wikis list with every wiki # Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests # 2. Easier to code async def populate_wikis(): + logger.info("Populating domain manager with wikis...") + start = time.time() async with db.pool().acquire() as connection: async with connection.transaction(): async for db_wiki in connection.cursor('SELECT DISTINCT wiki, rcid, postid FROM rcgcdw'): - await domains.new_wiki(Wiki(db_wiki["wiki"], db_wiki["rcid"], db_wiki["postid"])) - -queue_limit = settings.get("queue_limit", 30) -QueuedWiki = namedtuple("QueuedWiki", ['url', 'amount']) - - - - -class RcQueue: - def __init__(self): - self.domain_list = {} - self.to_remove = [] - self.irc_mapping = {} - - async def start_group(self, group, initial_wikis): - """Starts a task for given domain group""" - if group not in self.domain_list: - if group in self.irc_mapping: # Hopefully there are no race conditions.... - irc_connection = self.irc_mapping[group] - else: - for irc_server in settings["irc_servers"].keys(): - if group in settings["irc_servers"][irc_server]["domains"]: - irc_connection = AioIRCCat(settings["irc_servers"][irc_server]["irc_channel_mapping"], all_wikis) - for domain in settings["irc_servers"][irc_server]["domains"]: - self.irc_mapping[domain] = irc_connection - irc_connection.connect(settings["irc_servers"][irc_server]["irc_host"], settings["irc_servers"][irc_server]["irc_port"], settings["irc_servers"][irc_server]["irc_name"]) - break - else: - irc_connection = None - self.domain_list[group] = {"task": asyncio.create_task(scan_group(group)), "last_rowid": 0, "query": LimitedList(initial_wikis), "rate_limiter": RateLimiter(), "irc": irc_connection} - logger.debug(self.domain_list[group]) - else: - raise KeyError - - async def remove_wiki_from_group(self, wiki): - """Removes a wiki from query of given domain group""" - logger.debug(f"Removing {wiki} from group queue.") - group = get_domain(wiki) - self[group]["query"] = LimitedList([x for x in self[group]["query"] if x.url != wiki]) - all_wikis[wiki].rc_active = -1 - if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task - logger.debug(f"{group} no longer has any wikis queued!") - if not await self.check_if_domain_in_db(group): - await self.stop_task_group(group) - else: - logger.debug(f"But there are still wikis for it in DB!") - - async def stop_task_group(self, group): - self[group]["task"].cancel() - del self.domain_list[group] - - async def check_if_domain_in_db(self, domain): - async with db.pool().acquire() as connection: - async with connection.transaction(): - async for wiki in connection.cursor('SELECT DISTINCT wiki FROM rcgcdw WHERE rcid != -1;'): - if get_domain(wiki["wiki"]) == domain: - return True - return False - - @asynccontextmanager - async def retrieve_next_queued(self, group) -> Generator[QueuedWiki, None, None]: - """Retrives next wiki in the queue for given domain""" - if len(self.domain_list[group]["query"]) == 0: - # make sure we are not removing the group because entire domain group went down, it's expensive - yes, but could theoretically cause issues - raise QueueEmpty - # if self.check_if_domain_in_db(group): - # #logger.warning("Domain group {} has 0 elements yet there are still wikis in the db of the same domain group! This may indicate we ran over the list too fast. We are waiting...".format(group)) - # raise QueueEmpty - # else: - # await self.stop_task_group(group) - # return - try: - yield self.domain_list[group]["query"][0] - except asyncio.CancelledError: - raise - except: - if command_line_args.debug: - logger.exception("RC Group exception") - shutdown(asyncio.get_event_loop()) - else: - logger.exception("Group task returned error") - await generic_msg_sender_exception_logger(traceback.format_exc(), "Group task error logger", Group=group) - else: - self.domain_list[group]["query"].pop(0) - - - @staticmethod - def filter_rc_active(wiki_obj): - return wiki_obj[1].rc_active is None or wiki_obj[1].rc_active > -1 - - async def update_queues(self): - """Makes a round on rcgcdb DB and looks for updates to the queues in self.domain_list""" - try: - self.to_remove = [x[0] for x in filter(self.filter_rc_active, all_wikis.items())] # first populate this list and remove wikis that are still in the db, clean up the rest - full = set() - async with db.pool().acquire() as connection: - async with connection.transaction(): - async for db_wiki in connection.cursor('SELECT DISTINCT wiki, row_number() OVER (ORDER BY webhook) AS rowid, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL ORDER BY webhook'): - domain = get_domain(db_wiki["wiki"]) - try: - if db_wiki["wiki"] not in all_wikis: - raise AssertionError - self.to_remove.remove(db_wiki["wiki"]) - except AssertionError: - all_wikis[db_wiki["wiki"]] = Wiki() - all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"] - except ValueError: - pass - if domain in full: - continue - try: - current_domain: dict = self[domain] - if current_domain["irc"]: - logger.debug("DOMAIN LIST FOR IRC: {}".format(current_domain["irc"].updated)) - logger.debug("CURRENT DOMAIN INFO: {}".format(domain)) - logger.debug("IS WIKI IN A LIST?: {}".format(db_wiki["wiki"] in current_domain["irc"].updated)) - logger.debug("LAST CHECK FOR THE WIKI {} IS {}".format(db_wiki["wiki"], all_wikis[db_wiki["wiki"]].last_check)) - if db_wiki["wiki"] in current_domain["irc"].updated: # Priority wikis are the ones with IRC, if they get updated forcefully add them to queue - current_domain["irc"].updated.remove(db_wiki["wiki"]) - current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20), forced=True) - logger.debug("Updated in IRC so adding to queue.") - continue - elif all_wikis[db_wiki["wiki"]].last_check+settings["irc_overtime"] < time.time(): # if time went by and wiki should be updated now use default mechanics - logger.debug("Overtime so adding to queue.") - pass - else: # Continue without adding - logger.debug("No condition fulfilled so skipping.") - continue - if not db_wiki["rowid"] < current_domain["last_rowid"]: - current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20)) - except KeyError: - await self.start_group(domain, [QueuedWiki(db_wiki["wiki"], 20)]) - logger.info("A new domain group ({}) has been added since last time, adding it to the domain_list and starting a task...".format(domain)) - except ListFull: - full.add(domain) - current_domain["last_rowid"] = db_wiki["rowid"] - continue - for wiki in self.to_remove: - await self.remove_wiki_from_group(wiki) - for group, data in self.domain_list.items(): - if group not in full: - self[group]["last_rowid"] = 0 # iter reached the end without being stuck on full list - logger.debug("Current domain_list structure: {}".format(self.domain_list)) - except: - if command_line_args.debug: - logger.exception("Queue error!") - shutdown(asyncio.get_event_loop()) - else: - logger.exception("Exception on queue updater") - await generic_msg_sender_exception_logger(traceback.format_exc(), "Queue updator") - - - def __getitem__(self, item): - """Returns the query of given domain group""" - return self.domain_list[item] - - def __setitem__(self, key, value): - self.domain_list[key] = value - - -rcqueue = RcQueue() - - -# Start queueing logic - -def calculate_delay_for_group(group_length: int) -> float: - """Calculate the delay between fetching each wiki to avoid rate limits""" - min_delay = 60 / settings["max_requests_per_minute"] - if group_length == 0: - group_length = 1 - if (group_length * min_delay) < settings["minimal_cooldown_per_wiki_in_sec"]: - return settings["minimal_cooldown_per_wiki_in_sec"] / group_length - else: - return 0.0 - - -async def generate_targets(wiki_url: str, additional_requirements: str) -> defaultdict: - """To minimize the amount of requests, we generate a list of language/display mode combinations to create messages for - this way we can send the same message to multiple webhooks which have the same wiki and settings without doing another - request to the wiki just to duplicate the message. - """ - combinations = defaultdict(list) - async with db.pool().acquire() as connection: - async with connection.transaction(): - async for webhook in connection.cursor('SELECT webhook, lang, display FROM rcgcdw WHERE wiki = $1 {}'.format(additional_requirements), wiki_url): - combination = (webhook["lang"], webhook["display"]) - combinations[combination].append(webhook["webhook"]) - return combinations - - -async def generate_domain_groups(): - """Generate a list of wikis per domain (fandom.com, wikipedia.org etc.) - - :returns tuple[str, list]""" - domain_wikis = defaultdict(list) - async with db.pool().acquire() as connection: - async with connection.transaction(): - async for db_wiki in connection.cursor('SELECT DISTINCT wiki, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL'): - domain_wikis[get_domain(db_wiki["wiki"])].append(QueuedWiki(db_wiki["wiki"], 20)) - for group, db_wikis in domain_wikis.items(): - yield group, db_wikis - - -async def scan_group(group: str): - rate_limiter = rcqueue[group]["rate_limiter"] - while True: - try: - async with rcqueue.retrieve_next_queued(group) as queued_wiki: # acquire next wiki in queue - if "irc" not in rcqueue[group]: - await asyncio.sleep(calculate_delay_for_group(len(rcqueue[group]["query"]))) - logger.debug("Wiki {}".format(queued_wiki.url)) - local_wiki = all_wikis[queued_wiki.url] # set a reference to a wiki object from memory - extended = False - if local_wiki.mw_messages is None: - extended = True - async with aiohttp.ClientSession(headers=settings["header"], - timeout=aiohttp.ClientTimeout(6.0)) as session: - try: - wiki_response = await local_wiki.fetch_wiki(extended, queued_wiki.url, session, rate_limiter, amount=queued_wiki.amount) - await local_wiki.check_status(queued_wiki.url, wiki_response.status) - except (WikiServerError, WikiError): - logger.error("Exeption when fetching the wiki") - continue # ignore this wiki if it throws errors - try: - recent_changes_resp = await wiki_response.json() - if not isinstance(recent_changes_resp, dict): - logger.error(f"recent_changes_resp has a bad type, found {type(recent_changes_resp)}, __repr__ here: {recent_changes_resp}.") - raise TypeError - if "errors" in recent_changes_resp: - error = recent_changes_resp.get('errors') - if error["code"] == "readapidenied": - await local_wiki.fail_add(queued_wiki.url, 410) - continue - raise WikiError - except aiohttp.ContentTypeError: - logger.exception("Wiki seems to be resulting in non-json content.") - await local_wiki.fail_add(queued_wiki.url, 410) - continue - except: - logger.exception("On loading json of response.") - continue - try: - recent_changes = recent_changes_resp['query']['recentchanges'] - recent_changes.reverse() - except KeyError: - logger.error("recent_changes_resp returned KeyError. skipping this check. (usually this happens when the wiki doesn't respond properly, it's pretty normal)") - continue - if extended: - await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs) - if local_wiki.rc_active in (0, None, -1): # new wiki, just get the last rc to not spam the channel, -1 for -1 to NULL changes - if len(recent_changes) > 0: - local_wiki.rc_active = recent_changes[-1]["rcid"] - DBHandler.add(queued_wiki.url, recent_changes[-1]["rcid"]) - else: - local_wiki.rc_active = 0 - DBHandler.add(queued_wiki.url, 0) - await DBHandler.update_db() - continue - categorize_events = {} - targets = await generate_targets(queued_wiki.url, "AND (rcid != -1 OR rcid IS NULL)") - paths = get_paths(queued_wiki.url, recent_changes_resp) - new_events = 0 - local_wiki.last_check = time.time() # on successful check, save new last check time - for change in recent_changes: - if change["rcid"] > local_wiki.rc_active and queued_wiki.amount != 450: - new_events += 1 - if new_events == 20: - # call the function again with max limit for more results, ignore the ones in this request - logger.debug("There were too many new events, queuing wiki with 450 limit.") - rcqueue[group]["query"].insert(1, QueuedWiki(queued_wiki.url, 450)) - break - await process_cats(change, local_wiki, mw_msgs, categorize_events) - else: # If we broke from previous loop (too many changes) don't execute sending messages here - highest_rc = local_wiki.rc_active # setup var for later use - message_list = defaultdict(list) - for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up - if change["rcid"] > local_wiki.rc_active: - if highest_rc is None or change["rcid"] > highest_rc: # make sure that the highest_rc is really highest rcid but do allow other entries with potentially lesser rcids come after without breaking the cycle - highest_rc = change["rcid"] - for target in targets.items(): - try: - message = await essential_info(change, categorize_events, local_wiki, target, paths, - recent_changes_resp, rate_limiter) - if message is not None: - message_list[target[0]].append(message) - except asyncio.CancelledError: - raise - except: - if command_line_args.debug: - logger.exception("Exception on RC formatter") - raise - else: - logger.exception("Exception on RC formatter") - await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in RC formatter", Wiki=queued_wiki.url, Change=str(change)[0:1000]) - # Lets stack the messages - for messages in message_list.values(): - messages = stack_message_list(messages) - for message in messages: - await send_to_discord(message) - if recent_changes: # we don't have to test for highest_rc being null, because if there are no RC entries recent_changes will be an empty list which will result in false in here and DO NOT save the value - local_wiki.rc_active = highest_rc - DBHandler.add(queued_wiki.url, highest_rc) - await DBHandler.update_db() - except asyncio.CancelledError: - return - except QueueEmpty: - await asyncio.sleep(10.0) - continue - - -async def wiki_scanner(): - """Wiki scanner is spawned as a task which purpose is to continuously run over wikis in the DB, fetching recent changes - to add messages based on the changes to message queue later handled by message_sender coroutine.""" - try: - async for group, db_wikis in generate_domain_groups(): # First scan - await rcqueue.start_group(group, db_wikis) - while True: - await asyncio.sleep(20.0) - await rcqueue.update_queues() - except asyncio.CancelledError: - for item in rcqueue.domain_list.values(): # cancel running tasks - item["task"].cancel() - raise + try: + await domains.new_wiki(Wiki(db_wiki["wiki"], db_wiki["rcid"], db_wiki["postid"])) + except WikiExists: # Can rarely happen when Pub/Sub registers wiki before population + pass + logger.info("Populating domain manager with wikis took {} seconds".format(time.time()-start)) async def message_sender(): @@ -508,7 +192,6 @@ async def discussion_handler(): await generic_msg_sender_exception_logger(traceback.format_exc(), "Discussion handler task exception", Wiki=db_wiki["wiki"]) - def shutdown(loop, signal=None): global main_tasks loop.remove_signal_handler(signal) @@ -546,11 +229,10 @@ async def main_loop(): nest_asyncio.apply(loop) # Setup database connection await db.setup_connection() - logger.debug("Connection type: {}".format(db.connection)) + await db.create_pubsub_interface(domains.webhook_update) + logger.debug("Connection type: {}".format(db.connection_pool)) await populate_wikis() - redis = Redis(domains) - await redis.connect() - await redis.pubsub() + # START LISTENER CONNECTION domains.run_all_domains() try: signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) @@ -562,9 +244,8 @@ async def main_loop(): signals = (signal.SIGBREAK, signal.SIGTERM, signal.SIGINT) # loop.set_exception_handler(global_exception_handler) try: - main_tasks = {"wiki_scanner": asyncio.create_task(wiki_scanner()), "message_sender": asyncio.create_task(message_sender()), - "discussion_handler": asyncio.create_task(discussion_handler()), "database_updates": asyncio.create_task(DBHandler.update_db()), - "redis_updates": asyncio.create_task(redis.reader())} + main_tasks = {"message_sender": asyncio.create_task(message_sender()), + "discussion_handler": asyncio.create_task(discussion_handler()), "database_updates": asyncio.create_task(DBHandler.update_db())} main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"]) main_tasks["database_updates_shield"] = asyncio.shield(main_tasks["database_updates"]) await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"], main_tasks["database_updates"]) @@ -573,4 +254,4 @@ async def main_loop(): except asyncio.CancelledError: return -asyncio.run(main_loop(), debug=False) +asyncio.run(main_loop(), debug=command_line_args.debug) diff --git a/src/database.py b/src/database.py index deadb35..430c368 100644 --- a/src/database.py +++ b/src/database.py @@ -1,6 +1,6 @@ import asyncpg import logging -from typing import Optional +from typing import Optional, Callable from src.config import settings logger = logging.getLogger("rcgcdb.database") @@ -8,23 +8,31 @@ logger = logging.getLogger("rcgcdb.database") class db_connection: - connection: Optional[asyncpg.Pool] = None + listener_connection: Optional[asyncpg.Connection] = None + connection_pool: Optional[asyncpg.Pool] = None + + async def create_pubsub_interface(self, callback: Callable): + await self.listener_connection.add_listener("webhookupdates", callback) async def setup_connection(self): - # Establish a connection to an existing database named "test" - # as a "postgres" user. - logger.debug("Setting up the Database connection...") - self.connection = await asyncpg.create_pool(user=settings["pg_user"], host=settings.get("pg_host", "localhost"), - database=settings.get("pg_db", "rcgcdb"), password=settings.get("pg_pass"), - port=settings.get("pg_port", 5432)) - logger.debug("Database connection established! Connection: {}".format(self.connection)) + logger.debug("Setting up the Database connections...") + # First, setup a separate connection for pub/sub listener + # It's mainly because I'm afraid that connection pool will be aggressive about inactive connections + self.listener_connection = await asyncpg.connect(user=settings["pg_user"], host=settings.get("pg_host", "localhost"), + database=settings.get("pg_db", "rcgcdb"), password=settings.get("pg_pass"), + port=settings.get("pg_port", 5432)) + self.connection_pool = await asyncpg.create_pool(user=settings["pg_user"], host=settings.get("pg_host", "localhost"), + database=settings.get("pg_db", "rcgcdb"), password=settings.get("pg_pass"), + port=settings.get("pg_port", 5432)) + logger.debug("Database connection established! Connection: {}".format(self.connection_pool)) async def shutdown_connection(self): logger.debug("Shutting down database connection...") - await self.connection.close() + await self.listener_connection.close() + await self.connection_pool.close() def pool(self) -> asyncpg.Pool: - return self.connection + return self.connection_pool # Tried to make it a decorator but tbh won't probably work # async def in_transaction(self, func): @@ -37,7 +45,4 @@ class db_connection: # async def query(self, string, *arg): # async with self.connection.acquire() as connection: # async with connection.transaction(): - # return connection.cursor(string, *arg) - - -db = db_connection() + # return connection.cursor(string, *arg) \ No newline at end of file diff --git a/src/domain.py b/src/domain.py index bb3b40d..3fd7a5e 100644 --- a/src/domain.py +++ b/src/domain.py @@ -55,6 +55,8 @@ class Domain: :parameter wiki - Wiki object :parameter first (optional) - bool indicating if wikis should be added as first or last in the ordered dict""" wiki.set_domain(self) + if wiki.script_url in self.wikis: + raise WikiExists("Wiki {} exists in domain {}".format(wiki.script_url, self.name)) self.wikis[wiki.script_url] = wiki if first: self.wikis.move_to_end(wiki.script_url, last=False) diff --git a/src/domain_manager.py b/src/domain_manager.py index 1e16b29..d8e5994 100644 --- a/src/domain_manager.py +++ b/src/domain_manager.py @@ -1,6 +1,9 @@ from __future__ import annotations from typing import TYPE_CHECKING, Optional from urllib.parse import urlparse, urlunparse + +import asyncpg + from src.config import settings from src.domain import Domain from src.irc_feed import AioIRCCat @@ -14,6 +17,19 @@ class DomainManager: def __init__(self): self.domains: dict[str, Domain] = {} + async def webhook_update(self, connection: asyncpg.Connection, pid: int, channel: str, payload: str): + """Callback for database listener. Used to update our domain cache on changes such as new wikis or removed wikis""" + # TODO Write a trigger for pub/sub in database/Wiki-Bot repo + split_payload = payload.split(" ") + if len(split_payload) < 2: + raise ValueError("Improper pub/sub message! Pub/sub payload: {}".format(payload)) + if split_payload[0] == "ADD": + await self.new_wiki(Wiki(split_payload[1], None, None)) + elif split_payload[0] == "REMOVE": + self.remove_wiki(split_payload[1]) + else: + raise ValueError("Unknown pub/sub command! Payload: {}".format(payload)) + async def new_wiki(self, wiki: Wiki): """Finds a domain for the wiki and adds a wiki to the domain object. diff --git a/src/redis_connector.py b/src/redis_connector.py deleted file mode 100644 index 4e24fb4..0000000 --- a/src/redis_connector.py +++ /dev/null @@ -1,53 +0,0 @@ -import asyncio -import aioredis -import async_timeout -import logging -from typing import Optional, TYPE_CHECKING -from src.config import settings -from src.wiki import Wiki - -logger = logging.getLogger("rcgcdb.redisconnector") - -if TYPE_CHECKING: - from src.domain_manager import DomainManager - -class Redis: - def __init__(self, domain_manager): - self.pub_connection: Optional[aioredis.connection] = None - self.stat_connection: Optional[aioredis.connection] = None - self.domain_manager: DomainManager = domain_manager - - async def reader(self): - """Based on code from https://aioredis.readthedocs.io/en/latest/getting-started/#pubsub-mode""" - while True: - try: - async with async_timeout.timeout(1): - message = await self.pub_connection.get_message(ignore_subscribe_messages=True) - if message is not None: - print(f"(Reader) Message Received: {message}") - logger.debug(f"(Reader) Message Received: {message}") - await self.process_changes(message["data"]) - await asyncio.sleep(1.0) - except asyncio.TimeoutError: # TODO Better handler - pass - except aioredis.exceptions.ConnectionError: - pass - except asyncio.CancelledError: - # TODO Send a message about shutdown - raise NotImplementedError - - async def process_changes(self, data: str): - data = data.split(" ") - if data[0] == "REMOVE": - self.domain_manager.remove_wiki(data[1]) # TODO Add response to source - elif data[0] == "ADD": # ADD https://new_wiki.somedamain.com 43 1 where 43 stands for rc_id and 1 for discussion_id - wiki = Wiki(data[1], int(data[2]), int(data[3])) # TODO This might raise an issue if non-int value - await self.domain_manager.new_wiki(wiki) - - - async def connect(self): - self.stat_connection = await aioredis.from_url("redis://" + settings["redis_host"], encoding="UTF-8") - - async def pubsub(self): - self.pub_connection = self.stat_connection.pubsub() - await self.pub_connection.subscribe("rcgcdb_updates") diff --git a/src/statistics.py b/src/statistics.py index 045aa3d..73c0890 100644 --- a/src/statistics.py +++ b/src/statistics.py @@ -1,12 +1,25 @@ +import time from src.config import settings from typing import Union, Optional +from enum import Enum + + +class LogType(Enum): + CONNECTION_ERROR: 1 + HTTP_ERROR: 2 + MEDIAWIKI_ERROR: 3 + VALUE_UPDATE: 4 queue_limit = settings.get("queue_limit", 30) + class Log: + """Log class represents an event that happened to a wiki fetch. Main purpose of those logs is debug and error-tracking.""" def __init__(self, **kwargs): - - + self.type: LogType = kwargs["type"] + self.time: int = int(time.time()) + self.title: str = kwargs["title"] + self.details: Optional[str] = kwargs.get("details", None) class LimitedList(list): def __init__(self, *args): @@ -18,7 +31,7 @@ class LimitedList(list): class Statistics: - def __init__(self, rc_id: int, discussion_id: int): + def __init__(self, rc_id: Optional[int], discussion_id: Optional[int]): self.last_checked_rc: Optional[int] = None self.last_action: Optional[int] = rc_id self.last_checked_discussion: Optional[int] = None diff --git a/src/wiki.py b/src/wiki.py index 54549f3..0ab9c2a 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -18,7 +18,7 @@ from src.api.context import Context from src.misc import parse_link from src.i18n import langs from src.wiki_ratelimiter import RateLimiter -from statistics import Statistics +from statistics import Statistics, Log, LogType import src.discord import asyncio from src.config import settings @@ -36,11 +36,10 @@ if TYPE_CHECKING: from src.domain import Domain class Wiki: - def __init__(self, script_url: str, rc_id: int, discussion_id: int): + def __init__(self, script_url: str, rc_id: Optional[int], discussion_id: Optional[int]): self.script_url: str = script_url self.session = aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(6.0)) self.statistics: Statistics = Statistics(rc_id, discussion_id) - self.fail_times: int = 0 self.mw_messages: Optional[MWMessages] = None self.first_fetch_done: bool = False self.domain: Optional[Domain] = None @@ -50,24 +49,24 @@ class Wiki: def rc_id(self): return self.statistics.last_action - async def remove(self, reason): - logger.info("Removing a wiki {}".format(self.script_url)) - await src.discord.wiki_removal(self.script_url, reason) - await src.discord.wiki_removal_monitor(self.script_url, reason) - async with db.pool().acquire() as connection: - result = await connection.execute('DELETE FROM rcgcdw WHERE wiki = $1', self.script_url) - logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(result, self.script_url)) + # async def remove(self, reason): + # logger.info("Removing a wiki {}".format(self.script_url)) + # await src.discord.wiki_removal(self.script_url, reason) + # await src.discord.wiki_removal_monitor(self.script_url, reason) + # async with db.pool().acquire() as connection: + # result = await connection.execute('DELETE FROM rcgcdw WHERE wiki = $1', self.script_url) + # logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(result, self.script_url)) def set_domain(self, domain: Domain): self.domain = domain - async def downtime_controller(self, down, reason=None): - if down: - self.fail_times += 1 - if self.fail_times > 20: - await self.remove(reason) - else: - self.fail_times -= 1 + # async def downtime_controller(self, down, reason=None): + # if down: + # self.fail_times += 1 + # if self.fail_times > 20: + # await self.remove(reason) + # else: + # self.fail_times -= 1 async def generate_targets(self) -> defaultdict[namedtuple, list[str]]: """This function generates all possible varations of outputs that we need to generate messages for. @@ -141,7 +140,7 @@ class Wiki: elif 399 < request.status < 500: logger.error("Request returned ClientError status code on {url}".format(url=request.url)) if request.status in wiki_reamoval_reasons: - await self.downtime_controller(True, reason=request.status) + self.statistics.update(Log(type=LogType.HTTP_ERROR, title="{} error".format(request.status), details=str(request.headers) + "\n" + str(request.url))) raise ClientError(request) else: # JSON Extraction @@ -158,11 +157,10 @@ class Wiki: except KeyError: logger.exception("KeyError while iterating over json_path, full response: {}".format(request.json())) raise - self.first_fetch_done = True return request_json async def fetch_wiki(self, amount=10) -> dict: - if self.first_fetch_done is False: + if self.mw_messages is None: params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges", "meta": "allmessages|siteinfo", "utf8": 1, "tglimit": "max", "tgprop": "displayname", @@ -188,10 +186,8 @@ class Wiki: try: request = await self.fetch_wiki(amount=amount) self.client.last_request = request - except WikiServerError: - return # TODO Add a log entry? - else: - await self.downtime_controller(False) + except WikiServerError as e: + self.statistics.update(Log(type=LogType.CONNECTION_ERROR, title=e.)) # We need more details in WIkiServerError exception if not self.mw_messages: mw_messages = request.get("query", {}).get("allmessages", []) final_mw_messages = dict() @@ -234,7 +230,7 @@ class Wiki: self.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text() except KeyError: self.tags[tag["name"]] = None - targets = await self.generate_targets() + targets = await self.generate_targets() # TODO Cache this in Wiki and update based on Redis updates message_list = defaultdict(list) for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up if change["rcid"] > self.rc_id: @@ -242,6 +238,7 @@ class Wiki: highest_id = change["rcid"] for combination, webhooks in targets.items(): message, metadata = await rc_processor(self, change, categorize_events, combination, webhooks) + break async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, display_options: namedtuple("Settings", ["lang", "display"]), webhooks: list) -> tuple[src.discord.DiscordMessage, src.discord.DiscordMessageMetadata]: