Wrapping up the work on rate-limiting

This commit is contained in:
Frisk 2020-08-06 15:26:06 +02:00
parent 71a3bdd91d
commit 1b6be292d9
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
4 changed files with 61 additions and 86 deletions

View file

@ -6,7 +6,6 @@ import sys
import traceback import traceback
from collections import defaultdict from collections import defaultdict
import functools
import requests import requests
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
@ -18,8 +17,7 @@ from src.misc import get_paths, get_domain
from src.msgqueue import messagequeue from src.msgqueue import messagequeue
from src.queue_handler import DBHandler from src.queue_handler import DBHandler
from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essential_feeds from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essential_feeds
from src.discord import DiscordMessage, formatter_exception_logger, msg_sender_exception_logger, \ from src.discord import DiscordMessage, generic_msg_sender_exception_logger
group_task_exception_logger, discussion_task_exception_logger
from src.wiki_ratelimiter import RateLimiter from src.wiki_ratelimiter import RateLimiter
logging.config.dictConfig(settings["logging"]) logging.config.dictConfig(settings["logging"])
@ -38,6 +36,9 @@ mw_msgs: dict = {} # will have the type of id: tuple
# Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests # Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests
# 2. Easier to code # 2. Easier to code
for db_wiki in db_cursor.execute('SELECT wiki FROM rcgcdw GROUP BY wiki ORDER BY ROWID'):
all_wikis[db_wiki["wiki"]] = Wiki() # populate all_wikis
queue_limit = settings.get("queue_limit", 30) queue_limit = settings.get("queue_limit", 30)
class LimitedList(list): class LimitedList(list):
@ -65,9 +66,11 @@ class RcQueue:
async def remove_wiki_from_group(self, wiki): async def remove_wiki_from_group(self, wiki):
"""Removes a wiki from query of given domain group""" """Removes a wiki from query of given domain group"""
logger.debug(f"Removing {wiki} from group queue.")
group = get_domain(wiki) group = get_domain(wiki)
self[group]["query"] = [x for x in self[group]["query"] if x["wiki"] == wiki] self[group]["query"] = [x for x in self[group]["query"] if x["wiki"] == wiki]
if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task
all_wikis[wiki].rc_active = False
self[group]["task"].cancel() self[group]["task"].cancel()
del self.domain_list[group] del self.domain_list[group]
@ -76,23 +79,29 @@ class RcQueue:
"""Retrives next wiki in the queue for given domain""" """Retrives next wiki in the queue for given domain"""
try: try:
yield self.domain_list[group]["query"][0] yield self.domain_list[group]["query"][0]
except asyncio.CancelledError:
raise
except: except:
if command_line_args.debug: if command_line_args.debug:
logger.exception("RC Group exception") logger.exception("RC Group exception")
raise # reraise the issue shutdown(asyncio.get_event_loop())
else: else:
logger.exception("Group task returned error") logger.exception("Group task returned error")
await group_task_exception_logger(group, traceback.format_exc()) await generic_msg_sender_exception_logger(traceback.format_exc(), "Group task error logger", Group=group)
else: else:
self.domain_list[group]["query"].pop(0) self.domain_list[group]["query"].pop(0)
@staticmethod
def filter_rc_active(wiki_obj):
return wiki_obj[1].rc_active
async def update_queues(self): async def update_queues(self):
"""Makes a round on rcgcdw DB and looks for updates to the queues in self.domain_list""" """Makes a round on rcgcdw DB and looks for updates to the queues in self.domain_list"""
try: try:
fetch_all = db_cursor.execute( fetch_all = db_cursor.execute(
'SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID') 'SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID')
self.to_remove = list(all_wikis.keys()) # first populate this list and remove wikis that are still in the db, clean up the rest self.to_remove = [x[0] for x in filter(self.filter_rc_active, all_wikis.items())] # first populate this list and remove wikis that are still in the db, clean up the rest
full = [] full = []
for db_wiki in fetch_all.fetchall(): for db_wiki in fetch_all.fetchall():
domain = get_domain(db_wiki["wiki"]) domain = get_domain(db_wiki["wiki"])
@ -109,13 +118,19 @@ class RcQueue:
current_domain["last_rowid"] = db_wiki["ROWID"] current_domain["last_rowid"] = db_wiki["ROWID"]
continue continue
for wiki in self.to_remove: for wiki in self.to_remove:
del all_wikis[wiki]
await self.remove_wiki_from_group(wiki) await self.remove_wiki_from_group(wiki)
for group, data in self.domain_list.items(): for group, data in self.domain_list.items():
if group not in full: if group not in full:
self[group]["last_rowid"] = 0 # iter reached the end without being stuck on full list self[group]["last_rowid"] = 0 # iter reached the end without being stuck on full list
logger.debug("Current domain_list structure: {}".format(self.domain_list))
except: except:
logger.exception("Queue error!") if command_line_args.debug:
logger.exception("Queue error!")
shutdown(asyncio.get_event_loop())
else:
logger.exception("Exception on queue updater")
await generic_msg_sender_exception_logger(traceback.format_exc(), "Queue updator")
def __getitem__(self, item): def __getitem__(self, item):
"""Returns the query of given domain group""" """Returns the query of given domain group"""
@ -133,6 +148,8 @@ rcqueue = RcQueue()
def calculate_delay_for_group(group_length: int) -> float: def calculate_delay_for_group(group_length: int) -> float:
"""Calculate the delay between fetching each wiki to avoid rate limits""" """Calculate the delay between fetching each wiki to avoid rate limits"""
min_delay = 60 / settings["max_requests_per_minute"] min_delay = 60 / settings["max_requests_per_minute"]
if group_length == 0:
group_length = 1
if (group_length * min_delay) < settings["minimal_cooldown_per_wiki_in_sec"]: if (group_length * min_delay) < settings["minimal_cooldown_per_wiki_in_sec"]:
return settings["minimal_cooldown_per_wiki_in_sec"] / group_length return settings["minimal_cooldown_per_wiki_in_sec"] / group_length
else: else:
@ -156,8 +173,8 @@ async def generate_domain_groups():
domain_wikis = defaultdict(list) domain_wikis = defaultdict(list)
fetch_all = db_cursor.execute('SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID ASC') fetch_all = db_cursor.execute('SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID ASC')
for db_wiki in fetch_all.fetchall(): for db_wiki in fetch_all.fetchall():
all_wikis[db_wiki["wiki"]].rc_active = True
domain_wikis[get_domain(db_wiki["wiki"])].append(db_wiki) domain_wikis[get_domain(db_wiki["wiki"])].append(db_wiki)
all_wikis[db_wiki["wiki"]] = Wiki() # populate all_wikis
for group, db_wikis in domain_wikis.items(): for group, db_wikis in domain_wikis.items():
yield group, db_wikis yield group, db_wikis
@ -216,13 +233,15 @@ async def scan_group(group: str):
try: try:
await essential_info(change, categorize_events, local_wiki, db_wiki, await essential_info(change, categorize_events, local_wiki, db_wiki,
target, paths, recent_changes_resp, rate_limiter) target, paths, recent_changes_resp, rate_limiter)
except asyncio.CancelledError:
raise
except: except:
if command_line_args.debug: if command_line_args.debug:
logger.exception("Exception on RC formatter") logger.exception("Exception on RC formatter")
raise raise
else: else:
logger.exception("Exception on RC formatter") logger.exception("Exception on RC formatter")
await formatter_exception_logger(db_wiki["wiki"], change, traceback.format_exc()) await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in RC formatter", Wiki=db_wiki["wiki"], Change=str(change)[0:1000])
if recent_changes: if recent_changes:
DBHandler.add(db_wiki["wiki"], change["rcid"]) DBHandler.add(db_wiki["wiki"], change["rcid"])
delay_between_wikis = calculate_delay_for_group(len(rcqueue[group]["query"])) delay_between_wikis = calculate_delay_for_group(len(rcqueue[group]["query"]))
@ -248,13 +267,15 @@ async def message_sender():
try: try:
while True: while True:
await messagequeue.resend_msgs() await messagequeue.resend_msgs()
except asyncio.CancelledError:
pass
except: except:
if command_line_args.debug: if command_line_args.debug:
logger.exception("Exception on DC message sender") logger.exception("Exception on DC message sender")
raise # reraise the issue shutdown(loop=asyncio.get_event_loop())
else: else:
logger.exception("Exception on DC message sender") logger.exception("Exception on DC message sender")
await msg_sender_exception_logger(traceback.format_exc()) await generic_msg_sender_exception_logger(traceback.format_exc(), "Message sender exception")
async def discussion_handler(): async def discussion_handler():
try: try:
@ -304,22 +325,27 @@ async def discussion_handler():
for target in targets.items(): for target in targets.items():
try: try:
await essential_feeds(post, db_wiki, target) await essential_feeds(post, db_wiki, target)
except asyncio.CancelledError:
raise
except: except:
if command_line_args.debug: if command_line_args.debug:
raise # reraise the issue logger.exception("Exception on Feeds formatter")
shutdown(loop=asyncio.get_event_loop())
else: else:
logger.exception("Exception on Feeds formatter") logger.exception("Exception on Feeds formatter")
await formatter_exception_logger(db_wiki["wiki"], post, traceback.format_exc()) await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in feed formatter", Post=str(post)[0:1000], Wiki=db_wiki["wiki"])
if discussion_feed: if discussion_feed:
DBHandler.add(db_wiki["wiki"], post["id"], True) DBHandler.add(db_wiki["wiki"], post["id"], True)
await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more
DBHandler.update_db() DBHandler.update_db()
except asyncio.CancelledError:
pass
except: except:
if command_line_args.debug: if command_line_args.debug:
raise # reraise the issue raise # reraise the issue
else: else:
logger.exception("Exception on Feeds formatter") logger.exception("Exception on Feeds formatter")
await discussion_task_exception_logger(db_wiki["wiki"], traceback.format_exc()) await generic_msg_sender_exception_logger(traceback.format_exc(), "Discussion handler task exception", Wiki=db_wiki["wiki"])
@ -327,22 +353,23 @@ def shutdown(loop, signal=None):
DBHandler.update_db() DBHandler.update_db()
if len(messagequeue) > 0: if len(messagequeue) > 0:
logger.warning("Some messages are still queued!") logger.warning("Some messages are still queued!")
loop.stop()
logger.info("Script has shut down due to signal {}.".format(signal))
for task in asyncio.all_tasks(loop): for task in asyncio.all_tasks(loop):
logger.debug("Killing task {}".format(task.get_name())) logger.debug("Killing task {}".format(task.get_name()))
task.cancel() task.cancel()
sys.exit(0) loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop)))
loop.stop()
logger.info("Script has shut down due to signal {}.".format(signal))
# sys.exit(0)
def global_exception_handler(loop, context): # def global_exception_handler(loop, context):
"""Global exception handler for asyncio, lets us know when something crashes""" # """Global exception handler for asyncio, lets us know when something crashes"""
msg = context.get("exception", context["message"]) # msg = context.get("exception", context["message"])
logger.error("Global exception handler: {}".format(msg)) # logger.error("Global exception handler: {}".format(msg))
if command_line_args.debug is False: # if command_line_args.debug is False:
requests.post("https://discord.com/api/webhooks/"+settings["monitoring_webhook"], data=repr(DiscordMessage("compact", "monitoring", [settings["monitoring_webhook"]], wiki=None, content="[RcGcDb] Global exception handler: {}".format(msg))), headers={'Content-Type': 'application/json'}) # requests.post("https://discord.com/api/webhooks/"+settings["monitoring_webhook"], data=repr(DiscordMessage("compact", "monitoring", [settings["monitoring_webhook"]], wiki=None, content="[RcGcDb] Global exception handler: {}".format(msg))), headers={'Content-Type': 'application/json'})
else: # else:
shutdown(loop) # shutdown(loop)
async def main_loop(): async def main_loop():
@ -355,7 +382,7 @@ async def main_loop():
except AttributeError: except AttributeError:
logger.info("Running on Windows, some things may not work as they should.") logger.info("Running on Windows, some things may not work as they should.")
signals = (signal.SIGBREAK, signal.SIGTERM, signal.SIGINT) signals = (signal.SIGBREAK, signal.SIGTERM, signal.SIGINT)
loop.set_exception_handler(global_exception_handler) # loop.set_exception_handler(global_exception_handler)
try: try:
task1 = asyncio.create_task(wiki_scanner()) task1 = asyncio.create_task(wiki_scanner())
task2 = asyncio.create_task(message_sender()) task2 = asyncio.create_task(message_sender())
@ -366,5 +393,4 @@ async def main_loop():
except KeyboardInterrupt: except KeyboardInterrupt:
shutdown(loop) shutdown(loop)
asyncio.run(main_loop(), debug=command_line_args.debug)
asyncio.run(main_loop(), debug=command_line_args.debug)

View file

@ -110,41 +110,13 @@ async def wiki_removal_monitor(wiki_url, status):
await send_to_discord_webhook_monitoring(DiscordMessage("compact", "webhook/remove", content="Removing {} because {}.".format(wiki_url, status), webhook_url=[None], wiki=None)) await send_to_discord_webhook_monitoring(DiscordMessage("compact", "webhook/remove", content="Removing {} because {}.".format(wiki_url, status), webhook_url=[None], wiki=None))
async def discussion_task_exception_logger(wiki, exception): async def generic_msg_sender_exception_logger(exception: str, title: str, **kwargs):
"""Creates a Discord message reporting a crash"""
message = DiscordMessage("embed", "bot/exception", [None], wiki=None) message = DiscordMessage("embed", "bot/exception", [None], wiki=None)
message["description"] = exception message["description"] = exception
message["title"] = "Discussion task exception logger" message["title"] = title
message.add_field("Wiki", wiki) for key, value in kwargs:
message.finish_embed() message.add_field(key, value)
await send_to_discord_webhook_monitoring(message)
async def group_task_exception_logger(group, exception):
message = DiscordMessage("embed", "bot/exception", [None], wiki=None)
message["description"] = exception
message["title"] = "Group task exception logger"
message.add_field("Group", group)
message.finish_embed()
await send_to_discord_webhook_monitoring(message)
async def formatter_exception_logger(wiki_url, change, exception):
"""Creates a Discord message reporting a crash in RC formatter area"""
message = DiscordMessage("embed", "bot/exception", [None], wiki=None)
message["description"] = exception
message["title"] = "RC Exception Report"
change = str(change)[0:1000]
message.add_field("Wiki URL", wiki_url)
message.add_field("Change", change)
message.finish_embed()
await send_to_discord_webhook_monitoring(message)
async def msg_sender_exception_logger(exception):
"""Creates a Discord message reporting a crash in RC formatter area"""
message = DiscordMessage("embed", "bot/exception", [None], wiki=None)
message["description"] = exception
message["title"] = "MSGSENDER Exception Report"
message.finish_embed() message.finish_embed()
await send_to_discord_webhook_monitoring(message) await send_to_discord_webhook_monitoring(message)

View file

@ -1,23 +0,0 @@
import aiohttp
import logging
from src.config import settings
logger = logging.getLogger("rcgcdb.request_tracking")
class WikiRequestTracking:
def __init__(self):
self.current_timeout = 0
async def add_timeout(self, time: float):
self.current_timeout += time
def is_fandom(self, url):
if any(x in url for x in ("fandom.com", "gamepedia.com", "wikia.org")):
return True
return False
async def on_request_start(session, trace_config_ctx, params):
if
trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_request_start)

View file

@ -24,7 +24,7 @@ class Wiki:
mw_messages: int = None mw_messages: int = None
fail_times: int = 0 # corresponding to amount of times connection with wiki failed for client reasons (400-499) fail_times: int = 0 # corresponding to amount of times connection with wiki failed for client reasons (400-499)
session: aiohttp.ClientSession = None session: aiohttp.ClientSession = None
rc_active: bool = False
@staticmethod @staticmethod
async def fetch_wiki(extended, script_path, session: aiohttp.ClientSession, ratelimiter: RateLimiter) -> aiohttp.ClientResponse: async def fetch_wiki(extended, script_path, session: aiohttp.ClientSession, ratelimiter: RateLimiter) -> aiohttp.ClientResponse: