# -*- coding: utf-8 -*- # This file is part of Recent changes Goat compatible Discord webhook (RcGcDw). # RcGcDw is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # RcGcDw is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with RcGcDw. If not, see . import base64 import json, logging, sys, re, platform from html.parser import HTMLParser from typing import Callable, Tuple, List, Optional, Union from urllib.parse import urlparse, urlunparse import requests from src.api.context import Context from src.argparser import command_args from src.configloader import settings import src.api.util from src.discord.message import DiscordMessage, DiscordMessageMetadata from src.discord.queue import messagequeue, send_to_discord from src.exceptions import MediaWikiError from src.i18n import misc _ = misc.gettext # Create a custom logger misc_logger = logging.getLogger("rcgcdw.misc") data_template = {"rcid": None, "discussion_id": 0, "abuse_log_id": None, "daily_overview": {"edits": None, "new_files": None, "admin_actions": None, "bytes_changed": None, "new_articles": None, "unique_editors": None, "day_score": None, "days_tracked": 0}} WIKI_API_PATH: str = "" WIKI_ARTICLE_PATH: str = "" WIKI_SCRIPT_PATH: str = "" WIKI_JUST_DOMAIN: str = "" def send_simple(msgtype, message, name, avatar): discord_msg = DiscordMessage("compact", msgtype, settings["webhookURL"], content=message) discord_msg.set_avatar(avatar) discord_msg.set_name(name) messagequeue.resend_msgs() send_to_discord(discord_msg, meta=DiscordMessageMetadata("POST")) class DataFile: """Data class which instance of is shared by multiple modules to remain consistent and do not cause too many IO operations.""" def __init__(self): self.data_filename: str = settings.get("datafile_path", "data.json") self.data: dict = self.load_datafile() misc_logger.debug("Current contents of {} {}".format(self.data_filename, self.data)) self.changed: bool = False def generate_datafile(self): """Generate a data.json file from a template.""" try: with open(self.data_filename, 'w', encoding="utf-8") as data: data.write(json.dumps(data_template, indent=4)) except PermissionError: misc_logger.critical("Could not create a data file (no permissions). No way to store last edit.") sys.exit(1) def load_datafile(self) -> dict: """Read a data.json file and return a dictionary with contents :rtype: dict """ try: with open(self.data_filename, encoding="utf-8") as data: return json.loads(data.read()) except FileNotFoundError: self.generate_datafile() misc_logger.info("The data file could not be found. Generating a new one...") if not command_args.nowelcome: send_simple("welcome", _("RcGcDw is now running and checking {wiki}.").format(wiki=settings["wikiname"]), _("Welcome"), settings["avatars"].get("welcome", None)) return data_template def save_datafile(self): """Overwrites the data.json file with given dictionary""" if self.changed is False: # don't cause unnecessary write operations return try: with open(self.data_filename, "w", encoding="utf-8") as data_file: data_file.write(json.dumps(self.data, indent=4)) self.changed = False misc_logger.debug("Saving the database succeeded.") except PermissionError: misc_logger.critical("Could not modify a data file (no permissions). No way to store last edit.") sys.exit(1) except OSError as e: if settings.get("error_tolerance", 1) > 1: if platform.system() == "Windows": if "Invalid argument: '" + self.data_filename + "'" in str(e): misc_logger.error("Saving the data file failed due to Invalid argument exception, we've seen it " "before in issue #209, if you know the reason for it happening please reopen the " "issue with explanation, for now we are going to just ignore it.") # Reference #209 return raise def __setitem__(self, instance, value): if self.data[instance] != value: self.data[instance] = value self.changed = True def __getitem__(self, item): try: return self.data[item] except KeyError: # if such value doesn't exist, set to and return none self.__setitem__(item, None) self.save_datafile() return None datafile = DataFile() def weighted_average(value, weight, new_value): """Calculates weighted average of value number with weight weight and new_value with weight 1""" return round(((value * weight) + new_value) / (weight + 1), 2) def class_searcher(attribs: list) -> str: """Function to return classes of given element in HTMLParser on handle_starttag :returns a string with all of the classes of element """ for attr in attribs: if attr[0] == "class": return attr[1] return "" class ContentParser(HTMLParser): """ContentPerser is an implementation of HTMLParser that parses output of action=compare&prop=diff API request for two MediaWiki revisions. It extracts the following: small_prev_ins - storing up to 1000 characters of added text small_prev_del - storing up to 1000 chracters of removed text ins_length - storing length of inserted text del_length - storing length of deleted text """ more = _("\n__And more__") current_tag = "" last_ins = None last_del = None empty = False small_prev_ins = "" small_prev_del = "" ins_length = len(more) del_length = len(more) def handle_starttag(self, tagname, attribs): if tagname == "ins" or tagname == "del": self.current_tag = tagname if tagname == "td": classes = class_searcher(attribs).split(' ') if "diff-addedline" in classes and self.ins_length <= 1000: self.current_tag = "tda" self.last_ins = "" if "diff-deletedline" in classes and self.del_length <= 1000: self.current_tag = "tdd" self.last_del = "" if "diff-empty" in classes: self.empty = True def handle_data(self, data): def escape_formatting(data: str) -> str: """Escape Discord formatting""" return re.sub(r"([`_*~:<>{}@/|#\-\.\\\[\]\(\)])", "\\\\\\1", data) data = escape_formatting(data) if self.current_tag == "ins" and self.ins_length <= 1000: self.ins_length += len("**" + data + "**") if self.ins_length <= 1000: self.last_ins = self.last_ins + "**" + data + "**" if self.current_tag == "del" and self.del_length <= 1000: self.del_length += len("~~" + data + "~~") if self.del_length <= 1000: self.last_del = self.last_del + "~~" + data + "~~" if self.current_tag == "tda" and self.ins_length <= 1000: self.ins_length += len(data) if self.ins_length <= 1000: self.last_ins = self.last_ins + data if self.current_tag == "tdd" and self.del_length <= 1000: self.del_length += len(data) if self.del_length <= 1000: self.last_del = self.last_del + data def handle_endtag(self, tagname): if tagname == "ins": self.current_tag = "tda" elif tagname == "del": self.current_tag = "tdd" elif tagname == "td": self.current_tag = "" elif tagname == "tr": if self.last_ins is not None: self.ins_length += 1 if self.empty and not self.last_ins.isspace(): if "**" in self.last_ins: self.last_ins = self.last_ins.replace("**", "__") self.ins_length += 4 self.last_ins = "**" + self.last_ins + "**" self.small_prev_ins = self.small_prev_ins + "\n" + self.last_ins if self.ins_length > 1000: self.small_prev_ins = self.small_prev_ins + self.more self.last_ins = None if self.last_del is not None: self.del_length += 1 if self.empty and not self.last_del.isspace(): if "~~" in self.last_del: self.last_del = self.last_del.replace("~~", "__") self.del_length += 4 self.last_del = "~~" + self.last_del + "~~" self.small_prev_del = self.small_prev_del + "\n" + self.last_del if self.del_length > 1000: self.small_prev_del = self.small_prev_del + self.more self.last_del = None self.empty = False def safe_read(request, *keys): if request is None: return None try: request = request.json() for item in keys: request = request[item] except KeyError: misc_logger.warning( "Failure while extracting data from request on key {key} in {change}".format(key=item, change=request)) return None except ValueError: misc_logger.warning("Failure while extracting data from request in {change}".format(change=request)) return None return request def parse_mw_request_info(request_data: dict, url: str): """A function parsing request JSON message from MediaWiki logging all warnings and raising on MediaWiki errors""" # any([True for k in request_data.keys() if k in ("error", "errors")]) errors: dict = request_data.get("errors", {}) # Is it ugly? I don't know tbh if errors: raise MediaWikiError(str(errors)) warnings: dict = request_data.get("warnings", {}) if warnings: for module, warning_data in warnings.items(): misc_logger.warning("MediaWiki returned the following warning on module {module}: {text} on {url}.".format( module=module, text=warning_data.get("warnings", ""), url=url )) return request_data def add_to_dict(dictionary, key): if key in dictionary: dictionary[key] += 1 else: dictionary[key] = 1 return dictionary def prepare_paths(path: str, dry=False): """Set the URL paths for article namespace and script namespace WIKI_API_PATH will be: WIKI_DOMAIN/api.php WIKI_ARTICLE_PATH will be: WIKI_DOMAIN/articlepath/$1 where $1 is the replaced string WIKI_SCRIPT_PATH will be: WIKI_DOMAIN/ WIKI_JUST_DOMAIN will be: WIKI_DOMAIN""" global WIKI_API_PATH global WIKI_ARTICLE_PATH global WIKI_SCRIPT_PATH global WIKI_JUST_DOMAIN def quick_try_url(url): """Quickly test if URL is the proper script path, False if it appears invalid dictionary when it appears valid""" try: request = requests.get(url, timeout=5, headers=settings["header"]) if request.status_code == requests.codes.ok: if request.json()["query"]["general"] is not None: return request else: misc_logger.debug(f"Request to the wiki failed with code {request.status_code} and text: {request.text}") return False except (KeyError, requests.exceptions.ConnectionError): return False try: parsed_url = urlparse(path) except KeyError: misc_logger.critical("wiki_url is not specified in the settings. Please provide the wiki url in the settings and start the script again.") sys.exit(1) for url_scheme in (path, path.split("wiki")[0], urlunparse((*parsed_url[0:2], "", "", "", ""))): # check different combinations, it's supposed to be idiot-proof tested = quick_try_url(url_scheme.rstrip("/") + "/api.php?action=query&format=json&meta=siteinfo") if tested: if not dry: WIKI_API_PATH = urlunparse((*parsed_url[0:2], "", "", "", "")) + tested.json()["query"]["general"]["scriptpath"] + "/api.php" WIKI_SCRIPT_PATH = urlunparse((*parsed_url[0:2], "", "", "", "")) + tested.json()["query"]["general"]["scriptpath"] + "/" WIKI_ARTICLE_PATH = urlunparse((*parsed_url[0:2], "", "", "", "")) + tested.json()["query"]["general"]["articlepath"] WIKI_JUST_DOMAIN = urlunparse((*parsed_url[0:2], "", "", "", "")) break return urlunparse((*parsed_url[0:2], "", "", "", "")) else: misc_logger.critical("Could not verify wikis paths. Please make sure you have given the proper wiki URLs in settings.json ({path} should be script path to your wiki) and your Internet connection is working.".format(path=path)) sys.exit(1) prepare_paths(settings["wiki_url"]) def run_hooks(hooks: Union[List[Tuple[Callable[[Context, dict], None], int, Optional[Callable]]], List[Tuple[Callable[[DiscordMessage, DiscordMessageMetadata, Context, dict], None], int, Optional[Callable]]]], *arguments): for hook in hooks: if hook[2] is not None: try: if hook[2](*arguments) is False: misc_logger.debug(f"Ignoring hook {hook[0].__name__} due to conditions not being met for execution") continue except: if settings.get("error_tolerance", 1) > 0: misc_logger.exception("On running a hook check function, ignoring hook") else: raise try: misc_logger.debug(f"Running {hook[0].__name__} hook") hook[0](*arguments) except: if settings.get("error_tolerance", 1) > 0: misc_logger.exception("On running a hook, ignoring hook") else: raise def profile_field_name(name, embed, _): profile_fields = {"profile-location": _("Location"), "profile-aboutme": _("About me"), "profile-link-google": _("Google link"), "profile-link-facebook": _("Facebook link"), "profile-link-twitter": _("Twitter link"), "profile-link-reddit": _("Reddit link"), "profile-link-twitch": _("Twitch link"), "profile-link-psn": _("PSN link"), "profile-link-vk": _("VK link"), "profile-link-xbl": _("XBL link"), "profile-link-steam": _("Steam link"), "profile-link-discord": _("Discord handle"), "profile-link-battlenet": _("Battle.net handle")} try: return profile_fields[name] except KeyError: if embed: return _("Unknown") else: return _("unknown") class LinkParser(HTMLParser): new_string = "" recent_href = "" def __init__(self, DOMAIN_URL: str): self.WIKI_JUST_DOMAIN = DOMAIN_URL super().__init__() def handle_starttag(self, tag, attrs): for attr in attrs: if attr[0] == 'href': self.recent_href = attr[1] if self.recent_href.startswith("//"): self.recent_href = "https:{rest}".format(rest=self.recent_href) elif not self.recent_href.startswith("http"): self.recent_href = self.WIKI_JUST_DOMAIN + self.recent_href self.recent_href = self.recent_href.replace(")", "\\)") elif attr[0] == 'data-uncrawlable-url': self.recent_href = attr[1].encode('ascii') self.recent_href = base64.b64decode(self.recent_href) self.recent_href = self.WIKI_JUST_DOMAIN + self.recent_href.decode('ascii') def handle_data(self, data): if self.recent_href: self.new_string = self.new_string + "[{}](<{}>)".format(src.api.util.sanitize_to_markdown(data), self.recent_href) self.recent_href = "" else: self.new_string = self.new_string + src.api.util.sanitize_to_markdown(data) def handle_comment(self, data): self.new_string = self.new_string + src.api.util.sanitize_to_markdown(data) def handle_endtag(self, tag): misc_logger.debug(self.new_string)