import logging.config from src.config import settings import sqlite3 import sys import signal from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info import asyncio, aiohttp from src.misc import get_paths from src.exceptions import * from src.database import db_cursor from collections import defaultdict from src.queue_handler import DBHandler from src.discord import DiscordMessage from src.msgqueue import messagequeue import requests logging.config.dictConfig(settings["logging"]) logger = logging.getLogger("rcgcdb.bot") logger.debug("Current settings: {settings}".format(settings=settings)) # Log Fail states with structure wiki_url: number of fail states all_wikis: dict = {} mw_msgs: dict = {} # will have the type of id: tuple # First populate the all_wikis list with every wiki # Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests # 2. Easier to code for wiki in db_cursor.execute('SELECT DISTINCT wiki FROM rcgcdw'): all_wikis[wiki] = Wiki() # Start queueing logic def calculate_delay() -> float: min_delay = 60 / settings["max_requests_per_minute"] if (len(all_wikis) * min_delay) < settings["minimal_cooldown_per_wiki_in_sec"]: return settings["minimal_cooldown_per_wiki_in_sec"] / len(all_wikis) else: return min_delay def generate_targets(wiki_url: str) -> defaultdict: combinations = defaultdict(list) for webhook in db_cursor.execute('SELECT webhook, lang, display FROM rcgcdw WHERE wiki = ?', (wiki_url,)): combination = (webhook[1], webhook[2]) # lang, display combinations[combination].append(webhook[0]) return combinations async def wiki_scanner(): try: while True: calc_delay = calculate_delay() fetch_all = db_cursor.execute('SELECT webhook, wiki, lang, display, wikiid, rcid, postid FROM rcgcdw GROUP BY wiki') # webhook, wiki, lang, display, wikiid, rcid, postid for db_wiki in fetch_all.fetchall(): logger.debug("Wiki {}".format(db_wiki[1])) extended = False if db_wiki[1] not in all_wikis: logger.debug("New wiki: {}".format(db_wiki[1])) all_wikis[db_wiki[1]] = Wiki() local_wiki = all_wikis[db_wiki[1]] # set a reference to a wiki object from memory if local_wiki.mw_messages is None: extended = True async with aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(2.0)) as session: try: wiki_response = await local_wiki.fetch_wiki(extended, db_wiki[1], session) await local_wiki.check_status(db_wiki[1], wiki_response.status) except (WikiServerError, WikiError): logger.exception("Exeption when fetching the wiki") continue # ignore this wiki if it throws errors try: recent_changes_resp = await wiki_response.json() if "error" in recent_changes_resp or "errors" in recent_changes_resp: error = recent_changes_resp.get("error", recent_changes_resp["errors"]) if error["code"] == "readapidenied": await local_wiki.fail_add(db_wiki[1], 410) continue raise WikiError recent_changes = recent_changes_resp['query']['recentchanges'] recent_changes.reverse() except aiohttp.ContentTypeError: logger.exception("Wiki seems to be resulting in non-json content.") await local_wiki.fail_add(db_wiki[1], 410) continue except: logger.exception("On loading json of response.") continue if extended: await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs) if db_wiki[5] is None: # new wiki, just get the last rc to not spam the channel if len(recent_changes) > 0: DBHandler.add(db_wiki[1], recent_changes[-1]["rcid"]) else: DBHandler.add(db_wiki[1], 0) DBHandler.update_db() continue categorize_events = {} targets = generate_targets(db_wiki[1]) paths = get_paths(db_wiki[1], recent_changes_resp) for change in recent_changes: await process_cats(change, local_wiki, mw_msgs, categorize_events) for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up if change["rcid"] > db_wiki[5]: for target in targets.items(): await essential_info(change, categorize_events, local_wiki, db_wiki, target, paths, recent_changes_resp) if recent_changes: DBHandler.add(db_wiki[1], change["rcid"]) DBHandler.update_db() await asyncio.sleep(delay=calc_delay) except asyncio.CancelledError: raise async def message_sender(): while True: await messagequeue.resend_msgs() def shutdown(loop, signal=None): DBHandler.update_db() loop.stop() logger.info("Script has shut down due to signal {}.".format(signal)) for task in asyncio.all_tasks(loop): logger.debug("Killing task {}".format(task.get_name())) task.cancel() sys.exit(0) def global_exception_handler(loop, context): """Global exception handler for asyncio, lets us know when something crashes""" msg = context.get("exception", context["message"]) logger.error("Global exception handler:" + msg) async def main_loop(): loop = asyncio.get_event_loop() try: signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) for s in signals: loop.add_signal_handler( s, lambda s=s: shutdown(loop, signal=s)) except AttributeError: logger.info("Running on Windows huh? This complicates things") signals = (signal.SIGBREAK, signal.SIGTERM, signal.SIGINT) loop.set_exception_handler(global_exception_handler) try: task1 = asyncio.create_task(wiki_scanner()) task2 = asyncio.create_task(message_sender()) await task1 await task2 except KeyboardInterrupt: shutdown(loop) asyncio.run(main_loop())