Merge remote-tracking branch 'origin/testing' into testing

This commit is contained in:
Frisk 2025-01-27 10:43:25 +01:00
commit ace09fb6ac
2 changed files with 366 additions and 53 deletions

View file

@ -18,11 +18,14 @@ from typing import Optional
import requests
import markdown
import json
import re
from src.configloader import settings
from src.api.context import Context
from src.discord.message import DiscordMessage, DiscordMessageMetadata
from src.api.hook import post_hook
from src.api.util import sanitize_to_markdown
from src.misc import class_searcher
from html.parser import HTMLParser
# {
# "hooks": {
@ -39,25 +42,87 @@ matrix_hook: Optional[dict] = None
start_time = int(time.time())
class StrikethroughProcessor(markdown.inlinepatterns.AsteriskProcessor):
PATTERNS = [
markdown.inlinepatterns.EmStrongItem(re.compile(r'(~{2})(.+?)\1', re.DOTALL | re.UNICODE), 'single', 'del')
]
class StrikethroughExtension(markdown.extensions.Extension):
def extendMarkdown(self, md):
""" Modify inline patterns. """
md.inlinePatterns.register(StrikethroughProcessor(r'~'), 'strikethrough', 50)
def check_if_exists(settings: dict):
global matrix_hook
matrix_hook = settings.get("hooks", {}).get("matrix", {})
return bool(matrix_hook)
@post_hook(priority=50, register=check_if_exists)
def matrix_posthook(message: DiscordMessage, metadata: DiscordMessageMetadata, context: Context, change: dict):
if not matrix_hook.get("access_token"):
raise KeyError("Matrix hook requires an access token to run!")
if settings["appearance"]["mode"] == "embed":
matrix_json_formatted_body = discord_embed_converter(message).replace("\\:", ":").replace("\\|", "|")
else:
matrix_json_formatted_body = discord_compact_converter(message)
content = (message.webhook_object.get("content", "") or "")
if message.message_type == "embed":
embed = ""
images = []
if message.embed.get("author", {}).get("name", None):
author = sanitize_to_markdown(message["author"]["name"])
if message["author"].get("url", None):
author = "[{name}]({url})".format(name=author, url=message["author"]["url"])
embed += "{author}:\n".format(author=author)
if message.embed.get("title", None):
title = message["title"]
if message.embed.get("url", None):
title = "[{title}]({url})".format(title=title, url=message["url"])
thumbnail = ""
if message.embed.get("thumbnail", {}).get("url", None):
thumbnail = " [thumbnail]({url})".format(url=message["thumbnail"]["url"])
images.append( "[^]({url})".format(url=message["thumbnail"]["url"]) )
embed += "**{title}**{thumbnail}\n".format(title=title, thumbnail=thumbnail)
elif message.embed.get("thumbnail", {}).get("url", None):
embed += "[thumbnail]({url})\n".format(url=message["thumbnail"]["url"])
images.append( "[^]({url})".format(url=message["thumbnail"]["url"]) )
if message.embed.get("description", None):
embed += message["description"] + "\n"
if context.changed_content:
edit_diff = ContentParser(context._)
edit_diff.feed(context.changed_content)
diff_text = edit_diff.text
diff_text = re.sub(r'(~{2})(.+?)\1', '<span data-mx-bg-color="#9f0000">\\2</span>', diff_text)
diff_text = re.sub(r'(\*{2})(.+?)\1', '<span data-mx-bg-color="#005e20">\\2</span>', diff_text)
embed += "\n" + diff_text + "\n"
if message.embed.get("fields", []):
for field in message["fields"]:
if context.changed_content and field["name"] in (context._("Removed"), context._("Added")):
continue
embed += "**{name}**\n> {value}\n".format(name=field["name"],value=field["value"].replace("\n", "\n> "))
if message.embed.get("image", {}).get("url", None):
embed += "**[image]({url})**\n".format(url=message["image"]["url"])
images.append( "**[^]({url})**".format(url=message["image"]["url"]) )
if message.embed.get("footer", {}).get("text", None):
timestamp = ""
if message.embed.get("timestamp", None):
timestamp = "{timestamp}".format(timestamp=message["timestamp"])
embed += "{footer}{timestamp}\n".format(footer=sanitize_to_markdown(message["footer"]["text"]), timestamp=timestamp)
elif message.embed.get("timestamp", None):
embed += message["timestamp"] + "\n"
if content:
content += "\n"
content += "> " + embed[:-1].replace("\n", "\n> ")
if len(images):
content += "\n{images}".format(images=" ".join(images))
if matrix_hook.get("matrix_only"):
context.event = None
data = json.dumps({"msgtype": "m.text", "body": "test", "format": "org.matrix.custom.html", "formatted_body": matrix_json_formatted_body})
if not content:
return
html_content = markdown.markdown(content, extensions=[StrikethroughExtension(),'tables','nl2br'])
data = json.dumps({
"msgtype": "m.text",
"body": content,
"format": "org.matrix.custom.html",
"formatted_body": re.sub(r'\\(\\?)', '\\1', html_content)
})
for room in matrix_hook.get("rooms", []):
response = requests.put("{homeserver}/_matrix/client/v3/rooms/{roomId}/send/{eventType}/{txnId}".format(homeserver=matrix_hook.get("homeserver"),
roomId=room,
@ -66,51 +131,96 @@ def matrix_posthook(message: DiscordMessage, metadata: DiscordMessageMetadata, c
data=data, headers={"Authorization": "Bearer "+matrix_hook.get("access_token"), "Content-Type": "application/json"})
def discord_embed_converter(embed: DiscordMessage) -> str:
discord_embed = embed.embed
matrix_soup = []
class ContentParser(HTMLParser):
"""ContentPerser is an implementation of HTMLParser that parses output of action=compare&prop=diff API request
for two MediaWiki revisions."""
max_length = 3000
current_tag = ""
last_del = None
last_ins = None
done = False
matrix_soup.append("<a href=\"{url}\">{author}</a><br>".format(author=discord_embed.get("author", {}).get("name", "unknown"), url=discord_embed.get("author", {}).get("url", "")))
matrix_soup.append("<b><a href=\"{url}\">{title}</a></b><br>".format(title=discord_embed.get("title", "Unknown title"), url=discord_embed.get("url", "")))
if discord_embed.get("description"):
matrix_soup.append(markdown.markdown(discord_embed.get("description", ""), extensions=['pymdownx.tilde']))
for field in discord_embed.get("fields", []):
if field.get("inline"):
matrix_soup.append((markdown.markdown(field.get("value", ""), extensions=['pymdownx.tilde']), True))
else:
matrix_soup.append(("<b>{name}</b><br>{data}".format(name=field.get("name", ""), data=removep(markdown.markdown(field.get("value", ""), extensions=['pymdownx.tilde']))), False))
final_output = ""
in_table_mode = False
while matrix_soup:
element = matrix_soup.pop(0)
if isinstance(element, str):
element = removep(element)
if in_table_mode:
final_output += "</tbody></table>"
in_table_mode = False
final_output += element
else:
if in_table_mode is False:
final_output+="<table><tbody>"
in_table_mode = True
if element[1] is False:
final_output+="<tr><td colspan=\"2\">"+removep(element[0])+"</td></tr>"
else:
if matrix_soup and matrix_soup[0][1] is True:
another_element = matrix_soup.pop(0)
final_output+="<tr><td>"+removep(element[0])+"</td><td>"+removep(another_element[0])+"</td></tr>"
else:
final_output += "<tr><td colspan=\"2\">" + removep(element[0]) + "</td></tr>"
if in_table_mode:
final_output += "</tbody></table>"
return final_output.replace("\n", "")
def __init__(self, lang):
super().__init__()
self.text = "| {removed} | {added}\n|---|---\n".format(removed=lang("Removed"), added=lang("Added"))
def handle_starttag(self, tagname, attribs):
if self.done:
return
if tagname == "ins" or tagname == "del":
self.current_tag = tagname
if tagname == "td":
classes = class_searcher(attribs).split(' ')
if "diff-deletedline" in classes:
self.current_tag = "tdd"
self.last_del = "\u200b"
if "diff-addedline" in classes:
self.current_tag = "tda"
self.last_ins = "\u200b"
if "diff-empty" in classes:
if self.last_del is None:
self.last_del = ""
if self.last_ins is None:
self.last_ins = ""
def removep(element: str) -> str:
if element.startswith("<p>") and element.endswith("</p>"):
return element[3:-4]
return element
def handle_data(self, data):
def escape_formatting(data: str) -> str:
"""Escape Discord formatting"""
return re.sub(r"([`_*~:<>{}@/|#\-\.\\\[\]\(\)])", "\\\\\\1", data)
if not self.current_tag or self.done:
return
data = escape_formatting(data)
if self.current_tag == "del":
self.last_del += "~~" + data + "~~"
if self.current_tag == "ins":
self.last_ins += "**" + data + "**"
if self.current_tag == "tdd":
self.last_del += data
if self.current_tag == "tda":
self.last_ins += data
def discord_compact_converter(embed: DiscordMessage) -> str:
return markdown.markdown(embed.webhook_object["content"], extensions=['pymdownx.tilde'])
def handle_endtag(self, tagname):
if self.done:
return
if tagname == "ins":
self.current_tag = "tda"
elif tagname == "del":
self.current_tag = "tdd"
elif tagname == "td":
self.current_tag = ""
elif tagname == "tr":
if self.last_ins is None:
return
# if self.last_ins == "" and self.last_del != "\u200b":
# if "~~" in self.last_del:
# self.last_del = self.last_del.replace("~~", "__")
# self.last_del = "~~" + self.last_del + "~~"
# if self.last_del == "" and self.last_ins != "\u200b":
# if "**" in self.last_ins:
# self.last_ins = self.last_ins.replace("**", "__")
# self.last_ins = "**" + self.last_ins + "**"
if len(self.text) + len(self.last_del) + len(self.last_ins) > self.max_length:
remaining = self.max_length - len(self.text)
if remaining < 0:
remaining = 0
if remaining % 2 == 1:
remaining = remaining + 1
part = int(remaining / 2)
if len(self.last_del) < part:
part += part - len(self.last_del)
if len(self.last_ins) < part:
part += part - len(self.last_ins)
if len(self.last_del) > part:
self.last_del = self.last_del[:part]
if self.last_del.count("~~") % 2 == 1:
self.last_del += "~~"
self.last_del += " **And more**"
if len(self.last_ins) > part:
self.last_ins = self.last_ins[:part]
if self.last_ins.count("**") % 2 == 1:
self.last_ins += "**"
self.last_ins += " ~~And more~~"
self.done = True
self.text += "| {removed} | {added}\n".format(removed=self.last_del, added=self.last_ins)
self.last_del = None
self.last_ins = None

203
extensions/hooks/zulip.py Normal file
View file

@ -0,0 +1,203 @@
# This file is part of Recent changes Goat compatible Discord webhook (RcGcDw).
#
# RcGcDw is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDw. If not, see <http://www.gnu.org/licenses/>.
from src.api.context import Context
from src.discord.message import DiscordMessage, DiscordMessageMetadata
from src.api.hook import post_hook
from src.api.util import sanitize_to_markdown
from src.configloader import settings
from src.misc import class_searcher
from html.parser import HTMLParser
import requests
import base64
import re
# Bridge messages to Zulip
# {
# "hooks": {
# "zulip": {
# "realm": "https://mc-wiki.zulipchat.com",
# "email": "bot@zulip.com",
# "api_key": "12345",
# "target": {
# "type": "stream",
# "to": "Channel",
# "topic": "RcGcdw"
# }
# }
# }
# }
zulip = settings.get("hooks", {}).get("zulip", {})
auth = ""
if len(zulip):
auth = base64.b64encode("{email}:{api_key}".format(email=zulip["email"], api_key=zulip["api_key"]).encode('utf-8'))
@post_hook
def zulip_hook(message: DiscordMessage, metadata: DiscordMessageMetadata, context: Context, change: dict):
if not len(zulip) or context.feed_type == "discussion":
return
content = (message.webhook_object.get("content", "") or "")
if message.message_type == "embed":
if content:
content += "\n"
images = []
content += "`````quote\n"
if message.embed.get("author", {}).get("name", None):
author = sanitize_to_markdown(message["author"]["name"])
if message["author"].get("url", None):
author = "[{name}]({url})".format(name=author, url=message["author"]["url"])
content += "{author}:\n".format(author=author)
if message.embed.get("title", None):
title = message["title"]
if message.embed.get("url", None):
title = "[{title}]({url})".format(title=title, url=message["url"])
thumbnail = ""
if message.embed.get("thumbnail", {}).get("url", None):
thumbnail = " [thumbnail]({url})".format(url=message["thumbnail"]["url"])
images.append( "[^]({url})".format(url=message["thumbnail"]["url"]) )
content += "**{title}**{thumbnail}\n".format(title=title, thumbnail=thumbnail)
elif message.embed.get("thumbnail", {}).get("url", None):
content += "[thumbnail]({url})\n".format(url=message["thumbnail"]["url"])
images.append( "[^]({url})".format(url=message["thumbnail"]["url"]) )
if message.embed.get("description", None):
content += message["description"] + "\n"
if context.changed_content:
edit_diff = ContentParser(context._)
edit_diff.feed(context.changed_content)
content += "\n" + edit_diff.text + "\n"
if message.embed.get("fields", []):
for field in message["fields"]:
if context.changed_content and field["name"] in (context._("Removed"), context._("Added")):
continue
content += "- **{name}**\n````quote\n{value}\n````\n".format(name=field["name"],value=field["value"])
if message.embed.get("image", {}).get("url", None):
content += "**[image]({url})**\n".format(url=message["image"]["url"])
images.append( "**[^]({url})**".format(url=message["image"]["url"]) )
if message.embed.get("footer", {}).get("text", None):
timestamp = ""
if message.embed.get("timestamp", None):
timestamp = " • <time:{timestamp}>".format(timestamp=message["timestamp"])
content += "{footer}{timestamp}\n".format(footer=sanitize_to_markdown(message["footer"]["text"]), timestamp=timestamp)
elif message.embed.get("timestamp", None):
content += "<time:{timestamp}>\n".format(timestamp=message["timestamp"])
content += "`````"
if len(images):
content += "\n{images}".format(images=" ".join(images))
content = re.sub(r"@\*\*(all|everyone|channel|topic)\*\*", "@\u200b**\\1**", content)
if not content:
return
data = {**zulip["target"], 'content': content}
header = {'Authorization': "Basic {auth}".format(auth=auth.decode('utf-8'))}
req = requests.Request("POST", "{realm}/api/v1/messages".format(realm=zulip["realm"]), data=data, headers=header)
try:
req = req.prepare()
requests.Session().send(req, timeout=10)
except:
pass
class ContentParser(HTMLParser):
"""ContentPerser is an implementation of HTMLParser that parses output of action=compare&prop=diff API request
for two MediaWiki revisions."""
max_length = 3000
current_tag = ""
last_del = None
last_ins = None
done = False
def __init__(self, lang):
super().__init__()
self.text = "| {removed} | {added}\n|---|---\n".format(removed=lang("Removed"), added=lang("Added"))
def handle_starttag(self, tagname, attribs):
if self.done:
return
if tagname == "ins" or tagname == "del":
self.current_tag = tagname
if tagname == "td":
classes = class_searcher(attribs).split(' ')
if "diff-deletedline" in classes:
self.current_tag = "tdd"
self.last_del = "\u200b"
if "diff-addedline" in classes:
self.current_tag = "tda"
self.last_ins = "\u200b"
if "diff-empty" in classes:
if self.last_del is None:
self.last_del = ""
if self.last_ins is None:
self.last_ins = ""
def handle_data(self, data):
def escape_formatting(data: str) -> str:
"""Escape Discord formatting"""
return re.sub(r"([`_*~:<>{}@/|#\-\.\\\[\]\(\)])", "\\\\\\1", data)
if not self.current_tag or self.done:
return
data = escape_formatting(data)
if self.current_tag == "del":
self.last_del += "~~" + data + "~~"
if self.current_tag == "ins":
self.last_ins += "**" + data + "**"
if self.current_tag == "tdd":
self.last_del += data
if self.current_tag == "tda":
self.last_ins += data
def handle_endtag(self, tagname):
if self.done:
return
if tagname == "ins":
self.current_tag = "tda"
elif tagname == "del":
self.current_tag = "tdd"
elif tagname == "td":
self.current_tag = ""
elif tagname == "tr":
if self.last_ins is None:
return
# if self.last_ins == "" and self.last_del != "\u200b":
# if "~~" in self.last_del:
# self.last_del = self.last_del.replace("~~", "__")
# self.last_del = "~~" + self.last_del + "~~"
# if self.last_del == "" and self.last_ins != "\u200b":
# if "**" in self.last_ins:
# self.last_ins = self.last_ins.replace("**", "__")
# self.last_ins = "**" + self.last_ins + "**"
if len(self.text) + len(self.last_del) + len(self.last_ins) > self.max_length:
remaining = self.max_length - len(self.text)
if remaining < 0:
remaining = 0
if remaining % 2 == 1:
remaining = remaining + 1
part = int(remaining / 2)
if len(self.last_del) < part:
part += part - len(self.last_del)
if len(self.last_ins) < part:
part += part - len(self.last_ins)
if len(self.last_del) > part:
self.last_del = self.last_del[:part]
if self.last_del.count("~~") % 2 == 1:
self.last_del += "~~"
self.last_del += " **And more**"
if len(self.last_ins) > part:
self.last_ins = self.last_ins[:part]
if self.last_ins.count("**") % 2 == 1:
self.last_ins += "**"
self.last_ins += " ~~And more~~"
self.done = True
self.text += "| {removed} | {added}\n".format(removed=self.last_del, added=self.last_ins)
self.last_del = None
self.last_ins = None