Added buttons based on MarkusRost's contributions to RcGcDw project

This commit is contained in:
Frisk 2024-01-14 14:24:51 +01:00
parent daf4478ea2
commit cbcf3624e1
7 changed files with 110 additions and 17 deletions

View file

@ -16,3 +16,4 @@
#import extensions.hooks.example_hook #import extensions.hooks.example_hook
#import extensions.hooks.usertalk #import extensions.hooks.usertalk
#import extensions.hooks.edit_alerts #import extensions.hooks.edit_alerts
import extensions.hooks.buttons

View file

@ -0,0 +1,77 @@
# This file is part of Recent changes Goat compatible Discord webhook (RcGcDw).
#
# RcGcDw is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDw. If not, see <http://www.gnu.org/licenses/>.
from typing import Optional
from src.api.context import Context
from src.discord.message import DiscordMessage, DiscordMessageMetadata
from src.api.hook import post_hook
# The webhook used for RcGcDb need to be controlled by a Discord application running https://github.com/Markus-Rost/rcgcdw-buttons
# You can use https://www.wikibot.de/interactions to create a Discord webhook to be used for RcGcDw that supports buttons.
# {
# "hooks": {
# "buttons": {
# "block": "Block user",
# "delete": "Delete",
# "filerevert": "Revert",
# "move": "Move back",
# "rollback": "Rollback",
# "undo": "Undo"
# }
# }
# }
def add_button(message: DiscordMessage, custom_id: str, label, style=2, emoji: Optional[dict] = None):
if len(custom_id) > 100 or not len(label):
return
if "components" not in message.webhook_object:
message.webhook_object["components"] = [{"type": 1, "components": []}]
if len(message.webhook_object["components"][-1]["components"]) >= 5:
message.webhook_object["components"].append({"type": 1, "components": []})
message.webhook_object["components"][-1]["components"].append(
{"type": 2, "custom_id": custom_id, "style": style, "label": label, "emoji": emoji})
@post_hook
def buttons_hook(message: DiscordMessage, metadata: DiscordMessageMetadata, context: Context, change: dict):
action_buttons = context.buttons or ""
if not len(action_buttons) or context.feed_type == "discussion":
return
BUTTON_PREFIX = context.client.WIKI_SCRIPT_PATH[len(context.client.WIKI_JUST_DOMAIN):]
if "block" in action_buttons and context.event != "suppressed":
add_button(message,
BUTTON_PREFIX + " block " + ("#" + str(change["userid"]) if change["userid"] else change["user"]),
context.gettext("Block user"), 4, {"id": None, "name": "🚧"})
if context.feed_type != "recentchanges":
return
if "delete" in action_buttons and context.event in ("new", "upload/upload"):
add_button(message, BUTTON_PREFIX + " delete " + str(change["pageid"]),
context.gettext("Delete"), 4, {"id": None, "name": "🗑️"})
# if "filerevert" in action_buttons and context.event in ("upload/overwrite", "upload/revert"):
# add_button(message, BUTTON_PREFIX + " file " + str(change["pageid"]) + " " + revision["archivename"].split("!")[0],
# action_buttons["filerevert"], 2, {"id": None, "name": "🔂"})
if "move" in action_buttons and context.event in ("move/move", "move/move_redir"):
add_button(message, BUTTON_PREFIX + " move " + str(change["pageid"]) + " " + change["title"],
context.gettext("Move back"), 2, {"id": None, "name": "🔂"})
if context.event != "edit":
return
if "rollback" in action_buttons:
add_button(message, BUTTON_PREFIX + " rollback " + str(change["pageid"]) + " " + (
"#" + str(change["userid"]) if change["userid"] else change["user"]),
context.gettext("Rollback"), 1, {"id": None, "name": "🔁"})
if "undo" in action_buttons:
add_button(message, BUTTON_PREFIX + " undo " + str(change["pageid"]) + " " + str(change["revid"]),
context.gettext("Undo"), 2, {"id": None, "name": "🔂"})

View file

@ -15,7 +15,7 @@
from __future__ import annotations from __future__ import annotations
import gettext import gettext
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING: if TYPE_CHECKING:
from src.api.client import Client from src.api.client import Client
@ -24,7 +24,7 @@ if TYPE_CHECKING:
class Context: class Context:
"""Context object containing client and some metadata regarding specific formatter call, """Context object containing client and some metadata regarding specific formatter call,
they are mainly used as a bridge between part that fetches the changes and API's formatters""" they are mainly used as a bridge between part that fetches the changes and API's formatters"""
def __init__(self, message_type: str, feed_type: str, webhook_urls: list[str], client: Client, language: gettext.GNUTranslations, settings: dict): def __init__(self, message_type: str, feed_type: str, webhook_urls: list[str], client: Client, language: gettext.GNUTranslations, settings: dict, buttons: Optional[str]):
self.client = client self.client = client
self.webhook_url = webhook_urls self.webhook_url = webhook_urls
self.message_type = message_type self.message_type = message_type
@ -39,6 +39,7 @@ class Context:
self.pgettext = language.pgettext # Translation with context (ex. ctx.pgettext("From mediawiki module", "Blocked {} user")) self.pgettext = language.pgettext # Translation with context (ex. ctx.pgettext("From mediawiki module", "Blocked {} user"))
self.npgettext = language.npgettext # Plural translation with context (ex. ctx.npgettext("From mediawiki module", "Edited {} time", "Edited {} times", edit_amoint) self.npgettext = language.npgettext # Plural translation with context (ex. ctx.npgettext("From mediawiki module", "Edited {} time", "Edited {} times", edit_amoint)
self.settings = settings self.settings = settings
self.buttons = buttons
def set_categories(self, cats): def set_categories(self, cats):
self.categories = cats self.categories = cats

View file

@ -4,7 +4,7 @@ try: # load settings
with open("settings.json", encoding="utf8") as sfile: with open("settings.json", encoding="utf8") as sfile:
settings = json.load(sfile) settings = json.load(sfile)
if "user-agent" in settings["header"]: if "user-agent" in settings["header"]:
settings["header"]["user-agent"] = settings["header"]["user-agent"].format(version="1.9 Beta") # set the version in the useragent settings["header"]["user-agent"] = settings["header"]["user-agent"].format(version="1.9.1 Beta") # set the version in the useragent
except FileNotFoundError: except FileNotFoundError:
logging.critical("No config file could be found. Please make sure settings.json is in the directory.") logging.critical("No config file could be found. Please make sure settings.json is in the directory.")
sys.exit(1) sys.exit(1)

View file

@ -143,7 +143,7 @@ async def essential_feeds(change: dict, comment_pages: dict, wiki: Wiki, target:
comment_page["fullUrl"] = "/".join(wiki.script_url.split("/", 3)[:3]) + comment_page["relativeUrl"] comment_page["fullUrl"] = "/".join(wiki.script_url.split("/", 3)[:3]) + comment_page["relativeUrl"]
metadata = DiscordMessageMetadata("POST", rev_id=None, log_id=None, page_id=None) metadata = DiscordMessageMetadata("POST", rev_id=None, log_id=None, page_id=None)
context = Context("embed" if target[0].display > 0 else "compact", "recentchanges", target[1], wiki.client, context = Context("embed" if target[0].display > 0 else "compact", "recentchanges", target[1], wiki.client,
langs[target[0].lang]["formatters"], prepare_settings(target[0].display)) langs[target[0].lang]["formatters"], prepare_settings(target[0].display), "")
context.set_comment_page(comment_page) context.set_comment_page(comment_page)
discord_message: Optional[DiscordMessage] = None discord_message: Optional[DiscordMessage] = None
try: try:

View file

@ -8,6 +8,7 @@ import base64, re
import logging import logging
from typing import Callable from typing import Callable
from urllib.parse import urlparse, urlunparse from urllib.parse import urlparse, urlunparse
from src.config import settings
logger = logging.getLogger("rcgcdw.misc") logger = logging.getLogger("rcgcdw.misc")
@ -28,6 +29,17 @@ def get_domain(url: str) -> str:
return ".".join(urlunparse((*parsed_url[0:2], "", "", "", "")).split(".")[-2:]) # something like gamepedia.com, fandom.com return ".".join(urlunparse((*parsed_url[0:2], "", "", "", "")).split(".")[-2:]) # something like gamepedia.com, fandom.com
def run_hooks(hooks, *arguments):
for hook in hooks:
try:
hook(*arguments)
except:
if settings.get("error_tolerance", 1) > 0:
logger.exception("On running a pre hook, ignoring pre-hook")
else:
raise
class LinkParser(HTMLParser): class LinkParser(HTMLParser):
new_string = "" new_string = ""

View file

@ -8,12 +8,12 @@ import asyncio
import requests import requests
from src.api.util import default_message from src.api.util import default_message
from src.misc import prepare_settings from src.misc import prepare_settings, run_hooks
from src.discord.queue import messagequeue, QueueEntry from src.discord.queue import messagequeue, QueueEntry
from src.mw_messages import MWMessages from src.mw_messages import MWMessages
from src.exceptions import * from src.exceptions import *
from src.queue_handler import dbmanager from src.queue_handler import dbmanager
from src.api.hooks import formatter_hooks from src.api.hooks import formatter_hooks, pre_hooks, post_hooks
from src.api.client import Client from src.api.client import Client
from src.api.context import Context from src.api.context import Context
from src.discord.message import DiscordMessage, DiscordMessageMetadata, StackedDiscordMessage from src.discord.message import DiscordMessage, DiscordMessageMetadata, StackedDiscordMessage
@ -25,7 +25,7 @@ from bs4 import BeautifulSoup
from collections import OrderedDict, defaultdict, namedtuple from collections import OrderedDict, defaultdict, namedtuple
from typing import Union, Optional, TYPE_CHECKING, List from typing import Union, Optional, TYPE_CHECKING, List
Settings = namedtuple("Settings", ["lang", "display"]) Settings = namedtuple("Settings", ["lang", "display", "buttons"])
logger = logging.getLogger("rcgcdb.wiki") logger = logging.getLogger("rcgcdb.wiki")
# wiki_reamoval_reasons = {410: _("wiki deleted"), 404: _("wiki deleted"), 401: _("wiki inaccessible"), # wiki_reamoval_reasons = {410: _("wiki deleted"), 404: _("wiki deleted"), 401: _("wiki inaccessible"),
@ -174,16 +174,15 @@ class Wiki:
"""This function generates all possible varations of outputs that we need to generate messages for. """This function generates all possible varations of outputs that we need to generate messages for.
:returns defaultdict[namedtuple, list[str]] - where namedtuple is a named tuple with settings for given webhooks in list""" :returns defaultdict[namedtuple, list[str]] - where namedtuple is a named tuple with settings for given webhooks in list"""
Settings = namedtuple("Settings", ["lang", "display"])
target_settings: defaultdict[Settings, list[str]] = defaultdict(list) target_settings: defaultdict[Settings, list[str]] = defaultdict(list)
discussion_targets: defaultdict[Settings, list[str]] = defaultdict(list) discussion_targets: defaultdict[Settings, list[str]] = defaultdict(list)
async for webhook in dbmanager.fetch_rows("SELECT webhook, lang, display, rcid, postid FROM rcgcdb WHERE wiki = $1", self.script_url): async for webhook in dbmanager.fetch_rows("SELECT webhook, lang, display, rcid, postid, buttons FROM rcgcdb WHERE wiki = $1", self.script_url):
if webhook['rcid'] == -1 and webhook['postid'] == '-1': if webhook['rcid'] == -1 and webhook['postid'] == '-1':
await self.remove_wiki_from_db(4) await self.remove_wiki_from_db(4)
if webhook['rcid'] != -1: if webhook['rcid'] != -1:
target_settings[Settings(webhook["lang"], webhook["display"])].append(webhook["webhook"]) target_settings[Settings(webhook["lang"], webhook["display"], webhook["buttons"])].append(webhook["webhook"])
if webhook['postid'] != '-1': if webhook['postid'] != '-1':
discussion_targets[Settings(webhook["lang"], webhook["display"])].append(webhook["webhook"]) discussion_targets[Settings(webhook["lang"], webhook["display"], webhook["buttons"])].append(webhook["webhook"])
self.rc_targets = target_settings self.rc_targets = target_settings
self.discussion_targets = discussion_targets self.discussion_targets = discussion_targets
@ -314,14 +313,14 @@ class Wiki:
params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges", params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges",
"meta": "allmessages|siteinfo", "meta": "allmessages|siteinfo",
"utf8": 1, "tglimit": "max", "tgprop": "displayname", "utf8": 1, "tglimit": "max", "tgprop": "displayname",
"rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user", "rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user|userid",
"rclimit": amount, "rcshow": "!bot", "rctype": "edit|new|log|categorize", "rclimit": amount, "rcshow": "!bot", "rctype": "edit|new|log|categorize",
"ammessages": "recentchanges-page-added-to-category|recentchanges-page-removed-from-category|recentchanges-page-added-to-category-bundled|recentchanges-page-removed-from-category-bundled", "ammessages": "recentchanges-page-added-to-category|recentchanges-page-removed-from-category|recentchanges-page-added-to-category-bundled|recentchanges-page-removed-from-category-bundled",
"amenableparser": 1, "amincludelocal": 1, "siprop": "namespaces|general"}) "amenableparser": 1, "amincludelocal": 1, "siprop": "namespaces|general"})
else: else:
params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges", params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges",
"meta": "siteinfo", "utf8": 1, "rcshow": "!bot", "meta": "siteinfo", "utf8": 1, "rcshow": "!bot",
"rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user", "rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user|userid",
"rclimit": amount, "rctype": "edit|new|log|categorize", "siprop": "namespaces|general"}) "rclimit": amount, "rctype": "edit|new|log|categorize", "siprop": "namespaces|general"})
try: try:
response = await self.api_request(params=params) response = await self.api_request(params=params)
@ -450,16 +449,18 @@ def process_cachable(response: dict, wiki_object: Wiki) -> None:
wiki_object.recache_requested = False wiki_object.recache_requested = False
async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, display_options: namedtuple("Settings", ["lang", "display"]), webhooks: list) -> Optional[DiscordMessage]: async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, display_options: namedtuple("Settings", ["lang", "display", "buttons"]), webhooks: list) -> Optional[DiscordMessage]:
"""This function takes more vital information, communicates with a formatter and constructs DiscordMessage with it. """This function takes more vital information, communicates with a formatter and constructs DiscordMessage with it.
It creates DiscordMessageMetadata object, LinkParser and Context. Prepares a comment """ It creates DiscordMessageMetadata object, LinkParser and Context. Prepares a comment """
from src.misc import LinkParser from src.misc import LinkParser
LinkParser = LinkParser(wiki.client.WIKI_JUST_DOMAIN) LinkParser = LinkParser(wiki.client.WIKI_JUST_DOMAIN)
metadata = DiscordMessageMetadata("POST", rev_id=change.get("revid", None), log_id=change.get("logid", None), metadata = DiscordMessageMetadata("POST", rev_id=change.get("revid", None), log_id=change.get("logid", None),
page_id=change.get("pageid", None), message_display=display_options.display) page_id=change.get("pageid", None), message_display=display_options.display)
context = Context("embed" if display_options.display > 0 else "compact", "recentchanges", webhooks, wiki.client, langs[display_options.lang]["formatters"], prepare_settings(display_options.display)) context = Context("embed" if display_options.display > 0 else "compact", "recentchanges", webhooks, wiki.client,
langs[display_options.lang]["formatters"], prepare_settings(display_options.display), display_options.buttons)
if ("actionhidden" in change or "suppressed" in change) and "suppressed" not in settings["ignored"]: # if event is hidden using suppression if ("actionhidden" in change or "suppressed" in change) and "suppressed" not in settings["ignored"]: # if event is hidden using suppression
context.event = "suppressed" context.event = "suppressed"
run_hooks(pre_hooks, context, change)
try: try:
discord_message: Optional[DiscordMessage] = await asyncio.get_event_loop().run_in_executor( discord_message: Optional[DiscordMessage] = await asyncio.get_event_loop().run_in_executor(
None, functools.partial(default_message("suppressed", context.message_type, formatter_hooks), context, change)) None, functools.partial(default_message("suppressed", context.message_type, formatter_hooks), context, change))
@ -509,14 +510,14 @@ async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, displ
wiki.delete_messages(dict(page_id=change.get("pageid"))) wiki.delete_messages(dict(page_id=change.get("pageid")))
elif identification_string == "delete/event": elif identification_string == "delete/event":
logparams = change.get('logparams', {"ids": []}) logparams = change.get('logparams', {"ids": []})
if settings["appearance"]["mode"] == "embed": if context.message_type == "embed":
wiki.redact_messages(context, logparams.get("ids", []), "log_id", logparams.get("new", {})) wiki.redact_messages(context, logparams.get("ids", []), "log_id", logparams.get("new", {}))
else: else:
for logid in logparams.get("ids", []): for logid in logparams.get("ids", []):
wiki.delete_messages(dict(logid=logid)) wiki.delete_messages(dict(logid=logid))
elif identification_string == "delete/revision": elif identification_string == "delete/revision":
logparams = change.get('logparams', {"ids": []}) logparams = change.get('logparams', {"ids": []})
if settings["appearance"]["mode"] == "embed": if context.message_type == "embed":
wiki.redact_messages(context, logparams.get("ids", []), "rev_id", logparams.get("new", {})) wiki.redact_messages(context, logparams.get("ids", []), "rev_id", logparams.get("new", {}))
if display_options.display == 3: if display_options.display == 3:
wiki.redact_messages(context, wiki.find_middle_next(logparams.get("ids", []), change.get("pageid", -1)), "rev_id", wiki.redact_messages(context, wiki.find_middle_next(logparams.get("ids", []), change.get("pageid", -1)), "rev_id",
@ -524,6 +525,7 @@ async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, displ
else: else:
for revid in logparams.get("ids", []): for revid in logparams.get("ids", []):
wiki.delete_messages(dict(revid=revid)) wiki.delete_messages(dict(revid=revid))
run_hooks(post_hooks, discord_message, metadata, context, change)
if discord_message: # TODO How to react when none? (crash in formatter), probably bad handling atm if discord_message: # TODO How to react when none? (crash in formatter), probably bad handling atm
discord_message.finish_embed() discord_message.finish_embed()
discord_message.metadata = metadata discord_message.metadata = metadata