# This file is part of Recent changes Goat compatible Discord webhook (RcGcDw).
# RcGcDw is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with RcGcDw. If not, see .
from __future__ import annotations
import datetime
import json
import math
import random
from collections import defaultdict
from typing import Optional, TYPE_CHECKING
from src.exceptions import EmbedListFull
if TYPE_CHECKING:
from wiki import Wiki
from domain import Domain
with open("src/api/template_settings.json", "r") as template_json:
settings: dict = json.load(template_json)
class DiscordMessageMetadata:
def __init__(self, method, log_id = None, page_id = None, rev_id = None, webhook_url = None, message_display = None,
time_of_change: Optional[datetime.datetime] = None, domain: Domain = None):
self.method = method # unused, remains for compatibility reasons
self.page_id = page_id
self.log_id = log_id
self.rev_id = rev_id
self.webhook_url = webhook_url
self.message_display = message_display
self.time_of_change = time_of_change
self.domain = domain
def __str__(self):
return f""
def __getstate__(self):
if "domain" in self.__dict__:
obj_copy = self.__dict__.copy()
del obj_copy['domain']
return obj_copy
return super().__getstate__()
def json(self) -> dict:
dict_obj = {
"method": self.method,
"page_id": self.page_id,
"log_id": self.log_id,
"rev_id": self.rev_id,
"message_display": self.message_display,
"time_of_change": str(self.time_of_change)
}
return dict_obj
def matches(self, other: dict):
"""Checks if all keys and values match given dictionary"""
for key, value in other.items():
if isinstance(value, list) or isinstance(value, set):
if self.__dict__[key] not in value:
return False
else:
if self.__dict__[key] != value:
return False
return True
def dump_ids(self) -> (int, int, int, int):
return self.log_id, self.page_id, self.rev_id, self.message_display
class DiscordMessage:
"""A class defining a typical Discord JSON representation of webhook payload."""
def __init__(self, message_type: str, event_type: str, webhook_url: list[str], content=None):
self.webhook_object = dict(allowed_mentions={"parse": []})
self.length = 0
self.metadata: Optional[DiscordMessageMetadata] = None
self.wiki: Optional[Wiki] = None
if message_type == "embed":
self.__setup_embed()
elif message_type == "compact":
if settings["event_appearance"].get(event_type, {"emoji": None})["emoji"]:
content = settings["event_appearance"][event_type]["emoji"] + " " + content
self.webhook_object["content"] = content
self.length = len(content)
self.message_type = message_type
self.event_type = event_type
def __setitem__(self, key, value):
"""Set item is used only in embeds."""
try:
if key in ('title', 'description'):
self.length += len(value) - len(self.embed.get(key, ""))
self.embed[key] = value
except NameError:
raise TypeError("Tried to assign a value when message type is plain message!")
def __getitem__(self, item):
return self.embed.__getitem__(item)
def __repr__(self):
"""Return the Discord webhook object ready to be sent"""
return json.dumps(self.webhook_object)
def __len__(self):
return self.length
def __getstate__(self):
if "wiki" in self.__dict__:
obj_copy = self.__dict__.copy()
del obj_copy['wiki']
return obj_copy
return super().__getstate__()
def json(self):
dict_obj = {
"length": self.length,
"metadata": self.metadata.json()
}
return dict_obj
def matches(self, other: dict):
return self.metadata.matches(other)
def message_type(self):
if "content" in self.webhook_object:
return "compact"
return "embed"
def __setup_embed(self):
self.embed = defaultdict(dict)
if "embeds" not in self.webhook_object:
self.webhook_object["embeds"] = [self.embed]
else:
self.webhook_object["embeds"].append(self.embed)
self.embed["color"] = None
def add_embed(self):
self.finish_embed()
self.__setup_embed()
def finish_embed(self):
if self.message_type != "embed":
return
if self.embed["color"] is None:
if settings["event_appearance"].get(self.event_type, {"color": None})["color"] is None:
self.embed["color"] = random.randrange(1, 16777215)
else:
self.embed["color"] = settings["event_appearance"][self.event_type]["color"]
else:
self.embed["color"] = math.floor(self.embed["color"])
if not self.embed["author"].get("icon_url", None) and settings["event_appearance"].get(self.event_type, {"icon": None})["icon"]:
self.embed["author"]["icon_url"] = settings["event_appearance"][self.event_type]["icon"]
if len(self.embed["title"]) > 254:
self.embed["title"] = self.embed["title"][0:253] + "…"
self.finish_embed_message()
def finish_embed_message(self):
if "embeds" not in self.webhook_object:
self.webhook_object["embeds"] = [self.embed]
else:
if len(self.webhook_object["embeds"]) > 9:
raise EmbedListFull
self.webhook_object["embeds"].append(self.embed)
def set_author(self, name: str, url="", icon_url=""):
self.length += len(name)
self.embed["author"]["name"] = name
self.embed["author"]["url"] = url
self.embed["author"]["icon_url"] = icon_url
def set_footer(self, text: str, icon_url=""):
self.length += len(text)
self.embed["footer"]["text"] = text
self.embed["footer"]["icon_url"] = icon_url
def add_field(self, name, value, inline=False):
if "fields" not in self.embed:
self.embed["fields"] = []
self.length += len(name) + len(value)
self.embed["fields"].append(dict(name=name, value=value, inline=inline))
def set_avatar(self, url):
self.webhook_object["avatar_url"] = url
def set_name(self, name):
self.webhook_object["username"] = name
def set_link(self, link):
self.embed["link"] = link
def return_content(self):
return self.webhook_object["content"]
class MessageTooBig(BaseException):
pass
class StackedDiscordMessage:
def __init__(self, m_type: int, wiki: Wiki):
self.message_list: list[DiscordMessage] = []
self.length = 0
self.message_type: int = m_type # 0 for compact, 1 for embed
self.discord_callback_message_id: int = -1
self.wiki: Wiki = wiki
self.webhook: Optional[str] = None
def __len__(self):
return self.length
def __repr__(self):
message_structure = dict(allowed_mentions={"parse": []})
if self.message_type == 0:
message_structure["content"] = "\n".join([message.return_content() for message in self.message_list])
elif self.message_type == 1:
message_structure["embeds"] = [message.embed for message in self.message_list]
if self.message_list and "components" in self.message_list[0].webhook_object: # use components from the first message
message_structure["components"] = self.message_list[0].webhook_object["components"]
return json.dumps(message_structure)
def __iter__(self):
return self.message_list.__iter__()
def __getstate__(self):
if "wiki" in self.__dict__:
obj_copy = self.__dict__.copy()
del obj_copy['wiki']
return obj_copy
return super().__getstate__()
def is_empty(self):
return len(self.message_list) == 0
def json(self) -> dict:
dict_obj = {
"length": self.length,
"message_type": self.message_type,
"discord_callback_message_id": self.discord_callback_message_id,
"webhook": self.webhook.split("/")[0] if self.webhook else None,
"messages": [message.json() for message in self.message_list]
}
return dict_obj
def check_for_length(self, message_length: int):
if self.message_type:
return len(self) + message_length > 6000 or len(self.message_list) > 9
return (len(self) + message_length) > 2000
def filter(self, params: dict) -> list[tuple[int, DiscordMessage]]:
"""Filters messages by their metadata"""
return [(num, message) for num, message in enumerate(self.message_list) if message.matches(params)]
def delete_message_by_id(self, message_ids: list[int]):
"""Deletes messages with given IDS from the message_ids list"""
for message_id in sorted(message_ids, reverse=True):
self.message_list.pop(message_id)
def add_message(self, message: DiscordMessage):
if self.check_for_length(len(message)):
raise MessageTooBig
self.length += len(message) + (self.message_type == 0)
self.message_list.append(message)
# self._setup_embed()
# self.embed = message.embed
# self.finish_embed_message()