# This file is part of Recent changes Goat compatible Discord webhook (RcGcDw). # # RcGcDw is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # RcGcDw is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with RcGcDw. If not, see . import time from typing import Optional import requests import markdown import json import re from src.api.context import Context from src.discord.message import DiscordMessage, DiscordMessageMetadata from src.api.hook import post_hook from src.api.util import sanitize_to_markdown from src.misc import class_searcher from html.parser import HTMLParser # { # "hooks": { # "matrix": { # "homeserver": "https://example.com/endpoint", # "rooms": [], # "access_token": "authtoken", # "matrix_only": true # } # } # } matrix_hook: Optional[dict] = None start_time = int(time.time()) class StrikethroughProcessor(markdown.inlinepatterns.AsteriskProcessor): PATTERNS = [ markdown.inlinepatterns.EmStrongItem(re.compile(r'(~{2})(.+?)\1', re.DOTALL | re.UNICODE), 'single', 'del') ] class StrikethroughExtension(markdown.extensions.Extension): def extendMarkdown(self, md): """ Modify inline patterns. """ md.inlinePatterns.register(StrikethroughProcessor(r'~'), 'strikethrough', 50) def check_if_exists(settings: dict): global matrix_hook matrix_hook = settings.get("hooks", {}).get("matrix", {}) return bool(matrix_hook) @post_hook(priority=50, register=check_if_exists) def matrix_posthook(message: DiscordMessage, metadata: DiscordMessageMetadata, context: Context, change: dict): if not matrix_hook.get("access_token"): raise KeyError("Matrix hook requires an access token to run!") content = (message.webhook_object.get("content", "") or "") if message.message_type == "embed": embed = "" images = [] if message.embed.get("author", {}).get("name", None): author = sanitize_to_markdown(message["author"]["name"]) if message["author"].get("url", None): author = "[{name}]({url})".format(name=author, url=message["author"]["url"]) embed += "{author}:\n".format(author=author) if message.embed.get("title", None): title = message["title"] if message.embed.get("url", None): title = "[{title}]({url})".format(title=title, url=message["url"]) thumbnail = "" if message.embed.get("thumbnail", {}).get("url", None): thumbnail = " [thumbnail]({url})".format(url=message["thumbnail"]["url"]) images.append( "[^]({url})".format(url=message["thumbnail"]["url"]) ) embed += "**{title}**{thumbnail}\n".format(title=title, thumbnail=thumbnail) elif message.embed.get("thumbnail", {}).get("url", None): embed += "[thumbnail]({url})\n".format(url=message["thumbnail"]["url"]) images.append( "[^]({url})".format(url=message["thumbnail"]["url"]) ) if message.embed.get("description", None): embed += message["description"] + "\n" if context.changed_content: edit_diff = ContentParser(context._) edit_diff.feed(context.changed_content) diff_text = edit_diff.text diff_text = re.sub(r'(~{2})(.+?)\1', '\\2', diff_text) diff_text = re.sub(r'(\*{2})(.+?)\1', '\\2', diff_text) embed += "\n" + diff_text + "\n" if message.embed.get("fields", []): for field in message["fields"]: if context.changed_content and field["name"] in (context._("Removed"), context._("Added")): continue embed += "**{name}**\n> {value}\n".format(name=field["name"],value=field["value"].replace("\n", "\n> ")) if message.embed.get("image", {}).get("url", None): embed += "**[image]({url})**\n".format(url=message["image"]["url"]) images.append( "**[^]({url})**".format(url=message["image"]["url"]) ) if message.embed.get("footer", {}).get("text", None): timestamp = "" if message.embed.get("timestamp", None): timestamp = " • {timestamp}".format(timestamp=message["timestamp"]) embed += "{footer}{timestamp}\n".format(footer=sanitize_to_markdown(message["footer"]["text"]), timestamp=timestamp) elif message.embed.get("timestamp", None): embed += message["timestamp"] + "\n" if content: content += "\n" content += "> " + embed[:-1].replace("\n", "\n> ") if len(images): content += "\n{images}".format(images=" ".join(images)) if matrix_hook.get("matrix_only"): context.event = None if not content: return html_content = markdown.markdown(content, extensions=[StrikethroughExtension(),'tables','nl2br']) data = json.dumps({ "msgtype": "m.text", "body": content, "format": "org.matrix.custom.html", "formatted_body": re.sub(r'\\(\\?)', '\\1', html_content) }) for room in matrix_hook.get("rooms", []): response = requests.put("{homeserver}/_matrix/client/v3/rooms/{roomId}/send/{eventType}/{txnId}".format(homeserver=matrix_hook.get("homeserver"), roomId=room, eventType="m.room.message", txnId=str(int(change["rcid"])+start_time)), data=data, headers={"Authorization": "Bearer "+matrix_hook.get("access_token"), "Content-Type": "application/json"}) class ContentParser(HTMLParser): """ContentPerser is an implementation of HTMLParser that parses output of action=compare&prop=diff API request for two MediaWiki revisions.""" max_length = 3000 current_tag = "" last_del = None last_ins = None done = False def __init__(self, lang): super().__init__() self.text = "| {removed} | {added}\n|---|---\n".format(removed=lang("Removed"), added=lang("Added")) def handle_starttag(self, tagname, attribs): if self.done: return if tagname == "ins" or tagname == "del": self.current_tag = tagname if tagname == "td": classes = class_searcher(attribs).split(' ') if "diff-deletedline" in classes: self.current_tag = "tdd" self.last_del = "\u200b" if "diff-addedline" in classes: self.current_tag = "tda" self.last_ins = "\u200b" if "diff-empty" in classes: if self.last_del is None: self.last_del = "" if self.last_ins is None: self.last_ins = "" def handle_data(self, data): def escape_formatting(data: str) -> str: """Escape Discord formatting""" return re.sub(r"([`_*~:<>{}@/|#\-\.\\\[\]\(\)])", "\\\\\\1", data) if not self.current_tag or self.done: return data = escape_formatting(data) if self.current_tag == "del": self.last_del += "~~" + data + "~~" if self.current_tag == "ins": self.last_ins += "**" + data + "**" if self.current_tag == "tdd": self.last_del += data if self.current_tag == "tda": self.last_ins += data def handle_endtag(self, tagname): if self.done: return if tagname == "ins": self.current_tag = "tda" elif tagname == "del": self.current_tag = "tdd" elif tagname == "td": self.current_tag = "" elif tagname == "tr": if self.last_ins is None: return # if self.last_ins == "" and self.last_del != "\u200b": # if "~~" in self.last_del: # self.last_del = self.last_del.replace("~~", "__") # self.last_del = "~~" + self.last_del + "~~" # if self.last_del == "" and self.last_ins != "\u200b": # if "**" in self.last_ins: # self.last_ins = self.last_ins.replace("**", "__") # self.last_ins = "**" + self.last_ins + "**" if len(self.text) + len(self.last_del) + len(self.last_ins) > self.max_length: remaining = self.max_length - len(self.text) if remaining < 0: remaining = 0 if remaining % 2 == 1: remaining = remaining + 1 part = int(remaining / 2) if len(self.last_del) < part: part += part - len(self.last_del) if len(self.last_ins) < part: part += part - len(self.last_ins) if len(self.last_del) > part: self.last_del = self.last_del[:part] if self.last_del.count("~~") % 2 == 1: self.last_del += "~~" self.last_del += " **And more**" if len(self.last_ins) > part: self.last_ins = self.last_ins[:part] if self.last_ins.count("**") % 2 == 1: self.last_ins += "**" self.last_ins += " ~~And more~~" self.done = True self.text += "| {removed} | {added}\n".format(removed=self.last_del, added=self.last_ins) self.last_del = None self.last_ins = None