# -*- coding: utf-8 -*- # Recent changes Goat compatible Discord webhook is a project for using a webhook as recent changes page from MediaWiki. # Copyright (C) 2018 Frisk # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . import base64 import json, logging, sys, re, time, random, math from html.parser import HTMLParser from urllib.parse import urlparse, urlunparse, quote import requests from collections import defaultdict from src.configloader import settings from src.i18n import misc _ = misc.gettext # Create a custom logger misc_logger = logging.getLogger("rcgcdw.misc") data_template = {"rcid": None, "discussion_id": 0, "abuse_log_id": None, "daily_overview": {"edits": None, "new_files": None, "admin_actions": None, "bytes_changed": None, "new_articles": None, "unique_editors": None, "day_score": None, "days_tracked": 0}} WIKI_API_PATH: str = "" WIKI_ARTICLE_PATH: str = "" WIKI_SCRIPT_PATH: str = "" WIKI_JUST_DOMAIN: str = "" rate_limit = 0 profile_fields = {"profile-location": _("Location"), "profile-aboutme": _("About me"), "profile-link-google": _("Google link"), "profile-link-facebook":_("Facebook link"), "profile-link-twitter": _("Twitter link"), "profile-link-reddit": _("Reddit link"), "profile-link-twitch": _("Twitch link"), "profile-link-psn": _("PSN link"), "profile-link-vk": _("VK link"), "profile-link-xbl": _("XBL link"), "profile-link-steam": _("Steam link"), "profile-link-discord": _("Discord handle"), "profile-link-battlenet": _("Battle.net handle")} class DataFile: """Data class which instance of is shared by multiple modules to remain consistent and do not cause too many IO operations.""" def __init__(self): self.data = self.load_datafile() self.changed = False @staticmethod def generate_datafile(): """Generate a data.json file from a template.""" try: with open("data.json", 'w') as data: data.write(json.dumps(data_template, indent=4)) except PermissionError: misc_logger.critical("Could not create a data file (no permissions). No way to store last edit.") sys.exit(1) def load_datafile(self) -> dict: """Read a data.json file and return a dictionary with contents :rtype: dict """ try: with open("data.json") as data: return json.loads(data.read()) except FileNotFoundError: self.generate_datafile() misc_logger.info("The data file could not be found. Generating a new one...") return data_template def save_datafile(self): """Overwrites the data.json file with given dictionary""" if self.changed is False: # don't cause unnecessary write operations return try: with open("data.json", "w") as data_file: data_file.write(json.dumps(self.data, indent=4)) self.changed = False except PermissionError: misc_logger.critical("Could not modify a data file (no permissions). No way to store last edit.") sys.exit(1) def __setitem__(self, instance, value): self.data[instance] = value self.changed = True def __getitem__(self, item): return self.data[item] class MessageQueue: """Message queue class for undelivered messages""" def __init__(self): self._queue = [] def __repr__(self): return self._queue def __len__(self): return len(self._queue) def __iter__(self): return iter(self._queue) def clear(self): self._queue.clear() def add_message(self, message): self._queue.append(message) def cut_messages(self, item_num): self._queue = self._queue[item_num:] def resend_msgs(self): if self._queue: misc_logger.info( "{} messages waiting to be delivered to Discord due to Discord throwing errors/no connection to Discord servers.".format( len(self._queue))) for num, item in enumerate(self._queue): misc_logger.debug( "Trying to send a message to Discord from the queue with id of {} and content {}".format(str(num), str(item))) if send_to_discord_webhook(item) < 2: misc_logger.debug("Sending message succeeded") else: misc_logger.debug("Sending message failed") break else: self.clear() misc_logger.debug("Queue emptied, all messages delivered") self.cut_messages(num) misc_logger.debug(self._queue) messagequeue = MessageQueue() datafile = DataFile() def weighted_average(value, weight, new_value): """Calculates weighted average of value number with weight weight and new_value with weight 1""" return round(((value * weight) + new_value) / (weight + 1), 2) def link_formatter(link): """Formats a link to not embed it""" return "<" + quote(link.replace(" ", "_"), "/:?") + ">" def escape_formatting(data): """Escape Discord formatting""" return re.sub(r"([`_*~<>{}@/|\\])", "\\\\\\1", data, 0) class ContentParser(HTMLParser): more = _("\n__And more__") current_tag = "" small_prev_ins = "" small_prev_del = "" ins_length = len(more) del_length = len(more) added = False def handle_starttag(self, tagname, attribs): if tagname == "ins" or tagname == "del": self.current_tag = tagname if tagname == "td" and 'diff-addedline' in attribs[0]: self.current_tag = tagname + "a" if tagname == "td" and 'diff-deletedline' in attribs[0]: self.current_tag = tagname + "d" if tagname == "td" and 'diff-marker' in attribs[0]: self.added = True def handle_data(self, data): data = re.sub(r"([`_*~<>{}@/|\\])", "\\\\\\1", data, 0) if self.current_tag == "ins" and self.ins_length <= 1000: self.ins_length += len("**" + data + '**') if self.ins_length <= 1000: self.small_prev_ins = self.small_prev_ins + "**" + data + '**' else: self.small_prev_ins = self.small_prev_ins + self.more if self.current_tag == "del" and self.del_length <= 1000: self.del_length += len("~~" + data + '~~') if self.del_length <= 1000: self.small_prev_del = self.small_prev_del + "~~" + data + '~~' else: self.small_prev_del = self.small_prev_del + self.more if (self.current_tag == "afterins" or self.current_tag == "tda") and self.ins_length <= 1000: self.ins_length += len(data) if self.ins_length <= 1000: self.small_prev_ins = self.small_prev_ins + data else: self.small_prev_ins = self.small_prev_ins + self.more if (self.current_tag == "afterdel" or self.current_tag == "tdd") and self.del_length <= 1000: self.del_length += len(data) if self.del_length <= 1000: self.small_prev_del = self.small_prev_del + data else: self.small_prev_del = self.small_prev_del + self.more if self.added: if data == '+' and self.ins_length <= 1000: self.ins_length += 1 if self.ins_length <= 1000: self.small_prev_ins = self.small_prev_ins + '\n' else: self.small_prev_ins = self.small_prev_ins + self.more if data == '−' and self.del_length <= 1000: self.del_length += 1 if self.del_length <= 1000: self.small_prev_del = self.small_prev_del + '\n' else: self.small_prev_del = self.small_prev_del + self.more self.added = False def handle_endtag(self, tagname): if tagname == "ins": self.current_tag = "afterins" elif tagname == "del": self.current_tag = "afterdel" else: self.current_tag = "" def safe_read(request, *keys): if request is None: return None try: request = request.json() for item in keys: request = request[item] except KeyError: misc_logger.warning( "Failure while extracting data from request on key {key} in {change}".format(key=item, change=request)) return None except ValueError: misc_logger.warning("Failure while extracting data from request in {change}".format(change=request)) return None return request def handle_discord_http(code, formatted_embed, result): if 300 > code > 199: # message went through return 0 elif code == 400: # HTTP BAD REQUEST result.status_code, data, result, header misc_logger.error( "Following message has been rejected by Discord, please submit a bug on our bugtracker adding it:") misc_logger.error(formatted_embed) misc_logger.error(result.text) return 1 elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND misc_logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.") sys.exit(1) elif code == 429: misc_logger.error("We are sending too many requests to the Discord, slowing down...") return 2 elif 499 < code < 600: misc_logger.error( "Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format( code)) return 3 def add_to_dict(dictionary, key): if key in dictionary: dictionary[key] += 1 else: dictionary[key] = 1 return dictionary def prepare_paths(path, dry=False): global WIKI_API_PATH global WIKI_ARTICLE_PATH global WIKI_SCRIPT_PATH global WIKI_JUST_DOMAIN """Set the URL paths for article namespace and script namespace WIKI_API_PATH will be: WIKI_DOMAIN/api.php WIKI_ARTICLE_PATH will be: WIKI_DOMAIN/articlepath/$1 where $1 is the replaced string WIKI_SCRIPT_PATH will be: WIKI_DOMAIN/ WIKI_JUST_DOMAIN will be: WIKI_DOMAIN""" def quick_try_url(url): """Quickly test if URL is the proper script path, False if it appears invalid dictionary when it appears valid""" try: request = requests.get(url, timeout=5) if request.status_code == requests.codes.ok: if request.json()["query"]["general"] is not None: return request return False except (KeyError, requests.exceptions.ConnectionError): return False try: parsed_url = urlparse(path) except KeyError: misc_logger.critical("wiki_url is not specified in the settings. Please provide the wiki url in the settings and start the script again.") sys.exit(1) for url_scheme in (path, path.split("wiki")[0], urlunparse((*parsed_url[0:2], "", "", "", ""))): # check different combinations, it's supposed to be idiot-proof tested = quick_try_url(url_scheme + "/api.php?action=query&format=json&meta=siteinfo") if tested: if not dry: WIKI_API_PATH = urlunparse((*parsed_url[0:2], "", "", "", "")) + tested.json()["query"]["general"]["scriptpath"] + "/api.php" WIKI_SCRIPT_PATH = urlunparse((*parsed_url[0:2], "", "", "", "")) + tested.json()["query"]["general"]["scriptpath"] + "/" WIKI_ARTICLE_PATH = urlunparse((*parsed_url[0:2], "", "", "", "")) + tested.json()["query"]["general"]["articlepath"] WIKI_JUST_DOMAIN = urlunparse((*parsed_url[0:2], "", "", "", "")) break return urlunparse((*parsed_url[0:2], "", "", "", "")) else: misc_logger.critical("Could not verify wikis paths. Please make sure you have given the proper wiki URLs in settings.json ({path} should be script path to your wiki) and your Internet connection is working.".format(path=path)) sys.exit(1) prepare_paths(settings["wiki_url"]) def create_article_path(article: str) -> str: """Takes the string and creates an URL with it as the article name""" return WIKI_ARTICLE_PATH.replace("$1", article) def send_simple(msgtype, message, name, avatar): discord_msg = DiscordMessage("compact", msgtype, settings["webhookURL"], content=message) discord_msg.set_avatar(avatar) discord_msg.set_name(name) messagequeue.resend_msgs() send_to_discord(discord_msg) def update_ratelimit(request): """Updates rate limit time""" global rate_limit rate_limit = 0 if int(request.headers.get('x-ratelimit-remaining', "-1")) > 0 else int(request.headers.get( 'x-ratelimit-reset-after', 0)) rate_limit += settings.get("discord_message_cooldown", 0) def send_to_discord_webhook(data): global rate_limit header = settings["header"] header['Content-Type'] = 'application/json' try: time.sleep(rate_limit) rate_limit = 0 result = requests.post(data.webhook_url, data=repr(data), headers=header, timeout=10) update_ratelimit(result) except requests.exceptions.Timeout: misc_logger.warning("Timeouted while sending data to the webhook.") return 3 except requests.exceptions.ConnectionError: misc_logger.warning("Connection error while sending the data to a webhook") return 3 else: return handle_discord_http(result.status_code, data, result) def send_to_discord(data): for regex in settings["disallow_regexes"]: if data.webhook_object.get("content", None): if re.search(re.compile(regex), data.webhook_object["content"]): misc_logger.info("Message {} has been rejected due to matching filter ({}).".format(data.webhook_object["content"], regex)) return # discard the message without anything else: for to_check in [data.webhook_object.get("description", ""), data.webhook_object.get("title", ""), *[x["value"] for x in data["fields"]], data.webhook_object.get("author", {"name": ""}).get("name", "")]: if re.search(re.compile(regex), to_check): misc_logger.info("Message \"{}\" has been rejected due to matching filter ({}).".format( to_check, regex)) return # discard the message without anything if messagequeue: messagequeue.add_message(data) else: code = send_to_discord_webhook(data) if code == 3: messagequeue.add_message(data) elif code == 2: time.sleep(5.0) messagequeue.add_message(data) elif code < 2: pass class DiscordMessage(): """A class defining a typical Discord JSON representation of webhook payload.""" def __init__(self, message_type: str, event_type: str, webhook_url: str, content=None): self.webhook_object = dict(allowed_mentions={"parse": []}, avatar_url=settings["avatars"].get(message_type, "")) self.webhook_url = webhook_url if message_type == "embed": self.__setup_embed() elif message_type == "compact": self.webhook_object["content"] = content self.event_type = event_type def __setitem__(self, key, value): """Set item is used only in embeds.""" try: self.embed[key] = value except NameError: raise TypeError("Tried to assign a value when message type is plain message!") def __getitem__(self, item): return self.embed[item] def __repr__(self): """Return the Discord webhook object ready to be sent""" return json.dumps(self.webhook_object) def __setup_embed(self): self.embed = defaultdict(dict) if "embeds" not in self.webhook_object: self.webhook_object["embeds"] = [self.embed] else: self.webhook_object["embeds"].append(self.embed) self.embed["color"] = None def add_embed(self): self.finish_embed() self.__setup_embed() def finish_embed(self): if self.embed["color"] is None: if settings["appearance"]["embed"].get(self.event_type, {"color": None})["color"] is None: self.embed["color"] = random.randrange(1, 16777215) else: self.embed["color"] = settings["appearance"]["embed"][self.event_type]["color"] else: self.embed["color"] = math.floor(self.embed["color"]) def set_author(self, name, url, icon_url=""): self.embed["author"]["name"] = name self.embed["author"]["url"] = url self.embed["author"]["icon_url"] = icon_url def add_field(self, name, value, inline=False): if "fields" not in self.embed: self.embed["fields"] = [] self.embed["fields"].append(dict(name=name, value=value, inline=inline)) def set_avatar(self, url): self.webhook_object["avatar_url"] = url def set_name(self, name): self.webhook_object["username"] = name def profile_field_name(name, embed): try: return profile_fields[name] except KeyError: if embed: return _("Unknown") else: return _("unknown") class LinkParser(HTMLParser): new_string = "" recent_href = "" def handle_starttag(self, tag, attrs): for attr in attrs: if attr[0] == 'href': self.recent_href = attr[1] if self.recent_href.startswith("//"): self.recent_href = "https:{rest}".format(rest=self.recent_href) elif not self.recent_href.startswith("http"): self.recent_href = WIKI_JUST_DOMAIN + self.recent_href self.recent_href = self.recent_href.replace(")", "\\)") elif attr[0] == 'data-uncrawlable-url': self.recent_href = attr[1].encode('ascii') self.recent_href = base64.b64decode(self.recent_href) self.recent_href = WIKI_JUST_DOMAIN + self.recent_href.decode('ascii') def handle_data(self, data): if self.recent_href: self.new_string = self.new_string + "[{}](<{}>)".format(data.replace("//", "/\\/"), self.recent_href) self.recent_href = "" else: self.new_string = self.new_string + data.replace("//", "/\\/") def handle_comment(self, data): self.new_string = self.new_string + data.replace("//", "/\\/") def handle_endtag(self, tag): misc_logger.debug(self.new_string)