# This file is part of Recent changes Goat compatible Discord webhook (RcGcDw).
#
# RcGcDw is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDw. If not, see .
import time
from typing import Optional
import requests
import markdown
import json
import re
from src.api.context import Context
from src.discord.message import DiscordMessage, DiscordMessageMetadata
from src.api.hook import post_hook
from src.api.util import sanitize_to_markdown
from src.misc import class_searcher
from html.parser import HTMLParser
# {
# "hooks": {
# "matrix": {
# "homeserver": "https://example.com/endpoint",
# "rooms": [],
# "access_token": "authtoken",
# "matrix_only": true
# }
# }
# }
matrix_hook: Optional[dict] = None
start_time = int(time.time())
class StrikethroughProcessor(markdown.inlinepatterns.AsteriskProcessor):
PATTERNS = [
markdown.inlinepatterns.EmStrongItem(re.compile(r'(~{2})(.+?)\1', re.DOTALL | re.UNICODE), 'single', 'del')
]
class StrikethroughExtension(markdown.extensions.Extension):
def extendMarkdown(self, md):
""" Modify inline patterns. """
md.inlinePatterns.register(StrikethroughProcessor(r'~'), 'strikethrough', 50)
def check_if_exists(settings: dict):
global matrix_hook
matrix_hook = settings.get("hooks", {}).get("matrix", {})
return bool(matrix_hook)
@post_hook(priority=50, register=check_if_exists)
def matrix_posthook(message: DiscordMessage, metadata: DiscordMessageMetadata, context: Context, change: dict):
if not matrix_hook.get("access_token"):
raise KeyError("Matrix hook requires an access token to run!")
content = (message.webhook_object.get("content", "") or "")
if message.message_type == "embed":
embed = ""
images = []
if message.embed.get("author", {}).get("name", None):
author = sanitize_to_markdown(message["author"]["name"])
if message["author"].get("url", None):
author = "[{name}]({url})".format(name=author, url=message["author"]["url"])
embed += "{author}:\n".format(author=author)
if message.embed.get("title", None):
title = message["title"]
if message.embed.get("url", None):
title = "[{title}]({url})".format(title=title, url=message["url"])
thumbnail = ""
if message.embed.get("thumbnail", {}).get("url", None):
thumbnail = " [thumbnail]({url})".format(url=message["thumbnail"]["url"])
images.append( "[^]({url})".format(url=message["thumbnail"]["url"]) )
embed += "**{title}**{thumbnail}\n".format(title=title, thumbnail=thumbnail)
elif message.embed.get("thumbnail", {}).get("url", None):
embed += "[thumbnail]({url})\n".format(url=message["thumbnail"]["url"])
images.append( "[^]({url})".format(url=message["thumbnail"]["url"]) )
if message.embed.get("description", None):
embed += message["description"] + "\n"
if context.changed_content:
edit_diff = ContentParser(context._)
edit_diff.feed(context.changed_content)
diff_text = edit_diff.text
diff_text = re.sub(r'(~{2})(.+?)\1', '\\2', diff_text)
diff_text = re.sub(r'(\*{2})(.+?)\1', '\\2', diff_text)
embed += "\n" + diff_text + "\n"
if message.embed.get("fields", []):
for field in message["fields"]:
if context.changed_content and field["name"] in (context._("Removed"), context._("Added")):
continue
embed += "**{name}**\n> {value}\n".format(name=field["name"],value=field["value"].replace("\n", "\n> "))
if message.embed.get("image", {}).get("url", None):
embed += "**[image]({url})**\n".format(url=message["image"]["url"])
images.append( "**[^]({url})**".format(url=message["image"]["url"]) )
if message.embed.get("footer", {}).get("text", None):
timestamp = ""
if message.embed.get("timestamp", None):
timestamp = " • {timestamp}".format(timestamp=message["timestamp"])
embed += "{footer}{timestamp}\n".format(footer=sanitize_to_markdown(message["footer"]["text"]), timestamp=timestamp)
elif message.embed.get("timestamp", None):
embed += message["timestamp"] + "\n"
if content:
content += "\n"
content += "> " + embed[:-1].replace("\n", "\n> ")
if len(images):
content += "\n{images}".format(images=" ".join(images))
if matrix_hook.get("matrix_only"):
context.event = None
if not content:
return
html_content = markdown.markdown(content, extensions=[StrikethroughExtension(),'tables','nl2br'])
data = json.dumps({
"msgtype": "m.text",
"body": content,
"format": "org.matrix.custom.html",
"formatted_body": re.sub(r'\\(\\?)', '\\1', html_content)
})
for room in matrix_hook.get("rooms", []):
response = requests.put("{homeserver}/_matrix/client/v3/rooms/{roomId}/send/{eventType}/{txnId}".format(homeserver=matrix_hook.get("homeserver"),
roomId=room,
eventType="m.room.message",
txnId=str(int(change["rcid"])+start_time)),
data=data, headers={"Authorization": "Bearer "+matrix_hook.get("access_token"), "Content-Type": "application/json"})
class ContentParser(HTMLParser):
"""ContentPerser is an implementation of HTMLParser that parses output of action=compare&prop=diff API request
for two MediaWiki revisions."""
max_length = 3000
current_tag = ""
last_del = None
last_ins = None
done = False
def __init__(self, lang):
super().__init__()
self.text = "| {removed} | {added}\n|---|---\n".format(removed=lang("Removed"), added=lang("Added"))
def handle_starttag(self, tagname, attribs):
if self.done:
return
if tagname == "ins" or tagname == "del":
self.current_tag = tagname
if tagname == "td":
classes = class_searcher(attribs).split(' ')
if "diff-deletedline" in classes:
self.current_tag = "tdd"
self.last_del = "\u200b"
if "diff-addedline" in classes:
self.current_tag = "tda"
self.last_ins = "\u200b"
if "diff-empty" in classes:
if self.last_del is None:
self.last_del = ""
if self.last_ins is None:
self.last_ins = ""
def handle_data(self, data):
def escape_formatting(data: str) -> str:
"""Escape Discord formatting"""
return re.sub(r"([`_*~:<>{}@/|#\-\.\\\[\]\(\)])", "\\\\\\1", data)
if not self.current_tag or self.done:
return
data = escape_formatting(data)
if self.current_tag == "del":
self.last_del += "~~" + data + "~~"
if self.current_tag == "ins":
self.last_ins += "**" + data + "**"
if self.current_tag == "tdd":
self.last_del += data
if self.current_tag == "tda":
self.last_ins += data
def handle_endtag(self, tagname):
if self.done:
return
if tagname == "ins":
self.current_tag = "tda"
elif tagname == "del":
self.current_tag = "tdd"
elif tagname == "td":
self.current_tag = ""
elif tagname == "tr":
if self.last_ins is None:
return
# if self.last_ins == "" and self.last_del != "\u200b":
# if "~~" in self.last_del:
# self.last_del = self.last_del.replace("~~", "__")
# self.last_del = "~~" + self.last_del + "~~"
# if self.last_del == "" and self.last_ins != "\u200b":
# if "**" in self.last_ins:
# self.last_ins = self.last_ins.replace("**", "__")
# self.last_ins = "**" + self.last_ins + "**"
if len(self.text) + len(self.last_del) + len(self.last_ins) > self.max_length:
remaining = self.max_length - len(self.text)
if remaining < 0:
remaining = 0
if remaining % 2 == 1:
remaining = remaining + 1
part = int(remaining / 2)
if len(self.last_del) < part:
part += part - len(self.last_del)
if len(self.last_ins) < part:
part += part - len(self.last_ins)
if len(self.last_del) > part:
self.last_del = self.last_del[:part]
if self.last_del.count("~~") % 2 == 1:
self.last_del += "~~"
self.last_del += " **And more**"
if len(self.last_ins) > part:
self.last_ins = self.last_ins[:part]
if self.last_ins.count("**") % 2 == 1:
self.last_ins += "**"
self.last_ins += " ~~And more~~"
self.done = True
self.text += "| {removed} | {added}\n".format(removed=self.last_del, added=self.last_ins)
self.last_del = None
self.last_ins = None