from __future__ import annotations import functools import json import time import re import logging, aiohttp import asyncio from functools import cache from api.util import default_message from src.discord.queue import messagequeue, QueueEntry from mw_messages import MWMessages from src.exceptions import * from src.queue_handler import dbmanager from src.api.hooks import formatter_hooks from src.api.client import Client from src.api.context import Context from src.discord.message import DiscordMessage, DiscordMessageMetadata, StackedDiscordMessage from src.i18n import langs from src.statistics import Statistics, Log, LogType from src.config import settings # noinspection PyPackageRequirements from bs4 import BeautifulSoup from collections import OrderedDict, defaultdict, namedtuple from typing import Union, Optional, TYPE_CHECKING Settings = namedtuple("Settings", ["lang", "display"]) logger = logging.getLogger("rcgcdb.wiki") # wiki_reamoval_reasons = {410: _("wiki deleted"), 404: _("wiki deleted"), 401: _("wiki inaccessible"), # 402: _("wiki inaccessible"), 403: _("wiki inaccessible"), 1000: _("discussions disabled")} if TYPE_CHECKING: from src.domain import Domain MESSAGE_LIMIT = settings.get("message_limit", 30) class Wiki: def __init__(self, script_url: str, rc_id: Optional[int], discussion_id: Optional[int]): self.script_url: str = script_url self.session: aiohttp.ClientSession = aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(6.0)) self.statistics: Statistics = Statistics(rc_id, discussion_id) self.mw_messages: Optional[MWMessages] = None self.tags: dict[str, Optional[str]] = {} # Tag can be None if hidden self.first_fetch_done: bool = False self.domain: Optional[Domain] = None self.targets: Optional[defaultdict[Settings, list[str]]] = None self.client: Client = Client(formatter_hooks, self) self.message_history: list[StackedDiscordMessage] = list() self.namespaces: Optional[dict] = None self.recache_requested: bool = False @property def rc_id(self): return self.statistics.last_action @property def last_request(self): return self.statistics.last_request @last_request.setter def last_request(self, value): self.statistics.last_request = value # async def remove(self, reason): # logger.info("Removing a wiki {}".format(self.script_url)) # await src.discord.wiki_removal(self.script_url, reason) # await src.discord.wiki_removal_monitor(self.script_url, reason) # async with db.pool().acquire() as connection: # result = await connection.execute('DELETE FROM rcgcdw WHERE wiki = $1', self.script_url) # logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(result, self.script_url)) def add_message(self, message: StackedDiscordMessage): self.message_history.append(message) if len(self.message_history) > MESSAGE_LIMIT*len(self.targets): self.message_history = self.message_history[len(self.message_history)-MESSAGE_LIMIT*len(self.targets):] def set_domain(self, domain: Domain): self.domain = domain # def find_middle_next(ids: List[str], pageid: int) -> set: # TODO Properly re-implement for RcGcDb # """To address #235 RcGcDw should now remove diffs in next revs relative to redacted revs to protect information in revs that revert revdeleted information. # # :arg ids - list # :arg pageid - int # # :return list""" # ids = [int(x) for x in ids] # result = set() # ids.sort() # Just to be sure, sort the list to make sure it's always sorted # messages = db_cursor.execute("SELECT revid FROM event WHERE pageid = ? AND revid >= ? ORDER BY revid", # (pageid, ids[0],)) # all_in_page = [x[0] for x in messages.fetchall()] # for id in ids: # try: # result.add(all_in_page[all_in_page.index(id) + 1]) # except (KeyError, ValueError): # logger.debug(f"Value {id} not in {all_in_page} or no value after that.") # return result - set(ids) def search_message_history(self, params: dict) -> list[tuple[StackedDiscordMessage, list[int]]]: """Search self.message_history for messages which match all properties in params and return them in a list""" output = [] for message in self.message_history: returned_matches_for_stacked = message.filter(params) if returned_matches_for_stacked: output.append((message, [x[0] for x in returned_matches_for_stacked])) return output def delete_messages(self, params: dict): """Delete certain messages from message_history which DiscordMessageMetadata matches all properties in params""" # Delete all messages with given IDs for stacked_message, ids in self.search_message_history(params): stacked_message.delete_message_by_id(ids) # If all messages were removed, send a DELETE to Discord if len(stacked_message.message_list) == 0: messagequeue.add_message(QueueEntry(stacked_message, [stacked_message.webhook], self, method="DELETE")) else: messagequeue.add_message(QueueEntry(stacked_message, [stacked_message.webhook], self, method="PATCH")) def redact_messages(self, ids: list[int], mode: str, censored_properties: dict): # ids can refer to multiple events, and search does not support additive mode, so we have to loop it for all ids for revlogid in ids: for stacked_message, ids in self.search_message_history({mode: revlogid}): # This might not work depending on how Python handles it, but hey, learning experience for message in [message for num, message in enumerate(stacked_message.message_list) if num in ids]: if "user" in censored_properties and "url" in message["author"]: message["author"]["name"] = _("hidden") message["author"].pop("url") if "action" in censored_properties and "url" in message: message["title"] = _("~~hidden~~") message["embed"].pop("url") if "content" in censored_properties and "fields" in message: message["embed"].pop("fields") if "comment" in censored_properties: message["description"] = _("~~hidden~~") messagequeue.add_message(QueueEntry(stacked_message, [stacked_message.webhook], self, method="PATCH")) # async def downtime_controller(self, down, reason=None): # if down: # self.fail_times += 1 # if self.fail_times > 20: # await self.remove(reason) # else: # self.fail_times -= 1 async def update_targets(self) -> None: """This function generates all possible varations of outputs that we need to generate messages for. :returns defaultdict[namedtuple, list[str]] - where namedtuple is a named tuple with settings for given webhooks in list""" Settings = namedtuple("Settings", ["lang", "display"]) target_settings: defaultdict[Settings, list[str]] = defaultdict(list) async for webhook in dbmanager.fetch_rows("SELECT webhook, lang, display FROM rcgcdw WHERE wiki = $1 AND (rcid != -1 OR rcid IS NULL)", self.script_url): target_settings[Settings(webhook["lang"], webhook["display"])].append(webhook["webhook"]) self.targets = target_settings def parse_mw_request_info(self, request_data: dict, url: str): """A function parsing request JSON message from MediaWiki logging all warnings and raising on MediaWiki errors""" # any([True for k in request_data.keys() if k in ("error", "errors")]) errors: list = request_data.get("errors", {}) # Is it ugly? I don't know tbh if errors: raise MediaWikiError(str(errors)) warnings: list = request_data.get("warnings", {}) if warnings: for warning in warnings: logger.warning("MediaWiki returned the following warning: {code} - {text} on {url}.".format( code=warning["code"], text=warning.get("text", warning.get("*", "")), url=url )) return request_data async def api_request(self, params: Union[str, OrderedDict], *json_path: str, timeout: int = 10, allow_redirects: bool = False) -> dict: """Method to GET request data from the wiki's API with error handling including recognition of MediaWiki errors. Parameters: params (str, OrderedDict): a string or collections.OrderedDict object containing query parameters json_path (str): *args taking strings as values. After request is parsed as json it will extract data from given json path timeout (int, float) (default=10): int or float limiting time required for receiving a full response from a server before returning TimeoutError allow_redirects (bool) (default=False): switches whether the request should follow redirects or not Returns: request_content (dict): a dict resulting from json extraction of HTTP GET request with given json_path OR One of the following exceptions: ServerError: When connection with the wiki failed due to server error ClientError: When connection with the wiki failed due to client error KeyError: When json_path contained keys that weren't found in response JSON response BadRequest: When params argument is of wrong type MediaWikiError: When MediaWiki returns an error """ # Making request try: if isinstance(params, str): # Todo Make it so there are some default arguments like warning/error format appended request = await self.session.get(self.script_url + "api.php?" + params + "&errorformat=raw", timeout=timeout, allow_redirects=allow_redirects) elif isinstance(params, OrderedDict): params["errorformat"] = "raw" request = await self.session.get(self.script_url + "api.php", params=params, timeout=timeout, allow_redirects=allow_redirects) else: raise BadRequest(params) except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError) as exc: logger.warning("Reached {error} error for request on link {url}".format(error=repr(exc), url=self.script_url + str(params))) raise ServerError # Catching HTTP errors if 499 < request.status < 600: raise ServerError elif request.status == 302: logger.critical( "Redirect detected! Either the wiki given in the script settings (wiki field) is incorrect/the wiki got removed or is giving us the false value. Please provide the real URL to the wiki, current URL redirects to {}".format( request.url)) elif 399 < request.status < 500: logger.error("Request returned ClientError status code on {url}".format(url=request.url)) self.statistics.update(Log(type=LogType.HTTP_ERROR, title="{} error".format(request.status), details=str(request.headers) + "\n" + str(request.url))) raise ClientError(request) else: # JSON Extraction try: request_json = self.parse_mw_request_info(await request.json(encoding="UTF-8"), str(request.url)) for item in json_path: request_json = request_json[item] except ValueError: logger.warning("ValueError when extracting JSON data on {url}".format(url=request.url)) raise ServerError except MediaWikiError: logger.exception("MediaWiki error on request: {}".format(request.url)) raise except KeyError: logger.exception("KeyError while iterating over json_path, full response: {}".format(request.json())) raise return request_json async def fetch_wiki(self, amount=10) -> dict: if self.mw_messages is None: params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges", "meta": "allmessages|siteinfo", "utf8": 1, "tglimit": "max", "tgprop": "displayname", "rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user", "rclimit": amount, "rcshow": "!bot", "rctype": "edit|new|log|categorize", "ammessages": "recentchanges-page-added-to-category|recentchanges-page-removed-from-category|recentchanges-page-added-to-category-bundled|recentchanges-page-removed-from-category-bundled", "amenableparser": 1, "amincludelocal": 1, "siprop": "namespaces|general"}) else: params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges", "meta": "siteinfo", "utf8": 1, "rcshow": "!bot", "rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user", "rclimit": amount, "rctype": "edit|new|log|categorize", "siprop": "namespaces|general"}) try: response = await self.api_request(params=params) except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError) as e: logger.error("A connection error occurred while requesting {}".format(params)) raise WikiServerError(e) return response async def scan(self, amount=10): """Main track of fetching RecentChanges of a wiki. :raises WikiServerError """ while True: # Trap event in case there are more changes needed to be fetched try: request = await self.fetch_wiki(amount=amount) self.client.last_request = request except WikiServerError as e: # If WikiServerError comes up 2 times in recent 2 minutes, this will reraise the exception, otherwise waits 2 seconds and retries self.statistics.update(Log(type=LogType.CONNECTION_ERROR, title=str(e.exception))) if self.statistics.recent_connection_errors() > 1: raise await asyncio.sleep(2.0) continue if not self.mw_messages or self.recache_requested: process_cachable(request, self) try: recent_changes = request["query"]["recentchanges"] recent_changes.reverse() except KeyError: raise WikiError if self.rc_id in (0, None, -1): if len(recent_changes) > 0: self.statistics.last_action = recent_changes[-1]["rcid"] dbmanager.add(("UPDATE rcgcdw SET rcid = $1 WHERE wiki = $2 AND ( rcid != -1 OR rcid IS NULL )", (recent_changes[-1]["rcid"], self.script_url))) else: self.statistics.last_action = 0 dbmanager.add(("UPDATE rcgcdw SET rcid = 0 WHERE wiki = $1 AND ( rcid != -1 OR rcid IS NULL )", (self.script_url))) return # TODO Add a log entry? categorize_events = {} new_events = 0 self.statistics.last_checked_rc = int(time.time()) highest_id = self.rc_id # Pretty sure that will be faster for change in recent_changes: if change["rcid"] > highest_id and amount != 450: new_events += 1 if new_events == 10: # call the function again with max limit for more results, ignore the ones in this request logger.debug("There were too many new events, queuing wiki with 450 limit.") amount = 450 break await process_cats(change, self, categorize_events) else: # adequate amount of changes message_list = [] # Collect all messages so they can be efficiently merged in Discord message sender for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up if change["rcid"] > self.rc_id: if highest_id is None or change["rcid"] > highest_id: # make sure that the highest_rc is really highest rcid but do allow other entries with potentially lesser rcids come after without breaking the cycle highest_id = change["rcid"] for combination, webhooks in self.targets.items(): message = await rc_processor(self, change, categorize_events, combination, webhooks) message.wiki = self message_list.append(QueueEntry(message, webhooks)) messagequeue.add_messages(message_list) return @cache def prepare_settings(display_mode: int) -> dict: """Prepares dict of RcGcDw compatible settings based on a template and display mode of given call""" with open("src/api/template_settings.json", "r") as template_json: template = json.load(template_json) template["appearance"]["embed"]["embed_images"] = True if display_mode > 1 else False template["appearance"]["embed"]["show_edit_changes"] = True if display_mode > 2 else False return template def process_cachable(response: dict, wiki_object: Wiki) -> None: """This function processes cachable objects – such as MediaWiki system messages and wiki tag display names to be used for processing of DiscordMessages and saves them in a wiki object.""" mw_messages = response.get("query", {}).get("allmessages", []) final_mw_messages = dict() for msg in mw_messages: if "missing" not in msg: # ignore missing strings final_mw_messages[msg["name"]] = re.sub(r'\[\[.*?]]', '', msg["*"]) else: logger.warning("Could not fetch the MW message translation for: {}".format(msg["name"])) wiki_object.mw_messages = MWMessages(final_mw_messages) for tag in response["query"]["tags"]: try: wiki_object.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text() except KeyError: wiki_object.tags[tag["name"]] = None wiki_object.namespaces = response["query"]["namespaces"] wiki_object.recache_requested = False async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, display_options: namedtuple("Settings", ["lang", "display"]), webhooks: list) -> DiscordMessage: """This function takes more vital information, communicates with a formatter and constructs DiscordMessage with it. It creates DiscordMessageMetadata object, LinkParser and Context. Prepares a comment """ from src.misc import LinkParser LinkParser = LinkParser() metadata = DiscordMessageMetadata("POST", rev_id=change.get("revid", None), log_id=change.get("logid", None), page_id=change.get("pageid", None)) context = Context("embed" if display_options.display > 0 else "compact", "recentchanges", webhooks, wiki.client, langs[display_options.lang]["rc_formatters"], prepare_settings(display_options.display)) if ("actionhidden" in change or "suppressed" in change) and "suppressed" not in settings["ignored"]: # if event is hidden using suppression context.event = "suppressed" try: discord_message: Optional[DiscordMessage] = await asyncio.get_event_loop().run_in_executor( None, functools.partial(default_message("suppressed", display_options.display, formatter_hooks), context, change)) except: if settings.get("error_tolerance", 1) > 0: discord_message: Optional[DiscordMessage] = None # It's handled by send_to_discord, we still want other code to run else: raise else: if "commenthidden" not in change: LinkParser.feed(change.get("parsedcomment", "")) parsed_comment = LinkParser.new_string else: parsed_comment = _("~~hidden~~") if not parsed_comment and context.message_type == "embed" and settings["appearance"].get("embed", {}).get( "show_no_description_provided", True): parsed_comment = _("No description provided") context.set_parsedcomment(parsed_comment) if "userhidden" in change: change["user"] = _("hidden") if change.get("ns", -1) in settings.get("ignored_namespaces", ()): return if change["type"] in ["edit", "new"]: logger.debug("List of categories in essential_info: {}".format(changed_categories)) identification_string = change["type"] context.set_categories(changed_categories) elif change["type"] == "categorize": return elif change["type"] == "log": identification_string = "{logtype}/{logaction}".format(logtype=change["logtype"], logaction=change["logaction"]) else: identification_string = change.get("type", "unknown") # If event doesn't have a type if identification_string in settings["ignored"]: return context.event = identification_string try: discord_message: Optional[DiscordMessage] = await asyncio.get_event_loop().run_in_executor(None, functools.partial(default_message(identification_string, display_options.display, formatter_hooks), context, change)) except: if settings.get("error_tolerance", 1) > 0: discord_message: Optional[DiscordMessage] = None # It's handled by send_to_discord, we still want other code to run else: raise if identification_string in ("delete/delete", "delete/delete_redir"): # TODO Move it into a hook? wiki.delete_messages(dict(pageid=change.get("pageid"))) elif identification_string == "delete/event": logparams = change.get('logparams', {"ids": []}) if settings["appearance"]["mode"] == "embed": wiki.redact_messages(logparams.get("ids", []), "rev_id", logparams.get("new", {})) else: for logid in logparams.get("ids", []): wiki.delete_messages(dict(logid=logid)) elif identification_string == "delete/revision": logparams = change.get('logparams', {"ids": []}) if settings["appearance"]["mode"] == "embed": wiki.redact_messages(logparams.get("ids", []), "log_id", logparams.get("new", {})) else: for revid in logparams.get("ids", []): wiki.delete_messages(dict(revid=revid)) discord_message.finish_embed() if discord_message: discord_message.metadata = metadata return discord_message async def process_cats(event: dict, local_wiki: Wiki, categorize_events: dict): """Process categories based on local MW messages. """ if event["type"] == "categorize": if "commenthidden" not in event: if local_wiki.mw_messages is not None: cat_title = event["title"].split(':', 1)[1] # I so much hate this, blame Markus for making me do this if event["revid"] not in categorize_events: categorize_events[event["revid"]] = {"new": set(), "removed": set()} comment_to_match = re.sub(r'<.*?a>', '', event["parsedcomment"]) if local_wiki.mw_messages["recentchanges-page-added-to-category"] in comment_to_match or local_wiki.mw_messages["recentchanges-page-added-to-category-bundled"] in comment_to_match: # Added to category categorize_events[event["revid"]]["new"].add(cat_title) #logger.debug("Matched {} to added category for {}".format(cat_title, event["revid"])) elif local_wiki.mw_messages["recentchanges-page-removed-from-category"] in comment_to_match or local_wiki.mw_messages["recentchanges-page-removed-from-category-bundled"] in comment_to_match: # Removed from category categorize_events[event["revid"]]["removed"].add(cat_title) #logger.debug("Matched {} to removed category for {}".format(cat_title, event["revid"])) else: logger.debug( "Unknown match for category change with messages {} and comment_to_match {}".format(local_wiki.mw_messages,comment_to_match)) else: logger.warning( "Init information not available, could not read category information. Please restart the bot.") else: logger.debug("Log entry got suppressed, ignoring entry.") # This function has been removed. While its implementation seems sound, it should be considered only if we find performance # concerns with RcGcDb # async def process_mwmsgs(wiki_response: dict, local_wiki: Wiki, mw_msgs: dict): # """ # This function is made to parse the initial wiki extended information to update local_wiki.mw_messages that stores the key # to mw_msgs that is a dict storing id: tuple where tuple is a set of MW messages for categories. # The reason it's constructed this way is to prevent duplication of data in memory so Markus doesn't complain about # high RAM usage. It does however affect CPU performance as every wiki requires to check the list for the matching # tuples of MW messages. # # :param wiki_response: # :param local_wiki: # :param mw_msgs: # :return: # """ # msgs = [] # for message in wiki_response["query"]["allmessages"]: # if not "missing" in message: # ignore missing strings # msgs.append((message["name"], re.sub(r'\[\[.*?\]\]', '', message["*"]))) # else: # logger.warning("Could not fetch the MW message translation for: {}".format(message["name"])) # msgs = tuple(msgs) # for key, set in mw_msgs.items(): # if msgs == set: # local_wiki.mw_messages = key # return # # if same entry is not in mw_msgs # key = len(mw_msgs) # mw_msgs[key] = msgs # it may be a little bit messy for sure, however I don't expect any reason to remove mw_msgs entries by one # local_wiki.mw_messages = key async def essential_feeds(change: dict, comment_pages: dict, db_wiki, target: tuple) -> DiscordMessage: """Prepares essential information for both embed and compact message format.""" appearance_mode = feeds_embed_formatter if target[0][1] > 0 else feeds_compact_formatter identification_string = change["_embedded"]["thread"][0]["containerType"] comment_page = None if identification_string == "ARTICLE_COMMENT" and comment_pages is not None: comment_page = comment_pages.get(change["forumId"], None) if comment_page is not None: comment_page["fullUrl"] = "/".join(db_wiki["wiki"].split("/", 3)[:3]) + comment_page["relativeUrl"] return await appearance_mode(identification_string, change, target, db_wiki["wiki"], article_page=comment_page)