Additional work done on the ratelimiting

This commit is contained in:
Frisk 2020-08-03 16:44:42 +02:00
parent c1831b992b
commit a4462369bb
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
2 changed files with 91 additions and 68 deletions

View file

@ -12,7 +12,7 @@ from src.argparser import command_line_args
from src.config import settings from src.config import settings
from src.database import db_cursor from src.database import db_cursor
from src.exceptions import * from src.exceptions import *
from src.misc import get_paths from src.misc import get_paths, get_domain
from src.msgqueue import messagequeue from src.msgqueue import messagequeue
from src.queue_handler import DBHandler from src.queue_handler import DBHandler
from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essential_feeds from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essential_feeds
@ -41,11 +41,11 @@ for wiki in db_cursor.execute('SELECT DISTINCT wiki FROM rcgcdw'):
# Start queueing logic # Start queueing logic
def calculate_delay() -> float: def calculate_delay_for_group(group_length: int) -> float:
"""Calculate the delay between fetching each wiki to avoid rate limits""" """Calculate the delay between fetching each wiki to avoid rate limits"""
min_delay = 60 / settings["max_requests_per_minute"] min_delay = 60 / settings["max_requests_per_minute"]
if (len(all_wikis) * min_delay) < settings["minimal_cooldown_per_wiki_in_sec"]: if (group_length * min_delay) < settings["minimal_cooldown_per_wiki_in_sec"]:
return settings["minimal_cooldown_per_wiki_in_sec"] / len(all_wikis) return settings["minimal_cooldown_per_wiki_in_sec"] / group_length
else: else:
return min_delay return min_delay
@ -62,78 +62,95 @@ def generate_targets(wiki_url: str) -> defaultdict:
return combinations return combinations
async def generate_domain_groups(): # oh boy, I cannot wait to learn about async generators
combinations = defaultdict(list)
fetch_all = db_cursor.execute('SELECT webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw GROUP BY wiki')
for db_wiki in fetch_all.fetchall():
combinations[get_domain(db_wiki["wiki"])].append(db_wiki)
for item in combinations.values():
yield item
async def scan_group(group: list):
calc_delay = calculate_delay_for_group(len(group))
for db_wiki in group:
logger.debug("Wiki {}".format(db_wiki["wiki"]))
if db_wiki["wiki"] not in all_wikis:
logger.info("Registering new wiki locally: {}".format(db_wiki["wiki"]))
all_wikis[db_wiki["wiki"]] = Wiki()
local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory
if db_wiki["rcid"] != -1:
extended = False
if local_wiki.mw_messages is None:
extended = True
async with aiohttp.ClientSession(headers=settings["header"],
timeout=aiohttp.ClientTimeout(3.0)) as session:
try:
wiki_response = await local_wiki.fetch_wiki(extended, db_wiki["wiki"], session)
await local_wiki.check_status(db_wiki["wiki"], wiki_response.status)
except (WikiServerError, WikiError):
logger.error("Exeption when fetching the wiki")
continue # ignore this wiki if it throws errors
try:
recent_changes_resp = await wiki_response.json()
if "error" in recent_changes_resp or "errors" in recent_changes_resp:
error = recent_changes_resp.get("error", recent_changes_resp["errors"])
if error["code"] == "readapidenied":
await local_wiki.fail_add(db_wiki["wiki"], 410)
continue
raise WikiError
recent_changes = recent_changes_resp['query']['recentchanges']
recent_changes.reverse()
except aiohttp.ContentTypeError:
logger.exception("Wiki seems to be resulting in non-json content.")
await local_wiki.fail_add(db_wiki["wiki"], 410)
continue
except:
logger.exception("On loading json of response.")
continue
if extended:
await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs)
if db_wiki["rcid"] is None: # new wiki, just get the last rc to not spam the channel
if len(recent_changes) > 0:
DBHandler.add(db_wiki["wiki"], recent_changes[-1]["rcid"])
else:
DBHandler.add(db_wiki["wiki"], 0)
DBHandler.update_db()
continue
categorize_events = {}
targets = generate_targets(db_wiki["wiki"])
paths = get_paths(db_wiki["wiki"], recent_changes_resp)
for change in recent_changes:
await process_cats(change, local_wiki, mw_msgs, categorize_events)
for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up
if change["rcid"] > db_wiki["rcid"]:
for target in targets.items():
try:
await essential_info(change, categorize_events, local_wiki, db_wiki,
target, paths, recent_changes_resp)
except:
if command_line_args.debug:
raise # reraise the issue
else:
logger.exception("Exception on RC formatter")
await formatter_exception_logger(db_wiki["wiki"], change, traceback.format_exc())
if recent_changes:
DBHandler.add(db_wiki["wiki"], change["rcid"])
await asyncio.sleep(delay=calc_delay)
async def wiki_scanner(): async def wiki_scanner():
"""Wiki scanner is spawned as a task which purpose is to continuously run over wikis in the DB, fetching recent changes """Wiki scanner is spawned as a task which purpose is to continuously run over wikis in the DB, fetching recent changes
to add messages based on the changes to message queue later handled by message_sender coroutine.""" to add messages based on the changes to message queue later handled by message_sender coroutine."""
try: try:
while True: while True:
calc_delay = calculate_delay() async for group in generate_domain_groups():
asyncio.create_task(scan_group(group))
fetch_all = db_cursor.execute( fetch_all = db_cursor.execute(
'SELECT webhook, wiki, lang, display, wikiid, rcid, postid FROM rcgcdw GROUP BY wiki') 'SELECT webhook, wiki, lang, display, wikiid, rcid, postid FROM rcgcdw GROUP BY wiki')
for db_wiki in fetch_all.fetchall(): for db_wiki in fetch_all.fetchall():
logger.debug("Wiki {}".format(db_wiki["wiki"]))
if db_wiki["wiki"] not in all_wikis:
logger.info("Registering new wiki locally: {}".format(db_wiki["wiki"]))
all_wikis[db_wiki["wiki"]] = Wiki()
local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory
if db_wiki["rcid"] != -1:
extended = False
if local_wiki.mw_messages is None:
extended = True
async with aiohttp.ClientSession(headers=settings["header"],
timeout=aiohttp.ClientTimeout(3.0)) as session:
try:
wiki_response = await local_wiki.fetch_wiki(extended, db_wiki["wiki"], session)
await local_wiki.check_status(db_wiki["wiki"], wiki_response.status)
except (WikiServerError, WikiError):
logger.error("Exeption when fetching the wiki")
continue # ignore this wiki if it throws errors
try:
recent_changes_resp = await wiki_response.json()
if "error" in recent_changes_resp or "errors" in recent_changes_resp:
error = recent_changes_resp.get("error", recent_changes_resp["errors"])
if error["code"] == "readapidenied":
await local_wiki.fail_add(db_wiki["wiki"], 410)
continue
raise WikiError
recent_changes = recent_changes_resp['query']['recentchanges']
recent_changes.reverse()
except aiohttp.ContentTypeError:
logger.exception("Wiki seems to be resulting in non-json content.")
await local_wiki.fail_add(db_wiki["wiki"], 410)
continue
except:
logger.exception("On loading json of response.")
continue
if extended:
await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs)
if db_wiki["rcid"] is None: # new wiki, just get the last rc to not spam the channel
if len(recent_changes) > 0:
DBHandler.add(db_wiki["wiki"], recent_changes[-1]["rcid"])
else:
DBHandler.add(db_wiki["wiki"], 0)
DBHandler.update_db()
continue
categorize_events = {}
targets = generate_targets(db_wiki["wiki"])
paths = get_paths(db_wiki["wiki"], recent_changes_resp)
for change in recent_changes:
await process_cats(change, local_wiki, mw_msgs, categorize_events)
for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up
if change["rcid"] > db_wiki["rcid"]:
for target in targets.items():
try:
await essential_info(change, categorize_events, local_wiki, db_wiki,
target, paths, recent_changes_resp)
except:
if command_line_args.debug:
raise # reraise the issue
else:
logger.exception("Exception on RC formatter")
await formatter_exception_logger(db_wiki["wiki"], change, traceback.format_exc())
if recent_changes:
DBHandler.add(db_wiki["wiki"], change["rcid"])
await asyncio.sleep(delay=2.0) # temporary measure until rate limiting is not implemented
if db_wiki["wikiid"] is not None: if db_wiki["wikiid"] is not None:
header = settings["header"] header = settings["header"]
header["Accept"] = "application/hal+json" header["Accept"] = "application/hal+json"

View file

@ -17,6 +17,12 @@ def get_paths(wiki: str, request) -> tuple:
return WIKI_API_PATH, WIKI_SCRIPT_PATH, WIKI_ARTICLE_PATH, WIKI_JUST_DOMAIN return WIKI_API_PATH, WIKI_SCRIPT_PATH, WIKI_ARTICLE_PATH, WIKI_JUST_DOMAIN
def get_domain(url: str) -> str:
"""Get domain of given URL"""
parsed_url = urlparse(url)
return ".".join(urlunparse((*parsed_url[0:2], "", "", "", "")).split(".")[-2:]) # something like gamepedia.com, fandom.com
class LinkParser(HTMLParser): class LinkParser(HTMLParser):
new_string = "" new_string = ""