2022-07-26 13:48:44 +00:00
|
|
|
# This file is part of Recent changes Goat compatible Discord webhook (RcGcDw).
|
|
|
|
|
|
|
|
# RcGcDw is free software: you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
|
|
|
|
# RcGcDw is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU General Public License for more details.
|
|
|
|
|
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
|
# along with RcGcDw. If not, see <http://www.gnu.org/licenses/>.
|
2022-10-03 13:34:36 +00:00
|
|
|
from __future__ import annotations
|
2024-07-04 17:03:51 +00:00
|
|
|
|
|
|
|
import datetime
|
2022-07-26 13:48:44 +00:00
|
|
|
import json
|
|
|
|
import math
|
|
|
|
import random
|
|
|
|
from collections import defaultdict
|
|
|
|
|
2022-08-16 10:50:49 +00:00
|
|
|
from typing import Optional, TYPE_CHECKING
|
|
|
|
|
2022-11-07 14:46:15 +00:00
|
|
|
from src.exceptions import EmbedListFull
|
|
|
|
|
2022-08-16 10:50:49 +00:00
|
|
|
if TYPE_CHECKING:
|
|
|
|
from wiki import Wiki
|
2024-07-04 17:03:51 +00:00
|
|
|
from domain import Domain
|
2022-08-09 10:57:40 +00:00
|
|
|
|
2022-08-05 10:24:55 +00:00
|
|
|
with open("src/api/template_settings.json", "r") as template_json:
|
|
|
|
settings: dict = json.load(template_json)
|
|
|
|
|
2022-08-09 10:57:40 +00:00
|
|
|
|
2022-08-05 10:24:55 +00:00
|
|
|
class DiscordMessageMetadata:
|
2024-07-04 17:03:51 +00:00
|
|
|
def __init__(self, method, log_id = None, page_id = None, rev_id = None, webhook_url = None, message_display = None,
|
|
|
|
time_of_change: Optional[datetime.datetime] = None, domain: Domain = None):
|
2022-09-18 14:38:19 +00:00
|
|
|
self.method = method # unused, remains for compatibility reasons
|
2022-08-05 10:24:55 +00:00
|
|
|
self.page_id = page_id
|
|
|
|
self.log_id = log_id
|
|
|
|
self.rev_id = rev_id
|
|
|
|
self.webhook_url = webhook_url
|
2023-11-25 14:11:11 +00:00
|
|
|
self.message_display = message_display
|
2024-07-04 20:09:23 +00:00
|
|
|
self.time_of_change = time_of_change
|
2024-07-04 17:03:51 +00:00
|
|
|
self.domain = domain
|
2022-08-05 10:24:55 +00:00
|
|
|
|
2024-07-16 18:51:45 +00:00
|
|
|
def __str__(self):
|
|
|
|
return f"<DiscordMessageMetadata page_id={self.page_id} log_id={self.log_id} rev_id={self.rev_id}>"
|
|
|
|
|
2024-07-20 23:57:01 +00:00
|
|
|
def json(self) -> dict:
|
|
|
|
dict_obj = {
|
|
|
|
"method": self.method,
|
|
|
|
"page_id": self.page_id,
|
|
|
|
"log_id": self.log_id,
|
|
|
|
"rev_id": self.rev_id,
|
|
|
|
"message_display": self.message_display,
|
|
|
|
"time_of_change": str(self.time_of_change)
|
|
|
|
}
|
|
|
|
return dict_obj
|
|
|
|
|
2022-08-05 10:24:55 +00:00
|
|
|
def matches(self, other: dict):
|
|
|
|
for key, value in other.items():
|
|
|
|
if self.__dict__[key] != value:
|
|
|
|
return False
|
|
|
|
return True
|
|
|
|
|
2023-11-25 14:11:11 +00:00
|
|
|
def dump_ids(self) -> (int, int, int, int):
|
|
|
|
return self.page_id, self.rev_id, self.log_id, self.message_display
|
2022-07-26 13:48:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
class DiscordMessage:
|
|
|
|
"""A class defining a typical Discord JSON representation of webhook payload."""
|
2022-10-03 13:34:36 +00:00
|
|
|
def __init__(self, message_type: str, event_type: str, webhook_url: list[str], content=None):
|
2022-08-05 10:24:55 +00:00
|
|
|
self.webhook_object = dict(allowed_mentions={"parse": []})
|
|
|
|
self.length = 0
|
2022-08-09 10:57:40 +00:00
|
|
|
self.metadata: Optional[DiscordMessageMetadata] = None
|
2022-08-16 10:50:49 +00:00
|
|
|
self.wiki: Optional[Wiki] = None
|
2022-07-26 13:48:44 +00:00
|
|
|
|
|
|
|
if message_type == "embed":
|
|
|
|
self.__setup_embed()
|
|
|
|
elif message_type == "compact":
|
|
|
|
if settings["event_appearance"].get(event_type, {"emoji": None})["emoji"]:
|
|
|
|
content = settings["event_appearance"][event_type]["emoji"] + " " + content
|
|
|
|
self.webhook_object["content"] = content
|
2022-08-05 10:24:55 +00:00
|
|
|
self.length = len(content)
|
2022-07-26 13:48:44 +00:00
|
|
|
|
|
|
|
self.message_type = message_type
|
|
|
|
self.event_type = event_type
|
|
|
|
|
|
|
|
def __setitem__(self, key, value):
|
|
|
|
"""Set item is used only in embeds."""
|
|
|
|
try:
|
2022-08-05 10:24:55 +00:00
|
|
|
if key in ('title', 'description'):
|
|
|
|
self.length += len(value) - len(self.embed.get(key, ""))
|
2022-07-26 13:48:44 +00:00
|
|
|
self.embed[key] = value
|
|
|
|
except NameError:
|
|
|
|
raise TypeError("Tried to assign a value when message type is plain message!")
|
|
|
|
|
|
|
|
def __getitem__(self, item):
|
|
|
|
return self.embed[item]
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
"""Return the Discord webhook object ready to be sent"""
|
|
|
|
return json.dumps(self.webhook_object)
|
|
|
|
|
2022-08-05 10:24:55 +00:00
|
|
|
def __len__(self):
|
|
|
|
return self.length
|
|
|
|
|
2024-07-20 23:57:01 +00:00
|
|
|
def json(self):
|
|
|
|
dict_obj = {
|
|
|
|
"length": self.length,
|
|
|
|
"metadata": self.metadata.json()
|
|
|
|
}
|
|
|
|
return dict_obj
|
|
|
|
|
2022-08-05 10:24:55 +00:00
|
|
|
def matches(self, other: dict):
|
|
|
|
return self.metadata.matches(other)
|
|
|
|
|
|
|
|
def message_type(self):
|
|
|
|
if "content" in self.webhook_object:
|
|
|
|
return "compact"
|
|
|
|
return "embed"
|
|
|
|
|
2022-07-26 13:48:44 +00:00
|
|
|
def __setup_embed(self):
|
|
|
|
self.embed = defaultdict(dict)
|
|
|
|
if "embeds" not in self.webhook_object:
|
|
|
|
self.webhook_object["embeds"] = [self.embed]
|
|
|
|
else:
|
|
|
|
self.webhook_object["embeds"].append(self.embed)
|
|
|
|
self.embed["color"] = None
|
|
|
|
|
|
|
|
def add_embed(self):
|
|
|
|
self.finish_embed()
|
|
|
|
self.__setup_embed()
|
|
|
|
|
|
|
|
def finish_embed(self):
|
|
|
|
if self.message_type != "embed":
|
|
|
|
return
|
|
|
|
if self.embed["color"] is None:
|
|
|
|
if settings["event_appearance"].get(self.event_type, {"color": None})["color"] is None:
|
|
|
|
self.embed["color"] = random.randrange(1, 16777215)
|
|
|
|
else:
|
|
|
|
self.embed["color"] = settings["event_appearance"][self.event_type]["color"]
|
|
|
|
else:
|
|
|
|
self.embed["color"] = math.floor(self.embed["color"])
|
|
|
|
if not self.embed["author"].get("icon_url", None) and settings["event_appearance"].get(self.event_type, {"icon": None})["icon"]:
|
|
|
|
self.embed["author"]["icon_url"] = settings["event_appearance"][self.event_type]["icon"]
|
|
|
|
if len(self.embed["title"]) > 254:
|
|
|
|
self.embed["title"] = self.embed["title"][0:253] + "…"
|
2022-08-05 10:24:55 +00:00
|
|
|
self.finish_embed_message()
|
|
|
|
|
|
|
|
def finish_embed_message(self):
|
|
|
|
if "embeds" not in self.webhook_object:
|
|
|
|
self.webhook_object["embeds"] = [self.embed]
|
|
|
|
else:
|
|
|
|
if len(self.webhook_object["embeds"]) > 9:
|
|
|
|
raise EmbedListFull
|
|
|
|
self.webhook_object["embeds"].append(self.embed)
|
2022-07-26 13:48:44 +00:00
|
|
|
|
2022-08-05 10:24:55 +00:00
|
|
|
def set_author(self, name: str, url="", icon_url=""):
|
|
|
|
self.length += len(name)
|
2022-07-26 13:48:44 +00:00
|
|
|
self.embed["author"]["name"] = name
|
|
|
|
self.embed["author"]["url"] = url
|
|
|
|
self.embed["author"]["icon_url"] = icon_url
|
|
|
|
|
2022-08-05 10:24:55 +00:00
|
|
|
def set_footer(self, text: str, icon_url=""):
|
|
|
|
self.length += len(text)
|
|
|
|
self.embed["footer"]["text"] = text
|
|
|
|
self.embed["footer"]["icon_url"] = icon_url
|
|
|
|
|
2022-07-26 13:48:44 +00:00
|
|
|
def add_field(self, name, value, inline=False):
|
|
|
|
if "fields" not in self.embed:
|
|
|
|
self.embed["fields"] = []
|
2022-08-05 10:24:55 +00:00
|
|
|
self.length += len(name) + len(value)
|
2022-07-26 13:48:44 +00:00
|
|
|
self.embed["fields"].append(dict(name=name, value=value, inline=inline))
|
|
|
|
|
|
|
|
def set_avatar(self, url):
|
|
|
|
self.webhook_object["avatar_url"] = url
|
|
|
|
|
|
|
|
def set_name(self, name):
|
|
|
|
self.webhook_object["username"] = name
|
|
|
|
|
|
|
|
def set_link(self, link):
|
|
|
|
self.embed["link"] = link
|
|
|
|
|
2022-08-05 10:24:55 +00:00
|
|
|
def return_content(self):
|
|
|
|
return self.webhook_object["content"]
|
|
|
|
|
2022-07-26 13:48:44 +00:00
|
|
|
|
2022-08-16 10:50:49 +00:00
|
|
|
class MessageTooBig(BaseException):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
2022-08-05 10:24:55 +00:00
|
|
|
class StackedDiscordMessage():
|
2022-08-31 12:30:41 +00:00
|
|
|
def __init__(self, m_type: int, wiki: Wiki):
|
2022-08-05 10:24:55 +00:00
|
|
|
self.message_list: list[DiscordMessage] = []
|
|
|
|
self.length = 0
|
|
|
|
self.message_type: int = m_type # 0 for compact, 1 for embed
|
2022-09-18 14:38:19 +00:00
|
|
|
self.discord_callback_message_id: int = -1
|
2022-08-31 12:30:41 +00:00
|
|
|
self.wiki: Wiki = wiki
|
2022-09-18 14:38:19 +00:00
|
|
|
self.webhook: Optional[str] = None
|
2022-08-05 10:24:55 +00:00
|
|
|
|
|
|
|
def __len__(self):
|
|
|
|
return self.length
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
message_structure = dict(allowed_mentions={"parse": []})
|
|
|
|
if self.message_type == 0:
|
|
|
|
message_structure["content"] = "\n".join([message.return_content() for message in self.message_list])
|
|
|
|
elif self.message_type == 1:
|
2022-10-29 15:04:25 +00:00
|
|
|
message_structure["embeds"] = [message.embed for message in self.message_list]
|
2024-07-04 17:03:51 +00:00
|
|
|
if self.message_list and "components" in self.message_list[0].webhook_object: # use components from the first message
|
2024-01-20 12:27:10 +00:00
|
|
|
message_structure["components"] = self.message_list[0].webhook_object["components"]
|
2022-08-05 10:24:55 +00:00
|
|
|
return json.dumps(message_structure)
|
|
|
|
|
2024-07-20 23:57:01 +00:00
|
|
|
def json(self) -> dict:
|
|
|
|
dict_obj = {
|
|
|
|
"length": self.length,
|
|
|
|
"message_type": self.message_type,
|
|
|
|
"discord_callback_message_id": self.discord_callback_message_id,
|
|
|
|
"webhook": self.webhook[-3:] if self.webhook else None,
|
|
|
|
"messages": [message.json() for message in self.message_list]
|
|
|
|
}
|
|
|
|
return dict_obj
|
|
|
|
|
2023-05-06 12:29:27 +00:00
|
|
|
def check_for_length(self, message_length: int):
|
|
|
|
if self.message_type:
|
|
|
|
return len(self) + message_length > 6000 or len(self.message_list) > 9
|
|
|
|
return (len(self) + message_length) > 2000
|
|
|
|
|
2022-08-05 10:24:55 +00:00
|
|
|
def filter(self, params: dict) -> list[tuple[int, DiscordMessage]]:
|
|
|
|
"""Filters messages by their metadata"""
|
2022-09-18 14:38:19 +00:00
|
|
|
return [(num, message) for num, message in enumerate(self.message_list) if message.matches(params)]
|
|
|
|
|
|
|
|
def delete_message_by_id(self, message_ids: list[int]):
|
|
|
|
"""Deletes messages with given IDS from the message_ids list"""
|
|
|
|
for message_id in sorted(message_ids, reverse=True):
|
|
|
|
self.message_list.pop(message_id)
|
2022-08-05 10:24:55 +00:00
|
|
|
|
|
|
|
def add_message(self, message: DiscordMessage):
|
2023-05-06 12:29:27 +00:00
|
|
|
if self.check_for_length(len(message)):
|
2022-08-05 10:24:55 +00:00
|
|
|
raise MessageTooBig
|
2023-05-06 12:29:27 +00:00
|
|
|
self.length += len(message) + (self.message_type == 0)
|
2022-08-05 10:24:55 +00:00
|
|
|
self.message_list.append(message)
|
|
|
|
# self._setup_embed()
|
|
|
|
# self.embed = message.embed
|
2024-07-04 20:09:23 +00:00
|
|
|
# self.finish_embed_message()
|