From 493a1c8194cf188687b9b20616c5b8f4d0906ad0 Mon Sep 17 00:00:00 2001 From: Frisk Date: Thu, 6 Aug 2020 02:46:43 +0200 Subject: [PATCH] More progress, it's almost working? --- src/bot.py | 211 ++++++++++++++++++++++------------------ src/discord.py | 18 ++++ src/formatters/rc.py | 16 +-- src/wiki.py | 19 ++-- src/wiki_ratelimiter.py | 17 +++- 5 files changed, 169 insertions(+), 112 deletions(-) diff --git a/src/bot.py b/src/bot.py index 4abf06f..167f9ec 100644 --- a/src/bot.py +++ b/src/bot.py @@ -18,7 +18,9 @@ from src.misc import get_paths, get_domain from src.msgqueue import messagequeue from src.queue_handler import DBHandler from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essential_feeds -from src.discord import DiscordMessage, formatter_exception_logger, msg_sender_exception_logger +from src.discord import DiscordMessage, formatter_exception_logger, msg_sender_exception_logger, \ + group_task_exception_logger, discussion_task_exception_logger +from src.wiki_ratelimiter import RateLimiter logging.config.dictConfig(settings["logging"]) logger = logging.getLogger("rcgcdb.bot") @@ -44,7 +46,7 @@ class LimitedList(list): def append(self, object) -> None: if len(self) < queue_limit: - self.append(object) + self.insert(len(self), object) return raise ListFull @@ -57,7 +59,7 @@ class RcQueue: async def start_group(self, group, initial_wikis): """Starts a task for given domain group""" if group not in self.domain_list: - self.domain_list[group] = {"task": asyncio.create_task(scan_group(group)), "last_rowid": 0, "query": LimitedList(initial_wikis)} + self.domain_list[group] = {"task": asyncio.create_task(scan_group(group), name=group), "last_rowid": 0, "query": LimitedList(initial_wikis), "rate_limiter": RateLimiter()} else: raise KeyError @@ -74,38 +76,46 @@ class RcQueue: """Retrives next wiki in the queue for given domain""" try: yield self.domain_list[group]["query"][0] - except IndexError: - logger.warning("Queue for {} domain group is empty.".format(group)) - yield None - finally: # add exception handling? + except: + if command_line_args.debug: + logger.exception("RC Group exception") + raise # reraise the issue + else: + logger.exception("Group task returned error") + await group_task_exception_logger(group, traceback.format_exc()) + else: self.domain_list[group]["query"].pop(0) + async def update_queues(self): """Makes a round on rcgcdw DB and looks for updates to the queues in self.domain_list""" - fetch_all = db_cursor.execute( - 'SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID') - self.to_remove = list(all_wikis.keys()) # first populate this list and remove wikis that are still in the db, clean up the rest - full = [] - for db_wiki in fetch_all.fetchall(): - domain = get_domain(db_wiki["wiki"]) - current_domain = self[domain] - try: - if not db_wiki["ROWID"] < current_domain["last_rowid"]: - current_domain["query"].append(db_wiki) - self.to_remove.remove(db_wiki["wiki"]) - except KeyError: - await self.start_group(domain, db_wiki) - logger.info("A new domain group has been added since last time, adding it to the domain_list and starting a task...") - except ListFull: - full.append(domain) - current_domain["last_rowid"] = db_wiki["ROWID"] - continue - for wiki in self.to_remove: - del all_wikis[wiki] - await self.remove_wiki_from_group(wiki) - for group, data in self.domain_list: - if group not in full: - self["domain"]["last_rowid"] = 0 # iter reached the end without being stuck on full list + try: + fetch_all = db_cursor.execute( + 'SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE NOT rcid = -1 GROUP BY wiki ORDER BY ROWID') + self.to_remove = list(all_wikis.keys()) # first populate this list and remove wikis that are still in the db, clean up the rest + full = [] + for db_wiki in fetch_all.fetchall(): + domain = get_domain(db_wiki["wiki"]) + current_domain = self[domain] + try: + if not db_wiki["ROWID"] < current_domain["last_rowid"]: + current_domain["query"].append(db_wiki) + self.to_remove.remove(db_wiki["wiki"]) + except KeyError: + await self.start_group(domain, db_wiki) + logger.info("A new domain group has been added since last time, adding it to the domain_list and starting a task...") + except ListFull: + full.append(domain) + current_domain["last_rowid"] = db_wiki["ROWID"] + continue + for wiki in self.to_remove: + del all_wikis[wiki] + await self.remove_wiki_from_group(wiki) + for group, data in self.domain_list.items(): + if group not in full: + self[group]["last_rowid"] = 0 # iter reached the end without being stuck on full list + except: + logger.exception("Queue error!") def __getitem__(self, item): """Returns the query of given domain group""" @@ -120,14 +130,13 @@ rcqueue = RcQueue() # Start queueing logic - def calculate_delay_for_group(group_length: int) -> float: """Calculate the delay between fetching each wiki to avoid rate limits""" min_delay = 60 / settings["max_requests_per_minute"] if (group_length * min_delay) < settings["minimal_cooldown_per_wiki_in_sec"]: return settings["minimal_cooldown_per_wiki_in_sec"] / group_length else: - return min_delay + return 0.0 def generate_targets(wiki_url: str) -> defaultdict: @@ -154,10 +163,9 @@ async def generate_domain_groups(): async def scan_group(group: str): + rate_limiter = rcqueue[group]["rate_limiter"] while True: async with rcqueue.retrieve_next_queued(group) as db_wiki: # acquire next wiki in queue - if db_wiki is None: - raise QueueEmpty logger.debug("Wiki {}".format(db_wiki["wiki"])) local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory if db_wiki["rcid"] != -1: @@ -167,7 +175,7 @@ async def scan_group(group: str): async with aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(3.0)) as session: try: - wiki_response = await local_wiki.fetch_wiki(extended, db_wiki["wiki"], session) + wiki_response = await local_wiki.fetch_wiki(extended, db_wiki["wiki"], session, rate_limiter) await local_wiki.check_status(db_wiki["wiki"], wiki_response.status) except (WikiServerError, WikiError): logger.error("Exeption when fetching the wiki") @@ -208,17 +216,19 @@ async def scan_group(group: str): for target in targets.items(): try: await essential_info(change, categorize_events, local_wiki, db_wiki, - target, paths, recent_changes_resp) + target, paths, recent_changes_resp, rate_limiter) except: if command_line_args.debug: - raise # reraise the issue + logger.exception("Exception on RC formatter") + raise else: logger.exception("Exception on RC formatter") await formatter_exception_logger(db_wiki["wiki"], change, traceback.format_exc()) if recent_changes: DBHandler.add(db_wiki["wiki"], change["rcid"]) - await asyncio.sleep(delay=calc_delay) - return group + delay_between_wikis = calculate_delay_for_group(len(rcqueue[group]["query"])) + await asyncio.sleep(delay_between_wikis) + DBHandler.update_db() async def wiki_scanner(): @@ -230,59 +240,6 @@ async def wiki_scanner(): while True: await asyncio.sleep(20.0) await rcqueue.update_queues() - # - # - # if db_wiki["wikiid"] is not None: - # header = settings["header"] - # header["Accept"] = "application/hal+json" - # async with aiohttp.ClientSession(headers=header, - # timeout=aiohttp.ClientTimeout(3.0)) as session: - # try: - # feeds_response = await local_wiki.fetch_feeds(db_wiki["wikiid"], session) - # except (WikiServerError, WikiError): - # logger.error("Exeption when fetching the wiki") - # continue # ignore this wiki if it throws errors - # try: - # discussion_feed_resp = await feeds_response.json(encoding="UTF-8") - # if "title" in discussion_feed_resp: - # error = discussion_feed_resp["error"] - # if error == "site doesn't exists": - # db_cursor.execute("UPDATE rcgcdw SET wikiid = ? WHERE wiki = ?", - # (None, db_wiki["wiki"],)) - # DBHandler.update_db() - # continue - # raise WikiError - # discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"] - # discussion_feed.reverse() - # except aiohttp.ContentTypeError: - # logger.exception("Wiki seems to be resulting in non-json content.") - # continue - # except: - # logger.exception("On loading json of response.") - # continue - # if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel - # if len(discussion_feed) > 0: - # DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True) - # else: - # DBHandler.add(db_wiki["wiki"], "0", True) - # DBHandler.update_db() - # continue - # targets = generate_targets(db_wiki["wiki"]) - # for post in discussion_feed: - # if post["id"] > db_wiki["postid"]: - # for target in targets.items(): - # try: - # await essential_feeds(post, db_wiki, target) - # except: - # if command_line_args.debug: - # raise # reraise the issue - # else: - # logger.exception("Exception on Feeds formatter") - # await formatter_exception_logger(db_wiki["wiki"], post, traceback.format_exc()) - # if discussion_feed: - # DBHandler.add(db_wiki["wiki"], post["id"], True) - # await asyncio.sleep(delay=calc_delay) - # DBHandler.update_db() except asyncio.CancelledError: raise @@ -300,6 +257,72 @@ async def message_sender(): logger.exception("Exception on DC message sender") await msg_sender_exception_logger(traceback.format_exc()) +async def discussion_handler(): + try: + while True: + fetch_all = db_cursor.execute( + 'SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid, postid FROM rcgcdw WHERE NOT wikiid = ""') + for db_wiki in fetch_all.fetchall(): + if db_wiki["wikiid"] is not None: + header = settings["header"] + header["Accept"] = "application/hal+json" + async with aiohttp.ClientSession(headers=header, + timeout=aiohttp.ClientTimeout(3.0)) as session: + local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory + try: + feeds_response = await local_wiki.fetch_feeds(db_wiki["wikiid"], session) + except (WikiServerError, WikiError): + logger.error("Exeption when fetching the wiki") + continue # ignore this wiki if it throws errors + try: + discussion_feed_resp = await feeds_response.json(encoding="UTF-8") + if "title" in discussion_feed_resp: + error = discussion_feed_resp["error"] + if error == "site doesn't exists": + db_cursor.execute("UPDATE rcgcdw SET wikiid = ? WHERE wiki = ?", + (None, db_wiki["wiki"],)) + DBHandler.update_db() + continue + raise WikiError + discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"] + discussion_feed.reverse() + except aiohttp.ContentTypeError: + logger.exception("Wiki seems to be resulting in non-json content.") + continue + except: + logger.exception("On loading json of response.") + continue + if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel + if len(discussion_feed) > 0: + DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True) + else: + DBHandler.add(db_wiki["wiki"], "0", True) + DBHandler.update_db() + continue + targets = generate_targets(db_wiki["wiki"]) + for post in discussion_feed: + if post["id"] > db_wiki["postid"]: + for target in targets.items(): + try: + await essential_feeds(post, db_wiki, target) + except: + if command_line_args.debug: + raise # reraise the issue + else: + logger.exception("Exception on Feeds formatter") + await formatter_exception_logger(db_wiki["wiki"], post, traceback.format_exc()) + if discussion_feed: + DBHandler.add(db_wiki["wiki"], post["id"], True) + await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more + DBHandler.update_db() + except: + if command_line_args.debug: + raise # reraise the issue + else: + logger.exception("Exception on Feeds formatter") + await discussion_task_exception_logger(db_wiki["wiki"], traceback.format_exc()) + + def shutdown(loop, signal=None): DBHandler.update_db() @@ -337,8 +360,10 @@ async def main_loop(): try: task1 = asyncio.create_task(wiki_scanner()) task2 = asyncio.create_task(message_sender()) + task3 = asyncio.create_task(discussion_handler()) await task1 await task2 + await task3 except KeyboardInterrupt: shutdown(loop) diff --git a/src/discord.py b/src/discord.py index 5bf5a3e..659bbf9 100644 --- a/src/discord.py +++ b/src/discord.py @@ -110,6 +110,24 @@ async def wiki_removal_monitor(wiki_url, status): await send_to_discord_webhook_monitoring(DiscordMessage("compact", "webhook/remove", content="Removing {} because {}.".format(wiki_url, status), webhook_url=[None], wiki=None)) +async def discussion_task_exception_logger(wiki, exception): + message = DiscordMessage("embed", "bot/exception", [None], wiki=None) + message["description"] = exception + message["title"] = "Discussion task exception logger" + message.add_field("Wiki", wiki) + message.finish_embed() + await send_to_discord_webhook_monitoring(message) + + +async def group_task_exception_logger(group, exception): + message = DiscordMessage("embed", "bot/exception", [None], wiki=None) + message["description"] = exception + message["title"] = "Group task exception logger" + message.add_field("Group", group) + message.finish_embed() + await send_to_discord_webhook_monitoring(message) + + async def formatter_exception_logger(wiki_url, change, exception): """Creates a Discord message reporting a crash in RC formatter area""" message = DiscordMessage("embed", "bot/exception", [None], wiki=None) diff --git a/src/formatters/rc.py b/src/formatters/rc.py index bfcc2dd..786b052 100644 --- a/src/formatters/rc.py +++ b/src/formatters/rc.py @@ -18,7 +18,7 @@ if 1 == 2: # additional translation strings in unreachable code print(_("director"), _("bot"), _("editor"), _("directors"), _("sysop"), _("bureaucrat"), _("reviewer"), _("autoreview"), _("autopatrol"), _("wiki_guardian"), ngettext("second", "seconds", 1), ngettext("minute", "minutes", 1), ngettext("hour", "hours", 1), ngettext("day", "days", 1), ngettext("week", "weeks", 1), ngettext("month", "months",1), ngettext("year", "years", 1), ngettext("millennium", "millennia", 1), ngettext("decade", "decades", 1), ngettext("century", "centuries", 1)) -async def compact_formatter(action, change, parsed_comment, categories, recent_changes, message_target, _, ngettext, paths, +async def compact_formatter(action, change, parsed_comment, categories, recent_changes, message_target, _, ngettext, paths, rate_limiter, additional_data=None): """Recent Changes compact formatter, part of RcGcDw""" if additional_data is None: @@ -330,7 +330,7 @@ async def compact_formatter(action, change, parsed_comment, categories, recent_c await send_to_discord(DiscordMessage("compact", action, message_target[1], content=content, wiki=WIKI_SCRIPT_PATH)) -async def embed_formatter(action, change, parsed_comment, categories, recent_changes, message_target, _, ngettext, paths, additional_data=None): +async def embed_formatter(action, change, parsed_comment, categories, recent_changes, message_target, _, ngettext, paths, rate_limiter, additional_data=None): """Recent Changes embed formatter, part of RcGcDw""" if additional_data is None: additional_data = {"namespaces": {}, "tags": {}} @@ -374,12 +374,12 @@ async def embed_formatter(action, change, parsed_comment, categories, recent_cha changed_content = await recent_changes.safe_request( "{wiki}?action=compare&format=json&fromtext=&torev={diff}&topst=1&prop=diff".format( wiki=WIKI_API_PATH, diff=change["revid"] - ), "compare", "*") + ), rate_limiter, "compare", "*") else: changed_content = await recent_changes.safe_request( "{wiki}?action=compare&format=json&fromrev={oldrev}&torev={diff}&topst=1&prop=diff".format( wiki=WIKI_API_PATH, diff=change["revid"],oldrev=change["old_revid"] - ), "compare", "*") + ), rate_limiter, "compare", "*") if changed_content: EditDiff = ContentParser(_) EditDiff.feed(changed_content) @@ -404,7 +404,7 @@ async def embed_formatter(action, change, parsed_comment, categories, recent_cha license = None urls = await recent_changes.safe_request( "{wiki}?action=query&format=json&prop=imageinfo&list=&meta=&titles={filename}&iiprop=timestamp%7Curl%7Carchivename&iilimit=5".format( - wiki=WIKI_API_PATH, filename=change["title"]), "query", "pages") + wiki=WIKI_API_PATH, filename=change["title"]), rate_limiter, "query", "pages") link = create_article_path(change["title"], WIKI_ARTICLE_PATH) additional_info_retrieved = False if urls is not None: @@ -520,21 +520,21 @@ async def embed_formatter(action, change, parsed_comment, categories, recent_cha embed["title"] = _("Unblocked {blocked_user}").format(blocked_user=user) elif action == "curseprofile/comment-created": if settings["appearance"]["embed"]["show_edit_changes"]: - parsed_comment = await recent_changes.pull_comment(change["logparams"]["4:comment_id"], WIKI_API_PATH) + parsed_comment = await recent_changes.pull_comment(change["logparams"]["4:comment_id"], WIKI_API_PATH, rate_limiter) link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]), WIKI_ARTICLE_PATH) embed["title"] = _("Left a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \ change["user"] else _( "Left a comment on their own profile") elif action == "curseprofile/comment-replied": if settings["appearance"]["embed"]["show_edit_changes"]: - parsed_comment = await recent_changes.pull_comment(change["logparams"]["4:comment_id"], WIKI_API_PATH) + parsed_comment = await recent_changes.pull_comment(change["logparams"]["4:comment_id"], WIKI_API_PATH, rate_limiter) link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]), WIKI_ARTICLE_PATH) embed["title"] = _("Replied to a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \ change["user"] else _( "Replied to a comment on their own profile") elif action == "curseprofile/comment-edited": if settings["appearance"]["embed"]["show_edit_changes"]: - parsed_comment = await recent_changes.pull_comment(change["logparams"]["4:comment_id"], WIKI_API_PATH) + parsed_comment = await recent_changes.pull_comment(change["logparams"]["4:comment_id"], WIKI_API_PATH, rate_limiter) link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]), WIKI_ARTICLE_PATH) embed["title"] = _("Edited a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \ change["user"] else _( diff --git a/src/wiki.py b/src/wiki.py index 7a1dd47..727cf00 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -7,6 +7,7 @@ from src.formatters.rc import embed_formatter, compact_formatter from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter from src.misc import parse_link from src.i18n import langs +from src.wiki_ratelimiter import RateLimiter import src.discord import asyncio from src.config import settings @@ -26,7 +27,8 @@ class Wiki: @staticmethod - async def fetch_wiki(extended, script_path, session: aiohttp.ClientSession) -> aiohttp.ClientResponse: + async def fetch_wiki(extended, script_path, session: aiohttp.ClientSession, ratelimiter: RateLimiter) -> aiohttp.ClientResponse: + await ratelimiter.timeout_wait() url_path = script_path + "api.php" amount = 20 if extended: @@ -45,6 +47,7 @@ class Wiki: "rclimit": amount, "rctype": "edit|new|log|categorize", "siprop": "namespaces|general"} try: response = await session.get(url_path, params=params) + ratelimiter.timeout_add(1.0) except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError): logger.exception("A connection error occurred while requesting {}".format(url_path)) raise WikiServerError @@ -63,10 +66,12 @@ class Wiki: return response @staticmethod - async def safe_request(url, *keys): + async def safe_request(url, ratelimiter, *keys): + await ratelimiter.timeout_wait() try: async with aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(3.0)) as session: request = await session.get(url, allow_redirects=False) + ratelimiter.timeout_add(1.0) request.raise_for_status() json_request = await request.json(encoding="UTF-8") except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError): @@ -108,11 +113,11 @@ class Wiki: logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(db_cursor.rowcount, wiki_url)) db_connection.commit() - async def pull_comment(self, comment_id, WIKI_API_PATH): + async def pull_comment(self, comment_id, WIKI_API_PATH, rate_limiter): try: comment = await self.safe_request( "{wiki}?action=comment&do=getRaw&comment_id={comment}&format=json".format(wiki=WIKI_API_PATH, - comment=comment_id), "text") + comment=comment_id), rate_limiter, "text") logger.debug("Got the following comment from the API: {}".format(comment)) if comment is None: raise TypeError @@ -186,7 +191,7 @@ async def process_mwmsgs(wiki_response: dict, local_wiki: Wiki, mw_msgs: dict): local_wiki.mw_messages = key # db_wiki: webhook, wiki, lang, display, wikiid, rcid, postid -async def essential_info(change: dict, changed_categories, local_wiki: Wiki, db_wiki: tuple, target: tuple, paths: tuple, request: dict): +async def essential_info(change: dict, changed_categories, local_wiki: Wiki, db_wiki: tuple, target: tuple, paths: tuple, request: dict, rate_limiter: RateLimiter): """Prepares essential information for both embed and compact message format.""" def _(string: str) -> str: """Our own translation string to make it compatible with async""" @@ -199,7 +204,7 @@ async def essential_info(change: dict, changed_categories, local_wiki: Wiki, db_ logger.debug("List of categories in essential_info: {}".format(changed_categories)) appearance_mode = embed_formatter if target[0][1] > 0 else compact_formatter if "actionhidden" in change or "suppressed" in change: # if event is hidden using suppression - await appearance_mode("suppressed", change, "", changed_categories, local_wiki, target, _, ngettext, paths) + await appearance_mode("suppressed", change, "", changed_categories, local_wiki, target, _, ngettext, paths, rate_limiter) return if "commenthidden" not in change: parsed_comment = parse_link(paths[3], change["parsedcomment"]) @@ -223,7 +228,7 @@ async def essential_info(change: dict, changed_categories, local_wiki: Wiki, db_ additional_data["tags"][tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text() except KeyError: additional_data["tags"][tag["name"]] = None # Tags with no displ - await appearance_mode(identification_string, change, parsed_comment, changed_categories, local_wiki, target, _, ngettext, paths, additional_data=additional_data) + await appearance_mode(identification_string, change, parsed_comment, changed_categories, local_wiki, target, _, ngettext, paths, rate_limiter, additional_data=additional_data) async def essential_feeds(change: dict, db_wiki: tuple, target: tuple): diff --git a/src/wiki_ratelimiter.py b/src/wiki_ratelimiter.py index 62efb28..21acbd5 100644 --- a/src/wiki_ratelimiter.py +++ b/src/wiki_ratelimiter.py @@ -1,10 +1,19 @@ -import logging -from urllib.parse import urlparse +import logging, time, asyncio logger = logging.getLogger("rcgcdw.ratelimiter") class RateLimiter: def __init__(self): - self.domain_requests: dict = {} + self.timeout_until = 0 - def get_timeout(self, url): + def timeout_add(self, timeout: float): + """This function sets a new timeout""" + self.timeout_until = time.time() + timeout + logger.debug("Added {} timeout".format(timeout)) + + async def timeout_wait(self): + """This awaitable calculates the time to wait according to timeout_until, does not wait if it's past the timeout to not skip a cycle""" + calculated_timeout = self.timeout_until - time.time() + logger.debug("Waiting {}".format(calculated_timeout)) + if calculated_timeout > 0: + await asyncio.sleep(calculated_timeout)