From 573efaf7fa19e8e008697d00bd53c0320ce398c2 Mon Sep 17 00:00:00 2001 From: Frisk Date: Tue, 16 Aug 2022 12:50:49 +0200 Subject: [PATCH] Progress --- src/discord/message.py | 16 ++- src/discussions.py | 1 + src/domain.py | 5 + src/irc_feed.py | 2 +- src/msgqueue.py | 34 +++++-- src/wiki.py | 223 +++++++++++------------------------------ 6 files changed, 107 insertions(+), 174 deletions(-) diff --git a/src/discord/message.py b/src/discord/message.py index bc867f1..23313b2 100644 --- a/src/discord/message.py +++ b/src/discord/message.py @@ -18,20 +18,22 @@ import math import random from collections import defaultdict -from typing import Optional +from typing import Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from wiki import Wiki with open("src/api/template_settings.json", "r") as template_json: settings: dict = json.load(template_json) class DiscordMessageMetadata: - def __init__(self, method, log_id = None, page_id = None, rev_id = None, webhook_url = None, new_data = None): + def __init__(self, method, log_id = None, page_id = None, rev_id = None, webhook_url = None): self.method = method self.page_id = page_id self.log_id = log_id self.rev_id = rev_id self.webhook_url = webhook_url - self.new_data = new_data def matches(self, other: dict): for key, value in other.items(): @@ -49,6 +51,7 @@ class DiscordMessage: self.webhook_object = dict(allowed_mentions={"parse": []}) self.length = 0 self.metadata: Optional[DiscordMessageMetadata] = None + self.wiki: Optional[Wiki] = None if message_type == "embed": self.__setup_embed() @@ -160,11 +163,16 @@ class DiscordMessageRaw(DiscordMessage): self.webhook_url = webhook_url +class MessageTooBig(BaseException): + pass + + class StackedDiscordMessage(): def __init__(self, m_type: int): self.message_list: list[DiscordMessage] = [] self.length = 0 self.message_type: int = m_type # 0 for compact, 1 for embed + self.discord_callback_message_ids: list[int] = [] def __len__(self): return self.length @@ -182,7 +190,7 @@ class StackedDiscordMessage(): return [(num, message) for num, message in enumerate(self.message_list)] def add_message(self, message: DiscordMessage): - if len(self) + len(message) > 6000: + if len(self) + len(message) > 6000 or len(self.message_list) > 9: raise MessageTooBig self.length += len(message) self.message_list.append(message) diff --git a/src/discussions.py b/src/discussions.py index 53ffba4..dbdf53b 100644 --- a/src/discussions.py +++ b/src/discussions.py @@ -1,4 +1,5 @@ from collections import OrderedDict +from src.config import settings import src.wiki diff --git a/src/domain.py b/src/domain.py index bd307d2..d141f1e 100644 --- a/src/domain.py +++ b/src/domain.py @@ -35,6 +35,7 @@ class Domain: return len(self.wikis) def destroy(self): + """Destroy the domain – do all of the tasks that should make sure there is no leftovers before being collected by GC""" if self.irc: self.irc.connection.disconnect("Leaving") if self.discussions_handler: @@ -43,9 +44,11 @@ class Domain: self.task.cancel() def get_wiki(self, item, default=None) -> Optional[src.wiki.Wiki]: + """Return a wiki with given domain name""" return self.wikis.get(item, default) def set_irc(self, irc_client: src.irc_feed.AioIRCCat): + """Sets IRC""" self.irc = irc_client def stop_task(self): @@ -53,6 +56,7 @@ class Domain: self.task.cancel() # Be aware that cancelling the task may take time def run_domain(self): + """Starts asyncio task for domain""" if not self.task or self.task.cancelled(): self.task = asyncio.create_task(self.run_wiki_check(), name=self.name) else: @@ -109,6 +113,7 @@ class Domain: return max((-25 * queue_length) + 150, 1) async def run_wiki_check(self): + """Runs appropriate scheduler depending on existence of IRC""" if self.irc: while True: await self.irc_scheduler() diff --git a/src/irc_feed.py b/src/irc_feed.py index ebdfa98..dfb5663 100644 --- a/src/irc_feed.py +++ b/src/irc_feed.py @@ -55,7 +55,7 @@ class AioIRCCat(irc.client_aio.AioSimpleIRCClient): # print(message) url = urlparse(message) full_url = "https://"+url.netloc + recognize_langs(url.path) - wiki = self.domain.get_wiki(full_url) + wiki = self.domain.get_wiki(full_url) # TODO Perhaps something less performance hurting? if wiki and wiki.rc_id != -1: self.updated_wikis.add(full_url) logger.debug("New website appended to the list! {}".format(full_url)) diff --git a/src/msgqueue.py b/src/msgqueue.py index 8f1ef53..b856f90 100644 --- a/src/msgqueue.py +++ b/src/msgqueue.py @@ -1,10 +1,21 @@ import asyncio, logging, aiohttp -from src.discord import send_to_discord_webhook, DiscordMessage, StackedDiscordMessage +import typing + +from src.discord.message import DiscordMessage, StackedDiscordMessage, MessageTooBig from src.config import settings from src.exceptions import EmbedListFull from collections import defaultdict logger = logging.getLogger("rcgcdw.msgqueue") +class QueueEntry: + def __init__(self, discord_message, webhooks): + self.discord_message: DiscordMessage = discord_message + self.webhooks: list[str] = webhooks + + def __iter__(self): + return iter(self.webhooks) + + class MessageQueue: """Message queue class for undelivered messages""" def __init__(self): @@ -23,9 +34,9 @@ class MessageQueue: def clear(self): self._queue.clear() - def add_message(self, message): - self._queue.append(message) - logger.debug("Adding new message") + def add_messages(self, messages: list[QueueEntry]): + self._queue.extend(messages) + logger.debug("Adding new messages") # # def replace_message(self, to_replace: DiscordMessage, with_replace: StackedDiscordMessage): # try: @@ -36,7 +47,6 @@ class MessageQueue: def cut_messages(self, item_num): self._queue = self._queue[item_num:] - async def group_by_webhook(self): # TODO Change into iterable """Group Discord messages in the queue by the dictionary, allowing to send multiple messages to different webhooks at the same time avoiding ratelimits per Discord webhook route.""" @@ -48,9 +58,21 @@ class MessageQueue: message_dict[webhook].append(msg) # defaultdict{"dadibadyvbdmadgqueh23/dihjd8agdandashd": [DiscordMessage, DiscordMessage]} return message_dict.items() # dict_items([('daosdkosakda/adkahfwegr34', [DiscordMessage]), ('daosdkosakda/adkahfwegr33', [DiscordMessage, DiscordMessage])]) + async def pack_massages(self, messages: list[DiscordMessage]) -> typing.AsyncGenerator: + """Pack messages into StackedDiscordMessage. It's an async generator""" + current_pack = StackedDiscordMessage(0 if messages[0].message_type == "compact" else 1) # first message + for message in messages: + try: + current_pack.add_message(message) + except MessageTooBig: + yield current_pack + current_pack = StackedDiscordMessage(0 if message.message_type == "compact" else 1) # next messages + current_pack.add_message(message) + yield current_pack + async def send_msg_set(self, msg_set: tuple): webhook_url, messages = msg_set # str("daosdkosakda/adkahfwegr34", list(DiscordMessage, DiscordMessage, DiscordMessage) - for msg in messages: + async for msg in self.pack_massages(messages): if self.global_rate_limit: return # if we are globally rate limited just wait for first gblocked request to finish status = await send_to_discord_webhook(msg, webhook_url) diff --git a/src/wiki.py b/src/wiki.py index 474427b..4a7cd4f 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -10,6 +10,7 @@ import logging, aiohttp from functools import cache from api.util import default_message +from msgqueue import QueueEntry, messagequeue from mw_messages import MWMessages from src.exceptions import * from src.database import db @@ -188,8 +189,7 @@ class Wiki: "amenableparser": 1, "amincludelocal": 1, "siprop": "namespaces|general"}) else: params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges", - "meta": "siteinfo", "utf8": 1, - "tglimit": "max", "rcshow": "!bot", "tgprop": "displayname", + "meta": "siteinfo", "utf8": 1, "rcshow": "!bot", "rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user", "rclimit": amount, "rctype": "edit|new|log|categorize", "siprop": "namespaces|general"}) try: @@ -200,7 +200,7 @@ class Wiki: return response async def scan(self, amount=10): - """Fetches recent changes of a wiki + """Main track of fetching RecentChanges of a wiki. :raises WikiServerError """ @@ -209,21 +209,13 @@ class Wiki: request = await self.fetch_wiki(amount=amount) self.client.last_request = request except WikiServerError as e: + # If WikiServerError comes up 2 times in recent 2 minutes, this will reraise the exception, otherwise waits 2 seconds self.statistics.update(Log(type=LogType.CONNECTION_ERROR, title=str(e.exception))) if self.statistics.recent_connection_errors() > 1: raise await asyncio.sleep(2.0) if not self.mw_messages: - # TODO Split into other function - mw_messages = request.get("query", {}).get("allmessages", []) - final_mw_messages = dict() - for msg in mw_messages: - if "missing" not in msg: # ignore missing strings - final_mw_messages[msg["name"]] = re.sub(r'\[\[.*?]]', '', msg["*"]) - else: - logger.warning("Could not fetch the MW message translation for: {}".format(msg["name"])) - self.mw_messages = MWMessages(final_mw_messages) - # TODO Split into other function + process_cachable(request, self) try: recent_changes = request["query"]["recentchanges"] recent_changes.reverse() @@ -252,19 +244,17 @@ class Wiki: break await process_cats(change, self, categorize_events) else: # adequate amount of changes - for tag in request["query"]["tags"]: - try: - self.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text() - except KeyError: - self.tags[tag["name"]] = None - message_list = defaultdict(list) + message_list = [] # Collect all messages so they can be efficiently merged in Discord message sender for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up if change["rcid"] > self.rc_id: if highest_id is None or change["rcid"] > highest_id: # make sure that the highest_rc is really highest rcid but do allow other entries with potentially lesser rcids come after without breaking the cycle highest_id = change["rcid"] for combination, webhooks in self.targets.items(): message = await rc_processor(self, change, categorize_events, combination, webhooks) - break + message.wiki = self + message_list.append(QueueEntry(message, webhooks)) + messagequeue.add_messages(message_list) + return @cache def prepare_settings(display_mode: int) -> dict: @@ -276,7 +266,27 @@ def prepare_settings(display_mode: int) -> dict: return template +def process_cachable(response: dict, wiki_object: Wiki) -> None: + """This function processes cachable objects – such as MediaWiki system messages and wiki tag display names to be used + for processing of DiscordMessages and saves them in a wiki object.""" + mw_messages = response.get("query", {}).get("allmessages", []) + final_mw_messages = dict() + for msg in mw_messages: + if "missing" not in msg: # ignore missing strings + final_mw_messages[msg["name"]] = re.sub(r'\[\[.*?]]', '', msg["*"]) + else: + logger.warning("Could not fetch the MW message translation for: {}".format(msg["name"])) + wiki_object.mw_messages = MWMessages(final_mw_messages) + for tag in response["query"]["tags"]: + try: + wiki_object.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text() + except KeyError: + wiki_object.tags[tag["name"]] = None + + async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, display_options: namedtuple("Settings", ["lang", "display"]), webhooks: list) -> DiscordMessage: + """This function takes more vital information, communicates with a formatter and constructs DiscordMessage with it. + It creates DiscordMessageMetadata object, LinkParser and Context. Prepares a comment """ from src.misc import LinkParser LinkParser = LinkParser() metadata = DiscordMessageMetadata("POST", rev_id=change.get("revid", None), log_id=change.get("logid", None), @@ -353,120 +363,6 @@ async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, displ return discord_message -@dataclass -class Wiki_old: - mw_messages: int = None - fail_times: int = 0 # corresponding to amount of times connection with wiki failed for client reasons (400-499) - session: aiohttp.ClientSession = None - rc_active: int = 0 - last_check: float = 0.0 - last_discussion_check: float = 0.0 - - @staticmethod - async def fetch_wiki(extended, script_path, session: aiohttp.ClientSession, ratelimiter: RateLimiter, amount=20) -> aiohttp.ClientResponse: - await ratelimiter.timeout_wait() - url_path = script_path + "api.php" - if extended: - params = {"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges", - "meta": "allmessages|siteinfo", - "utf8": 1, "tglimit": "max", "tgprop": "displayname", - "rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user", - "rclimit": amount, "rcshow": "!bot", "rctype": "edit|new|log|categorize", - "ammessages": "recentchanges-page-added-to-category|recentchanges-page-removed-from-category|recentchanges-page-added-to-category-bundled|recentchanges-page-removed-from-category-bundled", - "amenableparser": 1, "amincludelocal": 1, "siprop": "namespaces|general"} - else: - params = {"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges", - "meta": "siteinfo", "utf8": 1, - "tglimit": "max", "rcshow": "!bot", "tgprop": "displayname", - "rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user", - "rclimit": amount, "rctype": "edit|new|log|categorize", "siprop": "namespaces|general"} - try: - response = await session.get(url_path, params=params) - ratelimiter.timeout_add(1.0) - except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError): - logger.error("A connection error occurred while requesting {}".format(url_path)) - raise WikiServerError - return response - - @staticmethod - async def fetch_feeds(wiki, session: aiohttp.ClientSession) -> aiohttp.ClientResponse: - url_path = "{wiki}wikia.php".format(wiki=wiki) - params = {"controller": "DiscussionPost", "method": "getPosts", "includeCounters": "false", "sortDirection": "descending", "sortKey": "creation_date", "limit": 20} - try: - response = await session.get(url_path, params=params) - response.raise_for_status() - except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError, aiohttp.ClientResponseError): - logger.error("A connection error occurred while requesting {}".format(url_path)) - raise WikiServerError - return response - - @staticmethod - async def safe_request(url, ratelimiter, *keys): - await ratelimiter.timeout_wait() - try: - async with aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(6.0)) as session: - request = await session.get(url) - ratelimiter.timeout_add(1.0) - request.raise_for_status() - json_request = await request.json(encoding="UTF-8") - except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError): - logger.error("Reached connection error for request on link {url}".format(url=url)) - else: - try: - for item in keys: - json_request = json_request[item] - except KeyError: - logger.warning( - "Failure while extracting data from request on key {key} in {change}".format(key=item, change=json_request)) - return None - return json_request - - async def fail_add(self, wiki_url, status): - logger.debug("Increasing fail_times to {}".format(self.fail_times+3)) - self.fail_times += 3 - if self.fail_times > 9: - await self.remove(wiki_url, status) - - async def check_status(self, wiki_url, status): - if 199 < status < 300: - self.fail_times -= 1 - pass - elif 400 < status < 500: # ignore 400 error since this might be our fault - await self.fail_add(wiki_url, status) - logger.warning("Wiki {} responded with HTTP code {}, increased fail_times to {}, skipping...".format(wiki_url, status, self.fail_times)) - raise WikiError - elif 499 < status < 600: - logger.warning("Wiki {} responded with HTTP code {}, skipping...".format(wiki_url, status, self.fail_times)) - raise WikiServerError - - @staticmethod - async def remove(wiki_url, reason): - logger.info("Removing a wiki {}".format(wiki_url)) - await discord.discord.wiki_removal(wiki_url, reason) - await discord.discord.wiki_removal_monitor(wiki_url, reason) - async with db.pool().acquire() as connection: - result = await connection.execute('DELETE FROM rcgcdw WHERE wiki = $1', wiki_url) - logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(result, wiki_url)) - - async def pull_comment(self, comment_id, WIKI_API_PATH, rate_limiter): - try: - comment = await self.safe_request( - "{wiki}?action=comment&do=getRaw&comment_id={comment}&format=json".format(wiki=WIKI_API_PATH, - comment=comment_id), rate_limiter, "text") - logger.debug("Got the following comment from the API: {}".format(comment)) - if comment is None: - raise TypeError - except (TypeError, AttributeError): - logger.exception("Could not resolve the comment text.") - except KeyError: - logger.exception("CurseProfile extension API did not respond with a valid comment content.") - else: - if len(comment) > 1000: - comment = comment[0:1000] + "…" - return comment - return "" - - async def process_cats(event: dict, local_wiki: Wiki, categorize_events: dict): """Process categories based on local MW messages. """ if event["type"] == "categorize": @@ -492,35 +388,36 @@ async def process_cats(event: dict, local_wiki: Wiki, categorize_events: dict): else: logger.debug("Log entry got suppressed, ignoring entry.") - -async def process_mwmsgs(wiki_response: dict, local_wiki: Wiki, mw_msgs: dict): - """ - This function is made to parse the initial wiki extended information to update local_wiki.mw_messages that stores the key - to mw_msgs that is a dict storing id: tuple where tuple is a set of MW messages for categories. - The reason it's constructed this way is to prevent duplication of data in memory so Markus doesn't complain about - high RAM usage. It does however affect CPU performance as every wiki requires to check the list for the matching - tuples of MW messages. - - :param wiki_response: - :param local_wiki: - :param mw_msgs: - :return: - """ - msgs = [] - for message in wiki_response["query"]["allmessages"]: - if not "missing" in message: # ignore missing strings - msgs.append((message["name"], re.sub(r'\[\[.*?\]\]', '', message["*"]))) - else: - logger.warning("Could not fetch the MW message translation for: {}".format(message["name"])) - msgs = tuple(msgs) - for key, set in mw_msgs.items(): - if msgs == set: - local_wiki.mw_messages = key - return - # if same entry is not in mw_msgs - key = len(mw_msgs) - mw_msgs[key] = msgs # it may be a little bit messy for sure, however I don't expect any reason to remove mw_msgs entries by one - local_wiki.mw_messages = key +# This function has been removed. While its implementation seems sound, it should be considered only if we find performance +# concerns with RcGcDb +# async def process_mwmsgs(wiki_response: dict, local_wiki: Wiki, mw_msgs: dict): +# """ +# This function is made to parse the initial wiki extended information to update local_wiki.mw_messages that stores the key +# to mw_msgs that is a dict storing id: tuple where tuple is a set of MW messages for categories. +# The reason it's constructed this way is to prevent duplication of data in memory so Markus doesn't complain about +# high RAM usage. It does however affect CPU performance as every wiki requires to check the list for the matching +# tuples of MW messages. +# +# :param wiki_response: +# :param local_wiki: +# :param mw_msgs: +# :return: +# """ +# msgs = [] +# for message in wiki_response["query"]["allmessages"]: +# if not "missing" in message: # ignore missing strings +# msgs.append((message["name"], re.sub(r'\[\[.*?\]\]', '', message["*"]))) +# else: +# logger.warning("Could not fetch the MW message translation for: {}".format(message["name"])) +# msgs = tuple(msgs) +# for key, set in mw_msgs.items(): +# if msgs == set: +# local_wiki.mw_messages = key +# return +# # if same entry is not in mw_msgs +# key = len(mw_msgs) +# mw_msgs[key] = msgs # it may be a little bit messy for sure, however I don't expect any reason to remove mw_msgs entries by one +# local_wiki.mw_messages = key # db_wiki: webhook, wiki, lang, display, rcid, postid