RcGcDb/src/bot.py

254 lines
9.9 KiB
Python
Raw Normal View History

2020-07-28 12:39:32 +00:00
import aiohttp
import asyncio
2020-07-09 22:24:23 +00:00
import logging.config
2020-07-26 08:00:27 +00:00
import signal
2020-07-28 12:39:32 +00:00
import sys
import traceback
2020-07-19 23:40:20 +00:00
from collections import defaultdict
2020-07-28 12:39:32 +00:00
2020-07-23 19:12:07 +00:00
import requests
2020-07-09 22:24:23 +00:00
from src.argparser import command_line_args
2020-07-28 12:39:32 +00:00
from src.config import settings
from src.database import db_cursor
from src.exceptions import *
from src.misc import get_paths
from src.msgqueue import messagequeue
from src.queue_handler import DBHandler
2020-08-02 17:27:42 +00:00
from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essential_feeds
from src.discord import DiscordMessage, formatter_exception_logger, msg_sender_exception_logger
2020-07-28 12:39:32 +00:00
2020-07-09 22:24:23 +00:00
logging.config.dictConfig(settings["logging"])
logger = logging.getLogger("rcgcdb.bot")
logger.debug("Current settings: {settings}".format(settings=settings))
logger.info("RcGcDb v{} is starting up.".format("1.0"))
if command_line_args.debug:
logger.info("Debug mode is active!")
2020-07-09 22:24:23 +00:00
2020-07-27 03:16:50 +00:00
# Log Fail states with structure wiki_url: number of fail states
2020-07-11 15:54:08 +00:00
all_wikis: dict = {}
mw_msgs: dict = {} # will have the type of id: tuple
2020-07-09 22:24:23 +00:00
2020-07-10 13:38:36 +00:00
# First populate the all_wikis list with every wiki
# Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests
# 2. Easier to code
2020-07-09 22:24:23 +00:00
2020-07-19 23:40:20 +00:00
for wiki in db_cursor.execute('SELECT DISTINCT wiki FROM rcgcdw'):
all_wikis[wiki] = Wiki()
2020-07-09 22:24:23 +00:00
2020-07-23 19:12:07 +00:00
2020-07-09 22:24:23 +00:00
# Start queueing logic
2020-07-19 23:40:20 +00:00
2020-07-11 15:54:08 +00:00
def calculate_delay() -> float:
2020-07-28 12:25:18 +00:00
"""Calculate the delay between fetching each wiki to avoid rate limits"""
2020-07-23 19:12:07 +00:00
min_delay = 60 / settings["max_requests_per_minute"]
2020-07-11 15:54:08 +00:00
if (len(all_wikis) * min_delay) < settings["minimal_cooldown_per_wiki_in_sec"]:
2020-07-23 19:12:07 +00:00
return settings["minimal_cooldown_per_wiki_in_sec"] / len(all_wikis)
2020-07-11 15:54:08 +00:00
else:
return min_delay
2020-07-19 23:40:20 +00:00
2020-07-21 12:15:40 +00:00
def generate_targets(wiki_url: str) -> defaultdict:
2020-07-28 12:25:18 +00:00
"""To minimize the amount of requests, we generate a list of language/display mode combinations to create messages for
this way we can send the same message to multiple webhooks which have the same wiki and settings without doing another
request to the wiki just to duplicate the message.
"""
2020-07-19 23:40:20 +00:00
combinations = defaultdict(list)
2020-07-27 03:16:50 +00:00
for webhook in db_cursor.execute('SELECT webhook, lang, display FROM rcgcdw WHERE wiki = ?', (wiki_url,)):
2020-07-27 12:13:36 +00:00
combination = (webhook["lang"], webhook["display"])
combinations[combination].append(webhook["webhook"])
2020-07-19 23:40:20 +00:00
return combinations
2020-07-19 13:32:54 +00:00
async def wiki_scanner():
2020-08-01 00:48:14 +00:00
"""Wiki scanner is spawned as a task which purpose is to continuously run over wikis in the DB, fetching recent changes
to add messages based on the changes to message queue later handled by message_sender coroutine."""
2020-07-26 08:00:27 +00:00
try:
while True:
calc_delay = calculate_delay()
2020-07-28 12:39:32 +00:00
fetch_all = db_cursor.execute(
'SELECT webhook, wiki, lang, display, wikiid, rcid, postid FROM rcgcdw GROUP BY wiki')
2020-07-26 08:00:27 +00:00
for db_wiki in fetch_all.fetchall():
2020-07-27 12:13:36 +00:00
logger.debug("Wiki {}".format(db_wiki["wiki"]))
if db_wiki["wiki"] not in all_wikis:
logger.info("Registering new wiki locally: {}".format(db_wiki["wiki"]))
all_wikis[db_wiki["wiki"]] = Wiki()
2020-07-27 12:13:36 +00:00
local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory
2020-08-02 17:27:42 +00:00
if db_wiki["rcid"] != -1:
extended = False
if local_wiki.mw_messages is None:
extended = True
async with aiohttp.ClientSession(headers=settings["header"],
timeout=aiohttp.ClientTimeout(3.0)) as session:
try:
wiki_response = await local_wiki.fetch_wiki(extended, db_wiki["wiki"], session)
await local_wiki.check_status(db_wiki["wiki"], wiki_response.status)
except (WikiServerError, WikiError):
logger.error("Exeption when fetching the wiki")
continue # ignore this wiki if it throws errors
try:
recent_changes_resp = await wiki_response.json()
if "error" in recent_changes_resp or "errors" in recent_changes_resp:
error = recent_changes_resp.get("error", recent_changes_resp["errors"])
if error["code"] == "readapidenied":
await local_wiki.fail_add(db_wiki["wiki"], 410)
continue
raise WikiError
recent_changes = recent_changes_resp['query']['recentchanges']
recent_changes.reverse()
except aiohttp.ContentTypeError:
logger.exception("Wiki seems to be resulting in non-json content.")
await local_wiki.fail_add(db_wiki["wiki"], 410)
continue
except:
logger.exception("On loading json of response.")
continue
if extended:
await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs)
if db_wiki["rcid"] is None: # new wiki, just get the last rc to not spam the channel
if len(recent_changes) > 0:
DBHandler.add(db_wiki["wiki"], recent_changes[-1]["rcid"])
else:
DBHandler.add(db_wiki["wiki"], 0)
DBHandler.update_db()
2020-07-26 21:52:24 +00:00
continue
2020-08-02 17:27:42 +00:00
categorize_events = {}
targets = generate_targets(db_wiki["wiki"])
paths = get_paths(db_wiki["wiki"], recent_changes_resp)
for change in recent_changes:
await process_cats(change, local_wiki, mw_msgs, categorize_events)
for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up
if change["rcid"] > db_wiki["rcid"]:
for target in targets.items():
try:
await essential_info(change, categorize_events, local_wiki, db_wiki,
target, paths, recent_changes_resp)
except:
if command_line_args.debug:
raise # reraise the issue
else:
logger.exception("Exception on RC formatter")
await formatter_exception_logger(db_wiki["wiki"], change, traceback.format_exc())
if recent_changes:
DBHandler.add(db_wiki["wiki"], change["rcid"])
DBHandler.update_db()
await asyncio.sleep(delay=calc_delay)
if db_wiki["wikiid"] is not None:
header = settings["header"]
header["Accept"] = "application/hal+json"
async with aiohttp.ClientSession(headers=header,
timeout=aiohttp.ClientTimeout(3.0)) as session:
try:
feeds_response = await local_wiki.fetch_feeds(db_wiki["wikiid"], session)
# await local_wiki.check_status(db_wiki["wiki"], wiki_response.status)
# NEED A GOAT TO CHECK THIS
except (WikiServerError, WikiError):
logger.error("Exeption when fetching the wiki")
continue # ignore this wiki if it throws errors
try:
discussion_feed_resp = await feeds_response.json()
if "title" in discussion_feed_resp:
error = discussion_feed_resp["error"]
if error == "site doesn't exists":
db_cursor.execute("UPDATE rcgcdw SET wikiid = ? WHERE wiki = ?",
(None, db_wiki["wiki"],))
DBHandler.update_db()
continue
raise WikiError
discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"]
discussion_feed.reverse()
except aiohttp.ContentTypeError:
logger.exception("Wiki seems to be resulting in non-json content.")
# NEED A GOAT TO CHECK THIS
# await local_wiki.fail_add(db_wiki["wiki"], 410)
continue
except:
logger.exception("On loading json of response.")
continue
if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel
if len(discussion_feed) > 0:
DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True)
else:
DBHandler.add(db_wiki["wiki"], "0", True)
DBHandler.update_db()
2020-07-26 21:52:24 +00:00
continue
2020-08-02 17:27:42 +00:00
targets = generate_targets(db_wiki["wiki"])
for post in discussion_feed:
if post["id"] > db_wiki["postid"]:
for target in targets.items():
try:
await essential_feeds(post, db_wiki, target)
except:
if command_line_args.debug:
raise # reraise the issue
else:
logger.exception("Exception on Feeds formatter")
await formatter_exception_logger(db_wiki["wiki"], post, traceback.format_exc())
if discussion_feed:
DBHandler.add(db_wiki["wiki"], post["id"], True)
DBHandler.update_db()
2020-08-02 17:27:42 +00:00
await asyncio.sleep(delay=calc_delay)
2020-07-26 08:00:27 +00:00
except asyncio.CancelledError:
2020-07-26 21:52:24 +00:00
raise
2020-07-21 12:15:40 +00:00
2020-07-19 13:32:54 +00:00
async def message_sender():
2020-08-01 00:48:14 +00:00
"""message_sender is a coroutine responsible for handling Discord messages and their sending to Discord"""
try:
while True:
await messagequeue.resend_msgs()
except:
if command_line_args.debug:
logger.exception("Exception on DC message sender")
raise # reraise the issue
else:
logger.exception("Exception on DC message sender")
await msg_sender_exception_logger(traceback.format_exc())
2020-07-22 11:43:18 +00:00
2020-07-19 13:32:54 +00:00
2020-07-26 08:00:27 +00:00
def shutdown(loop, signal=None):
DBHandler.update_db()
if len(messagequeue) > 0:
logger.warning("Some messages are still queued!")
2020-07-26 08:00:27 +00:00
loop.stop()
logger.info("Script has shut down due to signal {}.".format(signal))
for task in asyncio.all_tasks(loop):
2020-07-26 21:52:24 +00:00
logger.debug("Killing task {}".format(task.get_name()))
2020-07-26 08:00:27 +00:00
task.cancel()
sys.exit(0)
2020-07-28 12:39:32 +00:00
2020-07-23 19:12:07 +00:00
def global_exception_handler(loop, context):
"""Global exception handler for asyncio, lets us know when something crashes"""
msg = context.get("exception", context["message"])
2020-07-28 14:18:06 +00:00
logger.error("Global exception handler: {}".format(msg))
if command_line_args.debug is False:
requests.post("https://discord.com/api/webhooks/"+settings["monitoring_webhook"], data=repr(DiscordMessage("compact", "monitoring", [settings["monitoring_webhook"]], wiki=None, content="[RcGcDb] Global exception handler: {}".format(msg))), headers={'Content-Type': 'application/json'})
else:
shutdown(loop)
2020-07-19 13:32:54 +00:00
async def main_loop():
2020-07-23 19:12:07 +00:00
loop = asyncio.get_event_loop()
2020-07-26 16:03:20 +00:00
try:
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
2020-07-26 21:52:24 +00:00
for s in signals:
loop.add_signal_handler(
s, lambda s=s: shutdown(loop, signal=s))
2020-07-26 16:03:20 +00:00
except AttributeError:
logger.info("Running on Windows, some things may not work as they should.")
2020-07-26 16:03:20 +00:00
signals = (signal.SIGBREAK, signal.SIGTERM, signal.SIGINT)
2020-07-23 19:12:07 +00:00
loop.set_exception_handler(global_exception_handler)
2020-07-26 21:52:24 +00:00
try:
task1 = asyncio.create_task(wiki_scanner())
task2 = asyncio.create_task(message_sender())
await task1
await task2
except KeyboardInterrupt:
shutdown(loop)
2020-07-19 13:32:54 +00:00
2020-07-21 12:15:40 +00:00
asyncio.run(main_loop(), debug=command_line_args.debug)