Added #131, may cause issues as the split has been done to many files and it scares me, will require work on i18n structure

This commit is contained in:
Frisk 2020-07-16 14:46:23 +02:00
parent 9946a0a7fa
commit a7565ec2d9
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
10 changed files with 1382 additions and 1327 deletions

Binary file not shown.

View file

@ -8,7 +8,7 @@ msgstr ""
"Project-Id-Version: \n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2020-03-17 20:53+0100\n"
"PO-Revision-Date: 2020-03-18 13:51+0100\n"
"PO-Revision-Date: 2020-07-12 12:17+0200\n"
"Last-Translator: \n"
"Language-Team: \n"
"Language: uk\n"
@ -511,7 +511,7 @@ msgstr "__Тільки пробіли__"
#: rcgcdw.py:594
msgid "Removed"
msgstr "Вилучено"
msgstr "видалено"
#: rcgcdw.py:597
msgid "Added"
@ -858,7 +858,7 @@ msgstr " та ще {}\n"
#: rcgcdw.py:957
msgid "**Removed**: "
msgstr "**Вилучено**: "
msgstr "**видалено**: "
#: rcgcdw.py:957
msgid " and {} more"

View file

@ -0,0 +1,209 @@
import datetime, logging
import json
import gettext
from urllib.parse import quote_plus
from src.configloader import settings
from src.misc import DiscordMessage, send_to_discord, escape_formatting
from src.i18n import disc
_ = disc.gettext
discussion_logger = logging.getLogger("rcgcdw.discussion_formatter")
def embed_formatter(post, post_type):
"""Embed formatter for Fandom discussions."""
embed = DiscordMessage("embed", "discussion", settings["fandom_discussions"]["webhookURL"])
embed.set_author(post["createdBy"]["name"], "{wikiurl}f/u/{creatorId}".format(
wikiurl=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"]), icon_url=post["createdBy"]["avatarUrl"])
discussion_post_type = post["_embedded"]["thread"][0].get("containerType", "FORUM") # Can be FORUM, ARTICLE_COMMENT or WALL on UCP
if post_type == "TEXT":
if post["isReply"]:
if discussion_post_type == "FORUM":
embed.event_type = "discussion/forum/reply"
embed["title"] = _("Replied to \"{title}\"").format(title=post["_embedded"]["thread"][0]["title"])
embed["url"] = "{wikiurl}f/p/{threadId}/r/{postId}".format(
wikiurl=settings["fandom_discussions"]["wiki_url"], threadId=post["threadId"], postId=post["id"])
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
user_wall = _("unknown") # Fail safe
embed.event_type = "discussion/wall/reply"
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
embed["url"] = "{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}".format(wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"])
embed["title"] = _("Replied to \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall)
else:
if discussion_post_type == "FORUM":
embed.event_type = "discussion/forum/post"
embed["title"] = _("Created \"{title}\"").format(title=post["title"])
embed["url"] = "{wikiurl}f/p/{threadId}".format(wikiurl=settings["fandom_discussions"]["wiki_url"],
threadId=post["threadId"])
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
user_wall = _("unknown") # Fail safe
embed.event_type = "discussion/wall/post"
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
embed["url"] = "{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}".format(
wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")),
threadid=post["threadId"])
embed["title"] = _("Created \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall)
if settings["fandom_discussions"]["appearance"]["embed"]["show_content"]:
if post.get("jsonModel") is not None:
npost = DiscussionsFromHellParser(post)
embed["description"] = npost.parse()
if npost.image_last:
embed["image"]["url"] = npost.image_last
embed["description"] = embed["description"].replace(npost.image_last, "")
else: # Fallback when model is not available
embed["description"] = post.get("rawContent", "")
elif post_type == "POLL":
embed.event_type = "discussion/forum/poll"
poll = post["poll"]
embed["title"] = _("Created a poll titled \"{title}\"").format(title=poll["question"])
image_type = False
if poll["answers"][0]["image"] is not None:
image_type = True
for num, option in enumerate(poll["answers"]):
embed.add_field(option["text"] if image_type is True else _("Option {}").format(num+1),
option["text"] if image_type is False else _("__[View image]({image_url})__").format(image_url=option["image"]["url"]),
inline=True)
embed["footer"]["text"] = post["forumName"]
embed["timestamp"] = datetime.datetime.fromtimestamp(post["creationDate"]["epochSecond"], tz=datetime.timezone.utc).isoformat()
embed.finish_embed()
send_to_discord(embed)
def compact_formatter(post, post_type):
"""Compact formatter for Fandom discussions."""
message = None
discussion_post_type = post["_embedded"]["thread"][0].get("containerType",
"FORUM") # Can be FORUM, ARTICLE_COMMENT or WALL on UCP
if post_type == "TEXT":
if not post["isReply"]:
if discussion_post_type == "FORUM":
message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{url}f/p/{threadId}>) in {forumName}").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"])
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
user_wall = _("unknown") # Fail safe
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}>) on {user}'s Message Wall").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall,
wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"]
)
else:
if discussion_post_type == "FORUM":
message = _("[{author}](<{url}f/u/{creatorId}>) created a [reply](<{url}f/p/{threadId}/r/{postId}>) to [{title}](<{url}f/p/{threadId}>) in {forumName}").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], threadId=post["threadId"], postId=post["id"], title=post["_embedded"]["thread"][0]["title"], forumName=post["forumName"]
)
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
user_wall = _("unknown") # Fail safe
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
message = _(
"[{author}](<{url}f/u/{creatorId}>) replied to [{title}](<{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}>) on {user}'s Message Wall").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall,
wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"])
elif post_type == "POLL":
message = _(
"[{author}](<{url}f/u/{creatorId}>) created a poll [{title}](<{url}f/p/{threadId}>) in {forumName}").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"],
creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"])
send_to_discord(DiscordMessage("compact", "discussion", settings["fandom_discussions"]["webhookURL"], content=message))
class DiscussionsFromHellParser:
"""This class converts fairly convoluted Fandom jsonModal of a discussion post into Markdown formatted usable thing. Takes string, returns string.
Kudos to MarkusRost for allowing me to implement this formatter based on his code in Wiki-Bot."""
def __init__(self, post):
self.post = post
self.jsonModal = json.loads(post.get("jsonModel", "{}"))
self.markdown_text = ""
self.item_num = 1
self.image_last = None
def parse(self) -> str:
"""Main parsing logic"""
self.parse_content(self.jsonModal["content"])
if len(self.markdown_text) > 2000:
self.markdown_text = self.markdown_text[0:2000] + ""
return self.markdown_text
def parse_content(self, content, ctype=None):
self.image_last = None
for item in content:
if ctype == "bulletList":
self.markdown_text += "\t"
if ctype == "orderedList":
self.markdown_text += "\t{num}. ".format(num=self.item_num)
self.item_num += 1
if item["type"] == "text":
if "marks" in item:
prefix, suffix = self.convert_marks(item["marks"])
self.markdown_text = "{old}{pre}{text}{suf}".format(old=self.markdown_text, pre=prefix, text=escape_formatting(item["text"]), suf=suffix)
else:
if ctype == "code_block":
self.markdown_text += item["text"] # ignore formatting on preformatted text which cannot have additional formatting anyways
else:
self.markdown_text += escape_formatting(item["text"])
elif item["type"] == "paragraph":
if "content" in item:
self.parse_content(item["content"], item["type"])
self.markdown_text += "\n"
elif item["type"] == "openGraph":
if not item["attrs"]["wasAddedWithInlineLink"]:
self.markdown_text = "{old}{link}\n".format(old=self.markdown_text, link=item["attrs"]["url"])
elif item["type"] == "image":
try:
discussion_logger.debug(item["attrs"]["id"])
if item["attrs"]["id"] is not None:
self.markdown_text = "{old}{img_url}\n".format(old=self.markdown_text, img_url=self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"])
self.image_last = self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"]
except (IndexError, ValueError):
discussion_logger.warning("Image {} not found.".format(item["attrs"]["id"]))
discussion_logger.debug(self.markdown_text)
elif item["type"] == "code_block":
self.markdown_text += "```\n"
if "content" in item:
self.parse_content(item["content"], item["type"])
self.markdown_text += "\n```\n"
elif item["type"] == "bulletList":
if "content" in item:
self.parse_content(item["content"], item["type"])
elif item["type"] == "orderedList":
self.item_num = 1
if "content" in item:
self.parse_content(item["content"], item["type"])
elif item["type"] == "listItem":
self.parse_content(item["content"], item["type"])
def convert_marks(self, marks):
prefix = ""
suffix = ""
for mark in marks:
if mark["type"] == "mention":
prefix += "["
suffix = "]({wiki}f/u/{userid}){suffix}".format(wiki=settings["fandom_discussions"]["wiki_url"], userid=mark["attrs"]["userId"], suffix=suffix)
elif mark["type"] == "strong":
prefix += "**"
suffix = "**{suffix}".format(suffix=suffix)
elif mark["type"] == "link":
prefix += "["
suffix = "]({link}){suffix}".format(link=mark["attrs"]["href"], suffix=suffix)
elif mark["type"] == "em":
prefix += "_"
suffix = "_" + suffix
return prefix, suffix

View file

@ -16,11 +16,11 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging, gettext, schedule, requests, json, datetime
from collections import defaultdict
import logging, gettext, schedule, requests
from src.configloader import settings
from urllib.parse import quote_plus
from src.misc import datafile, send_to_discord, DiscordMessage, WIKI_SCRIPT_PATH, escape_formatting, messagequeue
from src.discussion_formatters import embed_formatter, compact_formatter
from src.misc import datafile, messagequeue
from src.session import session
# Initialize translation
@ -43,119 +43,6 @@ storage = datafile.data
fetch_url = "https://services.fandom.com/discussion/{wikiid}/posts?sortDirection=descending&sortKey=creation_date&limit={limit}".format(wikiid=settings["fandom_discussions"]["wiki_id"], limit=settings["fandom_discussions"]["limit"])
def embed_formatter(post, post_type):
"""Embed formatter for Fandom discussions."""
embed = DiscordMessage("embed", "discussion", settings["fandom_discussions"]["webhookURL"])
embed.set_author(post["createdBy"]["name"], "{wikiurl}f/u/{creatorId}".format(
wikiurl=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"]), icon_url=post["createdBy"]["avatarUrl"])
discussion_post_type = post["_embedded"]["thread"][0].get("containerType", "FORUM") # Can be FORUM, ARTICLE_COMMENT or WALL on UCP
if post_type == "TEXT":
if post["isReply"]:
if discussion_post_type == "FORUM":
embed.event_type = "discussion/forum/reply"
embed["title"] = _("Replied to \"{title}\"").format(title=post["_embedded"]["thread"][0]["title"])
embed["url"] = "{wikiurl}f/p/{threadId}/r/{postId}".format(
wikiurl=settings["fandom_discussions"]["wiki_url"], threadId=post["threadId"], postId=post["id"])
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
user_wall = _("unknown") # Fail safe
embed.event_type = "discussion/wall/reply"
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
embed["url"] = "{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}".format(wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"])
embed["title"] = _("Replied to \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall)
else:
if discussion_post_type == "FORUM":
embed.event_type = "discussion/forum/post"
embed["title"] = _("Created \"{title}\"").format(title=post["title"])
embed["url"] = "{wikiurl}f/p/{threadId}".format(wikiurl=settings["fandom_discussions"]["wiki_url"],
threadId=post["threadId"])
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
user_wall = _("unknown") # Fail safe
embed.event_type = "discussion/wall/post"
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
embed["url"] = "{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}".format(
wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")),
threadid=post["threadId"])
embed["title"] = _("Created \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall)
if settings["fandom_discussions"]["appearance"]["embed"]["show_content"]:
if post.get("jsonModel") is not None:
npost = DiscussionsFromHellParser(post)
embed["description"] = npost.parse()
if npost.image_last:
embed["image"]["url"] = npost.image_last
embed["description"] = embed["description"].replace(npost.image_last, "")
else: # Fallback when model is not available
embed["description"] = post.get("rawContent", "")
elif post_type == "POLL":
embed.event_type = "discussion/forum/poll"
poll = post["poll"]
embed["title"] = _("Created a poll titled \"{title}\"").format(title=poll["question"])
image_type = False
if poll["answers"][0]["image"] is not None:
image_type = True
for num, option in enumerate(poll["answers"]):
embed.add_field(option["text"] if image_type is True else _("Option {}").format(num+1),
option["text"] if image_type is False else _("__[View image]({image_url})__").format(image_url=option["image"]["url"]),
inline=True)
embed["footer"]["text"] = post["forumName"]
embed["timestamp"] = datetime.datetime.fromtimestamp(post["creationDate"]["epochSecond"], tz=datetime.timezone.utc).isoformat()
embed.finish_embed()
send_to_discord(embed)
def compact_formatter(post, post_type):
"""Compact formatter for Fandom discussions."""
message = None
discussion_post_type = post["_embedded"]["thread"][0].get("containerType",
"FORUM") # Can be FORUM, ARTICLE_COMMENT or WALL on UCP
if post_type == "TEXT":
if not post["isReply"]:
if discussion_post_type == "FORUM":
message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{url}f/p/{threadId}>) in {forumName}").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"])
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
user_wall = _("unknown") # Fail safe
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}>) on {user}'s Message Wall").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall,
wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"]
)
else:
if discussion_post_type == "FORUM":
message = _("[{author}](<{url}f/u/{creatorId}>) created a [reply](<{url}f/p/{threadId}/r/{postId}>) to [{title}](<{url}f/p/{threadId}>) in {forumName}").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], threadId=post["threadId"], postId=post["id"], title=post["_embedded"]["thread"][0]["title"], forumName=post["forumName"]
)
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
user_wall = _("unknown") # Fail safe
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
message = _(
"[{author}](<{url}f/u/{creatorId}>) replied to [{title}](<{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}>) on {user}'s Message Wall").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall,
wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"])
elif post_type == "POLL":
message = _(
"[{author}](<{url}f/u/{creatorId}>) created a poll [{title}](<{url}f/p/{threadId}>) in {forumName}").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"],
creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"])
send_to_discord(DiscordMessage("compact", "discussion", settings["fandom_discussions"]["webhookURL"], content=message))
def fetch_discussions():
messagequeue.resend_msgs()
request = safe_request(fetch_url)
@ -188,89 +75,6 @@ def parse_discussion_post(post):
else:
discussion_logger.warning("The type of {} is an unknown discussion post type. Please post an issue on the project page to have it added https://gitlab.com/piotrex43/RcGcDw/-/issues.")
class DiscussionsFromHellParser:
"""This class converts fairly convoluted Fandom jsonModal of a discussion post into Markdown formatted usable thing. Takes string, returns string.
Kudos to MarkusRost for allowing me to implement this formatter based on his code in Wiki-Bot."""
def __init__(self, post):
self.post = post
self.jsonModal = json.loads(post.get("jsonModel", "{}"))
self.markdown_text = ""
self.item_num = 1
self.image_last = None
def parse(self) -> str:
"""Main parsing logic"""
self.parse_content(self.jsonModal["content"])
if len(self.markdown_text) > 2000:
self.markdown_text = self.markdown_text[0:2000] + ""
return self.markdown_text
def parse_content(self, content, ctype=None):
self.image_last = None
for item in content:
if ctype == "bulletList":
self.markdown_text += "\t"
if ctype == "orderedList":
self.markdown_text += "\t{num}. ".format(num=self.item_num)
self.item_num += 1
if item["type"] == "text":
if "marks" in item:
prefix, suffix = self.convert_marks(item["marks"])
self.markdown_text = "{old}{pre}{text}{suf}".format(old=self.markdown_text, pre=prefix, text=escape_formatting(item["text"]), suf=suffix)
else:
if ctype == "code_block":
self.markdown_text += item["text"] # ignore formatting on preformatted text which cannot have additional formatting anyways
else:
self.markdown_text += escape_formatting(item["text"])
elif item["type"] == "paragraph":
if "content" in item:
self.parse_content(item["content"], item["type"])
self.markdown_text += "\n"
elif item["type"] == "openGraph":
if not item["attrs"]["wasAddedWithInlineLink"]:
self.markdown_text = "{old}{link}\n".format(old=self.markdown_text, link=item["attrs"]["url"])
elif item["type"] == "image":
try:
discussion_logger.debug(item["attrs"]["id"])
if item["attrs"]["id"] is not None:
self.markdown_text = "{old}{img_url}\n".format(old=self.markdown_text, img_url=self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"])
self.image_last = self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"]
except (IndexError, ValueError):
discussion_logger.warning("Image {} not found.".format(item["attrs"]["id"]))
discussion_logger.debug(self.markdown_text)
elif item["type"] == "code_block":
self.markdown_text += "```\n"
if "content" in item:
self.parse_content(item["content"], item["type"])
self.markdown_text += "\n```\n"
elif item["type"] == "bulletList":
if "content" in item:
self.parse_content(item["content"], item["type"])
elif item["type"] == "orderedList":
self.item_num = 1
if "content" in item:
self.parse_content(item["content"], item["type"])
elif item["type"] == "listItem":
self.parse_content(item["content"], item["type"])
def convert_marks(self, marks):
prefix = ""
suffix = ""
for mark in marks:
if mark["type"] == "mention":
prefix += "["
suffix = "]({wiki}f/u/{userid}){suffix}".format(wiki=settings["fandom_discussions"]["wiki_url"], userid=mark["attrs"]["userId"], suffix=suffix)
elif mark["type"] == "strong":
prefix += "**"
suffix = "**{suffix}".format(suffix=suffix)
elif mark["type"] == "link":
prefix += "["
suffix = "]({link}){suffix}".format(link=mark["attrs"]["href"], suffix=suffix)
elif mark["type"] == "em":
prefix += "_"
suffix = "_" + suffix
return prefix, suffix
def safe_request(url):
"""Function to assure safety of request, and do not crash the script on exceptions,"""

2
src/exceptions.py Normal file
View file

@ -0,0 +1,2 @@
class MWError(Exception):
pass

15
src/i18n.py Normal file
View file

@ -0,0 +1,15 @@
import gettext, sys, logging
from src.configloader import settings
logger = logging.getLogger("rcgcdw.i18n")
# Setup translation
try:
lang = gettext.translation('rcgcdw', localedir='locale', languages=[settings["lang"]])
disc = gettext.translation('discussions', localedir='locale', languages=[settings["lang"]])
except FileNotFoundError:
logger.critical("No language files have been found. Make sure locale folder is located in the directory.")
sys.exit(1)
lang.install()
ngettext = lang.ngettext

View file

@ -15,7 +15,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import json, logging, sys, re, time, random, math
from html.parser import HTMLParser
from urllib.parse import urlparse, urlunparse
@ -42,6 +42,8 @@ WIKI_ARTICLE_PATH: str = ""
WIKI_SCRIPT_PATH: str = ""
WIKI_JUST_DOMAIN: str = ""
profile_fields = {"profile-location": _("Location"), "profile-aboutme": _("About me"), "profile-link-google": _("Google link"), "profile-link-facebook":_("Facebook link"), "profile-link-twitter": _("Twitter link"), "profile-link-reddit": _("Reddit link"), "profile-link-twitch": _("Twitch link"), "profile-link-psn": _("PSN link"), "profile-link-vk": _("VK link"), "profile-link-xbl": _("XBL link"), "profile-link-steam": _("Steam link"), "profile-link-discord": _("Discord handle"), "profile-link-battlenet": _("Battle.net handle")}
class DataFile:
"""Data class which instance of is shared by multiple modules to remain consistent and do not cause too many IO operations."""
def __init__(self):
@ -414,3 +416,45 @@ class DiscordMessage():
def set_name(self, name):
self.webhook_object["username"] = name
def profile_field_name(name, embed):
try:
return profile_fields[name]
except KeyError:
if embed:
return _("Unknown")
else:
return _("unknown")
class LinkParser(HTMLParser):
new_string = ""
recent_href = ""
def handle_starttag(self, tag, attrs):
for attr in attrs:
if attr[0] == 'href':
self.recent_href = attr[1]
if self.recent_href.startswith("//"):
self.recent_href = "https:{rest}".format(rest=self.recent_href)
elif not self.recent_href.startswith("http"):
self.recent_href = WIKI_JUST_DOMAIN + self.recent_href
self.recent_href = self.recent_href.replace(")", "\\)")
elif attr[0] == 'data-uncrawlable-url':
self.recent_href = attr[1].encode('ascii')
self.recent_href = base64.b64decode(self.recent_href)
self.recent_href = WIKI_JUST_DOMAIN + self.recent_href.decode('ascii')
def handle_data(self, data):
if self.recent_href:
self.new_string = self.new_string + "[{}](<{}>)".format(data, self.recent_href)
self.recent_href = ""
else:
self.new_string = self.new_string + data
def handle_comment(self, data):
self.new_string = self.new_string + data
def handle_endtag(self, tag):
misc_logger.debug(self.new_string)

352
src/rc.py Normal file
View file

@ -0,0 +1,352 @@
import re
import sys
import time
import logging
import requests
from bs4 import BeautifulSoup
from src.configloader import settings
from src.misc import WIKI_SCRIPT_PATH, WIKI_API_PATH, messagequeue, datafile, send_simple, safe_read, LinkParser
from src.exceptions import MWError
from src.session import session
from src.rc_formatters import compact_formatter, embed_formatter
storage = datafile.data
logger = logging.getLogger("rcgcdw.rc")
supported_logs = ["protect/protect", "protect/modify", "protect/unprotect", "upload/overwrite", "upload/upload", "delete/delete", "delete/delete_redir", "delete/restore", "delete/revision", "delete/event", "import/upload", "import/interwiki", "merge/merge", "move/move", "move/move_redir", "protect/move_prot", "block/block", "block/unblock", "block/reblock", "rights/rights", "rights/autopromote", "abusefilter/modify", "abusefilter/create", "interwiki/iw_add", "interwiki/iw_edit", "interwiki/iw_delete", "curseprofile/comment-created", "curseprofile/comment-edited", "curseprofile/comment-deleted", "curseprofile/comment-purged", "curseprofile/profile-edited", "curseprofile/comment-replied", "contentmodel/change", "sprite/sprite", "sprite/sheet", "sprite/slice", "managetags/create", "managetags/delete", "managetags/activate", "managetags/deactivate", "tag/update", "cargo/createtable", "cargo/deletetable", "cargo/recreatetable", "cargo/replacetable", "upload/revert"]
# Set the proper formatter
if settings["appearance"]["mode"] == "embed":
appearance_mode = embed_formatter
elif settings["appearance"]["mode"] == "compact":
appearance_mode = compact_formatter
else:
logger.critical("Unknown formatter!")
sys.exit(1)
LinkParser = LinkParser()
class Recent_Changes_Class(object):
"""Store verious data and functions related to wiki and fetching of Recent Changes"""
def __init__(self):
self.ids = []
self.map_ips = {}
self.recent_id = 0
self.downtimecredibility = 0
self.last_downtime = 0
self.tags = {}
self.groups = {}
self.streak = -1
self.mw_messages = {}
self.namespaces = None
self.session = session
self.logged_in = False
if settings["limitrefetch"] != -1:
self.file_id = storage["rcid"]
else:
self.file_id = 999999999 # such value won't cause trouble, and it will make sure no refetch happen
@staticmethod
def handle_mw_errors(request):
if "errors" in request:
logger.error(request["errors"])
raise MWError
return request
def log_in(self):
# session.cookies.clear()
if '@' not in settings["wiki_bot_login"]:
logger.error(
"Please provide proper nickname for login from {wiki}Special:BotPasswords".format(
wiki=WIKI_SCRIPT_PATH))
return
if len(settings["wiki_bot_password"]) != 32:
logger.error(
"Password seems incorrect. It should be 32 characters long! Grab it from {wiki}Special:BotPasswords".format(
wiki=WIKI_SCRIPT_PATH))
return
logger.info("Trying to log in to {wiki}...".format(wiki=WIKI_SCRIPT_PATH))
try:
response = self.handle_mw_errors(
self.session.post(WIKI_API_PATH,
data={'action': 'query', 'format': 'json', 'utf8': '', 'meta': 'tokens',
'type': 'login'}))
response = self.handle_mw_errors(
self.session.post(WIKI_API_PATH,
data={'action': 'login', 'format': 'json', 'utf8': '',
'lgname': settings["wiki_bot_login"],
'lgpassword': settings["wiki_bot_password"],
'lgtoken': response.json()['query']['tokens']['logintoken']}))
except ValueError:
logger.error("Logging in have not succeeded")
return
except MWError:
logger.error("Logging in have not succeeded")
return
try:
if response.json()['login']['result'] == "Success":
logger.info("Successfully logged in")
self.logged_in = True
else:
logger.error("Logging in have not succeeded")
except:
logger.error("Logging in have not succeeded")
def add_cache(self, change):
self.ids.append(change["rcid"])
# self.recent_id = change["rcid"]
if len(self.ids) > settings["limitrefetch"] + 5:
self.ids.pop(0)
def fetch(self, amount=settings["limit"]):
messagequeue.resend_msgs()
last_check = self.fetch_changes(amount=amount)
# If the request succeeds the last_check will be the last rcid from recentchanges query
if last_check is not None:
self.recent_id = last_check
# Assigns self.recent_id the last rcid if request succeeded, otherwise set the id from the file
if settings["limitrefetch"] != -1 and self.recent_id != self.file_id and self.recent_id != 0: # if saving to database is disabled, don't save the recent_id
self.file_id = self.recent_id
storage["rcid"] = self.recent_id
datafile.save_datafile()
logger.debug("Most recent rcid is: {}".format(self.recent_id))
return self.recent_id
def fetch_changes(self, amount, clean=False):
"""Fetches the :amount: of changes from the wiki.
Returns None on error and int of rcid of latest change if succeeded"""
global logged_in
if len(self.ids) == 0:
logger.debug("ids is empty, triggering clean fetch")
clean = True
changes = self.safe_request(
"{wiki}?action=query&format=json&list=recentchanges{show_bots}&rcprop=title%7Credirect%7Ctimestamp%7Cids%7Cloginfo%7Cparsedcomment%7Csizes%7Cflags%7Ctags%7Cuser&rclimit={amount}&rctype=edit%7Cnew%7Clog%7Cexternal{categorize}".format(
wiki=WIKI_API_PATH, amount=amount, categorize="%7Ccategorize" if settings["show_added_categories"] else "", show_bots="&rcshow=!bot" if settings["show_bots"] is False else ""))
if changes:
try:
changes = changes.json()['query']['recentchanges']
changes.reverse()
except ValueError:
logger.warning("ValueError in fetching changes")
logger.warning("Changes URL:" + changes.url)
self.downtime_controller()
return None
except KeyError:
logger.warning("Wiki returned %s" % (changes.json()))
return None
else:
if self.downtimecredibility > 0:
self.downtimecredibility -= 1
if self.streak > -1:
self.streak += 1
if self.streak > 8:
self.streak = -1
send_simple("down_detector", _("Connection to {wiki} seems to be stable now.").format(wiki=settings["wikiname"]),
_("Connection status"), settings["avatars"]["connection_restored"])
# In the first for loop we analize the categorize events and figure if we will need more changes to fetch
# in order to cover all of the edits
categorize_events = {}
new_events = 0
for change in changes:
if not (change["rcid"] in self.ids or change["rcid"] < self.recent_id) and not clean:
new_events += 1
logger.debug(
"New event: {}".format(change["rcid"]))
if new_events == settings["limit"]:
if amount < 500:
# call the function again with max limit for more results, ignore the ones in this request
logger.debug("There were too many new events, requesting max amount of events from the wiki.")
return self.fetch(amount=5000 if self.logged_in else 500)
else:
logger.debug(
"There were too many new events, but the limit was high enough we don't care anymore about fetching them all.")
if change["type"] == "categorize":
if "commenthidden" not in change:
if len(recent_changes.mw_messages.keys()) > 0:
cat_title = change["title"].split(':', 1)[1]
# I so much hate this, blame Markus for making me do this
if change["revid"] not in categorize_events:
categorize_events[change["revid"]] = {"new": set(), "removed": set()}
comment_to_match = re.sub(r'<.*?a>', '', change["parsedcomment"])
if recent_changes.mw_messages["recentchanges-page-added-to-category"] in comment_to_match or recent_changes.mw_messages["recentchanges-page-added-to-category-bundled"] in comment_to_match:
categorize_events[change["revid"]]["new"].add(cat_title)
logger.debug("Matched {} to added category for {}".format(cat_title, change["revid"]))
elif recent_changes.mw_messages["recentchanges-page-removed-from-category"] in comment_to_match or recent_changes.mw_messages["recentchanges-page-removed-from-category-bundled"] in comment_to_match:
categorize_events[change["revid"]]["removed"].add(cat_title)
logger.debug("Matched {} to removed category for {}".format(cat_title, change["revid"]))
else:
logger.debug("Unknown match for category change with messages {}, {}, {}, {} and comment_to_match {}".format(recent_changes.mw_messages["recentchanges-page-added-to-category"], recent_changes.mw_messages["recentchanges-page-removed-from-category"], recent_changes.mw_messages["recentchanges-page-removed-from-category-bundled"], recent_changes.mw_messages["recentchanges-page-added-to-category-bundled"], comment_to_match))
else:
logger.warning("Init information not available, could not read category information. Please restart the bot.")
else:
logger.debug("Log entry got suppressed, ignoring entry.")
# if change["revid"] in categorize_events:
# categorize_events[change["revid"]].append(cat_title)
# else:
# logger.debug("New category '{}' for {}".format(cat_title, change["revid"]))
# categorize_events[change["revid"]] = {cat_title: }
for change in changes:
if change["rcid"] in self.ids or change["rcid"] < self.recent_id:
logger.debug("Change ({}) is in ids or is lower than recent_id {}".format(change["rcid"],
self.recent_id))
continue
logger.debug(self.ids)
logger.debug(self.recent_id)
self.add_cache(change)
if clean and not (self.recent_id == 0 and change["rcid"] > self.file_id):
logger.debug("Rejected {val}".format(val=change["rcid"]))
continue
essential_info(change, categorize_events.get(change.get("revid"), None))
return change["rcid"]
def safe_request(self, url):
try:
request = self.session.get(url, timeout=10, allow_redirects=False)
except requests.exceptions.Timeout:
logger.warning("Reached timeout error for request on link {url}".format(url=url))
self.downtime_controller()
return None
except requests.exceptions.ConnectionError:
logger.warning("Reached connection error for request on link {url}".format(url=url))
self.downtime_controller()
return None
except requests.exceptions.ChunkedEncodingError:
logger.warning("Detected faulty response from the web server for request on link {url}".format(url=url))
self.downtime_controller()
return None
else:
if 499 < request.status_code < 600:
self.downtime_controller()
return None
elif request.status_code == 302:
logger.critical("Redirect detected! Either the wiki given in the script settings (wiki field) is incorrect/the wiki got removed or Gamepedia is giving us the false value. Please provide the real URL to the wiki, current URL redirects to {}".format(request.next.url))
sys.exit(0)
return request
def check_connection(self, looped=False):
online = 0
for website in ["https://google.com", "https://instagram.com", "https://steamcommunity.com"]:
try:
requests.get(website, timeout=10)
online += 1
except requests.exceptions.ConnectionError:
pass
except requests.exceptions.Timeout:
pass
if online < 1:
logger.error("Failure when checking Internet connection at {time}".format(
time=time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime())))
self.downtimecredibility = 0
if not looped:
while 1: # recursed loop, check for connection (every 10 seconds) as long as three services are down, don't do anything else
if self.check_connection(looped=True):
recent_changes.fetch(amount=settings["limitrefetch"])
break
time.sleep(10)
return False
return True
def downtime_controller(self):
if not settings["show_updown_messages"]:
return
if self.streak > -1: # reset the streak of successful connections when bad one happens
self.streak = 0
if self.downtimecredibility < 60:
self.downtimecredibility += 15
else:
if (
time.time() - self.last_downtime) > 1800 and self.check_connection(): # check if last downtime happened within 30 minutes, if yes, don't send a message
send_simple("down_detector", _("{wiki} seems to be down or unreachable.").format(wiki=settings["wikiname"]),
_("Connection status"), settings["avatars"]["connection_failed"])
self.last_downtime = time.time()
self.streak = 0
def clear_cache(self):
self.map_ips = {}
def init_info(self):
startup_info = safe_read(self.safe_request(
"{wiki}?action=query&format=json&uselang=content&list=tags&meta=allmessages%7Csiteinfo&utf8=1&tglimit=max&tgprop=displayname&ammessages=recentchanges-page-added-to-category%7Crecentchanges-page-removed-from-category%7Crecentchanges-page-added-to-category-bundled%7Crecentchanges-page-removed-from-category-bundled&amenableparser=1&amincludelocal=1&siprop=namespaces".format(
wiki=WIKI_API_PATH)), "query")
if startup_info:
if "tags" in startup_info and "allmessages" in startup_info:
for tag in startup_info["tags"]:
try:
self.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text()
except KeyError:
self.tags[tag["name"]] = None # Tags with no display name are hidden and should not appear on RC as well
for message in startup_info["allmessages"]:
if not "missing" in message: # ignore missing strings
self.mw_messages[message["name"]] = message["*"]
else:
logging.warning("Could not fetch the MW message translation for: {}".format(message["name"]))
for key, message in self.mw_messages.items():
if key.startswith("recentchanges-page-"):
self.mw_messages[key] = re.sub(r'\[\[.*?\]\]', '', message)
self.namespaces = startup_info["namespaces"]
logger.info("Gathered information about the tags and interface messages.")
else:
logger.warning("Could not retrieve initial wiki information. Some features may not work correctly!")
logger.debug(startup_info)
else:
logger.error("Could not retrieve initial wiki information. Possibly internet connection issue?")
def pull_comment(self, comment_id):
try:
comment = self.handle_mw_errors(self.safe_request(
"{wiki}?action=comment&do=getRaw&comment_id={comment}&format=json".format(wiki=WIKI_API_PATH,
comment=comment_id)).json())[
"text"]
logger.debug("Got the following comment from the API: {}".format(comment))
except MWError:
pass
except (TypeError, AttributeError):
logger.exception("Could not resolve the comment text.")
except KeyError:
logger.exception("CurseProfile extension API did not respond with a valid comment content.")
else:
if len(comment) > 1000:
comment = comment[0:1000] + ""
return comment
return ""
recent_changes = Recent_Changes_Class()
def essential_info(change, changed_categories):
"""Prepares essential information for both embed and compact message format."""
logger.debug(change)
if ("actionhidden" in change or "suppressed" in change) and "suppressed" not in settings["ignored"]: # if event is hidden using suppression
appearance_mode("suppressed", change, "", changed_categories, recent_changes)
return
if "commenthidden" not in change:
LinkParser.feed(change["parsedcomment"])
parsed_comment = LinkParser.new_string
LinkParser.new_string = ""
parsed_comment = re.sub(r"(`|_|\*|~|{|}|\|\|)", "\\\\\\1", parsed_comment, 0)
else:
parsed_comment = _("~~hidden~~")
if not parsed_comment:
parsed_comment = None
if change["type"] in ["edit", "new"]:
logger.debug("List of categories in essential_info: {}".format(changed_categories))
if "userhidden" in change:
change["user"] = _("hidden")
identification_string = change["type"]
elif change["type"] == "log":
identification_string = "{logtype}/{logaction}".format(logtype=change["logtype"], logaction=change["logaction"])
if identification_string not in supported_logs:
logger.warning(
"This event is not implemented in the script. Please make an issue on the tracker attaching the following info: wiki url, time, and this information: {}".format(
change))
return
elif change["type"] == "categorize":
return
else:
logger.warning("This event is not implemented in the script. Please make an issue on the tracker attaching the following info: wiki url, time, and this information: {}".format(change))
return
if identification_string in settings["ignored"]:
return
appearance_mode(identification_string, change, parsed_comment, changed_categories, recent_changes)

742
src/rc_formatters.py Normal file
View file

@ -0,0 +1,742 @@
import ipaddress
import math
import re
import time
import logging
from urllib.parse import quote_plus
from bs4 import BeautifulSoup
from src.configloader import settings
from src.misc import link_formatter, create_article_path, WIKI_SCRIPT_PATH, send_to_discord, DiscordMessage, safe_read, \
WIKI_API_PATH, ContentParser, profile_field_name, LinkParser
from src.i18n import lang
#from src.rc import recent_changes, pull_comment
ngettext = lang.ngettext
logger = logging.getLogger("rcgcdw.rc_formatters")
#from src.rcgcdw import recent_changes, ngettext, logger, profile_field_name, LinkParser, pull_comment
LinkParser = LinkParser()
def compact_formatter(action, change, parsed_comment, categories, recent_changes):
if action != "suppressed":
author_url = link_formatter(create_article_path("User:{user}".format(user=change["user"])))
author = change["user"]
parsed_comment = "" if parsed_comment is None else " *("+parsed_comment+")*"
parsed_comment = re.sub(r"([^<]|\A)(http(s)://.*?)( |\Z)", "\\1<\\2>\\4", parsed_comment) # see #97
if action in ["edit", "new"]:
edit_link = link_formatter("{wiki}index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format(
wiki=WIKI_SCRIPT_PATH, pageid=change["pageid"], diff=change["revid"], oldrev=change["old_revid"],
article=change["title"]))
edit_size = change["newlen"] - change["oldlen"]
if edit_size > 0:
sign = "+"
else:
sign = ""
if change["title"].startswith("MediaWiki:Tag-"):
pass
if action == "edit":
content = _("[{author}]({author_url}) edited [{article}]({edit_link}){comment} ({sign}{edit_size})").format(author=author, author_url=author_url, article=change["title"], edit_link=edit_link, comment=parsed_comment, edit_size=edit_size, sign=sign)
else:
content = _("[{author}]({author_url}) created [{article}]({edit_link}){comment} ({sign}{edit_size})").format(author=author, author_url=author_url, article=change["title"], edit_link=edit_link, comment=parsed_comment, edit_size=edit_size, sign=sign)
elif action =="upload/upload":
file_link = link_formatter(create_article_path(change["title"]))
content = _("[{author}]({author_url}) uploaded [{file}]({file_link}){comment}").format(author=author,
author_url=author_url,
file=change["title"],
file_link=file_link,
comment=parsed_comment)
elif action == "upload/revert":
file_link = link_formatter(create_article_path(change["title"]))
content = _("[{author}]({author_url}) reverted a version of [{file}]({file_link}){comment}").format(
author=author, author_url=author_url, file=change["title"], file_link=file_link, comment=parsed_comment)
elif action == "upload/overwrite":
file_link = link_formatter(create_article_path(change["title"]))
content = _("[{author}]({author_url}) uploaded a new version of [{file}]({file_link}){comment}").format(author=author, author_url=author_url, file=change["title"], file_link=file_link, comment=parsed_comment)
elif action == "delete/delete":
page_link = link_formatter(create_article_path(change["title"]))
content = _("[{author}]({author_url}) deleted [{page}]({page_link}){comment}").format(author=author, author_url=author_url, page=change["title"], page_link=page_link,
comment=parsed_comment)
elif action == "delete/delete_redir":
page_link = link_formatter(create_article_path(change["title"]))
content = _("[{author}]({author_url}) deleted redirect by overwriting [{page}]({page_link}){comment}").format(author=author, author_url=author_url, page=change["title"], page_link=page_link,
comment=parsed_comment)
elif action == "move/move":
link = link_formatter(create_article_path(change["logparams"]['target_title']))
redirect_status = _("without making a redirect") if "suppressredirect" in change["logparams"] else _("with a redirect")
content = _("[{author}]({author_url}) moved {redirect}*{article}* to [{target}]({target_url}) {made_a_redirect}{comment}").format(author=author, author_url=author_url, redirect="" if "redirect" in change else "", article=change["title"],
target=change["logparams"]['target_title'], target_url=link, comment=parsed_comment, made_a_redirect=redirect_status)
elif action == "move/move_redir":
link = link_formatter(create_article_path(change["logparams"]["target_title"]))
redirect_status = _("without making a redirect") if "suppressredirect" in change["logparams"] else _(
"with a redirect")
content = _("[{author}]({author_url}) moved {redirect}*{article}* over redirect to [{target}]({target_url}) {made_a_redirect}{comment}").format(author=author, author_url=author_url, redirect="" if "redirect" in change else "", article=change["title"],
target=change["logparams"]['target_title'], target_url=link, comment=parsed_comment, made_a_redirect=redirect_status)
elif action == "protect/move_prot":
link = link_formatter(create_article_path(change["logparams"]["oldtitle_title"]))
content = _(
"[{author}]({author_url}) moved protection settings from {redirect}*{article}* to [{target}]({target_url}){comment}").format(author=author, author_url=author_url, redirect="" if "redirect" in change else "", article=change["logparams"]["oldtitle_title"],
target=change["title"], target_url=link, comment=parsed_comment)
elif action == "block/block":
user = change["title"].split(':')[1]
restriction_description = ""
try:
ipaddress.ip_address(user)
link = link_formatter(create_article_path("Special:Contributions/{user}".format(user=user)))
except ValueError:
link = link_formatter(create_article_path(change["title"]))
if change["logparams"]["duration"] == "infinite":
block_time = _("infinity and beyond")
else:
english_length = re.sub(r"(\d+)", "", change["logparams"][
"duration"]) # note that translation won't work for millenia and century yet
english_length_num = re.sub(r"(\D+)", "", change["logparams"]["duration"])
try:
english_length = english_length.rstrip("s").strip()
block_time = "{num} {translated_length}".format(num=english_length_num,
translated_length=ngettext(english_length,
english_length + "s",
int(english_length_num)))
except AttributeError:
logger.error("Could not strip s from the block event, seems like the regex didn't work?")
return
if "sitewide" not in change["logparams"]:
restriction_description = ""
if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]:
restriction_description = _(" on pages: ")
for page in change["logparams"]["restrictions"]["pages"]:
restricted_pages = ["*{page}*".format(page=i["page_title"]) for i in change["logparams"]["restrictions"]["pages"]]
restriction_description = restriction_description + ", ".join(restricted_pages)
if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["namespaces"]:
namespaces = []
if restriction_description:
restriction_description = restriction_description + _(" and namespaces: ")
else:
restriction_description = _(" on namespaces: ")
for namespace in change["logparams"]["restrictions"]["namespaces"]:
if str(namespace) in recent_changes.namespaces: # if we have cached namespace name for given namespace number, add its name to the list
namespaces.append("*{ns}*".format(ns=recent_changes.namespaces[str(namespace)]["*"]))
else:
namespaces.append("*{ns}*".format(ns=namespace))
restriction_description = restriction_description + ", ".join(namespaces)
restriction_description = restriction_description + "."
if len(restriction_description) > 1020:
logger.debug(restriction_description)
restriction_description = restriction_description[:1020] + ""
content = _(
"[{author}]({author_url}) blocked [{user}]({user_url}) for {time}{restriction_desc}{comment}").format(author=author, author_url=author_url, user=user, time=block_time, user_url=link, restriction_desc=restriction_description, comment=parsed_comment)
elif action == "block/reblock":
link = link_formatter(create_article_path(change["title"]))
user = change["title"].split(':')[1]
content = _("[{author}]({author_url}) changed block settings for [{blocked_user}]({user_url}){comment}").format(author=author, author_url=author_url, blocked_user=user, user_url=link, comment=parsed_comment)
elif action == "block/unblock":
link = link_formatter(create_article_path(change["title"]))
user = change["title"].split(':')[1]
content = _("[{author}]({author_url}) unblocked [{blocked_user}]({user_url}){comment}").format(author=author, author_url=author_url, blocked_user=user, user_url=link, comment=parsed_comment)
elif action == "curseprofile/comment-created":
link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])))
content = _("[{author}]({author_url}) left a [comment]({comment}) on {target} profile").format(author=author, author_url=author_url, comment=link, target=change["title"].split(':')[1]+"'s" if change["title"].split(':')[1] != change["user"] else _("their own profile"))
elif action == "curseprofile/comment-replied":
link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])))
content = _("[{author}]({author_url}) replied to a [comment]({comment}) on {target} profile").format(author=author,
author_url=author_url,
comment=link,
target=change["title"].split(':')[1] if change["title"].split(':')[1] !=change["user"] else _("their own"))
elif action == "curseprofile/comment-edited":
link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])))
content = _("[{author}]({author_url}) edited a [comment]({comment}) on {target} profile").format(author=author,
author_url=author_url,
comment=link,
target=change["title"].split(':')[1] if change["title"].split(':')[1] !=change["user"] else _("their own"))
elif action == "curseprofile/comment-purged":
link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])))
content = _("[{author}]({author_url}) purged a comment on {target} profile").format(author=author,
author_url=author_url,
target=
change["title"].split(':')[
1] if
change["title"].split(':')[
1] != change[
"user"] else _(
"their own"))
elif action == "curseprofile/comment-deleted":
content = _("[{author}]({author_url}) deleted a comment on {target} profile").format(author=author,
author_url=author_url,
target=change["title"].split(':')[1] if change["title"].split(':')[1] !=change["user"] else _("their own"))
elif action == "curseprofile/profile-edited":
link = link_formatter(create_article_path("UserProfile:{user}".format(user=change["title"].split(":")[1])))
target = _("[{target}]({target_url})'s").format(target=change["title"].split(':')[1], target_url=link) if change["title"].split(':')[1] != author else _("[their own]({target_url})").format(target_url=link)
content = _("[{author}]({author_url}) edited the {field} on {target} profile. *({desc})*").format(author=author,
author_url=author_url,
target=target,
field=profile_field_name(change["logparams"]['4:section'], False),
desc=BeautifulSoup(change["parsedcomment"], "lxml").get_text())
elif action in ("rights/rights", "rights/autopromote"):
link = link_formatter(create_article_path("User:{user}".format(user=change["title"].split(":")[1])))
old_groups = []
new_groups = []
for name in change["logparams"]["oldgroups"]:
old_groups.append(_(name))
for name in change["logparams"]["newgroups"]:
new_groups.append(_(name))
if len(old_groups) == 0:
old_groups = [_("none")]
if len(new_groups) == 0:
new_groups = [_("none")]
if action == "rights/rights":
content = "[{author}]({author_url}) changed group membership for [{target}]({target_url}) from {old_groups} to {new_groups}{comment}".format(author=author, author_url=author_url, target=change["title"].split(":")[1], target_url=link, old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), comment=parsed_comment)
else:
content = "{author} autopromoted [{target}]({target_url}) from {old_groups} to {new_groups}{comment}".format(
author=_("System"), author_url=author_url, target=change["title"].split(":")[1], target_url=link,
old_groups=", ".join(old_groups), new_groups=', '.join(new_groups),
comment=parsed_comment)
elif action == "protect/protect":
link = link_formatter(create_article_path(change["title"]))
content = _("[{author}]({author_url}) protected [{article}]({article_url}) with the following settings: {settings}{comment}").format(author=author, author_url=author_url,
article=change["title"], article_url=link,
settings=change["logparams"]["description"]+_(" [cascading]") if "cascade" in change["logparams"] else "",
comment=parsed_comment)
elif action == "protect/modify":
link = link_formatter(create_article_path(change["title"]))
content = _(
"[{author}]({author_url}) modified protection settings of [{article}]({article_url}) to: {settings}{comment}").format(
author=author, author_url=author_url,
article=change["title"], article_url=link,
settings=change["logparams"]["description"] + _(" [cascading]") if "cascade" in change["logparams"] else "",
comment=parsed_comment)
elif action == "protect/unprotect":
link = link_formatter(create_article_path(change["title"]))
content = _("[{author}]({author_url}) removed protection from [{article}]({article_url}){comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, comment=parsed_comment)
elif action == "delete/revision":
amount = len(change["logparams"]["ids"])
link = link_formatter(create_article_path(change["title"]))
content = ngettext("[{author}]({author_url}) changed visibility of revision on page [{article}]({article_url}){comment}",
"[{author}]({author_url}) changed visibility of {amount} revisions on page [{article}]({article_url}){comment}", amount).format(author=author, author_url=author_url,
article=change["title"], article_url=link, amount=amount, comment=parsed_comment)
elif action == "import/upload":
link = link_formatter(create_article_path(change["title"]))
content = ngettext("[{author}]({author_url}) imported [{article}]({article_url}) with {count} revision{comment}",
"[{author}]({author_url}) imported [{article}]({article_url}) with {count} revisions{comment}", change["logparams"]["count"]).format(
author=author, author_url=author_url, article=change["title"], article_url=link, count=change["logparams"]["count"], comment=parsed_comment)
elif action == "delete/restore":
link = link_formatter(create_article_path(change["title"]))
content = _("[{author}]({author_url}) restored [{article}]({article_url}){comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, comment=parsed_comment)
elif action == "delete/event":
content = _("[{author}]({author_url}) changed visibility of log events{comment}").format(author=author, author_url=author_url, comment=parsed_comment)
elif action == "import/interwiki":
content = _("[{author}]({author_url}) imported interwiki{comment}").format(author=author, author_url=author_url, comment=parsed_comment)
elif action == "abusefilter/modify":
link = link_formatter(create_article_path("Special:AbuseFilter/history/{number}/diff/prev/{historyid}".format(number=change["logparams"]['newId'], historyid=change["logparams"]["historyId"])))
content = _("[{author}]({author_url}) edited abuse filter [number {number}]({filter_url})").format(author=author, author_url=author_url, number=change["logparams"]['newId'], filter_url=link)
elif action == "abusefilter/create":
link = link_formatter(
create_article_path("Special:AbuseFilter/{number}".format(number=change["logparams"]['newId'])))
content = _("[{author}]({author_url}) created abuse filter [number {number}]({filter_url})").format(author=author, author_url=author_url, number=change["logparams"]['newId'], filter_url=link)
elif action == "merge/merge":
link = link_formatter(create_article_path(change["title"]))
link_dest = link_formatter(create_article_path(change["logparams"]["dest_title"]))
content = _("[{author}]({author_url}) merged revision histories of [{article}]({article_url}) into [{dest}]({dest_url}){comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, dest_url=link_dest,
dest=change["logparams"]["dest_title"], comment=parsed_comment)
elif action == "interwiki/iw_add":
link = link_formatter(create_article_path("Special:Interwiki"))
content = _("[{author}]({author_url}) added an entry to the [interwiki table]({table_url}) pointing to {website} with {prefix} prefix").format(author=author, author_url=author_url, desc=parsed_comment,
prefix=change["logparams"]['0'],
website=change["logparams"]['1'],
table_url=link)
elif action == "interwiki/iw_edit":
link = link_formatter(create_article_path("Special:Interwiki"))
content = _("[{author}]({author_url}) edited an entry in [interwiki table]({table_url}) pointing to {website} with {prefix} prefix").format(author=author, author_url=author_url, desc=parsed_comment,
prefix=change["logparams"]['0'],
website=change["logparams"]['1'],
table_url=link)
elif action == "interwiki/iw_delete":
link = link_formatter(create_article_path("Special:Interwiki"))
content = _("[{author}]({author_url}) deleted an entry in [interwiki table]({table_url})").format(author=author, author_url=author_url, table_url=link)
elif action == "contentmodel/change":
link = link_formatter(create_article_path(change["title"]))
content = _("[{author}]({author_url}) changed the content model of the page [{article}]({article_url}) from {old} to {new}{comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, old=change["logparams"]["oldmodel"],
new=change["logparams"]["newmodel"], comment=parsed_comment)
elif action == "sprite/sprite":
link = link_formatter(create_article_path(change["title"]))
content = _("[{author}]({author_url}) edited the sprite for [{article}]({article_url})").format(author=author, author_url=author_url, article=change["title"], article_url=link)
elif action == "sprite/sheet":
link = link_formatter(create_article_path(change["title"]))
content = _("[{author}]({author_url}) created the sprite sheet for [{article}]({article_url})").format(author=author, author_url=author_url, article=change["title"], article_url=link)
elif action == "sprite/slice":
link = link_formatter(create_article_path(change["title"]))
content = _("[{author}]({author_url}) edited the slice for [{article}]({article_url})").format(author=author, author_url=author_url, article=change["title"], article_url=link)
elif action == "cargo/createtable":
LinkParser.feed(change["logparams"]["0"])
table = LinkParser.new_string
LinkParser.new_string = ""
content = _("[{author}]({author_url}) created the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=table)
elif action == "cargo/deletetable":
content = _("[{author}]({author_url}) deleted the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=change["logparams"]["0"])
elif action == "cargo/recreatetable":
LinkParser.feed(change["logparams"]["0"])
table = LinkParser.new_string
LinkParser.new_string = ""
content = _("[{author}]({author_url}) recreated the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=table)
elif action == "cargo/replacetable":
LinkParser.feed(change["logparams"]["0"])
table = LinkParser.new_string
LinkParser.new_string = ""
content = _("[{author}]({author_url}) replaced the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=table)
elif action == "managetags/create":
link = link_formatter(create_article_path("Special:Tags"))
content = _("[{author}]({author_url}) created a [tag]({tag_url}) \"{tag}\"").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link)
recent_changes.init_info()
elif action == "managetags/delete":
link = link_formatter(create_article_path("Special:Tags"))
content = _("[{author}]({author_url}) deleted a [tag]({tag_url}) \"{tag}\"").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link)
recent_changes.init_info()
elif action == "managetags/activate":
link = link_formatter(create_article_path("Special:Tags"))
content = _("[{author}]({author_url}) activated a [tag]({tag_url}) \"{tag}\"").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link)
elif action == "managetags/deactivate":
link = link_formatter(create_article_path("Special:Tags"))
content = _("[{author}]({author_url}) deactivated a [tag]({tag_url}) \"{tag}\"").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link)
elif action == "suppressed":
content = _("An action has been hidden by administration.")
else:
logger.warning("No entry for {event} with params: {params}".format(event=action, params=change))
return
send_to_discord(DiscordMessage("compact", action, settings["webhookURL"], content=content))
def embed_formatter(action, change, parsed_comment, categories, recent_changes):
embed = DiscordMessage("embed", action, settings["webhookURL"])
if parsed_comment is None:
parsed_comment = _("No description provided")
if action != "suppressed":
if "anon" in change:
author_url = create_article_path("Special:Contributions/{user}".format(user=change["user"].replace(" ", "_"))) # Replace here needed in case of #75
logger.debug("current user: {} with cache of IPs: {}".format(change["user"], recent_changes.map_ips.keys()))
if change["user"] not in list(recent_changes.map_ips.keys()):
contibs = safe_read(recent_changes.safe_request(
"{wiki}?action=query&format=json&list=usercontribs&uclimit=max&ucuser={user}&ucstart={timestamp}&ucprop=".format(
wiki=WIKI_API_PATH, user=change["user"], timestamp=change["timestamp"])), "query", "usercontribs")
if contibs is None:
logger.warning(
"WARNING: Something went wrong when checking amount of contributions for given IP address")
change["user"] = change["user"] + "(?)"
else:
recent_changes.map_ips[change["user"]] = len(contibs)
logger.debug("Current params user {} and state of map_ips {}".format(change["user"], recent_changes.map_ips))
change["user"] = "{author} ({contribs})".format(author=change["user"], contribs=len(contibs))
else:
logger.debug(
"Current params user {} and state of map_ips {}".format(change["user"], recent_changes.map_ips))
if action in ("edit", "new"):
recent_changes.map_ips[change["user"]] += 1
change["user"] = "{author} ({amount})".format(author=change["user"],
amount=recent_changes.map_ips[change["user"]])
else:
author_url = create_article_path("User:{}".format(change["user"].replace(" ", "_")))
embed.set_author(change["user"], author_url)
if action in ("edit", "new"): # edit or new page
editsize = change["newlen"] - change["oldlen"]
if editsize > 0:
if editsize > 6032:
embed["color"] = 65280
else:
embed["color"] = 35840 + (math.floor(editsize / 52)) * 256
elif editsize < 0:
if editsize < -6032:
embed["color"] = 16711680
else:
embed["color"] = 9175040 + (math.floor((editsize * -1) / 52)) * 65536
elif editsize == 0:
embed["color"] = 8750469
if change["title"].startswith("MediaWiki:Tag-"): # Refresh tag list when tag display name is edited
recent_changes.init_info()
link = "{wiki}index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format(
wiki=WIKI_SCRIPT_PATH, pageid=change["pageid"], diff=change["revid"], oldrev=change["old_revid"],
article=change["title"].replace(" ", "_"))
embed["title"] = "{redirect}{article} ({new}{minor}{bot}{space}{editsize})".format(redirect="" if "redirect" in change else "", article=change["title"], editsize="+" + str(
editsize) if editsize > 0 else editsize, new=_("(N!) ") if action == "new" else "",
minor=_("m") if action == "edit" and "minor" in change else "", bot=_('b') if "bot" in change else "", space=" " if "bot" in change or (action == "edit" and "minor" in change) or action == "new" else "")
if settings["appearance"]["embed"]["show_edit_changes"]:
if action == "new":
changed_content = safe_read(recent_changes.safe_request(
"{wiki}?action=compare&format=json&fromtext=&torev={diff}&topst=1&prop=diff".format(
wiki=WIKI_API_PATH, diff=change["revid"]
)), "compare", "*")
else:
changed_content = safe_read(recent_changes.safe_request(
"{wiki}?action=compare&format=json&fromrev={oldrev}&torev={diff}&topst=1&prop=diff".format(
wiki=WIKI_API_PATH, diff=change["revid"],oldrev=change["old_revid"]
)), "compare", "*")
if changed_content:
EditDiff = ContentParser()
EditDiff.feed(changed_content)
if EditDiff.small_prev_del:
if EditDiff.small_prev_del.replace("~~", "").isspace():
EditDiff.small_prev_del = _('__Only whitespace__')
else:
EditDiff.small_prev_del = EditDiff.small_prev_del.replace("~~~~", "")
if EditDiff.small_prev_ins:
if EditDiff.small_prev_ins.replace("**", "").isspace():
EditDiff.small_prev_ins = _('__Only whitespace__')
else:
EditDiff.small_prev_ins = EditDiff.small_prev_ins.replace("****", "")
logger.debug("Changed content: {}".format(EditDiff.small_prev_ins))
if EditDiff.small_prev_del and not action == "new":
embed.add_field(_("Removed"), "{data}".format(data=EditDiff.small_prev_del), inline=True)
if EditDiff.small_prev_ins:
embed.add_field(_("Added"), "{data}".format(data=EditDiff.small_prev_ins), inline=True)
else:
logger.warning("Unable to download data on the edit content!")
elif action in ("upload/overwrite", "upload/upload", "upload/revert"): # sending files
license = None
urls = safe_read(recent_changes.safe_request(
"{wiki}?action=query&format=json&prop=imageinfo&list=&meta=&titles={filename}&iiprop=timestamp%7Curl%7Carchivename&iilimit=5".format(
wiki=WIKI_API_PATH, filename=change["title"])), "query", "pages")
link = create_article_path(change["title"].replace(" ", "_"))
additional_info_retrieved = False
if urls is not None:
logger.debug(urls)
if "-1" not in urls: # image still exists and not removed
try:
img_info = next(iter(urls.values()))["imageinfo"]
for num, revision in enumerate(img_info):
if revision["timestamp"] == change["logparams"]["img_timestamp"]: # find the correct revision corresponding for this log entry
image_direct_url = "{rev}?{cache}".format(rev=revision["url"], cache=int(time.time()*5)) # cachebusting
additional_info_retrieved = True
break
except KeyError:
logger.warning("Wiki did not respond with extended information about file. The preview will not be shown.")
else:
logger.warning("Request for additional image information have failed. The preview will not be shown.")
if action in ("upload/overwrite", "upload/revert"):
if additional_info_retrieved:
article_encoded = change["title"].replace(" ", "_").replace(')', '\)')
try:
revision = img_info[num+1]
except IndexError:
logger.exception("Could not analize the information about the image (does it have only one version when expected more in overwrite?) which resulted in no Options field: {}".format(img_info))
else:
undolink = "{wiki}index.php?title={filename}&action=revert&oldimage={archiveid}".format(
wiki=WIKI_SCRIPT_PATH, filename=article_encoded, archiveid=revision["archivename"])
embed.add_field(_("Options"), _("([preview]({link}) | [undo]({undolink}))").format(
link=image_direct_url, undolink=undolink))
if settings["appearance"]["embed"]["embed_images"]:
embed["image"]["url"] = image_direct_url
if action == "upload/overwrite":
embed["title"] = _("Uploaded a new version of {name}").format(name=change["title"])
elif action == "upload/revert":
embed["title"] = _("Reverted a version of {name}").format(name=change["title"])
else:
embed["title"] = _("Uploaded {name}").format(name=change["title"])
if settings["license_detection"]:
article_content = safe_read(recent_changes.safe_request(
"{wiki}?action=query&format=json&prop=revisions&titles={article}&rvprop=content".format(
wiki=WIKI_API_PATH, article=quote_plus(change["title"], safe=''))), "query", "pages")
if article_content is None:
logger.warning("Something went wrong when getting license for the image")
return 0
if "-1" not in article_content:
content = list(article_content.values())[0]['revisions'][0]['*']
try:
matches = re.search(re.compile(settings["license_regex"], re.IGNORECASE), content)
if matches is not None:
license = matches.group("license")
else:
if re.search(re.compile(settings["license_regex_detect"], re.IGNORECASE), content) is None:
license = _("**No license!**")
else:
license = "?"
except IndexError:
logger.error(
"Given regex for the license detection is incorrect. It does not have a capturing group called \"license\" specified. Please fix license_regex value in the config!")
license = "?"
except re.error:
logger.error(
"Given regex for the license detection is incorrect. Please fix license_regex or license_regex_detect values in the config!")
license = "?"
if license is not None:
parsed_comment += _("\nLicense: {}").format(license)
if additional_info_retrieved:
embed.add_field(_("Options"), _("([preview]({link}))").format(link=image_direct_url))
if settings["appearance"]["embed"]["embed_images"]:
embed["image"]["url"] = image_direct_url
elif action == "delete/delete":
link = create_article_path(change["title"].replace(" ", "_"))
embed["title"] = _("Deleted page {article}").format(article=change["title"])
elif action == "delete/delete_redir":
link = create_article_path(change["title"].replace(" ", "_"))
embed["title"] = _("Deleted redirect {article} by overwriting").format(article=change["title"])
elif action == "move/move":
link = create_article_path(change["logparams"]['target_title'].replace(" ", "_"))
parsed_comment = "{supress}. {desc}".format(desc=parsed_comment,
supress=_("No redirect has been made") if "suppressredirect" in change["logparams"] else _(
"A redirect has been made"))
embed["title"] = _("Moved {redirect}{article} to {target}").format(redirect="" if "redirect" in change else "", article=change["title"], target=change["logparams"]['target_title'])
elif action == "move/move_redir":
link = create_article_path(change["logparams"]["target_title"].replace(" ", "_"))
embed["title"] = _("Moved {redirect}{article} to {title} over redirect").format(redirect="" if "redirect" in change else "", article=change["title"],
title=change["logparams"]["target_title"])
elif action == "protect/move_prot":
link = create_article_path(change["logparams"]["oldtitle_title"].replace(" ", "_"))
embed["title"] = _("Moved protection settings from {redirect}{article} to {title}").format(redirect="" if "redirect" in change else "", article=change["logparams"]["oldtitle_title"],
title=change["title"])
elif action == "block/block":
user = change["title"].split(':')[1]
try:
ipaddress.ip_address(user)
link = create_article_path("Special:Contributions/{user}".format(user=user))
except ValueError:
link = create_article_path(change["title"].replace(" ", "_").replace(')', '\)'))
if change["logparams"]["duration"] == "infinite":
block_time = _("infinity and beyond")
else:
english_length = re.sub(r"(\d+)", "", change["logparams"]["duration"]) #note that translation won't work for millenia and century yet
english_length_num = re.sub(r"(\D+)", "", change["logparams"]["duration"])
try:
english_length = english_length.rstrip("s").strip()
block_time = "{num} {translated_length}".format(num=english_length_num, translated_length=ngettext(english_length, english_length + "s", int(english_length_num)))
except AttributeError:
logger.error("Could not strip s from the block event, seems like the regex didn't work?")
return
if "sitewide" not in change["logparams"]:
restriction_description = ""
if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]:
restriction_description = _("Blocked from editing the following pages: ")
for page in change["logparams"]["restrictions"]["pages"]:
restricted_pages = ["*"+i["page_title"]+"*" for i in change["logparams"]["restrictions"]["pages"]]
restriction_description = restriction_description + ", ".join(restricted_pages)
if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["namespaces"]:
namespaces = []
if restriction_description:
restriction_description = restriction_description + _(" and namespaces: ")
else:
restriction_description = _("Blocked from editing pages on following namespaces: ")
for namespace in change["logparams"]["restrictions"]["namespaces"]:
if str(namespace) in recent_changes.namespaces: # if we have cached namespace name for given namespace number, add its name to the list
namespaces.append("*{ns}*".format(ns=recent_changes.namespaces[str(namespace)]["*"]))
else:
namespaces.append("*{ns}*".format(ns=namespace))
restriction_description = restriction_description + ", ".join(namespaces)
restriction_description = restriction_description + "."
if len(restriction_description) > 1020:
logger.debug(restriction_description)
restriction_description = restriction_description[:1020]+""
embed.add_field(_("Partial block details"), restriction_description, inline=True)
embed["title"] = _("Blocked {blocked_user} for {time}").format(blocked_user=user, time=block_time)
elif action == "block/reblock":
link = create_article_path(change["title"].replace(" ", "_").replace(')', '\)'))
user = change["title"].split(':')[1]
embed["title"] = _("Changed block settings for {blocked_user}").format(blocked_user=user)
elif action == "block/unblock":
link = create_article_path(change["title"].replace(" ", "_").replace(')', '\)'))
user = change["title"].split(':')[1]
embed["title"] = _("Unblocked {blocked_user}").format(blocked_user=user)
elif action == "curseprofile/comment-created":
if settings["appearance"]["embed"]["show_edit_changes"]:
parsed_comment = recent_changes.pull_comment(change["logparams"]["4:comment_id"])
link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))
embed["title"] = _("Left a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \
change["user"] else _(
"Left a comment on their own profile")
elif action == "curseprofile/comment-replied":
if settings["appearance"]["embed"]["show_edit_changes"]:
parsed_comment = recent_changes.pull_comment(change["logparams"]["4:comment_id"])
link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))
embed["title"] = _("Replied to a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \
change["user"] else _(
"Replied to a comment on their own profile")
elif action == "curseprofile/comment-edited":
if settings["appearance"]["embed"]["show_edit_changes"]:
parsed_comment = recent_changes.pull_comment(change["logparams"]["4:comment_id"])
link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))
embed["title"] = _("Edited a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \
change["user"] else _(
"Edited a comment on their own profile")
elif action == "curseprofile/profile-edited":
link = create_article_path("UserProfile:{target}".format(target=change["title"].split(':')[1].replace(" ", "_").replace(')', '\)')))
embed["title"] = _("Edited {target}'s profile").format(target=change["title"].split(':')[1]) if change["user"] != change["title"].split(':')[1] else _("Edited their own profile")
if not change["parsedcomment"]: # If the field is empty
parsed_comment = _("Cleared the {field} field").format(field=profile_field_name(change["logparams"]['4:section'], True))
else:
parsed_comment = _("{field} field changed to: {desc}").format(field=profile_field_name(change["logparams"]['4:section'], True), desc=BeautifulSoup(change["parsedcomment"], "lxml").get_text())
elif action == "curseprofile/comment-purged":
link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))
embed["title"] = _("Purged a comment on {target}'s profile").format(target=change["title"].split(':')[1])
elif action == "curseprofile/comment-deleted":
if "4:comment_id" in change["logparams"]:
link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))
else:
link = create_article_path(change["title"])
embed["title"] = _("Deleted a comment on {target}'s profile").format(target=change["title"].split(':')[1])
elif action in ("rights/rights", "rights/autopromote"):
link = create_article_path("User:{}".format(change["title"].split(":")[1].replace(" ", "_")))
if action == "rights/rights":
embed["title"] = _("Changed group membership for {target}").format(target=change["title"].split(":")[1])
else:
change["user"] = _("System")
author_url = ""
embed["title"] = _("{target} got autopromoted to a new usergroup").format(
target=change["title"].split(":")[1])
if len(change["logparams"]["oldgroups"]) < len(change["logparams"]["newgroups"]):
embed["thumbnail"]["url"] = "https://i.imgur.com/WnGhF5g.gif"
old_groups = []
new_groups = []
for name in change["logparams"]["oldgroups"]:
old_groups.append(_(name))
for name in change["logparams"]["newgroups"]:
new_groups.append(_(name))
if len(old_groups) == 0:
old_groups = [_("none")]
if len(new_groups) == 0:
new_groups = [_("none")]
reason = ": {desc}".format(desc=parsed_comment) if parsed_comment != _("No description provided") else ""
parsed_comment = _("Groups changed from {old_groups} to {new_groups}{reason}").format(
old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), reason=reason)
elif action == "protect/protect":
link = create_article_path(change["title"].replace(" ", "_"))
embed["title"] = _("Protected {target}").format(target=change["title"])
parsed_comment = "{settings}{cascade} | {reason}".format(settings=change["logparams"]["description"],
cascade=_(" [cascading]") if "cascade" in change["logparams"] else "",
reason=parsed_comment)
elif action == "protect/modify":
link = create_article_path(change["title"].replace(" ", "_"))
embed["title"] = _("Changed protection level for {article}").format(article=change["title"])
parsed_comment = "{settings}{cascade} | {reason}".format(settings=change["logparams"]["description"],
cascade=_(" [cascading]") if "cascade" in change["logparams"] else "",
reason=parsed_comment)
elif action == "protect/unprotect":
link = create_article_path(change["title"].replace(" ", "_"))
embed["title"] = _("Removed protection from {article}").format(article=change["title"])
elif action == "delete/revision":
amount = len(change["logparams"]["ids"])
link = create_article_path(change["title"].replace(" ", "_"))
embed["title"] = ngettext("Changed visibility of revision on page {article} ",
"Changed visibility of {amount} revisions on page {article} ", amount).format(
article=change["title"], amount=amount)
elif action == "import/upload":
link = create_article_path(change["title"].replace(" ", "_"))
embed["title"] = ngettext("Imported {article} with {count} revision",
"Imported {article} with {count} revisions", change["logparams"]["count"]).format(
article=change["title"], count=change["logparams"]["count"])
elif action == "delete/restore":
link = create_article_path(change["title"].replace(" ", "_"))
embed["title"] = _("Restored {article}").format(article=change["title"])
elif action == "delete/event":
link = create_article_path("Special:RecentChanges")
embed["title"] = _("Changed visibility of log events")
elif action == "import/interwiki":
link = create_article_path("Special:RecentChanges")
embed["title"] = _("Imported interwiki")
elif action == "abusefilter/modify":
link = create_article_path("Special:AbuseFilter/history/{number}/diff/prev/{historyid}".format(number=change["logparams"]['newId'], historyid=change["logparams"]["historyId"]))
embed["title"] = _("Edited abuse filter number {number}").format(number=change["logparams"]['newId'])
elif action == "abusefilter/create":
link = create_article_path("Special:AbuseFilter/{number}".format(number=change["logparams"]['newId']))
embed["title"] = _("Created abuse filter number {number}").format(number=change["logparams"]['newId'])
elif action == "merge/merge":
link = create_article_path(change["title"].replace(" ", "_"))
embed["title"] = _("Merged revision histories of {article} into {dest}").format(article=change["title"],
dest=change["logparams"]["dest_title"])
elif action == "interwiki/iw_add":
link = create_article_path("Special:Interwiki")
embed["title"] = _("Added an entry to the interwiki table")
parsed_comment = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=parsed_comment,
prefix=change["logparams"]['0'],
website=change["logparams"]['1'])
elif action == "interwiki/iw_edit":
link = create_article_path("Special:Interwiki")
embed["title"] = _("Edited an entry in interwiki table")
parsed_comment = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=parsed_comment,
prefix=change["logparams"]['0'],
website=change["logparams"]['1'])
elif action == "interwiki/iw_delete":
link = create_article_path("Special:Interwiki")
embed["title"] = _("Deleted an entry in interwiki table")
parsed_comment = _("Prefix: {prefix} | {desc}").format(desc=parsed_comment, prefix=change["logparams"]['0'])
elif action == "contentmodel/change":
link = create_article_path(change["title"].replace(" ", "_"))
embed["title"] = _("Changed the content model of the page {article}").format(article=change["title"])
parsed_comment = _("Model changed from {old} to {new}: {reason}").format(old=change["logparams"]["oldmodel"],
new=change["logparams"]["newmodel"],
reason=parsed_comment)
elif action == "sprite/sprite":
link = create_article_path(change["title"].replace(" ", "_"))
embed["title"] = _("Edited the sprite for {article}").format(article=change["title"])
elif action == "sprite/sheet":
link = create_article_path(change["title"].replace(" ", "_"))
embed["title"] = _("Created the sprite sheet for {article}").format(article=change["title"])
elif action == "sprite/slice":
link = create_article_path(change["title"].replace(" ", "_"))
embed["title"] = _("Edited the slice for {article}").format(article=change["title"])
elif action == "cargo/createtable":
LinkParser.feed(change["logparams"]["0"])
table = re.search(r"\[(.*?)\]\(<(.*?)>\)", LinkParser.new_string)
LinkParser.new_string = ""
link = table.group(2)
embed["title"] = _("Created the Cargo table \"{table}\"").format(table=table.group(1))
parsed_comment = None
elif action == "cargo/deletetable":
link = create_article_path("Special:CargoTables")
embed["title"] = _("Deleted the Cargo table \"{table}\"").format(table=change["logparams"]["0"])
parsed_comment = None
elif action == "cargo/recreatetable":
LinkParser.feed(change["logparams"]["0"])
table = re.search(r"\[(.*?)\]\(<(.*?)>\)", LinkParser.new_string)
LinkParser.new_string = ""
link = table.group(2)
embed["title"] = _("Recreated the Cargo table \"{table}\"").format(table=table.group(1))
parsed_comment = None
elif action == "cargo/replacetable":
LinkParser.feed(change["logparams"]["0"])
table = re.search(r"\[(.*?)\]\(<(.*?)>\)", LinkParser.new_string)
LinkParser.new_string = ""
link = table.group(2)
embed["title"] = _("Replaced the Cargo table \"{table}\"").format(table=table.group(1))
parsed_comment = None
elif action == "managetags/create":
link = create_article_path("Special:Tags")
embed["title"] = _("Created a tag \"{tag}\"").format(tag=change["logparams"]["tag"])
recent_changes.init_info()
elif action == "managetags/delete":
link = create_article_path("Special:Tags")
embed["title"] = _("Deleted a tag \"{tag}\"").format(tag=change["logparams"]["tag"])
recent_changes.init_info()
elif action == "managetags/activate":
link = create_article_path("Special:Tags")
embed["title"] = _("Activated a tag \"{tag}\"").format(tag=change["logparams"]["tag"])
elif action == "managetags/deactivate":
link = create_article_path("Special:Tags")
embed["title"] = _("Deactivated a tag \"{tag}\"").format(tag=change["logparams"]["tag"])
elif action == "suppressed":
link = create_article_path("")
embed["title"] = _("Action has been hidden by administration.")
embed["author"]["name"] = _("Unknown")
else:
logger.warning("No entry for {event} with params: {params}".format(event=action, params=change))
embed["author"]["icon_url"] = settings["appearance"]["embed"][action]["icon"]
embed["url"] = link
if parsed_comment is not None:
embed["description"] = parsed_comment
if settings["appearance"]["embed"]["show_footer"]:
embed["timestamp"] = change["timestamp"]
if "tags" in change and change["tags"]:
tag_displayname = []
for tag in change["tags"]:
if tag in recent_changes.tags:
if recent_changes.tags[tag] is None:
continue # Ignore hidden tags
else:
tag_displayname.append(recent_changes.tags[tag])
else:
tag_displayname.append(tag)
embed.add_field(_("Tags"), ", ".join(tag_displayname))
logger.debug("Current params in edit action: {}".format(change))
if categories is not None and not (len(categories["new"]) == 0 and len(categories["removed"]) == 0):
new_cat = (_("**Added**: ") + ", ".join(list(categories["new"])[0:16]) + ("\n" if len(categories["new"])<=15 else _(" and {} more\n").format(len(categories["new"])-15))) if categories["new"] else ""
del_cat = (_("**Removed**: ") + ", ".join(list(categories["removed"])[0:16]) + ("" if len(categories["removed"])<=15 else _(" and {} more").format(len(categories["removed"])-15))) if categories["removed"] else ""
embed.add_field(_("Changed categories"), new_cat + del_cat)
embed.finish_embed()
send_to_discord(embed)

File diff suppressed because it is too large Load diff