Added RcGcDw code to RcGcDb

This commit is contained in:
Frisk 2021-07-08 13:33:10 +02:00
parent f399276a2d
commit f72c21faf9
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
24 changed files with 3674 additions and 2 deletions

17
extensions/__init__.py Normal file
View file

@ -0,0 +1,17 @@
# This file is part of Recent changes Goat compatible Discord webhook (RcGcDw).
#
# RcGcDw is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDw. If not, see <http://www.gnu.org/licenses/>.
import extensions.base
import extensions.hooks

View file

@ -0,0 +1,26 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
import extensions.base.mediawiki
import extensions.base.abusefilter
import extensions.base.managewiki
import extensions.base.cargo
import extensions.base.datadump
import extensions.base.sprite
import extensions.base.translate
import extensions.base.discussions
import extensions.base.curseprofile
import extensions.base.interwiki
import extensions.base.renameuser

View file

@ -0,0 +1,125 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
import ipaddress
import logging
from src.discord.message import DiscordMessage
from src.api import formatter
from src.i18n import formatters_i18n
from src.api.context import Context
from src.api.util import embed_helper, sanitize_to_url, parse_mediawiki_changes, clean_link, compact_author, \
create_article_path, sanitize_to_markdown
from src.configloader import settings
_ = formatters_i18n.gettext
ngettext = formatters_i18n.ngettext
abusefilter_results = {"": _("None"), "warn": _("Warning issued"), "block": _("**Blocked user**"), "tag": _("Tagged the edit"), "disallow": _("Disallowed the action"), "rangeblock": _("**IP range blocked**"), "throttle": _("Throttled actions"), "blockautopromote": _("Removed autoconfirmed group"), "degroup": _("**Removed from privileged groups**")}
abusefilter_actions = {"edit": _("Edit"), "upload": _("Upload"), "move": _("Move"), "stashupload": _("Stash upload"), "delete": _("Deletion"), "createaccount": _("Account creation"), "autocreateaccount": _("Auto account creation")}
logger = logging.getLogger("extensions.base")
# AbuseFilter - https://www.mediawiki.org/wiki/Special:MyLanguage/Extension:AbuseFilter
# Processing Abuselog LOG events, separate from RC logs
def abuse_filter_format_user(change):
author = change["user"]
if settings.get("hide_ips", False):
try:
ipaddress.ip_address(change["user"])
except ValueError:
pass
else:
author = _("Unregistered user")
return author
@formatter.embed(event="abuselog")
def embed_abuselog(ctx: Context, change: dict):
action = "abuselog/{}".format(change["result"])
embed = DiscordMessage(ctx.message_type, action, ctx.webhook_url)
author = abuse_filter_format_user(change)
embed["title"] = _("{user} triggered \"{abuse_filter}\"").format(user=author, abuse_filter=sanitize_to_markdown(change["filter"]))
embed.add_field(_("Performed"), abusefilter_actions.get(change["action"], _("Unknown")))
embed.add_field(_("Action taken"), abusefilter_results.get(change["result"], _("Unknown")))
embed.add_field(_("Title"), sanitize_to_markdown(change.get("title", _("Unknown"))))
return embed
@formatter.compact(event="abuselog")
def compact_abuselog(ctx: Context, change: dict):
action = "abuselog/{}".format(change["result"])
author_url = clean_link(create_article_path("User:{user}".format(user=change["user"])))
author = abuse_filter_format_user(change)
message = _("[{author}]({author_url}) triggered *{abuse_filter}*, performing the action \"{action}\" on *[{target}]({target_url})* - action taken: {result}.").format(
author=author, author_url=author_url, abuse_filter=sanitize_to_markdown(change["filter"]),
action=abusefilter_actions.get(change["action"], _("Unknown")), target=change.get("title", _("Unknown")),
target_url=clean_link(create_article_path(sanitize_to_url(change.get("title", _("Unknown"))))),
result=abusefilter_results.get(change["result"], _("Unknown")))
return DiscordMessage(ctx.message_type, action, ctx.webhook_url, content=message)
# abusefilter/modify - AbuseFilter filter modification
@formatter.embed(event="abusefilter/modify")
def embed_abuselog_modify(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(
"Special:AbuseFilter/history/{number}/diff/prev/{historyid}".format(number=change["logparams"]['newId'],
historyid=change["logparams"]["historyId"]))
embed["title"] = _("Edited abuse filter number {number}").format(number=change["logparams"]['newId'])
return embed
@formatter.compact(event="abusefilter/modify")
def compact_abuselog_modify(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
link = clean_link(create_article_path(
"Special:AbuseFilter/history/{number}/diff/prev/{historyid}".format(number=change["logparams"]['newId'],
historyid=change["logparams"][
"historyId"])))
content = _("[{author}]({author_url}) edited abuse filter [number {number}]({filter_url})").format(author=author,
author_url=author_url,
number=change[
"logparams"][
'newId'],
filter_url=link)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# abusefilter/create - AbuseFilter filter creation
@formatter.embed(event="abusefilter/create")
def embed_abuselog_create(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path("Special:AbuseFilter/{number}".format(number=change["logparams"]['newId']))
embed["title"] = _("Created abuse filter number {number}").format(number=change["logparams"]['newId'])
return embed
@formatter.compact(event="abusefilter/create")
def compact_abuselog_create(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
link = clean_link(
create_article_path("Special:AbuseFilter/{number}".format(number=change["logparams"]['newId'])))
content = _("[{author}]({author_url}) created abuse filter [number {number}]({filter_url})").format(author=author,
author_url=author_url,
number=change[
"logparams"][
'newId'],
filter_url=link)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)

115
extensions/base/cargo.py Normal file
View file

@ -0,0 +1,115 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
import logging
import re
from src.discord.message import DiscordMessage
from src.api import formatter
from src.i18n import formatters_i18n
from src.api.context import Context
from src.api.util import embed_helper, compact_author, create_article_path, sanitize_to_markdown
_ = formatters_i18n.gettext
ngettext = formatters_i18n.ngettext
# Cargo - https://www.mediawiki.org/wiki/Extension:Cargo
# cargo/createtable - Creation of Cargo table
@formatter.embed(event="cargo/createtable")
def embed_cargo_createtable(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
table = re.search(r"\[(.*?)]\(<(.*?)>\)", ctx.client.parse_links(change["logparams"]["0"]))
embed["url"] = table.group(2)
embed["title"] = _("Created the Cargo table \"{table}\"").format(table=table.group(1))
return embed
@formatter.compact(event="cargo/createtable")
def compact_cargo_createtable(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
table = re.search(r"\[(.*?)]\(<(.*?)>\)", ctx.client.parse_links(change["logparams"]["0"]))
content = _("[{author}]({author_url}) created the Cargo table \"{table}\"").format(author=author,
author_url=author_url,
table=table)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# cargo/recreatetable - Recreating a Cargo table
@formatter.embed(event="cargo/recreatetable")
def embed_cargo_recreatetable(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
table = re.search(r"\[(.*?)]\(<(.*?)>\)", ctx.client.parse_links(change["logparams"]["0"]))
embed["url"] = table.group(2)
embed["title"] = _("Recreated the Cargo table \"{table}\"").format(table=table.group(1))
return embed
@formatter.compact(event="cargo/recreatetable")
def compact_cargo_recreatetable(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
table = re.search(r"\[(.*?)]\(<(.*?)>\)", ctx.client.parse_links(change["logparams"]["0"]))
content = _("[{author}]({author_url}) recreated the Cargo table \"{table}\"").format(author=author,
author_url=author_url,
table=table)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# cargo/replacetable - Replacing a Cargo table
@formatter.embed(event="cargo/replacetable")
def embed_cargo_replacetable(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
table = re.search(r"\[(.*?)]\(<(.*?)>\)", ctx.client.parse_links(change["logparams"]["0"]))
embed["url"] = table.group(2)
embed["title"] = _("Replaced the Cargo table \"{table}\"").format(table=table.group(1))
return embed
@formatter.compact(event="cargo/replacetable")
def compact_cargo_replacetable(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
table = re.search(r"\[(.*?)]\(<(.*?)>\)", ctx.client.parse_links(change["logparams"]["0"]))
content = _("[{author}]({author_url}) replaced the Cargo table \"{table}\"").format(author=author,
author_url=author_url,
table=table)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# cargo/deletetable - Deleting a table in Cargo
@formatter.embed(event="cargo/deletetable")
def embed_cargo_deletetable(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path("Special:CargoTables")
embed["title"] = _("Deleted the Cargo table \"{table}\"").format(table=sanitize_to_markdown(change["logparams"]["0"]))
return embed
@formatter.compact(event="cargo/deletetable")
def compact_cargo_deletetable(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
content = _("[{author}]({author_url}) deleted the Cargo table \"{table}\"").format(author=author,
author_url=author_url,
table=sanitize_to_markdown(change["logparams"]["0"]))
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)

View file

@ -0,0 +1,233 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
import logging
from src.configloader import settings
from src.discord.message import DiscordMessage
from src.api import formatter
from src.i18n import formatters_i18n
from src.api.context import Context
from src.api.util import embed_helper, clean_link, compact_author, create_article_path, sanitize_to_markdown, sanitize_to_url
from src.misc import profile_field_name
_ = formatters_i18n.gettext
ngettext = formatters_i18n.ngettext
# CurseProfile - https://help.fandom.com/wiki/Extension:CurseProfile
# curseprofile/profile-edited - Editing user profile
@formatter.embed(event="curseprofile/profile-edited")
def embed_curseprofile_profile_edited(ctx: Context, change: dict) -> DiscordMessage:
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
target_user = change["title"].split(':', 1)[1]
if target_user != change["user"]:
embed["title"] = _("Edited {target}'s profile").format(target=sanitize_to_markdown(target_user))
else:
embed["title"] = _("Edited their own profile")
if ctx.parsedcomment is None: # If the field is empty
embed["description"] = _("Cleared the {field} field").format(field=profile_field_name(change["logparams"]['4:section'], True))
else:
embed["description"] = _("{field} field changed to: {desc}").format(field=profile_field_name(change["logparams"]['4:section'], True), desc=ctx.parsedcomment)
embed["url"] = create_article_path("UserProfile:" + sanitize_to_url(target_user))
return embed
@formatter.compact(event="curseprofile/profile-edited")
def compact_curseprofile_profile_edited(ctx: Context, change: dict) -> DiscordMessage:
author, author_url = compact_author(ctx, change)
target_user = change["title"].split(':', 1)[1]
link = clean_link(create_article_path("UserProfile:" + sanitize_to_url(target_user)))
if target_user != author:
if ctx.parsedcomment is None: # If the field is empty
edit_clear_message = _("[{author}]({author_url}) cleared the {field} on [{target}]({target_url})'s profile.")
else:
edit_clear_message = _("[{author}]({author_url}) edited the {field} on [{target}]({target_url})'s profile. *({desc})*")
content = edit_clear_message.format(author=author, author_url=author_url, target=sanitize_to_markdown(target_user), target_url=link,
field=profile_field_name(change["logparams"]['4:section'], False), desc=ctx.parsedcomment)
else:
if ctx.parsedcomment is None: # If the field is empty
edit_clear_message = _("[{author}]({author_url}) cleared the {field} on [their own]({target_url}) profile.")
else:
edit_clear_message = _("[{author}]({author_url}) edited the {field} on [their own]({target_url}) profile. *({desc})*")
content = edit_clear_message.format(author=author, author_url=author_url, target_url=link,
field=profile_field_name(change["logparams"]['4:section'], False), desc=ctx.parsedcomment)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# curseprofile/comment-created - Creating comment on user profile
@formatter.embed(event="curseprofile/comment-created")
def embed_curseprofile_comment_created(ctx: Context, change: dict) -> DiscordMessage:
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
target_user = change["title"].split(':', 1)[1]
if target_user != change["user"]:
embed["title"] = _("Left a comment on {target}'s profile").format(target=sanitize_to_markdown(target_user))
else:
embed["title"] = _("Left a comment on their own profile")
if settings["appearance"]["embed"]["show_edit_changes"]:
embed["description"] = ctx.client.pull_curseprofile_comment(change["logparams"]["4:comment_id"])
embed["url"] = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))
return embed
@formatter.compact(event="curseprofile/comment-created")
def compact_curseprofile_comment_created(ctx: Context, change: dict) -> DiscordMessage:
author, author_url = compact_author(ctx, change)
target_user = change["title"].split(':', 1)[1]
link = clean_link(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])))
if target_user != author:
content = _("[{author}]({author_url}) left a [comment]({comment}) on {target}'s profile.").format(
author=author, author_url=author_url, comment=link, target=sanitize_to_markdown(target_user))
else:
content = _("[{author}]({author_url}) left a [comment]({comment}) on their own profile.").format(author=author, author_url=author_url, comment=link)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# curseprofile/comment-edited - Editing comment on user profile
@formatter.embed(event="curseprofile/comment-edited")
def embed_curseprofile_comment_edited(ctx: Context, change: dict) -> DiscordMessage:
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
target_user = change["title"].split(':', 1)[1]
if target_user != change["user"]:
embed["title"] = _("Edited a comment on {target}'s profile").format(target=sanitize_to_markdown(target_user))
else:
embed["title"] = _("Edited a comment on their own profile")
if settings["appearance"]["embed"]["show_edit_changes"]:
embed["description"] = ctx.client.pull_curseprofile_comment(change["logparams"]["4:comment_id"])
embed["url"] = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))
return embed
@formatter.compact(event="curseprofile/comment-edited")
def compact_curseprofile_comment_edited(ctx: Context, change: dict) -> DiscordMessage:
author, author_url = compact_author(ctx, change)
target_user = change["title"].split(':', 1)[1]
link = clean_link(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])))
if target_user != author:
content = _("[{author}]({author_url}) edited a [comment]({comment}) on {target}'s profile.").format(
author=author, author_url=author_url, comment=link, target=sanitize_to_markdown(target_user))
else:
content = _("[{author}]({author_url}) edited a [comment]({comment}) on their own profile.").format(author=author, author_url=author_url, comment=link)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# curseprofile/comment-replied - Replying to comment on user profile
@formatter.embed(event="curseprofile/comment-replied")
def embed_curseprofile_comment_replied(ctx: Context, change: dict) -> DiscordMessage:
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
target_user = change["title"].split(':', 1)[1]
if target_user != change["user"]:
embed["title"] = _("Replied to a comment on {target}'s profile").format(target=sanitize_to_markdown(target_user))
else:
embed["title"] = _("Replied to a comment on their own profile")
if settings["appearance"]["embed"]["show_edit_changes"]:
embed["description"] = ctx.client.pull_curseprofile_comment(change["logparams"]["4:comment_id"])
embed["url"] = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))
return embed
@formatter.compact(event="curseprofile/comment-replied")
def compact_curseprofile_comment_replied(ctx: Context, change: dict) -> DiscordMessage:
author, author_url = compact_author(ctx, change)
target_user = change["title"].split(':', 1)[1]
link = clean_link(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])))
if target_user != author:
content = _("[{author}]({author_url}) replied to a [comment]({comment}) on {target}'s profile.").format(
author=author, author_url=author_url, comment=link, target=sanitize_to_markdown(target_user))
else:
content = _("[{author}]({author_url}) replied to a [comment]({comment}) on their own profile.").format(author=author, author_url=author_url, comment=link)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# curseprofile/comment-deleted - Deleting comment on user profile
@formatter.embed(event="curseprofile/comment-deleted")
def embed_curseprofile_comment_deleted(ctx: Context, change: dict) -> DiscordMessage:
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
target_user = change["title"].split(':', 1)[1]
if target_user != change["user"]:
embed["title"] = _("Deleted a comment on {target}'s profile").format(target=sanitize_to_markdown(target_user))
else:
embed["title"] = _("Deleted a comment on their own profile")
if ctx.parsedcomment is not None:
embed["description"] = ctx.parsedcomment
if "4:comment_id" in change["logparams"]:
embed["url"] = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))
else:
embed["url"] = create_article_path("UserProfile:" + sanitize_to_url(target_user))
return embed
@formatter.compact(event="curseprofile/comment-deleted")
def compact_curseprofile_comment_deleted(ctx: Context, change: dict) -> DiscordMessage:
author, author_url = compact_author(ctx, change)
target_user = change["title"].split(':', 1)[1]
if "4:comment_id" in change["logparams"]:
link = clean_link(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])))
else:
link = clean_link(create_article_path("UserProfile:" + sanitize_to_url(target_user)))
parsed_comment = "" if ctx.parsedcomment is None else " *(" + ctx.parsedcomment + ")*"
if target_user != author:
content = _("[{author}]({author_url}) deleted a [comment]({comment}) on {target}'s profile.{reason}").format(
author=author, author_url=author_url, comment=link, target=sanitize_to_markdown(target_user), reason=parsed_comment)
else:
content = _("[{author}]({author_url}) deleted a [comment]({comment}) on their own profile.{reason}").format(
author=author, author_url=author_url, comment=link, reason=parsed_comment)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# curseprofile/comment-purged - Purging comment on user profile
@formatter.embed(event="curseprofile/comment-purged")
def embed_curseprofile_comment_purged(ctx: Context, change: dict) -> DiscordMessage:
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
target_user = change["title"].split(':', 1)[1]
if target_user != change["user"]:
embed["title"] = _("Purged a comment on {target}'s profile").format(target=sanitize_to_markdown(target_user))
else:
embed["title"] = _("Purged a comment on their own profile")
if ctx.parsedcomment is not None:
embed["description"] = ctx.parsedcomment
embed["url"] = create_article_path("UserProfile:" + sanitize_to_url(target_user))
return embed
@formatter.compact(event="curseprofile/comment-purged")
def compact_curseprofile_comment_purged(ctx: Context, change: dict) -> DiscordMessage:
author, author_url = compact_author(ctx, change)
target_user = change["title"].split(':', 1)[1]
link = clean_link(create_article_path("UserProfile:" + sanitize_to_url(target_user)))
parsed_comment = "" if ctx.parsedcomment is None else " *(" + ctx.parsedcomment + ")*"
if target_user != author:
content = _("[{author}]({author_url}) purged a comment on [{target}]({link})'s profile.{reason}").format(
author=author, author_url=author_url, link=link, target=sanitize_to_markdown(target_user), reason=parsed_comment)
else:
content = _("[{author}]({author_url}) purged a comment on [their own]({link}) profile.{reason}").format(author=author, author_url=author_url, link=link)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content, reason=parsed_comment)

View file

@ -0,0 +1,71 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
import logging
from src.discord.message import DiscordMessage
from src.api import formatter
from src.i18n import formatters_i18n
from src.api.context import Context
from src.api.util import embed_helper, compact_author, create_article_path, sanitize_to_markdown, sanitize_to_url, compact_summary
_ = formatters_i18n.gettext
ngettext = formatters_i18n.ngettext
# DataDumps - https://www.mediawiki.org/wiki/Extension:DataDump
# datadump/generate - Generating a dump of wiki
@formatter.embed(event="datadump/generate")
def embed_datadump_generate(ctx: Context, change: dict) -> DiscordMessage:
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["title"] = _("Generated {file} dump").format(file=change["logparams"]["filename"])
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
return embed
@formatter.compact(event="mdatadump/generate")
def compact_datadump_generate(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
content = _("[{author}]({author_url}) generated *{file}* dump{comment}").format(
author=author, author_url=author_url, file=sanitize_to_markdown(change["logparams"]["filename"]),
comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# datadump/delete - Deleting a dump of a wiki
@formatter.embed(event="datadump/delete")
def embed_datadump_delete(ctx: Context, change: dict) -> DiscordMessage:
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["title"] = _("Deleted {file} dump").format(file=sanitize_to_markdown(change["logparams"]["filename"]))
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
return embed
@formatter.compact(event="mdatadump/delete")
def compact_datadump_delete(ctx: Context, change: dict) -> DiscordMessage:
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
content = _("[{author}]({author_url}) deleted *{file}* dump{comment}").format(
author=author, author_url=author_url, file=sanitize_to_markdown(change["logparams"]["filename"]),
comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)

View file

@ -0,0 +1,368 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
# Discussions - Custom Fandom technology which apparently doesn't have any documentation or homepage, not even open-source, go figure
import json
import datetime, logging
import gettext
from urllib.parse import quote_plus
from src.configloader import settings
from src.api.util import create_article_path, clean_link, sanitize_to_markdown
from src.api.context import Context
from src.discord.queue import send_to_discord
from src.discord.message import DiscordMessage, DiscordMessageMetadata
from src.api import formatter
from src.i18n import formatters_i18n
_ = formatters_i18n.gettext
logger = logging.getLogger("rcgcdw.discussion_formatter")
class DiscussionsFromHellParser:
"""This class converts fairly convoluted Fandom jsonModal of a discussion post into Markdown formatted usable thing.
Takes string, returns string. Kudos to MarkusRost for allowing me to implement this formatter based on his code in Wiki-Bot."""
def __init__(self, post):
self.post = post
self.jsonModal = json.loads(post.get("jsonModel", "{}"))
self.markdown_text = ""
self.item_num = 1
self.image_last = None
def parse(self) -> str:
"""Main parsing logic"""
self.parse_content(self.jsonModal["content"])
if len(self.markdown_text) > 2000:
self.markdown_text = self.markdown_text[0:2000] + ""
return self.markdown_text
def parse_content(self, content, ctype=None):
self.image_last = None
for item in content:
if ctype == "bulletList":
self.markdown_text += "\t"
if ctype == "orderedList":
self.markdown_text += "\t{num}. ".format(num=self.item_num)
self.item_num += 1
if item["type"] == "text":
if "marks" in item:
prefix, suffix = self.convert_marks(item["marks"])
self.markdown_text = "{old}{pre}{text}{suf}".format(old=self.markdown_text, pre=prefix,
text=sanitize_to_markdown(item["text"]),
suf=suffix)
else:
if ctype == "code_block":
self.markdown_text += item["text"] # ignore formatting on preformatted text which cannot have additional formatting anyways
else:
self.markdown_text += sanitize_to_markdown(item["text"])
elif item["type"] == "paragraph":
if "content" in item:
self.parse_content(item["content"], item["type"])
self.markdown_text += "\n"
elif item["type"] == "openGraph":
if not item["attrs"]["wasAddedWithInlineLink"]:
self.markdown_text = "{old}{link}\n".format(old=self.markdown_text, link=item["attrs"]["url"])
elif item["type"] == "image":
try:
logger.debug(item["attrs"]["id"])
if item["attrs"]["id"] is not None:
self.markdown_text = "{old}{img_url}\n".format(old=self.markdown_text, img_url=
self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"])
self.image_last = self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"]
except (IndexError, ValueError):
logger.warning("Image {} not found.".format(item["attrs"]["id"]))
logger.debug(self.markdown_text)
elif item["type"] == "code_block":
self.markdown_text += "```\n"
if "content" in item:
self.parse_content(item["content"], item["type"])
self.markdown_text += "\n```\n"
elif item["type"] == "bulletList":
if "content" in item:
self.parse_content(item["content"], item["type"])
elif item["type"] == "orderedList":
self.item_num = 1
if "content" in item:
self.parse_content(item["content"], item["type"])
elif item["type"] == "listItem":
self.parse_content(item["content"], item["type"])
@staticmethod
def convert_marks(marks):
prefix = ""
suffix = ""
for mark in marks:
if mark["type"] == "mention":
prefix += "["
suffix = "]({wiki}f/u/{userid}){suffix}".format(wiki=settings["fandom_discussions"]["wiki_url"],
userid=mark["attrs"]["userId"], suffix=suffix)
elif mark["type"] == "strong":
prefix += "**"
suffix = "**{suffix}".format(suffix=suffix)
elif mark["type"] == "link":
prefix += "["
suffix = "]({link}){suffix}".format(link=mark["attrs"]["href"], suffix=suffix)
elif mark["type"] == "em":
prefix += "_"
suffix = "_" + suffix
return prefix, suffix
def common_discussions(post: dict, embed: DiscordMessage):
"""A method to setup embeds with common info shared between all types of discussion posts"""
if settings["fandom_discussions"]["appearance"]["embed"]["show_content"]:
if post.get("jsonModel") is not None:
npost = DiscussionsFromHellParser(post)
embed["description"] = npost.parse()
if npost.image_last:
embed["image"]["url"] = npost.image_last
embed["description"] = embed["description"].replace(npost.image_last, "")
else: # Fallback when model is not available
embed["description"] = post.get("rawContent", "")
embed["footer"]["text"] = post["forumName"]
embed["timestamp"] = datetime.datetime.fromtimestamp(post["creationDate"]["epochSecond"],
tz=datetime.timezone.utc).isoformat()
# discussion/forum - Discussions on the "forum" available via "Discuss" button
@formatter.embed(event="discussion/forum")
def embed_discussion_forum(ctx: Context, post: dict):
embed = DiscordMessage("embed", "discussion", settings["fandom_discussions"]["webhookURL"])
common_discussions(post, embed)
author = _("unknown") # Fail safe
if post["createdBy"]["name"]:
author = post["createdBy"]["name"]
embed.set_author(author, "{url}f/u/{creatorId}".format(url=settings["fandom_discussions"]["wiki_url"],
creatorId=post["creatorId"]),
icon_url=post["createdBy"]["avatarUrl"])
if not post["isReply"]:
embed["url"] = "{url}f/p/{threadId}".format(url=settings["fandom_discussions"]["wiki_url"],
threadId=post["threadId"])
embed["title"] = _("Created \"{title}\"").format(title=post["title"])
thread_funnel = post.get("funnel")
if thread_funnel == "POLL":
embed.event_type = "discussion/forum/poll"
embed["title"] = _("Created a poll \"{title}\"").format(title=post["title"])
if settings["fandom_discussions"]["appearance"]["embed"]["show_content"]:
poll = post["poll"]
image_type = False
if poll["answers"][0]["image"] is not None:
image_type = True
for num, option in enumerate(poll["answers"]):
embed.add_field(option["text"] if image_type is True else _("Option {}").format(num + 1),
option["text"] if image_type is False else _(
"__[View image]({image_url})__").format(image_url=option["image"]["url"]),
inline=True)
elif thread_funnel == "QUIZ":
embed.event_type = "discussion/forum/quiz"
embed["title"] = _("Created a quiz \"{title}\"").format(title=post["title"])
if settings["fandom_discussions"]["appearance"]["embed"]["show_content"]:
quiz = post["_embedded"]["quizzes"][0]
embed["description"] = quiz["title"]
if quiz["image"] is not None:
embed["image"]["url"] = quiz["image"]
elif thread_funnel == "TEXT":
embed.event_type = "discussion/forum/post"
else:
logger.warning(
"The type of {} is an unknown discussion post type. Please post an issue on the project page to have it added https://gitlab.com/piotrex43/RcGcDw/-/issues.".format(
thread_funnel))
embed.event_type = "unknown"
if post["_embedded"]["thread"][0]["tags"]:
tag_displayname = []
for tag in post["_embedded"]["thread"][0]["tags"]:
tag_displayname.append("[{title}]({url})".format(title=tag["articleTitle"], url=create_article_path(
quote_plus(tag["articleTitle"].replace(" ", "_"), "/:?=&"))))
if len(", ".join(tag_displayname)) > 1000:
embed.add_field(formatters_i18n.pgettext("Fandom discussions Tags/Forums", "Tags"), formatters_i18n.pgettext("Fandom discussions amount of Tags/Forums", "{} tags").format(len(post["_embedded"]["thread"][0]["tags"])))
else:
embed.add_field(formatters_i18n.pgettext("Fandom discussions Tags/Forums", "Tags"), ", ".join(tag_displayname))
else:
embed.event_type = "discussion/forum/reply"
embed["title"] = _("Replied to \"{title}\"").format(title=post["_embedded"]["thread"][0]["title"])
embed["url"] = "{url}f/p/{threadId}/r/{postId}".format(url=settings["fandom_discussions"]["wiki_url"],
threadId=post["threadId"], postId=post["id"])
return embed
@formatter.compact(event="discussion/forum")
def compact_discussion_forum(ctx: Context, post: dict):
message = None
author = _("unknown") # Fail safe
if post["createdBy"]["name"]:
author = post["createdBy"]["name"]
author_url = "<{url}f/u/{creatorId}>".format(url=settings["fandom_discussions"]["wiki_url"],
creatorId=post["creatorId"])
if not post["isReply"]:
thread_funnel = post.get("funnel")
msg_text = _("[{author}]({author_url}) created [{title}](<{url}f/p/{threadId}>) in {forumName}")
if thread_funnel == "POLL":
event_type = "discussion/forum/poll"
msg_text = _("[{author}]({author_url}) created a poll [{title}](<{url}f/p/{threadId}>) in {forumName}")
elif thread_funnel == "QUIZ":
event_type = "discussion/forum/quiz"
msg_text = _("[{author}]({author_url}) created a quiz [{title}](<{url}f/p/{threadId}>) in {forumName}")
elif thread_funnel == "TEXT":
event_type = "discussion/forum/post"
else:
logger.warning(
"The type of {} is an unknown discussion post type. Please post an issue on the project page to have it added https://gitlab.com/piotrex43/RcGcDw/-/issues.".format(
thread_funnel))
event_type = "unknown"
message = msg_text.format(author=author, author_url=author_url, title=post["title"],
url=settings["fandom_discussions"]["wiki_url"], threadId=post["threadId"],
forumName=post["forumName"])
else:
event_type = "discussion/forum/reply"
message = _(
"[{author}]({author_url}) created a [reply](<{url}f/p/{threadId}/r/{postId}>) to [{title}](<{url}f/p/{threadId}>) in {forumName}").format(
author=author, author_url=author_url, url=settings["fandom_discussions"]["wiki_url"],
threadId=post["threadId"], postId=post["id"], title=post["_embedded"]["thread"][0]["title"],
forumName=post["forumName"])
return DiscordMessage("compact", event_type, ctx.webhook_url, content=message)
# discussion/wall - Wall posts/replies
def compact_author_discussions(post: dict):
"""A common function for a few discussion related foramtters, it's formatting author's name and URL to their profile"""
author = _("unknown") # Fail safe
if post["creatorIp"]:
author = post["creatorIp"][1:] if settings.get("hide_ips", False) is False else _("Unregistered user")
author_url = "<{url}wiki/Special:Contributions{creatorIp}>".format(url=settings["fandom_discussions"]["wiki_url"],
creatorIp=post["creatorIp"])
else:
if post["createdBy"]["name"]:
author = post["createdBy"]["name"]
author_url = clean_link(create_article_path("User:{user}".format(user=author)))
else:
author_url = "<{url}f/u/{creatorId}>".format(url=settings["fandom_discussions"]["wiki_url"],
creatorId=post["creatorId"])
return author, author_url
def embed_author_discussions(post: dict, embed: DiscordMessage):
author = _("unknown") # Fail safe
if post["creatorIp"]:
author = post["creatorIp"][1:]
embed.set_author(author if settings.get("hide_ips", False) is False else _("Unregistered user"),
"{url}wiki/Special:Contributions{creatorIp}".format(
url=settings["fandom_discussions"]["wiki_url"], creatorIp=post["creatorIp"]))
else:
if post["createdBy"]["name"]:
author = post["createdBy"]["name"]
embed.set_author(author, "{url}wiki/User:{creator}".format(url=settings["fandom_discussions"]["wiki_url"],
creator=author.replace(" ", "_")),
icon_url=post["createdBy"]["avatarUrl"])
else:
embed.set_author(author, "{url}f/u/{creatorId}".format(url=settings["fandom_discussions"]["wiki_url"],
creatorId=post["creatorId"]),
icon_url=post["createdBy"]["avatarUrl"])
@formatter.embed(event="discussion/wall")
def embed_discussion_wall(ctx: Context, post: dict):
embed = DiscordMessage("embed", "discussion", settings["fandom_discussions"]["webhookURL"])
common_discussions(post, embed)
embed_author_discussions(post, embed)
user_wall = _("unknown") # Fail safe
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
if not post["isReply"]:
embed.event_type = "discussion/wall/post"
embed["url"] = "{url}wiki/Message_Wall:{user_wall}?threadId={threadId}".format(
url=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")),
threadId=post["threadId"])
embed["title"] = _("Created \"{title}\" on {user}'s Message Wall").format(title=post["title"], user=user_wall)
else:
embed.event_type = "discussion/wall/reply"
embed["url"] = "{url}wiki/Message_Wall:{user_wall}?threadId={threadId}#{replyId}".format(
url=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")),
threadId=post["threadId"], replyId=post["id"])
embed["title"] = _("Replied to \"{title}\" on {user}'s Message Wall").format(
title=post["_embedded"]["thread"][0]["title"], user=user_wall)
return embed
@formatter.compact(event="discussion/wall")
def compact_discussion_wall(ctx: Context, post: dict):
author, author_url = compact_author_discussions(post)
user_wall = _("unknown") # Fail safe
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
if not post["isReply"]:
event_type = "discussion/wall/post"
message = _(
"[{author}]({author_url}) created [{title}](<{url}wiki/Message_Wall:{user_wall}?threadId={threadId}>) on [{user}'s Message Wall](<{url}wiki/Message_Wall:{user_wall}>)").format(
author=author, author_url=author_url, title=post["title"], url=settings["fandom_discussions"]["wiki_url"],
user=user_wall, user_wall=quote_plus(user_wall.replace(" ", "_")), threadId=post["threadId"])
else:
event_type = "discussion/wall/reply"
message = _(
"[{author}]({author_url}) created a [reply](<{url}wiki/Message_Wall:{user_wall}?threadId={threadId}#{replyId}>) to [{title}](<{url}wiki/Message_Wall:{user_wall}?threadId={threadId}>) on [{user}'s Message Wall](<{url}wiki/Message_Wall:{user_wall}>)").format(
author=author, author_url=author_url, url=settings["fandom_discussions"]["wiki_url"],
title=post["_embedded"]["thread"][0]["title"], user=user_wall,
user_wall=quote_plus(user_wall.replace(" ", "_")), threadId=post["threadId"], replyId=post["id"])
return DiscordMessage("compact", event_type, ctx.webhook_url, content=message)
# discussion/article_comment - Article comments
@formatter.embed(event="discussion/article_comment")
def embed_discussion_article_comment(ctx: Context, post: dict):
embed = DiscordMessage("embed", "discussion", settings["fandom_discussions"]["webhookURL"])
common_discussions(post, embed)
embed_author_discussions(post, embed)
article_paths = ctx.comment_page
if article_paths is None:
article_page = {"title": _("unknown"), "fullUrl": settings["fandom_discussions"]["wiki_url"]} # No page known
if not post["isReply"]:
embed.event_type = "discussion/comment/post"
embed["url"] = "{url}?commentId={commentId}".format(url=article_paths["fullUrl"], commentId=post["threadId"])
embed["title"] = _("Commented on {article}").format(article=article_paths["title"])
else:
embed.event_type = "discussion/comment/reply"
embed["url"] = "{url}?commentId={commentId}&replyId={replyId}".format(url=article_paths["fullUrl"],
commentId=post["threadId"],
replyId=post["id"])
embed["title"] = _("Replied to a comment on {article}").format(article=article_paths["title"])
embed["footer"]["text"] = article_paths["title"]
return embed
@formatter.compact(event="discussion/article_comment")
def compact_discussion_article_comment(ctx: Context, post: dict):
author, author_url = compact_author_discussions(post)
article_paths = ctx.comment_page
if article_paths is None:
article_paths = {"title": _("unknown"), "fullUrl": settings["fandom_discussions"]["wiki_url"]} # No page known
article_paths["fullUrl"] = article_paths["fullUrl"].replace(")", "\)").replace("()", "\(")
if not post["isReply"]:
event_type = "discussion/comment/post"
message = _(
"[{author}]({author_url}) created a [comment](<{url}?commentId={commentId}>) on [{article}](<{url}>)").format(
author=author, author_url=author_url, url=article_paths["fullUrl"], article=article_paths["title"],
commentId=post["threadId"])
else:
event_type = "discussion/comment/reply"
message = _(
"[{author}]({author_url}) created a [reply](<{url}?commentId={commentId}&replyId={replyId}>) to a [comment](<{url}?commentId={commentId}>) on [{article}](<{url}>)").format(
author=author, author_url=author_url, url=article_paths["fullUrl"], article=article_paths["title"],
commentId=post["threadId"], replyId=post["id"])
return DiscordMessage("compact", event_type, ctx.webhook_url, content=message)

View file

@ -0,0 +1,108 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
import logging
from src.discord.message import DiscordMessage
from src.api import formatter
from src.i18n import formatters_i18n
from src.api.context import Context
from src.api.util import embed_helper, clean_link, compact_author, create_article_path, sanitize_to_url, compact_summary
_ = formatters_i18n.gettext
ngettext = formatters_i18n.ngettext
# Interwiki - https://www.mediawiki.org/wiki/Extension:Interwiki
# interwiki/iw_add - Added entry to interwiki table
@formatter.embed(event="interwiki/iw_add", mode="embed")
def embed_interwiki_iw_add(ctx: Context, change: dict) -> DiscordMessage:
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change, set_desc=False)
embed["url"] = create_article_path("Special:Interwiki")
embed["title"] = _("Added an entry to the interwiki table")
embed["description"] = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=ctx.parsedcomment,
prefix=change["logparams"]['0'],
website=change["logparams"]['1'])
return embed
@formatter.compact(event="interwiki/iw_add")
def compact_interwiki_iw_add(ctx: Context, change: dict) -> DiscordMessage:
author, author_url = compact_author(ctx, change)
link = clean_link(create_article_path("Special:Interwiki"))
parsed_comment = compact_summary(ctx)
content = _(
"[{author}]({author_url}) added an entry to the [interwiki table]({table_url}) pointing to {website} with {prefix} prefix").format(
author=author, author_url=author_url, desc=parsed_comment, prefix=change["logparams"]['0'],
website=change["logparams"]['1'], table_url=link)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# interwiki/iw_edit - Editing interwiki entry
@formatter.embed(event="interwiki/iw_edit", mode="embed")
def embed_interwiki_iw_edit(ctx: Context, change: dict) -> DiscordMessage:
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change, set_desc=False)
embed["url"] = create_article_path("Special:Interwiki")
embed["title"] = _("Edited an entry in interwiki table")
embed["description"] = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=ctx.parsedcomment,
prefix=change["logparams"]['0'],
website=change["logparams"]['1'])
return embed
@formatter.compact(event="interwiki/iw_edit")
def compact_interwiki_iw_edit(ctx: Context, change: dict) -> DiscordMessage:
author, author_url = compact_author(ctx, change)
link = clean_link(create_article_path("Special:Interwiki"))
parsed_comment = compact_summary(ctx)
content = _(
"[{author}]({author_url}) edited an entry in [interwiki table]({table_url}) pointing to {website} with {prefix} prefix").format(
author=author, author_url=author_url, desc=parsed_comment, prefix=change["logparams"]['0'],
website=change["logparams"]['1'], table_url=link)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# interwiki/iw_delete - Deleting interwiki entry
@formatter.embed(event="interwiki/iw_delete", mode="embed")
def embed_interwiki_iw_delete(ctx: Context, change: dict) -> DiscordMessage:
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change, set_desc=False)
embed["url"] = create_article_path("Special:Interwiki")
embed["title"] = _("Deleted an entry in interwiki table")
embed["description"] = _("Prefix: {prefix} | {desc}").format(desc=ctx.parsedcomment,
prefix=change["logparams"]['0'])
return embed
@formatter.compact(event="interwiki/iw_delete")
def compact_interwiki_iw_delete(ctx: Context, change: dict) -> DiscordMessage:
author, author_url = compact_author(ctx, change)
link = clean_link(create_article_path("Special:Interwiki"))
parsed_comment = compact_summary(ctx)
content = _("[{author}]({author_url}) deleted an entry in [interwiki table]({table_url}){desc}").format(
author=author,
author_url=author_url,
table_url=link,
desc=parsed_comment)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)

View file

@ -0,0 +1,229 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
from src.discord.message import DiscordMessage
from src.api import formatter
from src.i18n import formatters_i18n
from src.api.context import Context
from src.api.util import embed_helper, compact_author, create_article_path, sanitize_to_markdown, sanitize_to_url, compact_summary
_ = formatters_i18n.gettext
ngettext = formatters_i18n.ngettext
# ManageWiki - https://www.mediawiki.org/wiki/Special:MyLanguage/Extension:ManageWiki
# managewiki/settings - Changing wiki settings
@formatter.embed(event="managewiki/settings")
def embed_managewiki_settings(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Changed wiki settings")
if change["logparams"].get("changes", ""):
embed.add_field("Setting", sanitize_to_markdown(change["logparams"].get("changes")))
return embed
@formatter.compact(event="managewiki/settings")
def compact_managewiki_settings(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
content = _("[{author}]({author_url}) changed wiki settings{reason}".format(author=author, author_url=author_url, reason=parsed_comment))
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# managewiki/delete - Deleting a wiki
@formatter.embed(event="managewiki/delete")
def embed_managewiki_delete(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Deleted a \"{wiki}\" wiki").format(wiki=change["logparams"].get("wiki", _("Unknown")))
return embed
@formatter.compact(event="managewiki/delete")
def compact_managewiki_delete(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
content = _("[{author}]({author_url}) deleted a wiki *{wiki_name}*{comment}").format(author=author,
author_url=author_url,
wiki_name=change[
"logparams"].get("wiki",
_("Unknown")),
comment=parsed_comment)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# managewiki/delete-group - Deleting a group
@formatter.embed(event="managewiki/delete-group")
def embed_managewiki_delete_group(ctx: Context, change: dict) -> DiscordMessage:
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
group = change["title"].split("/")[-1]
embed["title"] = _("Deleted a \"{group}\" user group").format(wiki=group)
return embed
@formatter.compact(event="managewiki/delete-group")
def compact_managewiki_delete_group(ctx: Context, change: dict) -> DiscordMessage:
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
group = change["title"].split("/")[-1]
content = _("[{author}]({author_url}) deleted a usergroup *{group}*{comment}").format(author=author,
author_url=author_url,
group=group,
comment=parsed_comment)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# managewiki/lock - Locking a wiki
@formatter.embed(event="managewiki/lock")
def embed_managewiki_lock(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Locked a \"{wiki}\" wiki").format(wiki=change["logparams"].get("wiki", _("Unknown")))
return embed
@formatter.compact(event="managewiki/lock")
def compact_managewiki_lock(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
content = _("[{author}]({author_url}) locked a wiki *{wiki_name}*{comment}").format(
author=author, author_url=author_url, wiki_name=change["logparams"].get("wiki", _("Unknown")),
comment=parsed_comment)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# managewiki/namespaces - Modirying a wiki namespace
@formatter.embed(event="managewiki/namespaces")
def embed_managewiki_namespaces(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Modified \"{namespace_name}\" namespace").format(
namespace_name=change["logparams"].get("namespace", _("Unknown")))
embed.add_field(_('Wiki'), change["logparams"].get("wiki", _("Unknown")))
return embed
@formatter.compact(event="managewiki/namespaces")
def compact_managewiki_namespaces(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
content = _("[{author}]({author_url}) modified namespace *{namespace_name}* on *{wiki_name}*{comment}").format(
author=author, author_url=author_url, namespace_name=change["logparams"].get("namespace", _("Unknown")),
wiki_name=change["logparams"].get("wiki", _("Unknown")), comment=parsed_comment)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# managewiki/namespaces-delete - Deleteing a namespace
@formatter.embed(event="managewiki/namespaces-delete")
def embed_managewiki_namespaces_delete(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Deleted a \"{namespace_name}\" namespace").format(
namespace_name=change["logparams"].get("namespace", _("Unknown")))
embed.add_field(_('Wiki'), change["logparams"].get("wiki", _("Unknown")))
return embed
@formatter.compact(event="managewiki/namespaces-delete")
def compact_managewiki_namespaces_delete(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
content = _(
"[{author}]({author_url}) deleted a namespace *{namespace_name}* on *{wiki_name}*{comment}").format(
author=author, author_url=author_url,
namespace_name=change["logparams"].get("namespace", _("Unknown")),
wiki_name=change["logparams"].get("wiki", _("Unknown")), comment=parsed_comment)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# managewiki/rights - Modifying user groups
@formatter.embed(event="managewiki/rights")
def embed_managewiki_rights(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
group_name = change["title"].split("/permissions/", 1)[1]
embed["title"] = _("Modified \"{usergroup_name}\" usergroup").format(usergroup_name=group_name)
return embed
@formatter.compact(event="managewiki/rights")
def compact_managewiki_rights(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
group_name = change["title"].split("/permissions/", 1)[1]
content = _("[{author}]({author_url}) modified user group *{group_name}*{comment}").format(
author=author, author_url=author_url, group_name=group_name, comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# managewiki/undelete - Restoring a wiki
@formatter.embed(event="managewiki/undelete")
def embed_managewiki_undelete(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Undeleted a \"{wiki}\" wiki").format(wiki=change["logparams"].get("wiki", _("Unknown")))
return embed
@formatter.compact(event="managewiki/undelete")
def compact_managewiki_undelete(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
content = _("[{author}]({author_url}) undeleted a wiki *{wiki_name}*{comment}").format(
author=author, author_url=author_url, wiki_name=change["logparams"].get("wiki", _("Unknown")),
comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# managewiki/unlock - Unlocking a wiki
@formatter.embed(event="managewiki/unlock")
def embed_managewiki_unlock(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Unlocked a \"{wiki}\" wiki").format(wiki=change["logparams"].get("wiki", _("Unknown")))
return embed
@formatter.compact(event="managewiki/unlock")
def compact_managewiki_unlock(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
content = _("[{author}]({author_url}) unlocked a wiki *{wiki_name}*{comment}").format(
author=author, author_url=author_url, wiki_name=change["logparams"].get("wiki", _("Unknown")),
comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)

1190
extensions/base/mediawiki.py Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,70 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
import logging
from src.discord.message import DiscordMessage
from src.api import formatter
from src.i18n import formatters_i18n
from src.api.context import Context
from src.api.util import embed_helper, compact_summary, clean_link, compact_author, create_article_path, sanitize_to_markdown, sanitize_to_url
_ = formatters_i18n.gettext
ngettext = formatters_i18n.ngettext
# Renameuser - https://www.mediawiki.org/wiki/Extension:Renameuser
# renameuser/renameuser - Renaming a user
@formatter.embed(event="renameuser/renameuser")
def embed_renameuser_renameuser(ctx: Context, change: dict) -> DiscordMessage:
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
edits = change["logparams"]["edits"]
if edits > 0:
embed["title"] = ngettext("Renamed user \"{old_name}\" with {edits} edit to \"{new_name}\"",
"Renamed user \"{old_name}\" with {edits} edits to \"{new_name}\"", edits).format(
old_name=sanitize_to_markdown(change["logparams"]["olduser"]), edits=edits,
new_name=sanitize_to_markdown(change["logparams"]["newuser"]))
else:
embed["title"] = _("Renamed user \"{old_name}\" to \"{new_name}\"").format(
old_name=sanitize_to_markdown(change["logparams"]["olduser"]),
new_name=sanitize_to_markdown(change["logparams"]["newuser"]))
embed["url"] = create_article_path("User:" + sanitize_to_url(change["logparams"]["newuser"]))
return embed
@formatter.compact(event="renameuser/renameuser")
def compact_renameuser_renameuser(ctx: Context, change: dict) -> DiscordMessage:
author, author_url = compact_author(ctx, change)
link = clean_link(create_article_path("User:" + sanitize_to_url(change["logparams"]["newuser"])))
edits = change["logparams"]["edits"]
parsed_comment = compact_summary(ctx)
if edits > 0:
content = ngettext(
"[{author}]({author_url}) renamed user *{old_name}* with {edits} edit to [{new_name}]({link}){comment}",
"[{author}]({author_url}) renamed user *{old_name}* with {edits} edits to [{new_name}]({link}){comment}",
edits).format(
author=author, author_url=author_url, old_name=sanitize_to_markdown(change["logparams"]["olduser"]),
edits=edits,
new_name=sanitize_to_markdown(change["logparams"]["newuser"]), link=link, comment=parsed_comment
)
else:
content = _("[{author}]({author_url}) renamed user *{old_name}* to [{new_name}]({link}){comment}").format(
author=author, author_url=author_url, old_name=sanitize_to_markdown(change["logparams"]["olduser"]),
new_name=sanitize_to_markdown(change["logparams"]["newuser"]), link=link, comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)

91
extensions/base/sprite.py Normal file
View file

@ -0,0 +1,91 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
import logging
from src.discord.message import DiscordMessage
from src.api import formatter
from src.i18n import formatters_i18n
from src.api.context import Context
from src.api.util import embed_helper, compact_author, create_article_path, sanitize_to_markdown, sanitize_to_url, \
clean_link
_ = formatters_i18n.gettext
ngettext = formatters_i18n.ngettext
# SpriteSheet - https://www.mediawiki.org/wiki/Extension:SpriteSheet
# sprite/sprite - Editing a sprite
@formatter.embed(event="sprite/sprite")
def embed_sprite_sprite(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Edited the sprite for {article}").format(article=sanitize_to_markdown(change["title"]))
return embed
@formatter.compact(event="sprite/sprite")
def compact_sprite_sprite(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
content = _("[{author}]({author_url}) edited the sprite for [{article}]({article_url})").format(author=author,
author_url=author_url,
article=sanitize_to_markdown(change[
"title"]),
article_url=link)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# sprite/sheet - Creating a sprite sheet
@formatter.embed(event="sprite/sheet")
def embed_sprite_sheet(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Created the sprite sheet for {article}").format(article=sanitize_to_markdown(change["title"]))
return embed
@formatter.compact(event="sprite/sheet")
def compact_sprite_sheet(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
content = _("[{author}]({author_url}) created the sprite sheet for [{article}]({article_url})").format(author=author, author_url=author_url, article=sanitize_to_markdown(change["title"]), article_url=link)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# sprite/slice - Editing a slice
@formatter.embed(event="sprite/slice")
def embed_sprite_slice(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Edited the slice for {article}").format(article=sanitize_to_markdown(change["title"]))
return embed
@formatter.compact(event="sprite/slice")
def compact_sprite_slice(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
content = _("[{author}]({author_url}) edited the slice for [{article}]({article_url})").format(author=author,
author_url=author_url,
article=sanitize_to_markdown(change[
"title"]),
article_url=link)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)

View file

@ -0,0 +1,482 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
import logging
from src.discord.message import DiscordMessage
from src.api import formatter
from src.i18n import formatters_i18n
from src.api.context import Context
from src.api.util import embed_helper, compact_author, create_article_path, sanitize_to_markdown, sanitize_to_url, \
clean_link, compact_summary
_ = formatters_i18n.gettext
ngettext = formatters_i18n.ngettext
# I cried when I realized I have to migrate Translate extension logs, but this way I atone for my countless sins
# Translate - https://www.mediawiki.org/wiki/Extension:Translate
# pagetranslation/mark - Marking a page for translation
@formatter.embed(event="pagetranslation/mark")
def embed_pagetranslation_mark(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
link = create_article_path(sanitize_to_url(change["title"]))
if "?" in link:
embed["url"] = link + "&oldid={}".format(change["logparams"]["revision"])
else:
embed["url"] = link + "?oldid={}".format(change["logparams"]["revision"])
embed["title"] = _("Marked \"{article}\" for translation").format(article=sanitize_to_markdown(change["title"]))
return embed
@formatter.compact(event="pagetranslation/mark")
def compact_pagetranslation_mark(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
link = create_article_path(sanitize_to_url(change["title"]))
if "?" in link:
link = link + "&oldid={}".format(change["logparams"]["revision"])
else:
link = link + "?oldid={}".format(change["logparams"]["revision"])
link = clean_link(link)
parsed_comment = compact_summary(ctx)
content = _("[{author}]({author_url}) marked [{article}]({article_url}) for translation{comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), article_url=link,
comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# pagetranslation/unmark - Removing a page from translation system
@formatter.embed(event="pagetranslation/unmark")
def embed_pagetranslation_unmark(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Removed \"{article}\" from the translation system").format(article=sanitize_to_markdown(change["title"]))
return embed
@formatter.compact(event="pagetranslation/unmark")
def compact_pagetranslation_unmark(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
content = _(
"[{author}]({author_url}) removed [{article}]({article_url}) from the translation system{comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), article_url=link,
comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# pagetranslation/moveok - Completed moving translation page
@formatter.embed(event="pagetranslation/moveok")
def embed_pagetranslation_moveok(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["logparams"]["target"]))
embed["title"] = _("Completed moving translation pages from \"{article}\" to \"{target}\"").format(
article=sanitize_to_markdown(change["title"]), target=sanitize_to_markdown(change["logparams"]["target"]))
return embed
@formatter.compact(event="pagetranslation/moveok")
def compact_pagetranslation_moveok(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
link = clean_link(create_article_path(sanitize_to_url(change["logparams"]["target"])))
content = _(
"[{author}]({author_url}) completed moving translation pages from *{article}* to [{target}]({target_url}){comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), target=sanitize_to_markdown(change["logparams"]["target"]),
target_url=link, comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# pagetranslation/movenok - Failed while moving translation page
@formatter.embed(event="pagetranslation/movenok")
def embed_pagetranslation_movenok(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Encountered a problem while moving \"{article}\" to \"{target}\"").format(
article=sanitize_to_markdown(change["title"]), target=sanitize_to_markdown(change["logparams"]["target"]))
return embed
@formatter.compact(event="pagetranslation/movenok")
def compact_pagetranslation_movenok(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
target_url = clean_link(create_article_path(sanitize_to_url(change["logparams"]["target"])))
content = _(
"[{author}]({author_url}) encountered a problem while moving [{article}]({article_url}) to [{target}]({target_url}){comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), article_url=link,
target=sanitize_to_markdown(change["logparams"]["target"]), target_url=target_url,
comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# pagetranslation/deletefnok - Failure in deletion of translatable page
@formatter.embed(event="pagetranslation/deletefnok")
def embed_pagetranslation_deletefnok(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Failed to delete \"{article}\" which belongs to translatable page \"{target}\"").format(
article=sanitize_to_markdown(change["title"]), target=sanitize_to_markdown(change["logparams"]["target"]))
return embed
@formatter.compact(event="pagetranslation/deletefnok")
def compact_pagetranslation_deletefnok(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
target_url = clean_link(create_article_path(sanitize_to_url(change["logparams"]["target"])))
content = _(
"[{author}]({author_url}) failed to delete [{article}]({article_url}) which belongs to translatable page [{target}]({target_url}){comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), article_url=link,
target=sanitize_to_markdown(change["logparams"]["target"]), target_url=target_url,
comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# pagetranslation/deletelok - Completion in deleting a page?
@formatter.embed(event="pagetranslation/deletelok")
def embed_pagetranslation_deletelok(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Completed deletion of translation page \"{article}\"").format(
article=sanitize_to_markdown(change["title"]))
return embed
@formatter.compact(event="pagetranslation/deletelok")
def compact_pagetranslation_deletelok(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
content = _(
"[{author}]({author_url}) completed deletion of translation page [{article}]({article_url}){comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), article_url=link,
comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# pagetranslation/deletelnok - Failure in deletion of article belonging to a translation page
@formatter.embed(event="pagetranslation/deletelnok")
def embed_pagetranslation_deletelnok(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Failed to delete \"{article}\" which belongs to translation page \"{target}\"").format(
article=sanitize_to_markdown(change["title"]), target=sanitize_to_markdown(change["logparams"]["target"]))
return embed
@formatter.compact(event="pagetranslation/deletelnok")
def compact_pagetranslation_deletelnok(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
target_url = clean_link(create_article_path(sanitize_to_url(change["logparams"]["target"])))
content = _(
"[{author}]({author_url}) failed to delete [{article}]({article_url}) which belongs to translation page [{target}]({target_url}){comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), article_url=link,
target=sanitize_to_markdown(change["logparams"]["target"]), target_url=target_url,
comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# pagetranslation/encourage - Encouraging to translate an article
@formatter.embed(event="pagetranslation/encourage")
def embed_pagetranslation_encourage(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Encouraged translation of \"{article}\"").format(article=sanitize_to_markdown(change["title"]))
return embed
@formatter.compact(event="pagetranslation/encourage")
def compact_pagetranslation_encourage(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
content = _("[{author}]({author_url}) encouraged translation of [{article}]({article_url}){comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), article_url=link,
comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# pagetranslation/discourage - Discouraging to translate an article
@formatter.embed(event="pagetranslation/discourage")
def embed_pagetranslation_discourage(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Discouraged translation of \"{article}\"").format(article=sanitize_to_markdown(change["title"]))
return embed
@formatter.compact(event="pagetranslation/discourage")
def compact_pagetranslation_discourage(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
content = _("[{author}]({author_url}) discouraged translation of [{article}]({article_url}){comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), article_url=link,
comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# pagetranslation/prioritylanguages - Changing the priority of translations?
@formatter.embed(event="pagetranslation/prioritylanguages")
def embed_pagetranslation_prioritylanguages(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
if "languages" in change["logparams"]:
languages = "`, `".join(change["logparams"]["languages"].split(","))
if change["logparams"]["force"] == "on":
embed["title"] = _("Limited languages for \"{article}\" to `{languages}`").format(article=sanitize_to_markdown(change["title"]),
languages=languages)
else:
embed["title"] = _("Priority languages for \"{article}\" set to `{languages}`").format(
article=sanitize_to_markdown(change["title"]), languages=languages)
else:
embed["title"] = _("Removed priority languages from \"{article}\"").format(article=sanitize_to_markdown(change["title"]))
return embed
@formatter.compact(event="pagetranslation/prioritylanguages")
def compact_pagetranslation_prioritylanguages(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
if "languages" in change["logparams"]:
languages = "`, `".join(change["logparams"]["languages"].split(","))
if change["logparams"]["force"] == "on":
content = _(
"[{author}]({author_url}) limited languages for [{article}]({article_url}) to `{languages}`{comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), article_url=link,
languages=languages, comment=parsed_comment
)
else:
content = _(
"[{author}]({author_url}) set the priority languages for [{article}]({article_url}) to `{languages}`{comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), article_url=link,
languages=languages, comment=parsed_comment
)
else:
content = _(
"[{author}]({author_url}) removed priority languages from [{article}]({article_url}){comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), article_url=link,
comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# pagetranslation/associate - Adding an article to translation group
@formatter.embed(event="pagetranslation/associate")
def embed_pagetranslation_associate(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Added translatable page \"{article}\" to aggregate group \"{group}\"").format(
article=sanitize_to_markdown(change["title"]), group=change["logparams"]["aggregategroup"])
return embed
@formatter.compact(event="pagetranslation/associate")
def compact_pagetranslation_associate(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
content = _(
"[{author}]({author_url}) added translatable page [{article}]({article_url}) to aggregate group \"{group}\"{comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), article_url=link,
group=change["logparams"]["aggregategroup"], comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# pagetranslation/dissociate - Removing an article from translation group
@formatter.embed(event="pagetranslation/dissociate")
def embed_pagetranslation_dissociate(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Removed translatable page \"{article}\" from aggregate group \"{group}\"").format(
article=sanitize_to_markdown(change["title"]), group=change["logparams"]["aggregategroup"])
return embed
@formatter.compact(event="pagetranslation/dissociate")
def compact_pagetranslation_dissociate(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
content = _(
"[{author}]({author_url}) removed translatable page [{article}]({article_url}) from aggregate group \"{group}\"{comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), article_url=link,
group=change["logparams"]["aggregategroup"], comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# translationreview/message - Reviewing translation
@formatter.embed(event="translationreview/message")
def embed_translationreview_message(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
link = create_article_path(sanitize_to_url(change["title"]))
if "?" in link:
embed["url"] = link + "&oldid={}".format(change["logparams"]["revision"])
else:
embed["url"] = link + "?oldid={}".format(change["logparams"]["revision"])
embed["title"] = _("Reviewed translation \"{article}\"").format(article=sanitize_to_markdown(change["title"]))
return embed
@formatter.compact(event="translationreview/message")
def compact_translationreview_message(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
link = create_article_path(sanitize_to_url(change["title"]))
if "?" in link:
link = link + "&oldid={}".format(change["logparams"]["revision"])
else:
link = link + "?oldid={}".format(change["logparams"]["revision"])
link = clean_link(link)
content = _("[{author}]({author_url}) reviewed translation [{article}]({article_url}){comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), article_url=link,
comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# translationreview/group - Changing of state for group translation?
@formatter.embed(event="translationreview/group")
def embed_translationreview_group(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
embed["title"] = _("Changed the state of `{language}` translations of \"{article}\"").format(
language=change["logparams"]["language"], article=sanitize_to_markdown(change["title"]))
if "old-state" in change["logparams"]:
embed.add_field(_("Old state"), change["logparams"]["old-state"], inline=True)
embed.add_field(_("New state"), change["logparams"]["new-state"], inline=True)
return embed
@formatter.compact(event="translationreview/group")
def compact_translationreview_group(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
if "old-state" in change["logparams"]:
content = _(
"[{author}]({author_url}) changed the state of `{language}` translations of [{article}]({article_url}) from `{old_state}` to `{new_state}`{comment}").format(
author=author, author_url=author_url, language=change["logparams"]["language"],
article=sanitize_to_markdown(change["logparams"]["group-label"]), article_url=link,
old_state=change["logparams"]["old-state"], new_state=change["logparams"]["new-state"],
comment=parsed_comment
)
else:
content = _(
"[{author}]({author_url}) changed the state of `{language}` translations of [{article}]({article_url}) to `{new_state}`{comment}").format(
author=author, author_url=author_url, language=change["logparams"]["language"],
article=sanitize_to_markdown(change["logparams"]["group-label"]), article_url=link,
new_state=change["logparams"]["new-state"], comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# pagelang/pagelang - Changing the language of a page
def get_languages(change):
old_lang = "`{}`".format(change["logparams"]["oldlanguage"])
if change["logparams"]["oldlanguage"][-5:] == "[def]":
old_lang = "`{}` {}".format(change["logparams"]["oldlanguage"][:-5], _("(default)"))
new_lang = "`{}`".format(change["logparams"]["newlanguage"])
if change["logparams"]["newlanguage"][-5:] == "[def]":
new_lang = "`{}` {}".format(change["logparams"]["oldlanguage"][:-5], _("(default)"))
return old_lang, new_lang
@formatter.embed(event="pagelang/pagelang")
def embed_pagelang_pagelang(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
old_lang, new_lang = get_languages(change)
embed["title"] = _("Changed the language of \"{article}\"").format(article=sanitize_to_markdown(change["title"]))
embed.add_field(_("Old language"), old_lang, inline=True)
embed.add_field(_("New language"), new_lang, inline=True)
return embed
@formatter.compact(event="pagelang/pagelang")
def compact_pagelang_pagelang(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
parsed_comment = compact_summary(ctx)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
old_lang, new_lang = get_languages(change)
content = _(
"[{author}]({author_url}) changed the language of [{article}]({article_url}) from {old_lang} to {new_lang}{comment}").format(
author=author, author_url=author_url,
article=sanitize_to_markdown(change["title"]), article_url=link,
old_lang=old_lang, new_lang=new_lang, comment=parsed_comment
)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)

View file

@ -0,0 +1,15 @@
# This file is part of Recent changes Goat compatible Discord webhook (RcGcDw).
#
# RcGcDw is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDw. If not, see <http://www.gnu.org/licenses/>.

0
src/api/__init__.py Normal file
View file

96
src/api/client.py Normal file
View file

@ -0,0 +1,96 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
import src.misc
from typing import Union
from collections import OrderedDict
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from src.wiki import Wiki
class Client:
"""
A client for interacting with RcGcDw when creating formatters or hooks.
"""
def __init__(self, hooks, wiki):
self._formatters = hooks
self.__recent_changes: Wiki = wiki
self.WIKI_API_PATH: str = src.misc.WIKI_API_PATH
self.WIKI_ARTICLE_PATH: str = src.misc.WIKI_ARTICLE_PATH
self.WIKI_SCRIPT_PATH: str = src.misc.WIKI_SCRIPT_PATH
self.WIKI_JUST_DOMAIN: str = src.misc.WIKI_JUST_DOMAIN
self.content_parser = src.misc.ContentParser
self.tags = self.__recent_changes.tags
self.LinkParser: type(src.misc.LinkParser) = src.misc.LinkParser
#self.make_api_request: src.rc.wiki.__recent_changes.api_request = self.__recent_changes.api_request
def refresh_internal_data(self):
"""Refreshes internal storage data for wiki tags and MediaWiki messages."""
self.__recent_changes.init_info()
@property
def namespaces(self) -> dict:
"""Return a dict of namespaces, if None return empty dict"""
if self.__recent_changes.namespaces is not None:
return self.__recent_changes.namespaces
else:
return dict()
def parse_links(self, summary: str):
link_parser = self.LinkParser()
link_parser.feed(summary)
return link_parser.new_string
def pull_curseprofile_comment(self, comment_id) -> Optional[str]:
"""Pulls a CurseProfile comment for current wiki set in the settings and with comment_id passed as an argument.
Returns:
String if comment was possible to be fetched
None if not
"""
return self.__recent_changes.pull_comment(comment_id)
def make_api_request(self, params: Union[str, OrderedDict], *json_path: str, timeout: int = 10, allow_redirects: bool = False):
"""Method to GET request data from the wiki's API with error handling including recognition of MediaWiki errors.
Parameters:
params (str, OrderedDict): a string or collections.OrderedDict object containing query parameters
json_path (str): *args taking strings as values. After request is parsed as json it will extract data from given json path
timeout (int, float) (default=10): int or float limiting time required for receiving a full response from a server before returning TimeoutError
allow_redirects (bool) (default=False): switches whether the request should follow redirects or not
Returns:
request_content (dict): a dict resulting from json extraction of HTTP GET request with given json_path
OR
One of the following exceptions:
ServerError: When connection with the wiki failed due to server error
ClientError: When connection with the wiki failed due to client error
KeyError: When json_path contained keys that weren't found in response JSON response
BadRequest: When params argument is of wrong type
MediaWikiError: When MediaWiki returns an error
"""
return self.__recent_changes.api_request(params, *json_path, timeout=timeout, allow_redirects=allow_redirects)
def get_formatters(self):
return self._formatters
def get_ipmapper(self) -> dict:
"""Returns a dict mapping IPs with amount of their edits"""
return self.__recent_changes.map_ips

40
src/api/context.py Normal file
View file

@ -0,0 +1,40 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from src.api.client import Client
class Context:
"""Context object containing client and some metadata regarding specific formatter call"""
def __init__(self, message_type: str, webhook_url: str, client: Client):
self.client = client
self.webhook_url = webhook_url
self.message_type = message_type
self.categories = None
self.parsedcomment = None
self.event = None
self.comment_page = None
def set_categories(self, cats):
self.categories = cats
def set_parsedcomment(self, parsedcomment: str):
self.parsedcomment = parsedcomment
def set_comment_page(self, page):
self.comment_page = page

76
src/api/formatter.py Normal file
View file

@ -0,0 +1,76 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
import src.api.hooks
import logging
from src.configloader import settings
from src.exceptions import FormatterBreaksAPISpec
from src.discord.message import DiscordMessage
from typing import Optional, Callable
logger = logging.getLogger("src.api.formatter")
def _register_formatter(func, kwargs, formatter_type: str, action_type=None):
"""
Registers a formatter inside of src.rcgcdw.formatter_hooks
"""
try:
_, action = func.__name__.split("_", 1)
etype = func.__module__
action_type = f"{etype}/{action}"
except ValueError:
raise
action_type = kwargs.get("event", action_type)
if action_type is None:
raise FormatterBreaksAPISpec("event type")
for act in [action_type] + kwargs.get("aliases", []): # Make action_type string a list and merge with aliases
if act in src.api.hooks.formatter_hooks[formatter_type]:
logger.warning(f"Action {act} is already defined inside of "
f"{src.api.hooks.formatter_hooks[formatter_type][act].__module__}! "
f"Overwriting it with one from {func.__module__}")
src.api.hooks.formatter_hooks[formatter_type][act] = func
def embed(**kwargs):
"""
Decorator to register a formatter are return a function
:key event: Event string
:key mode: Discord Message mode
:key aliases: Allows to register multiple events under same function
:return:
"""
def decorator_cont(func: Callable[[dict], DiscordMessage]):
_register_formatter(func, kwargs, "embed")
return func
return decorator_cont
def compact(**kwargs):
"""
Decorator to register a formatter are return a function
:key event: Event string
:key mode: Discord Message mode
:key aliases: Allows to register multiple events under same function
:return:
"""
def decorator_cont(func: Callable[[dict], DiscordMessage]):
_register_formatter(func, kwargs, "compact")
return func
return decorator_cont

36
src/api/hook.py Normal file
View file

@ -0,0 +1,36 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
import src.api.hooks
def pre_hook(func):
"""
Decorator to register a pre hook and return a function
:return: func
"""
src.api.hooks.pre_hooks.append(func)
return func
def post_hook(func):
"""
Decorator to register a post hook and return a function
:return: func
"""
src.api.hooks.post_hooks.append(func)
return func

20
src/api/hooks.py Normal file
View file

@ -0,0 +1,20 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
# Made just to avoid circular imports
from typing import Callable
formatter_hooks: dict[str, dict[str, Callable]] = {"embed": {}, "compact": {}}
pre_hooks = []
post_hooks = []

170
src/api/util.py Normal file
View file

@ -0,0 +1,170 @@
# This file is part of Recent changes Goat compatible Discord bot (RcGcDb).
#
# RcGcDb is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RcGcDw is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RcGcDb. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
import re
from urllib.parse import quote
from typing import Optional, Callable, TYPE_CHECKING
from src.exceptions import ServerError, MediaWikiError
from src.discord.message import DiscordMessage
from src.configloader import settings
import src.misc
import logging
from src.i18n import formatters_i18n
_ = formatters_i18n.gettext
if TYPE_CHECKING:
from src.api.context import Context
logger = logging.getLogger("src.api.util")
def default_message(event: str, formatter_hooks: dict) -> Callable:
"""Returns a method of a formatter responsible for the event or None if such does not exist."""
return formatter_hooks.get(event, formatter_hooks.get("generic", formatter_hooks["no_formatter"]))
def clean_link(link: str) -> str:
"""Adds <> around the link to prevent its embedding"""
return "<" + link.replace(" ", "_") + ">"
def sanitize_to_markdown(text: str) -> str:
"""Sanitizes given text to escape markdown formatting. It is used in values that will be visible on Discord in messages"""
return re.sub(r"([`_*~:<>{}@|\\])", "\\\\\\1", text).replace('//', "/\\/").replace('](', "]\\(")
def sanitize_to_url(text: str) -> str: # TODO ) replaces needed?
"""Formats a string in a way where it can be safely added to a URL without breaking MediaWiki URL schema"""
return quote(text, " /:").replace(' ', "_").replace(")", "%29")
def parse_mediawiki_changes(ctx: Context, content: str, embed: DiscordMessage) -> None:
"""Parses MediaWiki changes and adds them to embed as fields "Added" and "Removed" """
edit_diff = ctx.client.content_parser()
edit_diff.feed(content)
if edit_diff.small_prev_del:
if edit_diff.small_prev_del.replace("~~", "").isspace():
edit_diff.small_prev_del = _('__Only whitespace__')
else:
edit_diff.small_prev_del = edit_diff.small_prev_del.replace("~~~~", "")
if edit_diff.small_prev_ins:
if edit_diff.small_prev_ins.replace("**", "").isspace():
edit_diff.small_prev_ins = _('__Only whitespace__')
else:
edit_diff.small_prev_ins = edit_diff.small_prev_ins.replace("****", "")
logger.debug("Changed content: {}".format(edit_diff.small_prev_ins))
if edit_diff.small_prev_del and not ctx.event == "new":
embed.add_field(_("Removed"), "{data}".format(data=edit_diff.small_prev_del), inline=True)
if edit_diff.small_prev_ins:
embed.add_field(_("Added"), "{data}".format(data=edit_diff.small_prev_ins), inline=True)
def create_article_path(article: str) -> str:
"""Takes the string and creates an URL with it as the article name"""
return src.misc.WIKI_ARTICLE_PATH.replace("$1", article)
def compact_summary(ctx: Context) -> str:
"""Creates a comment for compact formatters"""
if ctx.parsedcomment:
return " *({})*".format(ctx.parsedcomment)
return ""
def compact_author(ctx: Context, change: dict) -> (Optional[str], Optional[str]):
"""Returns link to the author and the author itself respecting the settings"""
author, author_url = None, None
if ctx.event != "suppressed":
author_url = clean_link(create_article_path("User:{user}".format(user=sanitize_to_url(change["user"]))))
if "anon" in change:
if settings.get("hide_ips", False):
author = _("Unregistered user")
else:
author = change["user"]
else:
author = change["user"]
return author, author_url
def embed_helper(ctx: Context, message: DiscordMessage, change: dict, set_user=True, set_edit_meta=True, set_desc=True) -> None:
"""Helps in preparing common edit/log fields for events. Passed arguments automatically become saturated with needed data.
All automatic setups can be disabled by setting relevant variable to False
Currently handles:
setting usernames (handles according to settings, specific options set in the settings: hide_ips)
adding category fields (if there are any specified categories in the edit)
adding tags (if the log is tagged anyhow)
setting default description (to ctx.parsedcomment)"""
if set_user:
author = None
if "anon" in change:
author_url = create_article_path("Special:Contributions/{user}".format(user=sanitize_to_url(change["user"])))
ip_mapper = ctx.client.get_ipmapper()
logger.debug("current user: {} with cache of IPs: {}".format(change["user"], ip_mapper.keys()))
if change["user"] not in list(ip_mapper.keys()):
try:
contibs = ctx.client.make_api_request(
"?action=query&format=json&list=usercontribs&uclimit=max&ucuser={user}&ucstart={timestamp}&ucprop=".format(
user=sanitize_to_url(change["user"]), timestamp=change["timestamp"]), "query",
"usercontribs")
except (ServerError, MediaWikiError):
logger.warning("WARNING: Something went wrong when checking amount of contributions for given IP address")
if settings.get("hide_ips", False):
author = _("Unregistered user")
else:
author = change["user"] + "(?)"
else:
ip_mapper[change["user"]] = len(contibs)
logger.debug("Current params user {} and state of map_ips {}".format(change["user"], ip_mapper))
if settings.get("hide_ips", False):
author = _("Unregistered user")
else:
author = "{author} ({contribs})".format(author=change["user"], contribs=len(contibs))
else:
logger.debug("Current params user {} and state of map_ips {}".format(change["user"], ip_mapper))
if ctx.event in ("edit", "new"):
ip_mapper[change["user"]] += 1
author = "{author} ({amount})".format(
author=change["user"] if settings.get("hide_ips", False) is False else _("Unregistered user"),
amount=ip_mapper[change["user"]])
else:
author_url = create_article_path("User:{}".format(sanitize_to_url(change["user"])))
author = change["user"]
message.set_author(author, author_url)
if set_edit_meta:
if settings["appearance"]["embed"]["show_footer"]:
message["timestamp"] = change["timestamp"]
if "tags" in change and change["tags"]:
tag_displayname = []
for tag in change["tags"]:
if tag in ctx.client.tags:
if ctx.client.tags[tag] is None:
continue # Ignore hidden tags
else:
tag_displayname.append(ctx.client.tags[tag])
else:
tag_displayname.append(tag)
message.add_field(formatters_i18n.pgettext("recent changes Tags", "Tags"), ", ".join(tag_displayname))
if ctx.categories is not None and not (len(ctx.categories["new"]) == 0 and len(ctx.categories["removed"]) == 0):
new_cat = (_("**Added**: ") + ", ".join(list(ctx.categories["new"])[0:16]) + (
"\n" if len(ctx.categories["new"]) <= 15 else _(" and {} more\n").format(
len(ctx.categories["new"]) - 15))) if ctx.categories["new"] else ""
del_cat = (_("**Removed**: ") + ", ".join(list(ctx.categories["removed"])[0:16]) + (
"" if len(ctx.categories["removed"]) <= 15 else _(" and {} more").format(
len(ctx.categories["removed"]) - 15))) if ctx.categories["removed"] else ""
message.add_field(_("Changed categories"), new_cat + del_cat)
if set_desc:
message["description"] = ctx.parsedcomment

View file

@ -131,6 +131,7 @@ class DiscordMessage:
def set_name(self, name):
self.webhook_object["username"] = name
def stack_message_list(messages: list) -> list:
if len(messages) > 1:
if messages[0].message_type() == "embed":
@ -254,3 +255,16 @@ async def handle_discord_http(code: int, formatted_embed: str, result: aiohttp.C
return 3
else:
return 4
class DiscordMessageMetadata:
def __init__(self, method, log_id = None, page_id = None, rev_id = None, webhook_url = None, new_data = None):
self.method = method
self.page_id = page_id
self.log_id = log_id
self.rev_id = rev_id
self.webhook_url = webhook_url
self.new_data = new_data
def dump_ids(self) -> (int, int, int):
return self.page_id, self.rev_id, self.log_id

View file

@ -19,16 +19,19 @@ class Redis:
async with async_timeout.timeout(1):
message = await self.pub_connection.get_message(ignore_subscribe_messages=True)
if message is not None:
print(f"(Reader) Message Received: {message}")
logger.debug(f"(Reader) Message Received: {message}")
await asyncio.sleep(1.0)
except asyncio.TimeoutError: # TODO Better handler
pass
except aioredis.exceptions.ConnectionError:
pass
async def connect(self):
self.pub_connection = await aioredis.create_connection("redis://" + settings["redis_host"], encoding="UTF-8")
self.stat_connection = await aioredis.create_connection("redis://" + settings["redis_host"], encoding="UTF-8")
self.stat_connection = await aioredis.from_url("redis://" + settings["redis_host"], encoding="UTF-8")
async def pubsub(self):
self.pub_connection = self.stat_connection.pubsub()
await self.pub_connection.subscribe("rcgcdb_updates")
asyncio.create_task(self.reader())

View file

@ -5,6 +5,7 @@ from dataclasses import dataclass
import re
import logging, aiohttp
from api.util import default_message
from mw_messages import MWMessages
from src.exceptions import *
from src.database import db
@ -230,8 +231,84 @@ class Wiki:
if highest_id is None or change["rcid"] > highest_id: # make sure that the highest_rc is really highest rcid but do allow other entries with potentially lesser rcids come after without breaking the cycle
highest_id = change["rcid"]
for combination, webhooks in targets.items():
message = await rc_processor(self, change, categorize_events, )
async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, display_options: namedtuple("Settings", ["lang", "display"]), webhooks: list):
from src.misc import LinkParser
LinkParser = LinkParser()
metadata = src.discord.DiscordMessageMetadata("POST", rev_id=change.get("revid", None), log_id=change.get("logid", None),
page_id=change.get("pageid", None))
context = Context(display_options, webhooks, client)
if ("actionhidden" in change or "suppressed" in change) and "suppressed" not in settings[
"ignored"]: # if event is hidden using suppression
context.event = "suppressed"
try:
discord_message: Optional[src.discord.DiscordMessage] = default_message("suppressed", formatter_hooks)(context, change)
except NoFormatter:
return
except:
if settings.get("error_tolerance", 1) > 0:
discord_message: Optional[src.discord.DiscordMessage] = None # It's handled by send_to_discord, we still want other code to run
else:
raise
else:
if "commenthidden" not in change:
LinkParser.feed(change.get("parsedcomment", ""))
parsed_comment = LinkParser.new_string
else:
parsed_comment = _("~~hidden~~")
if not parsed_comment and context.message_type == "embed" and settings["appearance"].get("embed", {}).get(
"show_no_description_provided", True):
parsed_comment = _("No description provided")
context.set_parsedcomment(parsed_comment)
if "userhidden" in change:
change["user"] = _("hidden")
if change.get("ns", -1) in settings.get("ignored_namespaces", ()):
return
if change["type"] in ["edit", "new"]:
logger.debug("List of categories in essential_info: {}".format(changed_categories))
identification_string = change["type"]
context.set_categories(changed_categories)
elif change["type"] == "categorize":
return
elif change["type"] == "log":
identification_string = "{logtype}/{logaction}".format(logtype=change["logtype"],
logaction=change["logaction"])
else:
identification_string = change.get("type", "unknown") # If event doesn't have a type
if identification_string in settings["ignored"]:
return
context.event = identification_string
try:
discord_message: Optional[src.discord.DiscordMessage] = default_message(identification_string, formatter_hooks)(context,
change)
except:
if settings.get("error_tolerance", 1) > 0:
discord_message: Optional[
src.discord.DiscordMessage] = None # It's handled by send_to_discord, we still want other code to run
else:
raise
if identification_string in (
"delete/delete", "delete/delete_redir") and AUTO_SUPPRESSION_ENABLED: # TODO Move it into a hook?
delete_messages(dict(pageid=change.get("pageid")))
elif identification_string == "delete/event" and AUTO_SUPPRESSION_ENABLED:
logparams = change.get('logparams', {"ids": []})
if settings["appearance"]["mode"] == "embed":
redact_messages(logparams.get("ids", []), 1, logparams.get("new", {}))
else:
for logid in logparams.get("ids", []):
delete_messages(dict(logid=logid))
elif identification_string == "delete/revision" and AUTO_SUPPRESSION_ENABLED:
logparams = change.get('logparams', {"ids": []})
if settings["appearance"]["mode"] == "embed":
redact_messages(logparams.get("ids", []), 0, logparams.get("new", {}))
else:
for revid in logparams.get("ids", []):
delete_messages(dict(revid=revid))
discord_message.finish_embed()
return discord_message, metadata
@dataclass
class Wiki_old: