2025-01-25 09:35:57 +00:00
|
|
|
# This file is part of Recent changes Goat compatible Discord webhook (RcGcDw).
|
|
|
|
#
|
|
|
|
# RcGcDw is free software: you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
#
|
|
|
|
# RcGcDw is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU General Public License for more details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
|
# along with RcGcDw. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import time
|
|
|
|
from typing import Optional
|
|
|
|
|
|
|
|
import requests
|
|
|
|
import markdown
|
|
|
|
import json
|
2025-01-25 22:07:45 +00:00
|
|
|
import re
|
2025-01-25 09:35:57 +00:00
|
|
|
|
|
|
|
from src.api.context import Context
|
|
|
|
from src.discord.message import DiscordMessage, DiscordMessageMetadata
|
|
|
|
from src.api.hook import post_hook
|
2025-01-25 22:07:45 +00:00
|
|
|
from src.api.util import sanitize_to_markdown
|
|
|
|
from src.misc import class_searcher
|
|
|
|
from html.parser import HTMLParser
|
2025-01-25 09:35:57 +00:00
|
|
|
|
|
|
|
# {
|
|
|
|
# "hooks": {
|
|
|
|
# "matrix": {
|
|
|
|
# "homeserver": "https://example.com/endpoint",
|
|
|
|
# "rooms": [],
|
|
|
|
# "access_token": "authtoken",
|
|
|
|
# "matrix_only": true
|
|
|
|
# }
|
|
|
|
# }
|
|
|
|
# }
|
|
|
|
|
|
|
|
matrix_hook: Optional[dict] = None
|
|
|
|
start_time = int(time.time())
|
|
|
|
|
|
|
|
|
2025-01-25 22:07:45 +00:00
|
|
|
class StrikethroughProcessor(markdown.inlinepatterns.AsteriskProcessor):
|
|
|
|
PATTERNS = [
|
|
|
|
markdown.inlinepatterns.EmStrongItem(re.compile(r'(~{2})(.+?)\1', re.DOTALL | re.UNICODE), 'single', 'del')
|
|
|
|
]
|
|
|
|
|
|
|
|
class StrikethroughExtension(markdown.extensions.Extension):
|
|
|
|
def extendMarkdown(self, md):
|
|
|
|
""" Modify inline patterns. """
|
|
|
|
md.inlinePatterns.register(StrikethroughProcessor(r'~'), 'strikethrough', 50)
|
|
|
|
|
|
|
|
|
2025-01-25 09:35:57 +00:00
|
|
|
def check_if_exists(settings: dict):
|
|
|
|
global matrix_hook
|
|
|
|
matrix_hook = settings.get("hooks", {}).get("matrix", {})
|
|
|
|
return bool(matrix_hook)
|
|
|
|
|
|
|
|
@post_hook(priority=50, register=check_if_exists)
|
|
|
|
def matrix_posthook(message: DiscordMessage, metadata: DiscordMessageMetadata, context: Context, change: dict):
|
|
|
|
if not matrix_hook.get("access_token"):
|
|
|
|
raise KeyError("Matrix hook requires an access token to run!")
|
2025-01-25 22:07:45 +00:00
|
|
|
content = (message.webhook_object.get("content", "") or "")
|
|
|
|
if message.message_type == "embed":
|
|
|
|
embed = ""
|
|
|
|
images = []
|
|
|
|
if message.embed.get("author", {}).get("name", None):
|
|
|
|
author = sanitize_to_markdown(message["author"]["name"])
|
|
|
|
if message["author"].get("url", None):
|
|
|
|
author = "[{name}]({url})".format(name=author, url=message["author"]["url"])
|
|
|
|
embed += "{author}:\n".format(author=author)
|
|
|
|
if message.embed.get("title", None):
|
|
|
|
title = message["title"]
|
|
|
|
if message.embed.get("url", None):
|
|
|
|
title = "[{title}]({url})".format(title=title, url=message["url"])
|
|
|
|
thumbnail = ""
|
|
|
|
if message.embed.get("thumbnail", {}).get("url", None):
|
|
|
|
thumbnail = " [thumbnail]({url})".format(url=message["thumbnail"]["url"])
|
|
|
|
images.append( "[^]({url})".format(url=message["thumbnail"]["url"]) )
|
|
|
|
embed += "**{title}**{thumbnail}\n".format(title=title, thumbnail=thumbnail)
|
|
|
|
elif message.embed.get("thumbnail", {}).get("url", None):
|
|
|
|
embed += "[thumbnail]({url})\n".format(url=message["thumbnail"]["url"])
|
|
|
|
images.append( "[^]({url})".format(url=message["thumbnail"]["url"]) )
|
|
|
|
if message.embed.get("description", None):
|
|
|
|
embed += message["description"] + "\n"
|
|
|
|
if context.changed_content:
|
|
|
|
edit_diff = ContentParser(context._)
|
|
|
|
edit_diff.feed(context.changed_content)
|
|
|
|
diff_text = edit_diff.text
|
|
|
|
diff_text = re.sub(r'(~{2})(.+?)\1', '<span data-mx-bg-color="#9f0000">\\2</span>', diff_text)
|
|
|
|
diff_text = re.sub(r'(\*{2})(.+?)\1', '<span data-mx-bg-color="#005e20">\\2</span>', diff_text)
|
|
|
|
embed += "\n" + diff_text + "\n"
|
|
|
|
if message.embed.get("fields", []):
|
|
|
|
for field in message["fields"]:
|
|
|
|
if context.changed_content and field["name"] in (context._("Removed"), context._("Added")):
|
|
|
|
continue
|
|
|
|
embed += "**{name}**\n> {value}\n".format(name=field["name"],value=field["value"].replace("\n", "\n> "))
|
|
|
|
if message.embed.get("image", {}).get("url", None):
|
|
|
|
embed += "**[image]({url})**\n".format(url=message["image"]["url"])
|
|
|
|
images.append( "**[^]({url})**".format(url=message["image"]["url"]) )
|
|
|
|
if message.embed.get("footer", {}).get("text", None):
|
|
|
|
timestamp = ""
|
|
|
|
if message.embed.get("timestamp", None):
|
|
|
|
timestamp = " • {timestamp}".format(timestamp=message["timestamp"])
|
|
|
|
embed += "{footer}{timestamp}\n".format(footer=sanitize_to_markdown(message["footer"]["text"]), timestamp=timestamp)
|
|
|
|
elif message.embed.get("timestamp", None):
|
|
|
|
embed += message["timestamp"] + "\n"
|
|
|
|
if content:
|
|
|
|
content += "\n"
|
|
|
|
content += "> " + embed[:-1].replace("\n", "\n> ")
|
|
|
|
if len(images):
|
|
|
|
content += "\n{images}".format(images=" ".join(images))
|
2025-01-25 09:35:57 +00:00
|
|
|
if matrix_hook.get("matrix_only"):
|
|
|
|
context.event = None
|
2025-01-25 22:07:45 +00:00
|
|
|
if not content:
|
|
|
|
return
|
|
|
|
html_content = markdown.markdown(content, extensions=[StrikethroughExtension(),'tables','nl2br'])
|
|
|
|
data = json.dumps({
|
|
|
|
"msgtype": "m.text",
|
|
|
|
"body": content,
|
|
|
|
"format": "org.matrix.custom.html",
|
|
|
|
"formatted_body": re.sub(r'\\(\\?)', '\\1', html_content)
|
|
|
|
})
|
2025-01-25 09:35:57 +00:00
|
|
|
for room in matrix_hook.get("rooms", []):
|
|
|
|
response = requests.put("{homeserver}/_matrix/client/v3/rooms/{roomId}/send/{eventType}/{txnId}".format(homeserver=matrix_hook.get("homeserver"),
|
|
|
|
roomId=room,
|
|
|
|
eventType="m.room.message",
|
|
|
|
txnId=str(int(change["rcid"])+start_time)),
|
|
|
|
data=data, headers={"Authorization": "Bearer "+matrix_hook.get("access_token"), "Content-Type": "application/json"})
|
|
|
|
|
|
|
|
|
2025-01-25 22:07:45 +00:00
|
|
|
class ContentParser(HTMLParser):
|
|
|
|
"""ContentPerser is an implementation of HTMLParser that parses output of action=compare&prop=diff API request
|
|
|
|
for two MediaWiki revisions."""
|
|
|
|
max_length = 3000
|
|
|
|
current_tag = ""
|
|
|
|
last_del = None
|
|
|
|
last_ins = None
|
|
|
|
done = False
|
|
|
|
|
|
|
|
def __init__(self, lang):
|
|
|
|
super().__init__()
|
|
|
|
self.text = "| {removed} | {added}\n|---|---\n".format(removed=lang("Removed"), added=lang("Added"))
|
|
|
|
|
|
|
|
def handle_starttag(self, tagname, attribs):
|
|
|
|
if self.done:
|
|
|
|
return
|
|
|
|
if tagname == "ins" or tagname == "del":
|
|
|
|
self.current_tag = tagname
|
|
|
|
if tagname == "td":
|
|
|
|
classes = class_searcher(attribs).split(' ')
|
|
|
|
if "diff-deletedline" in classes:
|
|
|
|
self.current_tag = "tdd"
|
|
|
|
self.last_del = "\u200b"
|
|
|
|
if "diff-addedline" in classes:
|
|
|
|
self.current_tag = "tda"
|
|
|
|
self.last_ins = "\u200b"
|
|
|
|
if "diff-empty" in classes:
|
|
|
|
if self.last_del is None:
|
|
|
|
self.last_del = ""
|
|
|
|
if self.last_ins is None:
|
|
|
|
self.last_ins = ""
|
|
|
|
|
|
|
|
def handle_data(self, data):
|
|
|
|
def escape_formatting(data: str) -> str:
|
|
|
|
"""Escape Discord formatting"""
|
|
|
|
return re.sub(r"([`_*~:<>{}@/|#\-\.\\\[\]\(\)])", "\\\\\\1", data)
|
|
|
|
if not self.current_tag or self.done:
|
|
|
|
return
|
|
|
|
data = escape_formatting(data)
|
|
|
|
if self.current_tag == "del":
|
|
|
|
self.last_del += "~~" + data + "~~"
|
|
|
|
if self.current_tag == "ins":
|
|
|
|
self.last_ins += "**" + data + "**"
|
|
|
|
if self.current_tag == "tdd":
|
|
|
|
self.last_del += data
|
|
|
|
if self.current_tag == "tda":
|
|
|
|
self.last_ins += data
|
|
|
|
|
|
|
|
def handle_endtag(self, tagname):
|
|
|
|
if self.done:
|
|
|
|
return
|
|
|
|
if tagname == "ins":
|
|
|
|
self.current_tag = "tda"
|
|
|
|
elif tagname == "del":
|
|
|
|
self.current_tag = "tdd"
|
|
|
|
elif tagname == "td":
|
|
|
|
self.current_tag = ""
|
|
|
|
elif tagname == "tr":
|
|
|
|
if self.last_ins is None:
|
|
|
|
return
|
|
|
|
# if self.last_ins == "" and self.last_del != "\u200b":
|
|
|
|
# if "~~" in self.last_del:
|
|
|
|
# self.last_del = self.last_del.replace("~~", "__")
|
|
|
|
# self.last_del = "~~" + self.last_del + "~~"
|
|
|
|
# if self.last_del == "" and self.last_ins != "\u200b":
|
|
|
|
# if "**" in self.last_ins:
|
|
|
|
# self.last_ins = self.last_ins.replace("**", "__")
|
|
|
|
# self.last_ins = "**" + self.last_ins + "**"
|
|
|
|
if len(self.text) + len(self.last_del) + len(self.last_ins) > self.max_length:
|
|
|
|
remaining = self.max_length - len(self.text)
|
|
|
|
if remaining < 0:
|
|
|
|
remaining = 0
|
|
|
|
if remaining % 2 == 1:
|
|
|
|
remaining = remaining + 1
|
|
|
|
part = int(remaining / 2)
|
|
|
|
if len(self.last_del) < part:
|
|
|
|
part += part - len(self.last_del)
|
|
|
|
if len(self.last_ins) < part:
|
|
|
|
part += part - len(self.last_ins)
|
|
|
|
if len(self.last_del) > part:
|
|
|
|
self.last_del = self.last_del[:part]
|
|
|
|
if self.last_del.count("~~") % 2 == 1:
|
|
|
|
self.last_del += "~~"
|
|
|
|
self.last_del += " **And more**"
|
|
|
|
if len(self.last_ins) > part:
|
|
|
|
self.last_ins = self.last_ins[:part]
|
|
|
|
if self.last_ins.count("**") % 2 == 1:
|
|
|
|
self.last_ins += "**"
|
|
|
|
self.last_ins += " ~~And more~~"
|
|
|
|
self.done = True
|
|
|
|
self.text += "| {removed} | {added}\n".format(removed=self.last_del, added=self.last_ins)
|
|
|
|
self.last_del = None
|
|
|
|
self.last_ins = None
|