diff --git a/locale/uk/LC_MESSAGES/rcgcdw.mo b/locale/uk/LC_MESSAGES/rcgcdw.mo index 262b3bf..8c9e243 100644 Binary files a/locale/uk/LC_MESSAGES/rcgcdw.mo and b/locale/uk/LC_MESSAGES/rcgcdw.mo differ diff --git a/locale/uk/LC_MESSAGES/rcgcdw.po b/locale/uk/LC_MESSAGES/rcgcdw.po index 753639c..bb44c2b 100644 --- a/locale/uk/LC_MESSAGES/rcgcdw.po +++ b/locale/uk/LC_MESSAGES/rcgcdw.po @@ -8,7 +8,7 @@ msgstr "" "Project-Id-Version: \n" "Report-Msgid-Bugs-To: \n" "POT-Creation-Date: 2020-03-17 20:53+0100\n" -"PO-Revision-Date: 2020-03-18 13:51+0100\n" +"PO-Revision-Date: 2020-07-12 12:17+0200\n" "Last-Translator: \n" "Language-Team: \n" "Language: uk\n" @@ -511,7 +511,7 @@ msgstr "__Тільки пробіли__" #: rcgcdw.py:594 msgid "Removed" -msgstr "Вилучено" +msgstr "видалено" #: rcgcdw.py:597 msgid "Added" @@ -858,7 +858,7 @@ msgstr " та ще {}\n" #: rcgcdw.py:957 msgid "**Removed**: " -msgstr "**Вилучено**: " +msgstr "**видалено**: " #: rcgcdw.py:957 msgid " and {} more" diff --git a/src/discussion_formatters.py b/src/discussion_formatters.py new file mode 100644 index 0000000..ace0cf2 --- /dev/null +++ b/src/discussion_formatters.py @@ -0,0 +1,209 @@ +import datetime, logging +import json +import gettext +from urllib.parse import quote_plus + +from src.configloader import settings +from src.misc import DiscordMessage, send_to_discord, escape_formatting +from src.i18n import disc + +_ = disc.gettext + + +discussion_logger = logging.getLogger("rcgcdw.discussion_formatter") + +def embed_formatter(post, post_type): + """Embed formatter for Fandom discussions.""" + embed = DiscordMessage("embed", "discussion", settings["fandom_discussions"]["webhookURL"]) + embed.set_author(post["createdBy"]["name"], "{wikiurl}f/u/{creatorId}".format( + wikiurl=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"]), icon_url=post["createdBy"]["avatarUrl"]) + discussion_post_type = post["_embedded"]["thread"][0].get("containerType", "FORUM") # Can be FORUM, ARTICLE_COMMENT or WALL on UCP + if post_type == "TEXT": + if post["isReply"]: + if discussion_post_type == "FORUM": + embed.event_type = "discussion/forum/reply" + embed["title"] = _("Replied to \"{title}\"").format(title=post["_embedded"]["thread"][0]["title"]) + embed["url"] = "{wikiurl}f/p/{threadId}/r/{postId}".format( + wikiurl=settings["fandom_discussions"]["wiki_url"], threadId=post["threadId"], postId=post["id"]) + elif discussion_post_type == "ARTICLE_COMMENT": + discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037") + return + elif discussion_post_type == "WALL": + user_wall = _("unknown") # Fail safe + embed.event_type = "discussion/wall/reply" + if post["forumName"].endswith(' Message Wall'): + user_wall = post["forumName"][:-13] + embed["url"] = "{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}".format(wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"]) + embed["title"] = _("Replied to \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall) + else: + if discussion_post_type == "FORUM": + embed.event_type = "discussion/forum/post" + embed["title"] = _("Created \"{title}\"").format(title=post["title"]) + embed["url"] = "{wikiurl}f/p/{threadId}".format(wikiurl=settings["fandom_discussions"]["wiki_url"], + threadId=post["threadId"]) + elif discussion_post_type == "ARTICLE_COMMENT": + discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037") + return + elif discussion_post_type == "WALL": + user_wall = _("unknown") # Fail safe + embed.event_type = "discussion/wall/post" + if post["forumName"].endswith(' Message Wall'): + user_wall = post["forumName"][:-13] + embed["url"] = "{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}".format( + wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), + threadid=post["threadId"]) + embed["title"] = _("Created \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall) + if settings["fandom_discussions"]["appearance"]["embed"]["show_content"]: + if post.get("jsonModel") is not None: + npost = DiscussionsFromHellParser(post) + embed["description"] = npost.parse() + if npost.image_last: + embed["image"]["url"] = npost.image_last + embed["description"] = embed["description"].replace(npost.image_last, "") + else: # Fallback when model is not available + embed["description"] = post.get("rawContent", "") + elif post_type == "POLL": + embed.event_type = "discussion/forum/poll" + poll = post["poll"] + embed["title"] = _("Created a poll titled \"{title}\"").format(title=poll["question"]) + image_type = False + if poll["answers"][0]["image"] is not None: + image_type = True + for num, option in enumerate(poll["answers"]): + embed.add_field(option["text"] if image_type is True else _("Option {}").format(num+1), + option["text"] if image_type is False else _("__[View image]({image_url})__").format(image_url=option["image"]["url"]), + inline=True) + embed["footer"]["text"] = post["forumName"] + embed["timestamp"] = datetime.datetime.fromtimestamp(post["creationDate"]["epochSecond"], tz=datetime.timezone.utc).isoformat() + embed.finish_embed() + send_to_discord(embed) + + +def compact_formatter(post, post_type): + """Compact formatter for Fandom discussions.""" + message = None + discussion_post_type = post["_embedded"]["thread"][0].get("containerType", + "FORUM") # Can be FORUM, ARTICLE_COMMENT or WALL on UCP + if post_type == "TEXT": + if not post["isReply"]: + if discussion_post_type == "FORUM": + message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{url}f/p/{threadId}>) in {forumName}").format( + author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"]) + elif discussion_post_type == "ARTICLE_COMMENT": + discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037") + return + elif discussion_post_type == "WALL": + user_wall = _("unknown") # Fail safe + if post["forumName"].endswith(' Message Wall'): + user_wall = post["forumName"][:-13] + message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}>) on {user}'s Message Wall").format( + author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall, + wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"] + ) + else: + if discussion_post_type == "FORUM": + message = _("[{author}](<{url}f/u/{creatorId}>) created a [reply](<{url}f/p/{threadId}/r/{postId}>) to [{title}](<{url}f/p/{threadId}>) in {forumName}").format( + author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], threadId=post["threadId"], postId=post["id"], title=post["_embedded"]["thread"][0]["title"], forumName=post["forumName"] + ) + elif discussion_post_type == "ARTICLE_COMMENT": + discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037") + return + elif discussion_post_type == "WALL": + user_wall = _("unknown") # Fail safe + if post["forumName"].endswith(' Message Wall'): + user_wall = post["forumName"][:-13] + message = _( + "[{author}](<{url}f/u/{creatorId}>) replied to [{title}](<{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}>) on {user}'s Message Wall").format( + author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall, + wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"]) + + elif post_type == "POLL": + message = _( + "[{author}](<{url}f/u/{creatorId}>) created a poll [{title}](<{url}f/p/{threadId}>) in {forumName}").format( + author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], + creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"]) + send_to_discord(DiscordMessage("compact", "discussion", settings["fandom_discussions"]["webhookURL"], content=message)) + + +class DiscussionsFromHellParser: + """This class converts fairly convoluted Fandom jsonModal of a discussion post into Markdown formatted usable thing. Takes string, returns string. + Kudos to MarkusRost for allowing me to implement this formatter based on his code in Wiki-Bot.""" + def __init__(self, post): + self.post = post + self.jsonModal = json.loads(post.get("jsonModel", "{}")) + self.markdown_text = "" + self.item_num = 1 + self.image_last = None + + def parse(self) -> str: + """Main parsing logic""" + self.parse_content(self.jsonModal["content"]) + if len(self.markdown_text) > 2000: + self.markdown_text = self.markdown_text[0:2000] + "…" + return self.markdown_text + + def parse_content(self, content, ctype=None): + self.image_last = None + for item in content: + if ctype == "bulletList": + self.markdown_text += "\t• " + if ctype == "orderedList": + self.markdown_text += "\t{num}. ".format(num=self.item_num) + self.item_num += 1 + if item["type"] == "text": + if "marks" in item: + prefix, suffix = self.convert_marks(item["marks"]) + self.markdown_text = "{old}{pre}{text}{suf}".format(old=self.markdown_text, pre=prefix, text=escape_formatting(item["text"]), suf=suffix) + else: + if ctype == "code_block": + self.markdown_text += item["text"] # ignore formatting on preformatted text which cannot have additional formatting anyways + else: + self.markdown_text += escape_formatting(item["text"]) + elif item["type"] == "paragraph": + if "content" in item: + self.parse_content(item["content"], item["type"]) + self.markdown_text += "\n" + elif item["type"] == "openGraph": + if not item["attrs"]["wasAddedWithInlineLink"]: + self.markdown_text = "{old}{link}\n".format(old=self.markdown_text, link=item["attrs"]["url"]) + elif item["type"] == "image": + try: + discussion_logger.debug(item["attrs"]["id"]) + if item["attrs"]["id"] is not None: + self.markdown_text = "{old}{img_url}\n".format(old=self.markdown_text, img_url=self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"]) + self.image_last = self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"] + except (IndexError, ValueError): + discussion_logger.warning("Image {} not found.".format(item["attrs"]["id"])) + discussion_logger.debug(self.markdown_text) + elif item["type"] == "code_block": + self.markdown_text += "```\n" + if "content" in item: + self.parse_content(item["content"], item["type"]) + self.markdown_text += "\n```\n" + elif item["type"] == "bulletList": + if "content" in item: + self.parse_content(item["content"], item["type"]) + elif item["type"] == "orderedList": + self.item_num = 1 + if "content" in item: + self.parse_content(item["content"], item["type"]) + elif item["type"] == "listItem": + self.parse_content(item["content"], item["type"]) + + def convert_marks(self, marks): + prefix = "" + suffix = "" + for mark in marks: + if mark["type"] == "mention": + prefix += "[" + suffix = "]({wiki}f/u/{userid}){suffix}".format(wiki=settings["fandom_discussions"]["wiki_url"], userid=mark["attrs"]["userId"], suffix=suffix) + elif mark["type"] == "strong": + prefix += "**" + suffix = "**{suffix}".format(suffix=suffix) + elif mark["type"] == "link": + prefix += "[" + suffix = "]({link}){suffix}".format(link=mark["attrs"]["href"], suffix=suffix) + elif mark["type"] == "em": + prefix += "_" + suffix = "_" + suffix + return prefix, suffix \ No newline at end of file diff --git a/src/discussions.py b/src/discussions.py index 8a390e1..ebc93ac 100644 --- a/src/discussions.py +++ b/src/discussions.py @@ -16,11 +16,11 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import logging, gettext, schedule, requests, json, datetime -from collections import defaultdict +import logging, gettext, schedule, requests from src.configloader import settings -from urllib.parse import quote_plus -from src.misc import datafile, send_to_discord, DiscordMessage, WIKI_SCRIPT_PATH, escape_formatting, messagequeue + +from src.discussion_formatters import embed_formatter, compact_formatter +from src.misc import datafile, messagequeue from src.session import session # Initialize translation @@ -43,119 +43,6 @@ storage = datafile.data fetch_url = "https://services.fandom.com/discussion/{wikiid}/posts?sortDirection=descending&sortKey=creation_date&limit={limit}".format(wikiid=settings["fandom_discussions"]["wiki_id"], limit=settings["fandom_discussions"]["limit"]) -def embed_formatter(post, post_type): - """Embed formatter for Fandom discussions.""" - embed = DiscordMessage("embed", "discussion", settings["fandom_discussions"]["webhookURL"]) - embed.set_author(post["createdBy"]["name"], "{wikiurl}f/u/{creatorId}".format( - wikiurl=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"]), icon_url=post["createdBy"]["avatarUrl"]) - discussion_post_type = post["_embedded"]["thread"][0].get("containerType", "FORUM") # Can be FORUM, ARTICLE_COMMENT or WALL on UCP - if post_type == "TEXT": - if post["isReply"]: - if discussion_post_type == "FORUM": - embed.event_type = "discussion/forum/reply" - embed["title"] = _("Replied to \"{title}\"").format(title=post["_embedded"]["thread"][0]["title"]) - embed["url"] = "{wikiurl}f/p/{threadId}/r/{postId}".format( - wikiurl=settings["fandom_discussions"]["wiki_url"], threadId=post["threadId"], postId=post["id"]) - elif discussion_post_type == "ARTICLE_COMMENT": - discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037") - return - elif discussion_post_type == "WALL": - user_wall = _("unknown") # Fail safe - embed.event_type = "discussion/wall/reply" - if post["forumName"].endswith(' Message Wall'): - user_wall = post["forumName"][:-13] - embed["url"] = "{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}".format(wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"]) - embed["title"] = _("Replied to \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall) - else: - if discussion_post_type == "FORUM": - embed.event_type = "discussion/forum/post" - embed["title"] = _("Created \"{title}\"").format(title=post["title"]) - embed["url"] = "{wikiurl}f/p/{threadId}".format(wikiurl=settings["fandom_discussions"]["wiki_url"], - threadId=post["threadId"]) - elif discussion_post_type == "ARTICLE_COMMENT": - discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037") - return - elif discussion_post_type == "WALL": - user_wall = _("unknown") # Fail safe - embed.event_type = "discussion/wall/post" - if post["forumName"].endswith(' Message Wall'): - user_wall = post["forumName"][:-13] - embed["url"] = "{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}".format( - wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), - threadid=post["threadId"]) - embed["title"] = _("Created \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall) - if settings["fandom_discussions"]["appearance"]["embed"]["show_content"]: - if post.get("jsonModel") is not None: - npost = DiscussionsFromHellParser(post) - embed["description"] = npost.parse() - if npost.image_last: - embed["image"]["url"] = npost.image_last - embed["description"] = embed["description"].replace(npost.image_last, "") - else: # Fallback when model is not available - embed["description"] = post.get("rawContent", "") - elif post_type == "POLL": - embed.event_type = "discussion/forum/poll" - poll = post["poll"] - embed["title"] = _("Created a poll titled \"{title}\"").format(title=poll["question"]) - image_type = False - if poll["answers"][0]["image"] is not None: - image_type = True - for num, option in enumerate(poll["answers"]): - embed.add_field(option["text"] if image_type is True else _("Option {}").format(num+1), - option["text"] if image_type is False else _("__[View image]({image_url})__").format(image_url=option["image"]["url"]), - inline=True) - embed["footer"]["text"] = post["forumName"] - embed["timestamp"] = datetime.datetime.fromtimestamp(post["creationDate"]["epochSecond"], tz=datetime.timezone.utc).isoformat() - embed.finish_embed() - send_to_discord(embed) - - -def compact_formatter(post, post_type): - """Compact formatter for Fandom discussions.""" - message = None - discussion_post_type = post["_embedded"]["thread"][0].get("containerType", - "FORUM") # Can be FORUM, ARTICLE_COMMENT or WALL on UCP - if post_type == "TEXT": - if not post["isReply"]: - if discussion_post_type == "FORUM": - message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{url}f/p/{threadId}>) in {forumName}").format( - author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"]) - elif discussion_post_type == "ARTICLE_COMMENT": - discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037") - return - elif discussion_post_type == "WALL": - user_wall = _("unknown") # Fail safe - if post["forumName"].endswith(' Message Wall'): - user_wall = post["forumName"][:-13] - message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}>) on {user}'s Message Wall").format( - author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall, - wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"] - ) - else: - if discussion_post_type == "FORUM": - message = _("[{author}](<{url}f/u/{creatorId}>) created a [reply](<{url}f/p/{threadId}/r/{postId}>) to [{title}](<{url}f/p/{threadId}>) in {forumName}").format( - author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], threadId=post["threadId"], postId=post["id"], title=post["_embedded"]["thread"][0]["title"], forumName=post["forumName"] - ) - elif discussion_post_type == "ARTICLE_COMMENT": - discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037") - return - elif discussion_post_type == "WALL": - user_wall = _("unknown") # Fail safe - if post["forumName"].endswith(' Message Wall'): - user_wall = post["forumName"][:-13] - message = _( - "[{author}](<{url}f/u/{creatorId}>) replied to [{title}](<{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}>) on {user}'s Message Wall").format( - author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall, - wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"]) - - elif post_type == "POLL": - message = _( - "[{author}](<{url}f/u/{creatorId}>) created a poll [{title}](<{url}f/p/{threadId}>) in {forumName}").format( - author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], - creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"]) - send_to_discord(DiscordMessage("compact", "discussion", settings["fandom_discussions"]["webhookURL"], content=message)) - - def fetch_discussions(): messagequeue.resend_msgs() request = safe_request(fetch_url) @@ -188,89 +75,6 @@ def parse_discussion_post(post): else: discussion_logger.warning("The type of {} is an unknown discussion post type. Please post an issue on the project page to have it added https://gitlab.com/piotrex43/RcGcDw/-/issues.") -class DiscussionsFromHellParser: - """This class converts fairly convoluted Fandom jsonModal of a discussion post into Markdown formatted usable thing. Takes string, returns string. - Kudos to MarkusRost for allowing me to implement this formatter based on his code in Wiki-Bot.""" - def __init__(self, post): - self.post = post - self.jsonModal = json.loads(post.get("jsonModel", "{}")) - self.markdown_text = "" - self.item_num = 1 - self.image_last = None - - def parse(self) -> str: - """Main parsing logic""" - self.parse_content(self.jsonModal["content"]) - if len(self.markdown_text) > 2000: - self.markdown_text = self.markdown_text[0:2000] + "…" - return self.markdown_text - - def parse_content(self, content, ctype=None): - self.image_last = None - for item in content: - if ctype == "bulletList": - self.markdown_text += "\t• " - if ctype == "orderedList": - self.markdown_text += "\t{num}. ".format(num=self.item_num) - self.item_num += 1 - if item["type"] == "text": - if "marks" in item: - prefix, suffix = self.convert_marks(item["marks"]) - self.markdown_text = "{old}{pre}{text}{suf}".format(old=self.markdown_text, pre=prefix, text=escape_formatting(item["text"]), suf=suffix) - else: - if ctype == "code_block": - self.markdown_text += item["text"] # ignore formatting on preformatted text which cannot have additional formatting anyways - else: - self.markdown_text += escape_formatting(item["text"]) - elif item["type"] == "paragraph": - if "content" in item: - self.parse_content(item["content"], item["type"]) - self.markdown_text += "\n" - elif item["type"] == "openGraph": - if not item["attrs"]["wasAddedWithInlineLink"]: - self.markdown_text = "{old}{link}\n".format(old=self.markdown_text, link=item["attrs"]["url"]) - elif item["type"] == "image": - try: - discussion_logger.debug(item["attrs"]["id"]) - if item["attrs"]["id"] is not None: - self.markdown_text = "{old}{img_url}\n".format(old=self.markdown_text, img_url=self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"]) - self.image_last = self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"] - except (IndexError, ValueError): - discussion_logger.warning("Image {} not found.".format(item["attrs"]["id"])) - discussion_logger.debug(self.markdown_text) - elif item["type"] == "code_block": - self.markdown_text += "```\n" - if "content" in item: - self.parse_content(item["content"], item["type"]) - self.markdown_text += "\n```\n" - elif item["type"] == "bulletList": - if "content" in item: - self.parse_content(item["content"], item["type"]) - elif item["type"] == "orderedList": - self.item_num = 1 - if "content" in item: - self.parse_content(item["content"], item["type"]) - elif item["type"] == "listItem": - self.parse_content(item["content"], item["type"]) - - def convert_marks(self, marks): - prefix = "" - suffix = "" - for mark in marks: - if mark["type"] == "mention": - prefix += "[" - suffix = "]({wiki}f/u/{userid}){suffix}".format(wiki=settings["fandom_discussions"]["wiki_url"], userid=mark["attrs"]["userId"], suffix=suffix) - elif mark["type"] == "strong": - prefix += "**" - suffix = "**{suffix}".format(suffix=suffix) - elif mark["type"] == "link": - prefix += "[" - suffix = "]({link}){suffix}".format(link=mark["attrs"]["href"], suffix=suffix) - elif mark["type"] == "em": - prefix += "_" - suffix = "_" + suffix - return prefix, suffix - def safe_request(url): """Function to assure safety of request, and do not crash the script on exceptions,""" diff --git a/src/exceptions.py b/src/exceptions.py new file mode 100644 index 0000000..1871d8c --- /dev/null +++ b/src/exceptions.py @@ -0,0 +1,2 @@ +class MWError(Exception): + pass \ No newline at end of file diff --git a/src/i18n.py b/src/i18n.py new file mode 100644 index 0000000..8cc6cad --- /dev/null +++ b/src/i18n.py @@ -0,0 +1,15 @@ +import gettext, sys, logging +from src.configloader import settings +logger = logging.getLogger("rcgcdw.i18n") + +# Setup translation + +try: + lang = gettext.translation('rcgcdw', localedir='locale', languages=[settings["lang"]]) + disc = gettext.translation('discussions', localedir='locale', languages=[settings["lang"]]) +except FileNotFoundError: + logger.critical("No language files have been found. Make sure locale folder is located in the directory.") + sys.exit(1) + +lang.install() +ngettext = lang.ngettext \ No newline at end of file diff --git a/src/misc.py b/src/misc.py index c0716e1..ee67882 100644 --- a/src/misc.py +++ b/src/misc.py @@ -15,7 +15,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . - +import base64 import json, logging, sys, re, time, random, math from html.parser import HTMLParser from urllib.parse import urlparse, urlunparse @@ -42,6 +42,8 @@ WIKI_ARTICLE_PATH: str = "" WIKI_SCRIPT_PATH: str = "" WIKI_JUST_DOMAIN: str = "" +profile_fields = {"profile-location": _("Location"), "profile-aboutme": _("About me"), "profile-link-google": _("Google link"), "profile-link-facebook":_("Facebook link"), "profile-link-twitter": _("Twitter link"), "profile-link-reddit": _("Reddit link"), "profile-link-twitch": _("Twitch link"), "profile-link-psn": _("PSN link"), "profile-link-vk": _("VK link"), "profile-link-xbl": _("XBL link"), "profile-link-steam": _("Steam link"), "profile-link-discord": _("Discord handle"), "profile-link-battlenet": _("Battle.net handle")} + class DataFile: """Data class which instance of is shared by multiple modules to remain consistent and do not cause too many IO operations.""" def __init__(self): @@ -413,4 +415,46 @@ class DiscordMessage(): self.webhook_object["avatar_url"] = url def set_name(self, name): - self.webhook_object["username"] = name \ No newline at end of file + self.webhook_object["username"] = name + + +def profile_field_name(name, embed): + try: + return profile_fields[name] + except KeyError: + if embed: + return _("Unknown") + else: + return _("unknown") + + +class LinkParser(HTMLParser): + new_string = "" + recent_href = "" + + def handle_starttag(self, tag, attrs): + for attr in attrs: + if attr[0] == 'href': + self.recent_href = attr[1] + if self.recent_href.startswith("//"): + self.recent_href = "https:{rest}".format(rest=self.recent_href) + elif not self.recent_href.startswith("http"): + self.recent_href = WIKI_JUST_DOMAIN + self.recent_href + self.recent_href = self.recent_href.replace(")", "\\)") + elif attr[0] == 'data-uncrawlable-url': + self.recent_href = attr[1].encode('ascii') + self.recent_href = base64.b64decode(self.recent_href) + self.recent_href = WIKI_JUST_DOMAIN + self.recent_href.decode('ascii') + + def handle_data(self, data): + if self.recent_href: + self.new_string = self.new_string + "[{}](<{}>)".format(data, self.recent_href) + self.recent_href = "" + else: + self.new_string = self.new_string + data + + def handle_comment(self, data): + self.new_string = self.new_string + data + + def handle_endtag(self, tag): + misc_logger.debug(self.new_string) \ No newline at end of file diff --git a/src/rc.py b/src/rc.py new file mode 100644 index 0000000..781a056 --- /dev/null +++ b/src/rc.py @@ -0,0 +1,352 @@ +import re +import sys +import time +import logging +import requests +from bs4 import BeautifulSoup + +from src.configloader import settings +from src.misc import WIKI_SCRIPT_PATH, WIKI_API_PATH, messagequeue, datafile, send_simple, safe_read, LinkParser +from src.exceptions import MWError +from src.session import session +from src.rc_formatters import compact_formatter, embed_formatter +storage = datafile.data + +logger = logging.getLogger("rcgcdw.rc") + +supported_logs = ["protect/protect", "protect/modify", "protect/unprotect", "upload/overwrite", "upload/upload", "delete/delete", "delete/delete_redir", "delete/restore", "delete/revision", "delete/event", "import/upload", "import/interwiki", "merge/merge", "move/move", "move/move_redir", "protect/move_prot", "block/block", "block/unblock", "block/reblock", "rights/rights", "rights/autopromote", "abusefilter/modify", "abusefilter/create", "interwiki/iw_add", "interwiki/iw_edit", "interwiki/iw_delete", "curseprofile/comment-created", "curseprofile/comment-edited", "curseprofile/comment-deleted", "curseprofile/comment-purged", "curseprofile/profile-edited", "curseprofile/comment-replied", "contentmodel/change", "sprite/sprite", "sprite/sheet", "sprite/slice", "managetags/create", "managetags/delete", "managetags/activate", "managetags/deactivate", "tag/update", "cargo/createtable", "cargo/deletetable", "cargo/recreatetable", "cargo/replacetable", "upload/revert"] + +# Set the proper formatter +if settings["appearance"]["mode"] == "embed": + appearance_mode = embed_formatter +elif settings["appearance"]["mode"] == "compact": + appearance_mode = compact_formatter +else: + logger.critical("Unknown formatter!") + sys.exit(1) + + +LinkParser = LinkParser() + +class Recent_Changes_Class(object): + """Store verious data and functions related to wiki and fetching of Recent Changes""" + def __init__(self): + self.ids = [] + self.map_ips = {} + self.recent_id = 0 + self.downtimecredibility = 0 + self.last_downtime = 0 + self.tags = {} + self.groups = {} + self.streak = -1 + self.mw_messages = {} + self.namespaces = None + self.session = session + self.logged_in = False + if settings["limitrefetch"] != -1: + self.file_id = storage["rcid"] + else: + self.file_id = 999999999 # such value won't cause trouble, and it will make sure no refetch happen + + @staticmethod + def handle_mw_errors(request): + if "errors" in request: + logger.error(request["errors"]) + raise MWError + return request + + def log_in(self): + # session.cookies.clear() + if '@' not in settings["wiki_bot_login"]: + logger.error( + "Please provide proper nickname for login from {wiki}Special:BotPasswords".format( + wiki=WIKI_SCRIPT_PATH)) + return + if len(settings["wiki_bot_password"]) != 32: + logger.error( + "Password seems incorrect. It should be 32 characters long! Grab it from {wiki}Special:BotPasswords".format( + wiki=WIKI_SCRIPT_PATH)) + return + logger.info("Trying to log in to {wiki}...".format(wiki=WIKI_SCRIPT_PATH)) + try: + response = self.handle_mw_errors( + self.session.post(WIKI_API_PATH, + data={'action': 'query', 'format': 'json', 'utf8': '', 'meta': 'tokens', + 'type': 'login'})) + response = self.handle_mw_errors( + self.session.post(WIKI_API_PATH, + data={'action': 'login', 'format': 'json', 'utf8': '', + 'lgname': settings["wiki_bot_login"], + 'lgpassword': settings["wiki_bot_password"], + 'lgtoken': response.json()['query']['tokens']['logintoken']})) + except ValueError: + logger.error("Logging in have not succeeded") + return + except MWError: + logger.error("Logging in have not succeeded") + return + try: + if response.json()['login']['result'] == "Success": + logger.info("Successfully logged in") + self.logged_in = True + else: + logger.error("Logging in have not succeeded") + except: + logger.error("Logging in have not succeeded") + + def add_cache(self, change): + self.ids.append(change["rcid"]) + # self.recent_id = change["rcid"] + if len(self.ids) > settings["limitrefetch"] + 5: + self.ids.pop(0) + + def fetch(self, amount=settings["limit"]): + messagequeue.resend_msgs() + last_check = self.fetch_changes(amount=amount) + # If the request succeeds the last_check will be the last rcid from recentchanges query + if last_check is not None: + self.recent_id = last_check + # Assigns self.recent_id the last rcid if request succeeded, otherwise set the id from the file + if settings["limitrefetch"] != -1 and self.recent_id != self.file_id and self.recent_id != 0: # if saving to database is disabled, don't save the recent_id + self.file_id = self.recent_id + storage["rcid"] = self.recent_id + datafile.save_datafile() + logger.debug("Most recent rcid is: {}".format(self.recent_id)) + return self.recent_id + + def fetch_changes(self, amount, clean=False): + """Fetches the :amount: of changes from the wiki. + Returns None on error and int of rcid of latest change if succeeded""" + global logged_in + if len(self.ids) == 0: + logger.debug("ids is empty, triggering clean fetch") + clean = True + changes = self.safe_request( + "{wiki}?action=query&format=json&list=recentchanges{show_bots}&rcprop=title%7Credirect%7Ctimestamp%7Cids%7Cloginfo%7Cparsedcomment%7Csizes%7Cflags%7Ctags%7Cuser&rclimit={amount}&rctype=edit%7Cnew%7Clog%7Cexternal{categorize}".format( + wiki=WIKI_API_PATH, amount=amount, categorize="%7Ccategorize" if settings["show_added_categories"] else "", show_bots="&rcshow=!bot" if settings["show_bots"] is False else "")) + if changes: + try: + changes = changes.json()['query']['recentchanges'] + changes.reverse() + except ValueError: + logger.warning("ValueError in fetching changes") + logger.warning("Changes URL:" + changes.url) + self.downtime_controller() + return None + except KeyError: + logger.warning("Wiki returned %s" % (changes.json())) + return None + else: + if self.downtimecredibility > 0: + self.downtimecredibility -= 1 + if self.streak > -1: + self.streak += 1 + if self.streak > 8: + self.streak = -1 + send_simple("down_detector", _("Connection to {wiki} seems to be stable now.").format(wiki=settings["wikiname"]), + _("Connection status"), settings["avatars"]["connection_restored"]) + # In the first for loop we analize the categorize events and figure if we will need more changes to fetch + # in order to cover all of the edits + categorize_events = {} + new_events = 0 + for change in changes: + if not (change["rcid"] in self.ids or change["rcid"] < self.recent_id) and not clean: + new_events += 1 + logger.debug( + "New event: {}".format(change["rcid"])) + if new_events == settings["limit"]: + if amount < 500: + # call the function again with max limit for more results, ignore the ones in this request + logger.debug("There were too many new events, requesting max amount of events from the wiki.") + return self.fetch(amount=5000 if self.logged_in else 500) + else: + logger.debug( + "There were too many new events, but the limit was high enough we don't care anymore about fetching them all.") + if change["type"] == "categorize": + if "commenthidden" not in change: + if len(recent_changes.mw_messages.keys()) > 0: + cat_title = change["title"].split(':', 1)[1] + # I so much hate this, blame Markus for making me do this + if change["revid"] not in categorize_events: + categorize_events[change["revid"]] = {"new": set(), "removed": set()} + comment_to_match = re.sub(r'<.*?a>', '', change["parsedcomment"]) + if recent_changes.mw_messages["recentchanges-page-added-to-category"] in comment_to_match or recent_changes.mw_messages["recentchanges-page-added-to-category-bundled"] in comment_to_match: + categorize_events[change["revid"]]["new"].add(cat_title) + logger.debug("Matched {} to added category for {}".format(cat_title, change["revid"])) + elif recent_changes.mw_messages["recentchanges-page-removed-from-category"] in comment_to_match or recent_changes.mw_messages["recentchanges-page-removed-from-category-bundled"] in comment_to_match: + categorize_events[change["revid"]]["removed"].add(cat_title) + logger.debug("Matched {} to removed category for {}".format(cat_title, change["revid"])) + else: + logger.debug("Unknown match for category change with messages {}, {}, {}, {} and comment_to_match {}".format(recent_changes.mw_messages["recentchanges-page-added-to-category"], recent_changes.mw_messages["recentchanges-page-removed-from-category"], recent_changes.mw_messages["recentchanges-page-removed-from-category-bundled"], recent_changes.mw_messages["recentchanges-page-added-to-category-bundled"], comment_to_match)) + else: + logger.warning("Init information not available, could not read category information. Please restart the bot.") + else: + logger.debug("Log entry got suppressed, ignoring entry.") + # if change["revid"] in categorize_events: + # categorize_events[change["revid"]].append(cat_title) + # else: + # logger.debug("New category '{}' for {}".format(cat_title, change["revid"])) + # categorize_events[change["revid"]] = {cat_title: } + for change in changes: + if change["rcid"] in self.ids or change["rcid"] < self.recent_id: + logger.debug("Change ({}) is in ids or is lower than recent_id {}".format(change["rcid"], + self.recent_id)) + continue + logger.debug(self.ids) + logger.debug(self.recent_id) + self.add_cache(change) + if clean and not (self.recent_id == 0 and change["rcid"] > self.file_id): + logger.debug("Rejected {val}".format(val=change["rcid"])) + continue + essential_info(change, categorize_events.get(change.get("revid"), None)) + return change["rcid"] + + def safe_request(self, url): + try: + request = self.session.get(url, timeout=10, allow_redirects=False) + except requests.exceptions.Timeout: + logger.warning("Reached timeout error for request on link {url}".format(url=url)) + self.downtime_controller() + return None + except requests.exceptions.ConnectionError: + logger.warning("Reached connection error for request on link {url}".format(url=url)) + self.downtime_controller() + return None + except requests.exceptions.ChunkedEncodingError: + logger.warning("Detected faulty response from the web server for request on link {url}".format(url=url)) + self.downtime_controller() + return None + else: + if 499 < request.status_code < 600: + self.downtime_controller() + return None + elif request.status_code == 302: + logger.critical("Redirect detected! Either the wiki given in the script settings (wiki field) is incorrect/the wiki got removed or Gamepedia is giving us the false value. Please provide the real URL to the wiki, current URL redirects to {}".format(request.next.url)) + sys.exit(0) + return request + + def check_connection(self, looped=False): + online = 0 + for website in ["https://google.com", "https://instagram.com", "https://steamcommunity.com"]: + try: + requests.get(website, timeout=10) + online += 1 + except requests.exceptions.ConnectionError: + pass + except requests.exceptions.Timeout: + pass + if online < 1: + logger.error("Failure when checking Internet connection at {time}".format( + time=time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime()))) + self.downtimecredibility = 0 + if not looped: + while 1: # recursed loop, check for connection (every 10 seconds) as long as three services are down, don't do anything else + if self.check_connection(looped=True): + recent_changes.fetch(amount=settings["limitrefetch"]) + break + time.sleep(10) + return False + return True + + def downtime_controller(self): + if not settings["show_updown_messages"]: + return + if self.streak > -1: # reset the streak of successful connections when bad one happens + self.streak = 0 + if self.downtimecredibility < 60: + self.downtimecredibility += 15 + else: + if ( + time.time() - self.last_downtime) > 1800 and self.check_connection(): # check if last downtime happened within 30 minutes, if yes, don't send a message + send_simple("down_detector", _("{wiki} seems to be down or unreachable.").format(wiki=settings["wikiname"]), + _("Connection status"), settings["avatars"]["connection_failed"]) + self.last_downtime = time.time() + self.streak = 0 + + def clear_cache(self): + self.map_ips = {} + + def init_info(self): + startup_info = safe_read(self.safe_request( + "{wiki}?action=query&format=json&uselang=content&list=tags&meta=allmessages%7Csiteinfo&utf8=1&tglimit=max&tgprop=displayname&ammessages=recentchanges-page-added-to-category%7Crecentchanges-page-removed-from-category%7Crecentchanges-page-added-to-category-bundled%7Crecentchanges-page-removed-from-category-bundled&amenableparser=1&amincludelocal=1&siprop=namespaces".format( + wiki=WIKI_API_PATH)), "query") + if startup_info: + if "tags" in startup_info and "allmessages" in startup_info: + for tag in startup_info["tags"]: + try: + self.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text() + except KeyError: + self.tags[tag["name"]] = None # Tags with no display name are hidden and should not appear on RC as well + for message in startup_info["allmessages"]: + if not "missing" in message: # ignore missing strings + self.mw_messages[message["name"]] = message["*"] + else: + logging.warning("Could not fetch the MW message translation for: {}".format(message["name"])) + for key, message in self.mw_messages.items(): + if key.startswith("recentchanges-page-"): + self.mw_messages[key] = re.sub(r'\[\[.*?\]\]', '', message) + self.namespaces = startup_info["namespaces"] + logger.info("Gathered information about the tags and interface messages.") + else: + logger.warning("Could not retrieve initial wiki information. Some features may not work correctly!") + logger.debug(startup_info) + else: + logger.error("Could not retrieve initial wiki information. Possibly internet connection issue?") + + def pull_comment(self, comment_id): + try: + comment = self.handle_mw_errors(self.safe_request( + "{wiki}?action=comment&do=getRaw&comment_id={comment}&format=json".format(wiki=WIKI_API_PATH, + comment=comment_id)).json())[ + "text"] + logger.debug("Got the following comment from the API: {}".format(comment)) + except MWError: + pass + except (TypeError, AttributeError): + logger.exception("Could not resolve the comment text.") + except KeyError: + logger.exception("CurseProfile extension API did not respond with a valid comment content.") + else: + if len(comment) > 1000: + comment = comment[0:1000] + "…" + return comment + return "" + + +recent_changes = Recent_Changes_Class() + +def essential_info(change, changed_categories): + """Prepares essential information for both embed and compact message format.""" + logger.debug(change) + if ("actionhidden" in change or "suppressed" in change) and "suppressed" not in settings["ignored"]: # if event is hidden using suppression + appearance_mode("suppressed", change, "", changed_categories, recent_changes) + return + if "commenthidden" not in change: + LinkParser.feed(change["parsedcomment"]) + parsed_comment = LinkParser.new_string + LinkParser.new_string = "" + parsed_comment = re.sub(r"(`|_|\*|~|{|}|\|\|)", "\\\\\\1", parsed_comment, 0) + else: + parsed_comment = _("~~hidden~~") + if not parsed_comment: + parsed_comment = None + if change["type"] in ["edit", "new"]: + logger.debug("List of categories in essential_info: {}".format(changed_categories)) + if "userhidden" in change: + change["user"] = _("hidden") + identification_string = change["type"] + elif change["type"] == "log": + identification_string = "{logtype}/{logaction}".format(logtype=change["logtype"], logaction=change["logaction"]) + if identification_string not in supported_logs: + logger.warning( + "This event is not implemented in the script. Please make an issue on the tracker attaching the following info: wiki url, time, and this information: {}".format( + change)) + return + elif change["type"] == "categorize": + return + else: + logger.warning("This event is not implemented in the script. Please make an issue on the tracker attaching the following info: wiki url, time, and this information: {}".format(change)) + return + if identification_string in settings["ignored"]: + return + appearance_mode(identification_string, change, parsed_comment, changed_categories, recent_changes) diff --git a/src/rc_formatters.py b/src/rc_formatters.py new file mode 100644 index 0000000..c52b2d3 --- /dev/null +++ b/src/rc_formatters.py @@ -0,0 +1,742 @@ +import ipaddress +import math +import re +import time +import logging +from urllib.parse import quote_plus + +from bs4 import BeautifulSoup + +from src.configloader import settings +from src.misc import link_formatter, create_article_path, WIKI_SCRIPT_PATH, send_to_discord, DiscordMessage, safe_read, \ + WIKI_API_PATH, ContentParser, profile_field_name, LinkParser +from src.i18n import lang +#from src.rc import recent_changes, pull_comment +ngettext = lang.ngettext + +logger = logging.getLogger("rcgcdw.rc_formatters") +#from src.rcgcdw import recent_changes, ngettext, logger, profile_field_name, LinkParser, pull_comment + +LinkParser = LinkParser() + +def compact_formatter(action, change, parsed_comment, categories, recent_changes): + if action != "suppressed": + author_url = link_formatter(create_article_path("User:{user}".format(user=change["user"]))) + author = change["user"] + parsed_comment = "" if parsed_comment is None else " *("+parsed_comment+")*" + parsed_comment = re.sub(r"([^<]|\A)(http(s)://.*?)( |\Z)", "\\1<\\2>\\4", parsed_comment) # see #97 + if action in ["edit", "new"]: + edit_link = link_formatter("{wiki}index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format( + wiki=WIKI_SCRIPT_PATH, pageid=change["pageid"], diff=change["revid"], oldrev=change["old_revid"], + article=change["title"])) + edit_size = change["newlen"] - change["oldlen"] + if edit_size > 0: + sign = "+" + else: + sign = "" + if change["title"].startswith("MediaWiki:Tag-"): + pass + if action == "edit": + content = _("[{author}]({author_url}) edited [{article}]({edit_link}){comment} ({sign}{edit_size})").format(author=author, author_url=author_url, article=change["title"], edit_link=edit_link, comment=parsed_comment, edit_size=edit_size, sign=sign) + else: + content = _("[{author}]({author_url}) created [{article}]({edit_link}){comment} ({sign}{edit_size})").format(author=author, author_url=author_url, article=change["title"], edit_link=edit_link, comment=parsed_comment, edit_size=edit_size, sign=sign) + elif action =="upload/upload": + file_link = link_formatter(create_article_path(change["title"])) + content = _("[{author}]({author_url}) uploaded [{file}]({file_link}){comment}").format(author=author, + author_url=author_url, + file=change["title"], + file_link=file_link, + comment=parsed_comment) + elif action == "upload/revert": + file_link = link_formatter(create_article_path(change["title"])) + content = _("[{author}]({author_url}) reverted a version of [{file}]({file_link}){comment}").format( + author=author, author_url=author_url, file=change["title"], file_link=file_link, comment=parsed_comment) + elif action == "upload/overwrite": + file_link = link_formatter(create_article_path(change["title"])) + content = _("[{author}]({author_url}) uploaded a new version of [{file}]({file_link}){comment}").format(author=author, author_url=author_url, file=change["title"], file_link=file_link, comment=parsed_comment) + elif action == "delete/delete": + page_link = link_formatter(create_article_path(change["title"])) + content = _("[{author}]({author_url}) deleted [{page}]({page_link}){comment}").format(author=author, author_url=author_url, page=change["title"], page_link=page_link, + comment=parsed_comment) + elif action == "delete/delete_redir": + page_link = link_formatter(create_article_path(change["title"])) + content = _("[{author}]({author_url}) deleted redirect by overwriting [{page}]({page_link}){comment}").format(author=author, author_url=author_url, page=change["title"], page_link=page_link, + comment=parsed_comment) + elif action == "move/move": + link = link_formatter(create_article_path(change["logparams"]['target_title'])) + redirect_status = _("without making a redirect") if "suppressredirect" in change["logparams"] else _("with a redirect") + content = _("[{author}]({author_url}) moved {redirect}*{article}* to [{target}]({target_url}) {made_a_redirect}{comment}").format(author=author, author_url=author_url, redirect="⤷ " if "redirect" in change else "", article=change["title"], + target=change["logparams"]['target_title'], target_url=link, comment=parsed_comment, made_a_redirect=redirect_status) + elif action == "move/move_redir": + link = link_formatter(create_article_path(change["logparams"]["target_title"])) + redirect_status = _("without making a redirect") if "suppressredirect" in change["logparams"] else _( + "with a redirect") + content = _("[{author}]({author_url}) moved {redirect}*{article}* over redirect to [{target}]({target_url}) {made_a_redirect}{comment}").format(author=author, author_url=author_url, redirect="⤷ " if "redirect" in change else "", article=change["title"], + target=change["logparams"]['target_title'], target_url=link, comment=parsed_comment, made_a_redirect=redirect_status) + elif action == "protect/move_prot": + link = link_formatter(create_article_path(change["logparams"]["oldtitle_title"])) + content = _( + "[{author}]({author_url}) moved protection settings from {redirect}*{article}* to [{target}]({target_url}){comment}").format(author=author, author_url=author_url, redirect="⤷ " if "redirect" in change else "", article=change["logparams"]["oldtitle_title"], + target=change["title"], target_url=link, comment=parsed_comment) + elif action == "block/block": + user = change["title"].split(':')[1] + restriction_description = "" + try: + ipaddress.ip_address(user) + link = link_formatter(create_article_path("Special:Contributions/{user}".format(user=user))) + except ValueError: + link = link_formatter(create_article_path(change["title"])) + if change["logparams"]["duration"] == "infinite": + block_time = _("infinity and beyond") + else: + english_length = re.sub(r"(\d+)", "", change["logparams"][ + "duration"]) # note that translation won't work for millenia and century yet + english_length_num = re.sub(r"(\D+)", "", change["logparams"]["duration"]) + try: + english_length = english_length.rstrip("s").strip() + block_time = "{num} {translated_length}".format(num=english_length_num, + translated_length=ngettext(english_length, + english_length + "s", + int(english_length_num))) + except AttributeError: + logger.error("Could not strip s from the block event, seems like the regex didn't work?") + return + if "sitewide" not in change["logparams"]: + restriction_description = "" + if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]: + restriction_description = _(" on pages: ") + for page in change["logparams"]["restrictions"]["pages"]: + restricted_pages = ["*{page}*".format(page=i["page_title"]) for i in change["logparams"]["restrictions"]["pages"]] + restriction_description = restriction_description + ", ".join(restricted_pages) + if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["namespaces"]: + namespaces = [] + if restriction_description: + restriction_description = restriction_description + _(" and namespaces: ") + else: + restriction_description = _(" on namespaces: ") + for namespace in change["logparams"]["restrictions"]["namespaces"]: + if str(namespace) in recent_changes.namespaces: # if we have cached namespace name for given namespace number, add its name to the list + namespaces.append("*{ns}*".format(ns=recent_changes.namespaces[str(namespace)]["*"])) + else: + namespaces.append("*{ns}*".format(ns=namespace)) + restriction_description = restriction_description + ", ".join(namespaces) + restriction_description = restriction_description + "." + if len(restriction_description) > 1020: + logger.debug(restriction_description) + restriction_description = restriction_description[:1020] + "…" + content = _( + "[{author}]({author_url}) blocked [{user}]({user_url}) for {time}{restriction_desc}{comment}").format(author=author, author_url=author_url, user=user, time=block_time, user_url=link, restriction_desc=restriction_description, comment=parsed_comment) + elif action == "block/reblock": + link = link_formatter(create_article_path(change["title"])) + user = change["title"].split(':')[1] + content = _("[{author}]({author_url}) changed block settings for [{blocked_user}]({user_url}){comment}").format(author=author, author_url=author_url, blocked_user=user, user_url=link, comment=parsed_comment) + elif action == "block/unblock": + link = link_formatter(create_article_path(change["title"])) + user = change["title"].split(':')[1] + content = _("[{author}]({author_url}) unblocked [{blocked_user}]({user_url}){comment}").format(author=author, author_url=author_url, blocked_user=user, user_url=link, comment=parsed_comment) + elif action == "curseprofile/comment-created": + link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))) + content = _("[{author}]({author_url}) left a [comment]({comment}) on {target} profile").format(author=author, author_url=author_url, comment=link, target=change["title"].split(':')[1]+"'s" if change["title"].split(':')[1] != change["user"] else _("their own profile")) + elif action == "curseprofile/comment-replied": + link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))) + content = _("[{author}]({author_url}) replied to a [comment]({comment}) on {target} profile").format(author=author, + author_url=author_url, + comment=link, + target=change["title"].split(':')[1] if change["title"].split(':')[1] !=change["user"] else _("their own")) + elif action == "curseprofile/comment-edited": + link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))) + content = _("[{author}]({author_url}) edited a [comment]({comment}) on {target} profile").format(author=author, + author_url=author_url, + comment=link, + target=change["title"].split(':')[1] if change["title"].split(':')[1] !=change["user"] else _("their own")) + elif action == "curseprofile/comment-purged": + link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))) + content = _("[{author}]({author_url}) purged a comment on {target} profile").format(author=author, + author_url=author_url, + target= + change["title"].split(':')[ + 1] if + change["title"].split(':')[ + 1] != change[ + "user"] else _( + "their own")) + elif action == "curseprofile/comment-deleted": + content = _("[{author}]({author_url}) deleted a comment on {target} profile").format(author=author, + author_url=author_url, + target=change["title"].split(':')[1] if change["title"].split(':')[1] !=change["user"] else _("their own")) + + elif action == "curseprofile/profile-edited": + link = link_formatter(create_article_path("UserProfile:{user}".format(user=change["title"].split(":")[1]))) + target = _("[{target}]({target_url})'s").format(target=change["title"].split(':')[1], target_url=link) if change["title"].split(':')[1] != author else _("[their own]({target_url})").format(target_url=link) + content = _("[{author}]({author_url}) edited the {field} on {target} profile. *({desc})*").format(author=author, + author_url=author_url, + target=target, + field=profile_field_name(change["logparams"]['4:section'], False), + desc=BeautifulSoup(change["parsedcomment"], "lxml").get_text()) + elif action in ("rights/rights", "rights/autopromote"): + link = link_formatter(create_article_path("User:{user}".format(user=change["title"].split(":")[1]))) + old_groups = [] + new_groups = [] + for name in change["logparams"]["oldgroups"]: + old_groups.append(_(name)) + for name in change["logparams"]["newgroups"]: + new_groups.append(_(name)) + if len(old_groups) == 0: + old_groups = [_("none")] + if len(new_groups) == 0: + new_groups = [_("none")] + + if action == "rights/rights": + content = "[{author}]({author_url}) changed group membership for [{target}]({target_url}) from {old_groups} to {new_groups}{comment}".format(author=author, author_url=author_url, target=change["title"].split(":")[1], target_url=link, old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), comment=parsed_comment) + else: + content = "{author} autopromoted [{target}]({target_url}) from {old_groups} to {new_groups}{comment}".format( + author=_("System"), author_url=author_url, target=change["title"].split(":")[1], target_url=link, + old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), + comment=parsed_comment) + elif action == "protect/protect": + link = link_formatter(create_article_path(change["title"])) + content = _("[{author}]({author_url}) protected [{article}]({article_url}) with the following settings: {settings}{comment}").format(author=author, author_url=author_url, + article=change["title"], article_url=link, + settings=change["logparams"]["description"]+_(" [cascading]") if "cascade" in change["logparams"] else "", + comment=parsed_comment) + elif action == "protect/modify": + link = link_formatter(create_article_path(change["title"])) + content = _( + "[{author}]({author_url}) modified protection settings of [{article}]({article_url}) to: {settings}{comment}").format( + author=author, author_url=author_url, + article=change["title"], article_url=link, + settings=change["logparams"]["description"] + _(" [cascading]") if "cascade" in change["logparams"] else "", + comment=parsed_comment) + elif action == "protect/unprotect": + link = link_formatter(create_article_path(change["title"])) + content = _("[{author}]({author_url}) removed protection from [{article}]({article_url}){comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, comment=parsed_comment) + elif action == "delete/revision": + amount = len(change["logparams"]["ids"]) + link = link_formatter(create_article_path(change["title"])) + content = ngettext("[{author}]({author_url}) changed visibility of revision on page [{article}]({article_url}){comment}", + "[{author}]({author_url}) changed visibility of {amount} revisions on page [{article}]({article_url}){comment}", amount).format(author=author, author_url=author_url, + article=change["title"], article_url=link, amount=amount, comment=parsed_comment) + elif action == "import/upload": + link = link_formatter(create_article_path(change["title"])) + content = ngettext("[{author}]({author_url}) imported [{article}]({article_url}) with {count} revision{comment}", + "[{author}]({author_url}) imported [{article}]({article_url}) with {count} revisions{comment}", change["logparams"]["count"]).format( + author=author, author_url=author_url, article=change["title"], article_url=link, count=change["logparams"]["count"], comment=parsed_comment) + elif action == "delete/restore": + link = link_formatter(create_article_path(change["title"])) + content = _("[{author}]({author_url}) restored [{article}]({article_url}){comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, comment=parsed_comment) + elif action == "delete/event": + content = _("[{author}]({author_url}) changed visibility of log events{comment}").format(author=author, author_url=author_url, comment=parsed_comment) + elif action == "import/interwiki": + content = _("[{author}]({author_url}) imported interwiki{comment}").format(author=author, author_url=author_url, comment=parsed_comment) + elif action == "abusefilter/modify": + link = link_formatter(create_article_path("Special:AbuseFilter/history/{number}/diff/prev/{historyid}".format(number=change["logparams"]['newId'], historyid=change["logparams"]["historyId"]))) + content = _("[{author}]({author_url}) edited abuse filter [number {number}]({filter_url})").format(author=author, author_url=author_url, number=change["logparams"]['newId'], filter_url=link) + elif action == "abusefilter/create": + link = link_formatter( + create_article_path("Special:AbuseFilter/{number}".format(number=change["logparams"]['newId']))) + content = _("[{author}]({author_url}) created abuse filter [number {number}]({filter_url})").format(author=author, author_url=author_url, number=change["logparams"]['newId'], filter_url=link) + elif action == "merge/merge": + link = link_formatter(create_article_path(change["title"])) + link_dest = link_formatter(create_article_path(change["logparams"]["dest_title"])) + content = _("[{author}]({author_url}) merged revision histories of [{article}]({article_url}) into [{dest}]({dest_url}){comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, dest_url=link_dest, + dest=change["logparams"]["dest_title"], comment=parsed_comment) + elif action == "interwiki/iw_add": + link = link_formatter(create_article_path("Special:Interwiki")) + content = _("[{author}]({author_url}) added an entry to the [interwiki table]({table_url}) pointing to {website} with {prefix} prefix").format(author=author, author_url=author_url, desc=parsed_comment, + prefix=change["logparams"]['0'], + website=change["logparams"]['1'], + table_url=link) + elif action == "interwiki/iw_edit": + link = link_formatter(create_article_path("Special:Interwiki")) + content = _("[{author}]({author_url}) edited an entry in [interwiki table]({table_url}) pointing to {website} with {prefix} prefix").format(author=author, author_url=author_url, desc=parsed_comment, + prefix=change["logparams"]['0'], + website=change["logparams"]['1'], + table_url=link) + elif action == "interwiki/iw_delete": + link = link_formatter(create_article_path("Special:Interwiki")) + content = _("[{author}]({author_url}) deleted an entry in [interwiki table]({table_url})").format(author=author, author_url=author_url, table_url=link) + elif action == "contentmodel/change": + link = link_formatter(create_article_path(change["title"])) + content = _("[{author}]({author_url}) changed the content model of the page [{article}]({article_url}) from {old} to {new}{comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, old=change["logparams"]["oldmodel"], + new=change["logparams"]["newmodel"], comment=parsed_comment) + elif action == "sprite/sprite": + link = link_formatter(create_article_path(change["title"])) + content = _("[{author}]({author_url}) edited the sprite for [{article}]({article_url})").format(author=author, author_url=author_url, article=change["title"], article_url=link) + elif action == "sprite/sheet": + link = link_formatter(create_article_path(change["title"])) + content = _("[{author}]({author_url}) created the sprite sheet for [{article}]({article_url})").format(author=author, author_url=author_url, article=change["title"], article_url=link) + elif action == "sprite/slice": + link = link_formatter(create_article_path(change["title"])) + content = _("[{author}]({author_url}) edited the slice for [{article}]({article_url})").format(author=author, author_url=author_url, article=change["title"], article_url=link) + elif action == "cargo/createtable": + LinkParser.feed(change["logparams"]["0"]) + table = LinkParser.new_string + LinkParser.new_string = "" + content = _("[{author}]({author_url}) created the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=table) + elif action == "cargo/deletetable": + content = _("[{author}]({author_url}) deleted the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=change["logparams"]["0"]) + elif action == "cargo/recreatetable": + LinkParser.feed(change["logparams"]["0"]) + table = LinkParser.new_string + LinkParser.new_string = "" + content = _("[{author}]({author_url}) recreated the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=table) + elif action == "cargo/replacetable": + LinkParser.feed(change["logparams"]["0"]) + table = LinkParser.new_string + LinkParser.new_string = "" + content = _("[{author}]({author_url}) replaced the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=table) + elif action == "managetags/create": + link = link_formatter(create_article_path("Special:Tags")) + content = _("[{author}]({author_url}) created a [tag]({tag_url}) \"{tag}\"").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link) + recent_changes.init_info() + elif action == "managetags/delete": + link = link_formatter(create_article_path("Special:Tags")) + content = _("[{author}]({author_url}) deleted a [tag]({tag_url}) \"{tag}\"").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link) + recent_changes.init_info() + elif action == "managetags/activate": + link = link_formatter(create_article_path("Special:Tags")) + content = _("[{author}]({author_url}) activated a [tag]({tag_url}) \"{tag}\"").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link) + elif action == "managetags/deactivate": + link = link_formatter(create_article_path("Special:Tags")) + content = _("[{author}]({author_url}) deactivated a [tag]({tag_url}) \"{tag}\"").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link) + elif action == "suppressed": + content = _("An action has been hidden by administration.") + else: + logger.warning("No entry for {event} with params: {params}".format(event=action, params=change)) + return + send_to_discord(DiscordMessage("compact", action, settings["webhookURL"], content=content)) + + +def embed_formatter(action, change, parsed_comment, categories, recent_changes): + embed = DiscordMessage("embed", action, settings["webhookURL"]) + if parsed_comment is None: + parsed_comment = _("No description provided") + if action != "suppressed": + if "anon" in change: + author_url = create_article_path("Special:Contributions/{user}".format(user=change["user"].replace(" ", "_"))) # Replace here needed in case of #75 + logger.debug("current user: {} with cache of IPs: {}".format(change["user"], recent_changes.map_ips.keys())) + if change["user"] not in list(recent_changes.map_ips.keys()): + contibs = safe_read(recent_changes.safe_request( + "{wiki}?action=query&format=json&list=usercontribs&uclimit=max&ucuser={user}&ucstart={timestamp}&ucprop=".format( + wiki=WIKI_API_PATH, user=change["user"], timestamp=change["timestamp"])), "query", "usercontribs") + if contibs is None: + logger.warning( + "WARNING: Something went wrong when checking amount of contributions for given IP address") + change["user"] = change["user"] + "(?)" + else: + recent_changes.map_ips[change["user"]] = len(contibs) + logger.debug("Current params user {} and state of map_ips {}".format(change["user"], recent_changes.map_ips)) + change["user"] = "{author} ({contribs})".format(author=change["user"], contribs=len(contibs)) + else: + logger.debug( + "Current params user {} and state of map_ips {}".format(change["user"], recent_changes.map_ips)) + if action in ("edit", "new"): + recent_changes.map_ips[change["user"]] += 1 + change["user"] = "{author} ({amount})".format(author=change["user"], + amount=recent_changes.map_ips[change["user"]]) + else: + author_url = create_article_path("User:{}".format(change["user"].replace(" ", "_"))) + embed.set_author(change["user"], author_url) + if action in ("edit", "new"): # edit or new page + editsize = change["newlen"] - change["oldlen"] + if editsize > 0: + if editsize > 6032: + embed["color"] = 65280 + else: + embed["color"] = 35840 + (math.floor(editsize / 52)) * 256 + elif editsize < 0: + if editsize < -6032: + embed["color"] = 16711680 + else: + embed["color"] = 9175040 + (math.floor((editsize * -1) / 52)) * 65536 + elif editsize == 0: + embed["color"] = 8750469 + if change["title"].startswith("MediaWiki:Tag-"): # Refresh tag list when tag display name is edited + recent_changes.init_info() + link = "{wiki}index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format( + wiki=WIKI_SCRIPT_PATH, pageid=change["pageid"], diff=change["revid"], oldrev=change["old_revid"], + article=change["title"].replace(" ", "_")) + embed["title"] = "{redirect}{article} ({new}{minor}{bot}{space}{editsize})".format(redirect="⤷ " if "redirect" in change else "", article=change["title"], editsize="+" + str( + editsize) if editsize > 0 else editsize, new=_("(N!) ") if action == "new" else "", + minor=_("m") if action == "edit" and "minor" in change else "", bot=_('b') if "bot" in change else "", space=" " if "bot" in change or (action == "edit" and "minor" in change) or action == "new" else "") + if settings["appearance"]["embed"]["show_edit_changes"]: + if action == "new": + changed_content = safe_read(recent_changes.safe_request( + "{wiki}?action=compare&format=json&fromtext=&torev={diff}&topst=1&prop=diff".format( + wiki=WIKI_API_PATH, diff=change["revid"] + )), "compare", "*") + else: + changed_content = safe_read(recent_changes.safe_request( + "{wiki}?action=compare&format=json&fromrev={oldrev}&torev={diff}&topst=1&prop=diff".format( + wiki=WIKI_API_PATH, diff=change["revid"],oldrev=change["old_revid"] + )), "compare", "*") + if changed_content: + EditDiff = ContentParser() + EditDiff.feed(changed_content) + if EditDiff.small_prev_del: + if EditDiff.small_prev_del.replace("~~", "").isspace(): + EditDiff.small_prev_del = _('__Only whitespace__') + else: + EditDiff.small_prev_del = EditDiff.small_prev_del.replace("~~~~", "") + if EditDiff.small_prev_ins: + if EditDiff.small_prev_ins.replace("**", "").isspace(): + EditDiff.small_prev_ins = _('__Only whitespace__') + else: + EditDiff.small_prev_ins = EditDiff.small_prev_ins.replace("****", "") + logger.debug("Changed content: {}".format(EditDiff.small_prev_ins)) + if EditDiff.small_prev_del and not action == "new": + embed.add_field(_("Removed"), "{data}".format(data=EditDiff.small_prev_del), inline=True) + if EditDiff.small_prev_ins: + embed.add_field(_("Added"), "{data}".format(data=EditDiff.small_prev_ins), inline=True) + else: + logger.warning("Unable to download data on the edit content!") + elif action in ("upload/overwrite", "upload/upload", "upload/revert"): # sending files + license = None + urls = safe_read(recent_changes.safe_request( + "{wiki}?action=query&format=json&prop=imageinfo&list=&meta=&titles={filename}&iiprop=timestamp%7Curl%7Carchivename&iilimit=5".format( + wiki=WIKI_API_PATH, filename=change["title"])), "query", "pages") + link = create_article_path(change["title"].replace(" ", "_")) + additional_info_retrieved = False + if urls is not None: + logger.debug(urls) + if "-1" not in urls: # image still exists and not removed + try: + img_info = next(iter(urls.values()))["imageinfo"] + for num, revision in enumerate(img_info): + if revision["timestamp"] == change["logparams"]["img_timestamp"]: # find the correct revision corresponding for this log entry + image_direct_url = "{rev}?{cache}".format(rev=revision["url"], cache=int(time.time()*5)) # cachebusting + additional_info_retrieved = True + break + except KeyError: + logger.warning("Wiki did not respond with extended information about file. The preview will not be shown.") + else: + logger.warning("Request for additional image information have failed. The preview will not be shown.") + if action in ("upload/overwrite", "upload/revert"): + if additional_info_retrieved: + article_encoded = change["title"].replace(" ", "_").replace(')', '\)') + try: + revision = img_info[num+1] + except IndexError: + logger.exception("Could not analize the information about the image (does it have only one version when expected more in overwrite?) which resulted in no Options field: {}".format(img_info)) + else: + undolink = "{wiki}index.php?title={filename}&action=revert&oldimage={archiveid}".format( + wiki=WIKI_SCRIPT_PATH, filename=article_encoded, archiveid=revision["archivename"]) + embed.add_field(_("Options"), _("([preview]({link}) | [undo]({undolink}))").format( + link=image_direct_url, undolink=undolink)) + if settings["appearance"]["embed"]["embed_images"]: + embed["image"]["url"] = image_direct_url + if action == "upload/overwrite": + embed["title"] = _("Uploaded a new version of {name}").format(name=change["title"]) + elif action == "upload/revert": + embed["title"] = _("Reverted a version of {name}").format(name=change["title"]) + else: + embed["title"] = _("Uploaded {name}").format(name=change["title"]) + if settings["license_detection"]: + article_content = safe_read(recent_changes.safe_request( + "{wiki}?action=query&format=json&prop=revisions&titles={article}&rvprop=content".format( + wiki=WIKI_API_PATH, article=quote_plus(change["title"], safe=''))), "query", "pages") + if article_content is None: + logger.warning("Something went wrong when getting license for the image") + return 0 + if "-1" not in article_content: + content = list(article_content.values())[0]['revisions'][0]['*'] + try: + matches = re.search(re.compile(settings["license_regex"], re.IGNORECASE), content) + if matches is not None: + license = matches.group("license") + else: + if re.search(re.compile(settings["license_regex_detect"], re.IGNORECASE), content) is None: + license = _("**No license!**") + else: + license = "?" + except IndexError: + logger.error( + "Given regex for the license detection is incorrect. It does not have a capturing group called \"license\" specified. Please fix license_regex value in the config!") + license = "?" + except re.error: + logger.error( + "Given regex for the license detection is incorrect. Please fix license_regex or license_regex_detect values in the config!") + license = "?" + if license is not None: + parsed_comment += _("\nLicense: {}").format(license) + if additional_info_retrieved: + embed.add_field(_("Options"), _("([preview]({link}))").format(link=image_direct_url)) + if settings["appearance"]["embed"]["embed_images"]: + embed["image"]["url"] = image_direct_url + elif action == "delete/delete": + link = create_article_path(change["title"].replace(" ", "_")) + embed["title"] = _("Deleted page {article}").format(article=change["title"]) + elif action == "delete/delete_redir": + link = create_article_path(change["title"].replace(" ", "_")) + embed["title"] = _("Deleted redirect {article} by overwriting").format(article=change["title"]) + elif action == "move/move": + link = create_article_path(change["logparams"]['target_title'].replace(" ", "_")) + parsed_comment = "{supress}. {desc}".format(desc=parsed_comment, + supress=_("No redirect has been made") if "suppressredirect" in change["logparams"] else _( + "A redirect has been made")) + embed["title"] = _("Moved {redirect}{article} to {target}").format(redirect="⤷ " if "redirect" in change else "", article=change["title"], target=change["logparams"]['target_title']) + elif action == "move/move_redir": + link = create_article_path(change["logparams"]["target_title"].replace(" ", "_")) + embed["title"] = _("Moved {redirect}{article} to {title} over redirect").format(redirect="⤷ " if "redirect" in change else "", article=change["title"], + title=change["logparams"]["target_title"]) + elif action == "protect/move_prot": + link = create_article_path(change["logparams"]["oldtitle_title"].replace(" ", "_")) + embed["title"] = _("Moved protection settings from {redirect}{article} to {title}").format(redirect="⤷ " if "redirect" in change else "", article=change["logparams"]["oldtitle_title"], + title=change["title"]) + elif action == "block/block": + user = change["title"].split(':')[1] + try: + ipaddress.ip_address(user) + link = create_article_path("Special:Contributions/{user}".format(user=user)) + except ValueError: + link = create_article_path(change["title"].replace(" ", "_").replace(')', '\)')) + if change["logparams"]["duration"] == "infinite": + block_time = _("infinity and beyond") + else: + english_length = re.sub(r"(\d+)", "", change["logparams"]["duration"]) #note that translation won't work for millenia and century yet + english_length_num = re.sub(r"(\D+)", "", change["logparams"]["duration"]) + try: + english_length = english_length.rstrip("s").strip() + block_time = "{num} {translated_length}".format(num=english_length_num, translated_length=ngettext(english_length, english_length + "s", int(english_length_num))) + except AttributeError: + logger.error("Could not strip s from the block event, seems like the regex didn't work?") + return + if "sitewide" not in change["logparams"]: + restriction_description = "" + if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]: + restriction_description = _("Blocked from editing the following pages: ") + for page in change["logparams"]["restrictions"]["pages"]: + restricted_pages = ["*"+i["page_title"]+"*" for i in change["logparams"]["restrictions"]["pages"]] + restriction_description = restriction_description + ", ".join(restricted_pages) + if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["namespaces"]: + namespaces = [] + if restriction_description: + restriction_description = restriction_description + _(" and namespaces: ") + else: + restriction_description = _("Blocked from editing pages on following namespaces: ") + for namespace in change["logparams"]["restrictions"]["namespaces"]: + if str(namespace) in recent_changes.namespaces: # if we have cached namespace name for given namespace number, add its name to the list + namespaces.append("*{ns}*".format(ns=recent_changes.namespaces[str(namespace)]["*"])) + else: + namespaces.append("*{ns}*".format(ns=namespace)) + restriction_description = restriction_description + ", ".join(namespaces) + restriction_description = restriction_description + "." + if len(restriction_description) > 1020: + logger.debug(restriction_description) + restriction_description = restriction_description[:1020]+"…" + embed.add_field(_("Partial block details"), restriction_description, inline=True) + embed["title"] = _("Blocked {blocked_user} for {time}").format(blocked_user=user, time=block_time) + elif action == "block/reblock": + link = create_article_path(change["title"].replace(" ", "_").replace(')', '\)')) + user = change["title"].split(':')[1] + embed["title"] = _("Changed block settings for {blocked_user}").format(blocked_user=user) + elif action == "block/unblock": + link = create_article_path(change["title"].replace(" ", "_").replace(')', '\)')) + user = change["title"].split(':')[1] + embed["title"] = _("Unblocked {blocked_user}").format(blocked_user=user) + elif action == "curseprofile/comment-created": + if settings["appearance"]["embed"]["show_edit_changes"]: + parsed_comment = recent_changes.pull_comment(change["logparams"]["4:comment_id"]) + link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])) + embed["title"] = _("Left a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \ + change["user"] else _( + "Left a comment on their own profile") + elif action == "curseprofile/comment-replied": + if settings["appearance"]["embed"]["show_edit_changes"]: + parsed_comment = recent_changes.pull_comment(change["logparams"]["4:comment_id"]) + link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])) + embed["title"] = _("Replied to a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \ + change["user"] else _( + "Replied to a comment on their own profile") + elif action == "curseprofile/comment-edited": + if settings["appearance"]["embed"]["show_edit_changes"]: + parsed_comment = recent_changes.pull_comment(change["logparams"]["4:comment_id"]) + link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])) + embed["title"] = _("Edited a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \ + change["user"] else _( + "Edited a comment on their own profile") + elif action == "curseprofile/profile-edited": + link = create_article_path("UserProfile:{target}".format(target=change["title"].split(':')[1].replace(" ", "_").replace(')', '\)'))) + embed["title"] = _("Edited {target}'s profile").format(target=change["title"].split(':')[1]) if change["user"] != change["title"].split(':')[1] else _("Edited their own profile") + if not change["parsedcomment"]: # If the field is empty + parsed_comment = _("Cleared the {field} field").format(field=profile_field_name(change["logparams"]['4:section'], True)) + else: + parsed_comment = _("{field} field changed to: {desc}").format(field=profile_field_name(change["logparams"]['4:section'], True), desc=BeautifulSoup(change["parsedcomment"], "lxml").get_text()) + elif action == "curseprofile/comment-purged": + link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])) + embed["title"] = _("Purged a comment on {target}'s profile").format(target=change["title"].split(':')[1]) + elif action == "curseprofile/comment-deleted": + if "4:comment_id" in change["logparams"]: + link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])) + else: + link = create_article_path(change["title"]) + embed["title"] = _("Deleted a comment on {target}'s profile").format(target=change["title"].split(':')[1]) + elif action in ("rights/rights", "rights/autopromote"): + link = create_article_path("User:{}".format(change["title"].split(":")[1].replace(" ", "_"))) + if action == "rights/rights": + embed["title"] = _("Changed group membership for {target}").format(target=change["title"].split(":")[1]) + else: + change["user"] = _("System") + author_url = "" + embed["title"] = _("{target} got autopromoted to a new usergroup").format( + target=change["title"].split(":")[1]) + if len(change["logparams"]["oldgroups"]) < len(change["logparams"]["newgroups"]): + embed["thumbnail"]["url"] = "https://i.imgur.com/WnGhF5g.gif" + old_groups = [] + new_groups = [] + for name in change["logparams"]["oldgroups"]: + old_groups.append(_(name)) + for name in change["logparams"]["newgroups"]: + new_groups.append(_(name)) + if len(old_groups) == 0: + old_groups = [_("none")] + if len(new_groups) == 0: + new_groups = [_("none")] + reason = ": {desc}".format(desc=parsed_comment) if parsed_comment != _("No description provided") else "" + parsed_comment = _("Groups changed from {old_groups} to {new_groups}{reason}").format( + old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), reason=reason) + elif action == "protect/protect": + link = create_article_path(change["title"].replace(" ", "_")) + embed["title"] = _("Protected {target}").format(target=change["title"]) + parsed_comment = "{settings}{cascade} | {reason}".format(settings=change["logparams"]["description"], + cascade=_(" [cascading]") if "cascade" in change["logparams"] else "", + reason=parsed_comment) + elif action == "protect/modify": + link = create_article_path(change["title"].replace(" ", "_")) + embed["title"] = _("Changed protection level for {article}").format(article=change["title"]) + parsed_comment = "{settings}{cascade} | {reason}".format(settings=change["logparams"]["description"], + cascade=_(" [cascading]") if "cascade" in change["logparams"] else "", + reason=parsed_comment) + elif action == "protect/unprotect": + link = create_article_path(change["title"].replace(" ", "_")) + embed["title"] = _("Removed protection from {article}").format(article=change["title"]) + elif action == "delete/revision": + amount = len(change["logparams"]["ids"]) + link = create_article_path(change["title"].replace(" ", "_")) + embed["title"] = ngettext("Changed visibility of revision on page {article} ", + "Changed visibility of {amount} revisions on page {article} ", amount).format( + article=change["title"], amount=amount) + elif action == "import/upload": + link = create_article_path(change["title"].replace(" ", "_")) + embed["title"] = ngettext("Imported {article} with {count} revision", + "Imported {article} with {count} revisions", change["logparams"]["count"]).format( + article=change["title"], count=change["logparams"]["count"]) + elif action == "delete/restore": + link = create_article_path(change["title"].replace(" ", "_")) + embed["title"] = _("Restored {article}").format(article=change["title"]) + elif action == "delete/event": + link = create_article_path("Special:RecentChanges") + embed["title"] = _("Changed visibility of log events") + elif action == "import/interwiki": + link = create_article_path("Special:RecentChanges") + embed["title"] = _("Imported interwiki") + elif action == "abusefilter/modify": + link = create_article_path("Special:AbuseFilter/history/{number}/diff/prev/{historyid}".format(number=change["logparams"]['newId'], historyid=change["logparams"]["historyId"])) + embed["title"] = _("Edited abuse filter number {number}").format(number=change["logparams"]['newId']) + elif action == "abusefilter/create": + link = create_article_path("Special:AbuseFilter/{number}".format(number=change["logparams"]['newId'])) + embed["title"] = _("Created abuse filter number {number}").format(number=change["logparams"]['newId']) + elif action == "merge/merge": + link = create_article_path(change["title"].replace(" ", "_")) + embed["title"] = _("Merged revision histories of {article} into {dest}").format(article=change["title"], + dest=change["logparams"]["dest_title"]) + elif action == "interwiki/iw_add": + link = create_article_path("Special:Interwiki") + embed["title"] = _("Added an entry to the interwiki table") + parsed_comment = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=parsed_comment, + prefix=change["logparams"]['0'], + website=change["logparams"]['1']) + elif action == "interwiki/iw_edit": + link = create_article_path("Special:Interwiki") + embed["title"] = _("Edited an entry in interwiki table") + parsed_comment = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=parsed_comment, + prefix=change["logparams"]['0'], + website=change["logparams"]['1']) + elif action == "interwiki/iw_delete": + link = create_article_path("Special:Interwiki") + embed["title"] = _("Deleted an entry in interwiki table") + parsed_comment = _("Prefix: {prefix} | {desc}").format(desc=parsed_comment, prefix=change["logparams"]['0']) + elif action == "contentmodel/change": + link = create_article_path(change["title"].replace(" ", "_")) + embed["title"] = _("Changed the content model of the page {article}").format(article=change["title"]) + parsed_comment = _("Model changed from {old} to {new}: {reason}").format(old=change["logparams"]["oldmodel"], + new=change["logparams"]["newmodel"], + reason=parsed_comment) + elif action == "sprite/sprite": + link = create_article_path(change["title"].replace(" ", "_")) + embed["title"] = _("Edited the sprite for {article}").format(article=change["title"]) + elif action == "sprite/sheet": + link = create_article_path(change["title"].replace(" ", "_")) + embed["title"] = _("Created the sprite sheet for {article}").format(article=change["title"]) + elif action == "sprite/slice": + link = create_article_path(change["title"].replace(" ", "_")) + embed["title"] = _("Edited the slice for {article}").format(article=change["title"]) + elif action == "cargo/createtable": + LinkParser.feed(change["logparams"]["0"]) + table = re.search(r"\[(.*?)\]\(<(.*?)>\)", LinkParser.new_string) + LinkParser.new_string = "" + link = table.group(2) + embed["title"] = _("Created the Cargo table \"{table}\"").format(table=table.group(1)) + parsed_comment = None + elif action == "cargo/deletetable": + link = create_article_path("Special:CargoTables") + embed["title"] = _("Deleted the Cargo table \"{table}\"").format(table=change["logparams"]["0"]) + parsed_comment = None + elif action == "cargo/recreatetable": + LinkParser.feed(change["logparams"]["0"]) + table = re.search(r"\[(.*?)\]\(<(.*?)>\)", LinkParser.new_string) + LinkParser.new_string = "" + link = table.group(2) + embed["title"] = _("Recreated the Cargo table \"{table}\"").format(table=table.group(1)) + parsed_comment = None + elif action == "cargo/replacetable": + LinkParser.feed(change["logparams"]["0"]) + table = re.search(r"\[(.*?)\]\(<(.*?)>\)", LinkParser.new_string) + LinkParser.new_string = "" + link = table.group(2) + embed["title"] = _("Replaced the Cargo table \"{table}\"").format(table=table.group(1)) + parsed_comment = None + elif action == "managetags/create": + link = create_article_path("Special:Tags") + embed["title"] = _("Created a tag \"{tag}\"").format(tag=change["logparams"]["tag"]) + recent_changes.init_info() + elif action == "managetags/delete": + link = create_article_path("Special:Tags") + embed["title"] = _("Deleted a tag \"{tag}\"").format(tag=change["logparams"]["tag"]) + recent_changes.init_info() + elif action == "managetags/activate": + link = create_article_path("Special:Tags") + embed["title"] = _("Activated a tag \"{tag}\"").format(tag=change["logparams"]["tag"]) + elif action == "managetags/deactivate": + link = create_article_path("Special:Tags") + embed["title"] = _("Deactivated a tag \"{tag}\"").format(tag=change["logparams"]["tag"]) + elif action == "suppressed": + link = create_article_path("") + embed["title"] = _("Action has been hidden by administration.") + embed["author"]["name"] = _("Unknown") + else: + logger.warning("No entry for {event} with params: {params}".format(event=action, params=change)) + embed["author"]["icon_url"] = settings["appearance"]["embed"][action]["icon"] + embed["url"] = link + if parsed_comment is not None: + embed["description"] = parsed_comment + if settings["appearance"]["embed"]["show_footer"]: + embed["timestamp"] = change["timestamp"] + if "tags" in change and change["tags"]: + tag_displayname = [] + for tag in change["tags"]: + if tag in recent_changes.tags: + if recent_changes.tags[tag] is None: + continue # Ignore hidden tags + else: + tag_displayname.append(recent_changes.tags[tag]) + else: + tag_displayname.append(tag) + embed.add_field(_("Tags"), ", ".join(tag_displayname)) + logger.debug("Current params in edit action: {}".format(change)) + if categories is not None and not (len(categories["new"]) == 0 and len(categories["removed"]) == 0): + new_cat = (_("**Added**: ") + ", ".join(list(categories["new"])[0:16]) + ("\n" if len(categories["new"])<=15 else _(" and {} more\n").format(len(categories["new"])-15))) if categories["new"] else "" + del_cat = (_("**Removed**: ") + ", ".join(list(categories["removed"])[0:16]) + ("" if len(categories["removed"])<=15 else _(" and {} more").format(len(categories["removed"])-15))) if categories["removed"] else "" + embed.add_field(_("Changed categories"), new_cat + del_cat) + embed.finish_embed() + send_to_discord(embed) \ No newline at end of file diff --git a/src/rcgcdw.py b/src/rcgcdw.py index 7d07d4d..78eaf27 100644 --- a/src/rcgcdw.py +++ b/src/rcgcdw.py @@ -20,18 +20,19 @@ # WARNING! SHITTY CODE AHEAD. ENTER ONLY IF YOU ARE SURE YOU CAN TAKE IT # You have been warned -import time, logging.config, json, requests, datetime, re, gettext, math, random, os.path, schedule, sys, ipaddress, base64 -from html.parser import HTMLParser +import time, logging.config, requests, datetime, gettext, math, os.path, schedule, sys import src.misc -from bs4 import BeautifulSoup from collections import defaultdict, Counter -from urllib.parse import quote_plus from src.configloader import settings -from src.misc import link_formatter, ContentParser, safe_read, add_to_dict, datafile, \ - WIKI_API_PATH, WIKI_SCRIPT_PATH, WIKI_JUST_DOMAIN, create_article_path, messagequeue, send_to_discord_webhook, \ - send_to_discord, DiscordMessage, send_simple -from src.session import session +from src.misc import add_to_dict, datafile, \ + WIKI_API_PATH, create_article_path, send_to_discord, \ + DiscordMessage +from src.rc import recent_changes +from src.exceptions import MWError +from src.i18n import ngettext, lang + +_ = lang.gettext if settings["fandom_discussions"]["enabled"]: import src.discussions @@ -44,17 +45,6 @@ logging.config.dictConfig(settings["logging"]) logger = logging.getLogger("rcgcdw") logger.debug("Current settings: {settings}".format(settings=settings)) -# Setup translation - -try: - lang = gettext.translation('rcgcdw', localedir='locale', languages=[settings["lang"]]) -except FileNotFoundError: - logger.critical("No language files have been found. Make sure locale folder is located in the directory.") - sys.exit(1) - -lang.install() -ngettext = lang.ngettext - storage = datafile.data # Remove previous data holding file if exists and limitfetch allows @@ -66,835 +56,6 @@ if settings["limitrefetch"] != -1 and os.path.exists("lastchange.txt") is True: datafile.save_datafile() os.remove("lastchange.txt") -# A few initial vars - -logged_in = False -supported_logs = ["protect/protect", "protect/modify", "protect/unprotect", "upload/overwrite", "upload/upload", "delete/delete", "delete/delete_redir", "delete/restore", "delete/revision", "delete/event", "import/upload", "import/interwiki", "merge/merge", "move/move", "move/move_redir", "protect/move_prot", "block/block", "block/unblock", "block/reblock", "rights/rights", "rights/autopromote", "abusefilter/modify", "abusefilter/create", "interwiki/iw_add", "interwiki/iw_edit", "interwiki/iw_delete", "curseprofile/comment-created", "curseprofile/comment-edited", "curseprofile/comment-deleted", "curseprofile/comment-purged", "curseprofile/profile-edited", "curseprofile/comment-replied", "contentmodel/change", "sprite/sprite", "sprite/sheet", "sprite/slice", "managetags/create", "managetags/delete", "managetags/activate", "managetags/deactivate", "tag/update", "cargo/createtable", "cargo/deletetable", "cargo/recreatetable", "cargo/replacetable", "upload/revert"] -profile_fields = {"profile-location": _("Location"), "profile-aboutme": _("About me"), "profile-link-google": _("Google link"), "profile-link-facebook":_("Facebook link"), "profile-link-twitter": _("Twitter link"), "profile-link-reddit": _("Reddit link"), "profile-link-twitch": _("Twitch link"), "profile-link-psn": _("PSN link"), "profile-link-vk": _("VK link"), "profile-link-xbl": _("XBL link"), "profile-link-steam": _("Steam link"), "profile-link-discord": _("Discord handle"), "profile-link-battlenet": _("Battle.net handle")} - - -class LinkParser(HTMLParser): - new_string = "" - recent_href = "" - - def handle_starttag(self, tag, attrs): - for attr in attrs: - if attr[0] == 'href': - self.recent_href = attr[1] - if self.recent_href.startswith("//"): - self.recent_href = "https:{rest}".format(rest=self.recent_href) - elif not self.recent_href.startswith("http"): - self.recent_href = WIKI_JUST_DOMAIN + self.recent_href - self.recent_href = self.recent_href.replace(")", "\\)") - elif attr[0] == 'data-uncrawlable-url': - self.recent_href = attr[1].encode('ascii') - self.recent_href = base64.b64decode(self.recent_href) - self.recent_href = WIKI_JUST_DOMAIN + self.recent_href.decode('ascii') - - def handle_data(self, data): - if self.recent_href: - self.new_string = self.new_string + "[{}](<{}>)".format(data, self.recent_href) - self.recent_href = "" - else: - self.new_string = self.new_string + data - - def handle_comment(self, data): - self.new_string = self.new_string + data - - def handle_endtag(self, tag): - logger.debug(self.new_string) - - -LinkParser = LinkParser() - -class MWError(Exception): - pass - -def profile_field_name(name, embed): - try: - return profile_fields[name] - except KeyError: - if embed: - return _("Unknown") - else: - return _("unknown") - - -def pull_comment(comment_id): - try: - comment = recent_changes.handle_mw_errors(recent_changes.safe_request("{wiki}?action=comment&do=getRaw&comment_id={comment}&format=json".format(wiki=WIKI_API_PATH, comment=comment_id)).json())["text"] - logger.debug("Got the following comment from the API: {}".format(comment)) - except MWError: - pass - except (TypeError, AttributeError): - logger.exception("Could not resolve the comment text.") - except KeyError: - logger.exception("CurseProfile extension API did not respond with a valid comment content.") - else: - if len(comment) > 1000: - comment = comment[0:1000] + "…" - return comment - return "" - - -def compact_formatter(action, change, parsed_comment, categories): - if action != "suppressed": - author_url = link_formatter(create_article_path("User:{user}".format(user=change["user"]))) - author = change["user"] - parsed_comment = "" if parsed_comment is None else " *("+parsed_comment+")*" - parsed_comment = re.sub(r"([^<]|\A)(http(s)://.*?)( |\Z)", "\\1<\\2>\\4", parsed_comment) # see #97 - if action in ["edit", "new"]: - edit_link = link_formatter("{wiki}index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format( - wiki=WIKI_SCRIPT_PATH, pageid=change["pageid"], diff=change["revid"], oldrev=change["old_revid"], - article=change["title"])) - edit_size = change["newlen"] - change["oldlen"] - if edit_size > 0: - sign = "+" - else: - sign = "" - if change["title"].startswith("MediaWiki:Tag-"): # Refresh tag list when tag display name is edited - recent_changes.init_info() - if action == "edit": - content = _("[{author}]({author_url}) edited [{article}]({edit_link}){comment} ({sign}{edit_size})").format(author=author, author_url=author_url, article=change["title"], edit_link=edit_link, comment=parsed_comment, edit_size=edit_size, sign=sign) - else: - content = _("[{author}]({author_url}) created [{article}]({edit_link}){comment} ({sign}{edit_size})").format(author=author, author_url=author_url, article=change["title"], edit_link=edit_link, comment=parsed_comment, edit_size=edit_size, sign=sign) - elif action =="upload/upload": - file_link = link_formatter(create_article_path(change["title"])) - content = _("[{author}]({author_url}) uploaded [{file}]({file_link}){comment}").format(author=author, - author_url=author_url, - file=change["title"], - file_link=file_link, - comment=parsed_comment) - elif action == "upload/revert": - file_link = link_formatter(create_article_path(change["title"])) - content = _("[{author}]({author_url}) reverted a version of [{file}]({file_link}){comment}").format( - author=author, author_url=author_url, file=change["title"], file_link=file_link, comment=parsed_comment) - elif action == "upload/overwrite": - file_link = link_formatter(create_article_path(change["title"])) - content = _("[{author}]({author_url}) uploaded a new version of [{file}]({file_link}){comment}").format(author=author, author_url=author_url, file=change["title"], file_link=file_link, comment=parsed_comment) - elif action == "delete/delete": - page_link = link_formatter(create_article_path(change["title"])) - content = _("[{author}]({author_url}) deleted [{page}]({page_link}){comment}").format(author=author, author_url=author_url, page=change["title"], page_link=page_link, - comment=parsed_comment) - elif action == "delete/delete_redir": - page_link = link_formatter(create_article_path(change["title"])) - content = _("[{author}]({author_url}) deleted redirect by overwriting [{page}]({page_link}){comment}").format(author=author, author_url=author_url, page=change["title"], page_link=page_link, - comment=parsed_comment) - elif action == "move/move": - link = link_formatter(create_article_path(change["logparams"]['target_title'])) - redirect_status = _("without making a redirect") if "suppressredirect" in change["logparams"] else _("with a redirect") - content = _("[{author}]({author_url}) moved {redirect}*{article}* to [{target}]({target_url}) {made_a_redirect}{comment}").format(author=author, author_url=author_url, redirect="⤷ " if "redirect" in change else "", article=change["title"], - target=change["logparams"]['target_title'], target_url=link, comment=parsed_comment, made_a_redirect=redirect_status) - elif action == "move/move_redir": - link = link_formatter(create_article_path(change["logparams"]["target_title"])) - redirect_status = _("without making a redirect") if "suppressredirect" in change["logparams"] else _( - "with a redirect") - content = _("[{author}]({author_url}) moved {redirect}*{article}* over redirect to [{target}]({target_url}) {made_a_redirect}{comment}").format(author=author, author_url=author_url, redirect="⤷ " if "redirect" in change else "", article=change["title"], - target=change["logparams"]['target_title'], target_url=link, comment=parsed_comment, made_a_redirect=redirect_status) - elif action == "protect/move_prot": - link = link_formatter(create_article_path(change["logparams"]["oldtitle_title"])) - content = _( - "[{author}]({author_url}) moved protection settings from {redirect}*{article}* to [{target}]({target_url}){comment}").format(author=author, author_url=author_url, redirect="⤷ " if "redirect" in change else "", article=change["logparams"]["oldtitle_title"], - target=change["title"], target_url=link, comment=parsed_comment) - elif action == "block/block": - user = change["title"].split(':')[1] - restriction_description = "" - try: - ipaddress.ip_address(user) - link = link_formatter(create_article_path("Special:Contributions/{user}".format(user=user))) - except ValueError: - link = link_formatter(create_article_path(change["title"])) - if change["logparams"]["duration"] == "infinite": - block_time = _("infinity and beyond") - else: - english_length = re.sub(r"(\d+)", "", change["logparams"][ - "duration"]) # note that translation won't work for millenia and century yet - english_length_num = re.sub(r"(\D+)", "", change["logparams"]["duration"]) - try: - english_length = english_length.rstrip("s").strip() - block_time = "{num} {translated_length}".format(num=english_length_num, - translated_length=ngettext(english_length, - english_length + "s", - int(english_length_num))) - except AttributeError: - logger.error("Could not strip s from the block event, seems like the regex didn't work?") - return - if "sitewide" not in change["logparams"]: - restriction_description = "" - if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]: - restriction_description = _(" on pages: ") - for page in change["logparams"]["restrictions"]["pages"]: - restricted_pages = ["*{page}*".format(page=i["page_title"]) for i in change["logparams"]["restrictions"]["pages"]] - restriction_description = restriction_description + ", ".join(restricted_pages) - if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["namespaces"]: - namespaces = [] - if restriction_description: - restriction_description = restriction_description + _(" and namespaces: ") - else: - restriction_description = _(" on namespaces: ") - for namespace in change["logparams"]["restrictions"]["namespaces"]: - if str(namespace) in recent_changes.namespaces: # if we have cached namespace name for given namespace number, add its name to the list - namespaces.append("*{ns}*".format(ns=recent_changes.namespaces[str(namespace)]["*"])) - else: - namespaces.append("*{ns}*".format(ns=namespace)) - restriction_description = restriction_description + ", ".join(namespaces) - restriction_description = restriction_description + "." - if len(restriction_description) > 1020: - logger.debug(restriction_description) - restriction_description = restriction_description[:1020] + "…" - content = _( - "[{author}]({author_url}) blocked [{user}]({user_url}) for {time}{restriction_desc}{comment}").format(author=author, author_url=author_url, user=user, time=block_time, user_url=link, restriction_desc=restriction_description, comment=parsed_comment) - elif action == "block/reblock": - link = link_formatter(create_article_path(change["title"])) - user = change["title"].split(':')[1] - content = _("[{author}]({author_url}) changed block settings for [{blocked_user}]({user_url}){comment}").format(author=author, author_url=author_url, blocked_user=user, user_url=link, comment=parsed_comment) - elif action == "block/unblock": - link = link_formatter(create_article_path(change["title"])) - user = change["title"].split(':')[1] - content = _("[{author}]({author_url}) unblocked [{blocked_user}]({user_url}){comment}").format(author=author, author_url=author_url, blocked_user=user, user_url=link, comment=parsed_comment) - elif action == "curseprofile/comment-created": - link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))) - content = _("[{author}]({author_url}) left a [comment]({comment}) on {target} profile").format(author=author, author_url=author_url, comment=link, target=change["title"].split(':')[1]+"'s" if change["title"].split(':')[1] != change["user"] else _("their own profile")) - elif action == "curseprofile/comment-replied": - link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))) - content = _("[{author}]({author_url}) replied to a [comment]({comment}) on {target} profile").format(author=author, - author_url=author_url, - comment=link, - target=change["title"].split(':')[1] if change["title"].split(':')[1] !=change["user"] else _("their own")) - elif action == "curseprofile/comment-edited": - link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))) - content = _("[{author}]({author_url}) edited a [comment]({comment}) on {target} profile").format(author=author, - author_url=author_url, - comment=link, - target=change["title"].split(':')[1] if change["title"].split(':')[1] !=change["user"] else _("their own")) - elif action == "curseprofile/comment-purged": - link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))) - content = _("[{author}]({author_url}) purged a comment on {target} profile").format(author=author, - author_url=author_url, - target= - change["title"].split(':')[ - 1] if - change["title"].split(':')[ - 1] != change[ - "user"] else _( - "their own")) - elif action == "curseprofile/comment-deleted": - content = _("[{author}]({author_url}) deleted a comment on {target} profile").format(author=author, - author_url=author_url, - target=change["title"].split(':')[1] if change["title"].split(':')[1] !=change["user"] else _("their own")) - - elif action == "curseprofile/profile-edited": - link = link_formatter(create_article_path("UserProfile:{user}".format(user=change["title"].split(":")[1]))) - target = _("[{target}]({target_url})'s").format(target=change["title"].split(':')[1], target_url=link) if change["title"].split(':')[1] != author else _("[their own]({target_url})").format(target_url=link) - content = _("[{author}]({author_url}) edited the {field} on {target} profile. *({desc})*").format(author=author, - author_url=author_url, - target=target, - field=profile_field_name(change["logparams"]['4:section'], False), - desc=BeautifulSoup(change["parsedcomment"], "lxml").get_text()) - elif action in ("rights/rights", "rights/autopromote"): - link = link_formatter(create_article_path("User:{user}".format(user=change["title"].split(":")[1]))) - old_groups = [] - new_groups = [] - for name in change["logparams"]["oldgroups"]: - old_groups.append(_(name)) - for name in change["logparams"]["newgroups"]: - new_groups.append(_(name)) - if len(old_groups) == 0: - old_groups = [_("none")] - if len(new_groups) == 0: - new_groups = [_("none")] - - if action == "rights/rights": - content = "[{author}]({author_url}) changed group membership for [{target}]({target_url}) from {old_groups} to {new_groups}{comment}".format(author=author, author_url=author_url, target=change["title"].split(":")[1], target_url=link, old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), comment=parsed_comment) - else: - content = "{author} autopromoted [{target}]({target_url}) from {old_groups} to {new_groups}{comment}".format( - author=_("System"), author_url=author_url, target=change["title"].split(":")[1], target_url=link, - old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), - comment=parsed_comment) - elif action == "protect/protect": - link = link_formatter(create_article_path(change["title"])) - content = _("[{author}]({author_url}) protected [{article}]({article_url}) with the following settings: {settings}{comment}").format(author=author, author_url=author_url, - article=change["title"], article_url=link, - settings=change["logparams"]["description"]+_(" [cascading]") if "cascade" in change["logparams"] else "", - comment=parsed_comment) - elif action == "protect/modify": - link = link_formatter(create_article_path(change["title"])) - content = _( - "[{author}]({author_url}) modified protection settings of [{article}]({article_url}) to: {settings}{comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - settings=change["logparams"]["description"] + _(" [cascading]") if "cascade" in change["logparams"] else "", - comment=parsed_comment) - elif action == "protect/unprotect": - link = link_formatter(create_article_path(change["title"])) - content = _("[{author}]({author_url}) removed protection from [{article}]({article_url}){comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, comment=parsed_comment) - elif action == "delete/revision": - amount = len(change["logparams"]["ids"]) - link = link_formatter(create_article_path(change["title"])) - content = ngettext("[{author}]({author_url}) changed visibility of revision on page [{article}]({article_url}){comment}", - "[{author}]({author_url}) changed visibility of {amount} revisions on page [{article}]({article_url}){comment}", amount).format(author=author, author_url=author_url, - article=change["title"], article_url=link, amount=amount, comment=parsed_comment) - elif action == "import/upload": - link = link_formatter(create_article_path(change["title"])) - content = ngettext("[{author}]({author_url}) imported [{article}]({article_url}) with {count} revision{comment}", - "[{author}]({author_url}) imported [{article}]({article_url}) with {count} revisions{comment}", change["logparams"]["count"]).format( - author=author, author_url=author_url, article=change["title"], article_url=link, count=change["logparams"]["count"], comment=parsed_comment) - elif action == "delete/restore": - link = link_formatter(create_article_path(change["title"])) - content = _("[{author}]({author_url}) restored [{article}]({article_url}){comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, comment=parsed_comment) - elif action == "delete/event": - content = _("[{author}]({author_url}) changed visibility of log events{comment}").format(author=author, author_url=author_url, comment=parsed_comment) - elif action == "import/interwiki": - content = _("[{author}]({author_url}) imported interwiki{comment}").format(author=author, author_url=author_url, comment=parsed_comment) - elif action == "abusefilter/modify": - link = link_formatter(create_article_path("Special:AbuseFilter/history/{number}/diff/prev/{historyid}".format(number=change["logparams"]['newId'], historyid=change["logparams"]["historyId"]))) - content = _("[{author}]({author_url}) edited abuse filter [number {number}]({filter_url})").format(author=author, author_url=author_url, number=change["logparams"]['newId'], filter_url=link) - elif action == "abusefilter/create": - link = link_formatter( - create_article_path("Special:AbuseFilter/{number}".format(number=change["logparams"]['newId']))) - content = _("[{author}]({author_url}) created abuse filter [number {number}]({filter_url})").format(author=author, author_url=author_url, number=change["logparams"]['newId'], filter_url=link) - elif action == "merge/merge": - link = link_formatter(create_article_path(change["title"])) - link_dest = link_formatter(create_article_path(change["logparams"]["dest_title"])) - content = _("[{author}]({author_url}) merged revision histories of [{article}]({article_url}) into [{dest}]({dest_url}){comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, dest_url=link_dest, - dest=change["logparams"]["dest_title"], comment=parsed_comment) - elif action == "interwiki/iw_add": - link = link_formatter(create_article_path("Special:Interwiki")) - content = _("[{author}]({author_url}) added an entry to the [interwiki table]({table_url}) pointing to {website} with {prefix} prefix").format(author=author, author_url=author_url, desc=parsed_comment, - prefix=change["logparams"]['0'], - website=change["logparams"]['1'], - table_url=link) - elif action == "interwiki/iw_edit": - link = link_formatter(create_article_path("Special:Interwiki")) - content = _("[{author}]({author_url}) edited an entry in [interwiki table]({table_url}) pointing to {website} with {prefix} prefix").format(author=author, author_url=author_url, desc=parsed_comment, - prefix=change["logparams"]['0'], - website=change["logparams"]['1'], - table_url=link) - elif action == "interwiki/iw_delete": - link = link_formatter(create_article_path("Special:Interwiki")) - content = _("[{author}]({author_url}) deleted an entry in [interwiki table]({table_url})").format(author=author, author_url=author_url, table_url=link) - elif action == "contentmodel/change": - link = link_formatter(create_article_path(change["title"])) - content = _("[{author}]({author_url}) changed the content model of the page [{article}]({article_url}) from {old} to {new}{comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, old=change["logparams"]["oldmodel"], - new=change["logparams"]["newmodel"], comment=parsed_comment) - elif action == "sprite/sprite": - link = link_formatter(create_article_path(change["title"])) - content = _("[{author}]({author_url}) edited the sprite for [{article}]({article_url})").format(author=author, author_url=author_url, article=change["title"], article_url=link) - elif action == "sprite/sheet": - link = link_formatter(create_article_path(change["title"])) - content = _("[{author}]({author_url}) created the sprite sheet for [{article}]({article_url})").format(author=author, author_url=author_url, article=change["title"], article_url=link) - elif action == "sprite/slice": - link = link_formatter(create_article_path(change["title"])) - content = _("[{author}]({author_url}) edited the slice for [{article}]({article_url})").format(author=author, author_url=author_url, article=change["title"], article_url=link) - elif action == "cargo/createtable": - LinkParser.feed(change["logparams"]["0"]) - table = LinkParser.new_string - LinkParser.new_string = "" - content = _("[{author}]({author_url}) created the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=table) - elif action == "cargo/deletetable": - content = _("[{author}]({author_url}) deleted the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=change["logparams"]["0"]) - elif action == "cargo/recreatetable": - LinkParser.feed(change["logparams"]["0"]) - table = LinkParser.new_string - LinkParser.new_string = "" - content = _("[{author}]({author_url}) recreated the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=table) - elif action == "cargo/replacetable": - LinkParser.feed(change["logparams"]["0"]) - table = LinkParser.new_string - LinkParser.new_string = "" - content = _("[{author}]({author_url}) replaced the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=table) - elif action == "managetags/create": - link = link_formatter(create_article_path("Special:Tags")) - content = _("[{author}]({author_url}) created a [tag]({tag_url}) \"{tag}\"").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link) - recent_changes.init_info() - elif action == "managetags/delete": - link = link_formatter(create_article_path("Special:Tags")) - content = _("[{author}]({author_url}) deleted a [tag]({tag_url}) \"{tag}\"").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link) - recent_changes.init_info() - elif action == "managetags/activate": - link = link_formatter(create_article_path("Special:Tags")) - content = _("[{author}]({author_url}) activated a [tag]({tag_url}) \"{tag}\"").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link) - elif action == "managetags/deactivate": - link = link_formatter(create_article_path("Special:Tags")) - content = _("[{author}]({author_url}) deactivated a [tag]({tag_url}) \"{tag}\"").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link) - elif action == "suppressed": - content = _("An action has been hidden by administration.") - else: - logger.warning("No entry for {event} with params: {params}".format(event=action, params=change)) - return - send_to_discord(DiscordMessage("compact", action, settings["webhookURL"], content=content)) - - -def embed_formatter(action, change, parsed_comment, categories): - embed = DiscordMessage("embed", action, settings["webhookURL"]) - if parsed_comment is None: - parsed_comment = _("No description provided") - if action != "suppressed": - if "anon" in change: - author_url = create_article_path("Special:Contributions/{user}".format(user=change["user"].replace(" ", "_"))) # Replace here needed in case of #75 - logger.debug("current user: {} with cache of IPs: {}".format(change["user"], recent_changes.map_ips.keys())) - if change["user"] not in list(recent_changes.map_ips.keys()): - contibs = safe_read(recent_changes.safe_request( - "{wiki}?action=query&format=json&list=usercontribs&uclimit=max&ucuser={user}&ucstart={timestamp}&ucprop=".format( - wiki=WIKI_API_PATH, user=change["user"], timestamp=change["timestamp"])), "query", "usercontribs") - if contibs is None: - logger.warning( - "WARNING: Something went wrong when checking amount of contributions for given IP address") - change["user"] = change["user"] + "(?)" - else: - recent_changes.map_ips[change["user"]] = len(contibs) - logger.debug("Current params user {} and state of map_ips {}".format(change["user"], recent_changes.map_ips)) - change["user"] = "{author} ({contribs})".format(author=change["user"], contribs=len(contibs)) - else: - logger.debug( - "Current params user {} and state of map_ips {}".format(change["user"], recent_changes.map_ips)) - if action in ("edit", "new"): - recent_changes.map_ips[change["user"]] += 1 - change["user"] = "{author} ({amount})".format(author=change["user"], - amount=recent_changes.map_ips[change["user"]]) - else: - author_url = create_article_path("User:{}".format(change["user"].replace(" ", "_"))) - embed.set_author(change["user"], author_url) - if action in ("edit", "new"): # edit or new page - editsize = change["newlen"] - change["oldlen"] - if editsize > 0: - if editsize > 6032: - embed["color"] = 65280 - else: - embed["color"] = 35840 + (math.floor(editsize / 52)) * 256 - elif editsize < 0: - if editsize < -6032: - embed["color"] = 16711680 - else: - embed["color"] = 9175040 + (math.floor((editsize * -1) / 52)) * 65536 - elif editsize == 0: - embed["color"] = 8750469 - if change["title"].startswith("MediaWiki:Tag-"): # Refresh tag list when tag display name is edited - recent_changes.init_info() - link = "{wiki}index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format( - wiki=WIKI_SCRIPT_PATH, pageid=change["pageid"], diff=change["revid"], oldrev=change["old_revid"], - article=change["title"].replace(" ", "_")) - embed["title"] = "{redirect}{article} ({new}{minor}{bot}{space}{editsize})".format(redirect="⤷ " if "redirect" in change else "", article=change["title"], editsize="+" + str( - editsize) if editsize > 0 else editsize, new=_("(N!) ") if action == "new" else "", - minor=_("m") if action == "edit" and "minor" in change else "", bot=_('b') if "bot" in change else "", space=" " if "bot" in change or (action == "edit" and "minor" in change) or action == "new" else "") - if settings["appearance"]["embed"]["show_edit_changes"]: - if action == "new": - changed_content = safe_read(recent_changes.safe_request( - "{wiki}?action=compare&format=json&fromtext=&torev={diff}&topst=1&prop=diff".format( - wiki=WIKI_API_PATH, diff=change["revid"] - )), "compare", "*") - else: - changed_content = safe_read(recent_changes.safe_request( - "{wiki}?action=compare&format=json&fromrev={oldrev}&torev={diff}&topst=1&prop=diff".format( - wiki=WIKI_API_PATH, diff=change["revid"],oldrev=change["old_revid"] - )), "compare", "*") - if changed_content: - EditDiff = ContentParser() - EditDiff.feed(changed_content) - if EditDiff.small_prev_del: - if EditDiff.small_prev_del.replace("~~", "").isspace(): - EditDiff.small_prev_del = _('__Only whitespace__') - else: - EditDiff.small_prev_del = EditDiff.small_prev_del.replace("~~~~", "") - if EditDiff.small_prev_ins: - if EditDiff.small_prev_ins.replace("**", "").isspace(): - EditDiff.small_prev_ins = _('__Only whitespace__') - else: - EditDiff.small_prev_ins = EditDiff.small_prev_ins.replace("****", "") - logger.debug("Changed content: {}".format(EditDiff.small_prev_ins)) - if EditDiff.small_prev_del and not action == "new": - embed.add_field(_("Removed"), "{data}".format(data=EditDiff.small_prev_del), inline=True) - if EditDiff.small_prev_ins: - embed.add_field(_("Added"), "{data}".format(data=EditDiff.small_prev_ins), inline=True) - else: - logger.warning("Unable to download data on the edit content!") - elif action in ("upload/overwrite", "upload/upload", "upload/revert"): # sending files - license = None - urls = safe_read(recent_changes.safe_request( - "{wiki}?action=query&format=json&prop=imageinfo&list=&meta=&titles={filename}&iiprop=timestamp%7Curl%7Carchivename&iilimit=5".format( - wiki=WIKI_API_PATH, filename=change["title"])), "query", "pages") - link = create_article_path(change["title"].replace(" ", "_")) - additional_info_retrieved = False - if urls is not None: - logger.debug(urls) - if "-1" not in urls: # image still exists and not removed - try: - img_info = next(iter(urls.values()))["imageinfo"] - for num, revision in enumerate(img_info): - if revision["timestamp"] == change["logparams"]["img_timestamp"]: # find the correct revision corresponding for this log entry - image_direct_url = "{rev}?{cache}".format(rev=revision["url"], cache=int(time.time()*5)) # cachebusting - additional_info_retrieved = True - break - except KeyError: - logger.warning("Wiki did not respond with extended information about file. The preview will not be shown.") - else: - logger.warning("Request for additional image information have failed. The preview will not be shown.") - if action in ("upload/overwrite", "upload/revert"): - if additional_info_retrieved: - article_encoded = change["title"].replace(" ", "_").replace(')', '\)') - try: - revision = img_info[num+1] - except IndexError: - logger.exception("Could not analize the information about the image (does it have only one version when expected more in overwrite?) which resulted in no Options field: {}".format(img_info)) - else: - undolink = "{wiki}index.php?title={filename}&action=revert&oldimage={archiveid}".format( - wiki=WIKI_SCRIPT_PATH, filename=article_encoded, archiveid=revision["archivename"]) - embed.add_field(_("Options"), _("([preview]({link}) | [undo]({undolink}))").format( - link=image_direct_url, undolink=undolink)) - if settings["appearance"]["embed"]["embed_images"]: - embed["image"]["url"] = image_direct_url - if action == "upload/overwrite": - embed["title"] = _("Uploaded a new version of {name}").format(name=change["title"]) - elif action == "upload/revert": - embed["title"] = _("Reverted a version of {name}").format(name=change["title"]) - else: - embed["title"] = _("Uploaded {name}").format(name=change["title"]) - if settings["license_detection"]: - article_content = safe_read(recent_changes.safe_request( - "{wiki}?action=query&format=json&prop=revisions&titles={article}&rvprop=content".format( - wiki=WIKI_API_PATH, article=quote_plus(change["title"], safe=''))), "query", "pages") - if article_content is None: - logger.warning("Something went wrong when getting license for the image") - return 0 - if "-1" not in article_content: - content = list(article_content.values())[0]['revisions'][0]['*'] - try: - matches = re.search(re.compile(settings["license_regex"], re.IGNORECASE), content) - if matches is not None: - license = matches.group("license") - else: - if re.search(re.compile(settings["license_regex_detect"], re.IGNORECASE), content) is None: - license = _("**No license!**") - else: - license = "?" - except IndexError: - logger.error( - "Given regex for the license detection is incorrect. It does not have a capturing group called \"license\" specified. Please fix license_regex value in the config!") - license = "?" - except re.error: - logger.error( - "Given regex for the license detection is incorrect. Please fix license_regex or license_regex_detect values in the config!") - license = "?" - if license is not None: - parsed_comment += _("\nLicense: {}").format(license) - if additional_info_retrieved: - embed.add_field(_("Options"), _("([preview]({link}))").format(link=image_direct_url)) - if settings["appearance"]["embed"]["embed_images"]: - embed["image"]["url"] = image_direct_url - elif action == "delete/delete": - link = create_article_path(change["title"].replace(" ", "_")) - embed["title"] = _("Deleted page {article}").format(article=change["title"]) - elif action == "delete/delete_redir": - link = create_article_path(change["title"].replace(" ", "_")) - embed["title"] = _("Deleted redirect {article} by overwriting").format(article=change["title"]) - elif action == "move/move": - link = create_article_path(change["logparams"]['target_title'].replace(" ", "_")) - parsed_comment = "{supress}. {desc}".format(desc=parsed_comment, - supress=_("No redirect has been made") if "suppressredirect" in change["logparams"] else _( - "A redirect has been made")) - embed["title"] = _("Moved {redirect}{article} to {target}").format(redirect="⤷ " if "redirect" in change else "", article=change["title"], target=change["logparams"]['target_title']) - elif action == "move/move_redir": - link = create_article_path(change["logparams"]["target_title"].replace(" ", "_")) - embed["title"] = _("Moved {redirect}{article} to {title} over redirect").format(redirect="⤷ " if "redirect" in change else "", article=change["title"], - title=change["logparams"]["target_title"]) - elif action == "protect/move_prot": - link = create_article_path(change["logparams"]["oldtitle_title"].replace(" ", "_")) - embed["title"] = _("Moved protection settings from {redirect}{article} to {title}").format(redirect="⤷ " if "redirect" in change else "", article=change["logparams"]["oldtitle_title"], - title=change["title"]) - elif action == "block/block": - user = change["title"].split(':')[1] - try: - ipaddress.ip_address(user) - link = create_article_path("Special:Contributions/{user}".format(user=user)) - except ValueError: - link = create_article_path(change["title"].replace(" ", "_").replace(')', '\)')) - if change["logparams"]["duration"] == "infinite": - block_time = _("infinity and beyond") - else: - english_length = re.sub(r"(\d+)", "", change["logparams"]["duration"]) #note that translation won't work for millenia and century yet - english_length_num = re.sub(r"(\D+)", "", change["logparams"]["duration"]) - try: - english_length = english_length.rstrip("s").strip() - block_time = "{num} {translated_length}".format(num=english_length_num, translated_length=ngettext(english_length, english_length + "s", int(english_length_num))) - except AttributeError: - logger.error("Could not strip s from the block event, seems like the regex didn't work?") - return - if "sitewide" not in change["logparams"]: - restriction_description = "" - if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]: - restriction_description = _("Blocked from editing the following pages: ") - for page in change["logparams"]["restrictions"]["pages"]: - restricted_pages = ["*"+i["page_title"]+"*" for i in change["logparams"]["restrictions"]["pages"]] - restriction_description = restriction_description + ", ".join(restricted_pages) - if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["namespaces"]: - namespaces = [] - if restriction_description: - restriction_description = restriction_description + _(" and namespaces: ") - else: - restriction_description = _("Blocked from editing pages on following namespaces: ") - for namespace in change["logparams"]["restrictions"]["namespaces"]: - if str(namespace) in recent_changes.namespaces: # if we have cached namespace name for given namespace number, add its name to the list - namespaces.append("*{ns}*".format(ns=recent_changes.namespaces[str(namespace)]["*"])) - else: - namespaces.append("*{ns}*".format(ns=namespace)) - restriction_description = restriction_description + ", ".join(namespaces) - restriction_description = restriction_description + "." - if len(restriction_description) > 1020: - logger.debug(restriction_description) - restriction_description = restriction_description[:1020]+"…" - embed.add_field(_("Partial block details"), restriction_description, inline=True) - embed["title"] = _("Blocked {blocked_user} for {time}").format(blocked_user=user, time=block_time) - elif action == "block/reblock": - link = create_article_path(change["title"].replace(" ", "_").replace(')', '\)')) - user = change["title"].split(':')[1] - embed["title"] = _("Changed block settings for {blocked_user}").format(blocked_user=user) - elif action == "block/unblock": - link = create_article_path(change["title"].replace(" ", "_").replace(')', '\)')) - user = change["title"].split(':')[1] - embed["title"] = _("Unblocked {blocked_user}").format(blocked_user=user) - elif action == "curseprofile/comment-created": - if settings["appearance"]["embed"]["show_edit_changes"]: - parsed_comment = pull_comment(change["logparams"]["4:comment_id"]) - link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])) - embed["title"] = _("Left a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \ - change["user"] else _( - "Left a comment on their own profile") - elif action == "curseprofile/comment-replied": - if settings["appearance"]["embed"]["show_edit_changes"]: - parsed_comment = pull_comment(change["logparams"]["4:comment_id"]) - link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])) - embed["title"] = _("Replied to a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \ - change["user"] else _( - "Replied to a comment on their own profile") - elif action == "curseprofile/comment-edited": - if settings["appearance"]["embed"]["show_edit_changes"]: - parsed_comment = pull_comment(change["logparams"]["4:comment_id"]) - link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])) - embed["title"] = _("Edited a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \ - change["user"] else _( - "Edited a comment on their own profile") - elif action == "curseprofile/profile-edited": - link = create_article_path("UserProfile:{target}".format(target=change["title"].split(':')[1].replace(" ", "_").replace(')', '\)'))) - embed["title"] = _("Edited {target}'s profile").format(target=change["title"].split(':')[1]) if change["user"] != change["title"].split(':')[1] else _("Edited their own profile") - if not change["parsedcomment"]: # If the field is empty - parsed_comment = _("Cleared the {field} field").format(field=profile_field_name(change["logparams"]['4:section'], True)) - else: - parsed_comment = _("{field} field changed to: {desc}").format(field=profile_field_name(change["logparams"]['4:section'], True), desc=BeautifulSoup(change["parsedcomment"], "lxml").get_text()) - elif action == "curseprofile/comment-purged": - link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])) - embed["title"] = _("Purged a comment on {target}'s profile").format(target=change["title"].split(':')[1]) - elif action == "curseprofile/comment-deleted": - if "4:comment_id" in change["logparams"]: - link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])) - else: - link = create_article_path(change["title"]) - embed["title"] = _("Deleted a comment on {target}'s profile").format(target=change["title"].split(':')[1]) - elif action in ("rights/rights", "rights/autopromote"): - link = create_article_path("User:{}".format(change["title"].split(":")[1].replace(" ", "_"))) - if action == "rights/rights": - embed["title"] = _("Changed group membership for {target}").format(target=change["title"].split(":")[1]) - else: - change["user"] = _("System") - author_url = "" - embed["title"] = _("{target} got autopromoted to a new usergroup").format( - target=change["title"].split(":")[1]) - if len(change["logparams"]["oldgroups"]) < len(change["logparams"]["newgroups"]): - embed["thumbnail"]["url"] = "https://i.imgur.com/WnGhF5g.gif" - old_groups = [] - new_groups = [] - for name in change["logparams"]["oldgroups"]: - old_groups.append(_(name)) - for name in change["logparams"]["newgroups"]: - new_groups.append(_(name)) - if len(old_groups) == 0: - old_groups = [_("none")] - if len(new_groups) == 0: - new_groups = [_("none")] - reason = ": {desc}".format(desc=parsed_comment) if parsed_comment != _("No description provided") else "" - parsed_comment = _("Groups changed from {old_groups} to {new_groups}{reason}").format( - old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), reason=reason) - elif action == "protect/protect": - link = create_article_path(change["title"].replace(" ", "_")) - embed["title"] = _("Protected {target}").format(target=change["title"]) - parsed_comment = "{settings}{cascade} | {reason}".format(settings=change["logparams"]["description"], - cascade=_(" [cascading]") if "cascade" in change["logparams"] else "", - reason=parsed_comment) - elif action == "protect/modify": - link = create_article_path(change["title"].replace(" ", "_")) - embed["title"] = _("Changed protection level for {article}").format(article=change["title"]) - parsed_comment = "{settings}{cascade} | {reason}".format(settings=change["logparams"]["description"], - cascade=_(" [cascading]") if "cascade" in change["logparams"] else "", - reason=parsed_comment) - elif action == "protect/unprotect": - link = create_article_path(change["title"].replace(" ", "_")) - embed["title"] = _("Removed protection from {article}").format(article=change["title"]) - elif action == "delete/revision": - amount = len(change["logparams"]["ids"]) - link = create_article_path(change["title"].replace(" ", "_")) - embed["title"] = ngettext("Changed visibility of revision on page {article} ", - "Changed visibility of {amount} revisions on page {article} ", amount).format( - article=change["title"], amount=amount) - elif action == "import/upload": - link = create_article_path(change["title"].replace(" ", "_")) - embed["title"] = ngettext("Imported {article} with {count} revision", - "Imported {article} with {count} revisions", change["logparams"]["count"]).format( - article=change["title"], count=change["logparams"]["count"]) - elif action == "delete/restore": - link = create_article_path(change["title"].replace(" ", "_")) - embed["title"] = _("Restored {article}").format(article=change["title"]) - elif action == "delete/event": - link = create_article_path("Special:RecentChanges") - embed["title"] = _("Changed visibility of log events") - elif action == "import/interwiki": - link = create_article_path("Special:RecentChanges") - embed["title"] = _("Imported interwiki") - elif action == "abusefilter/modify": - link = create_article_path("Special:AbuseFilter/history/{number}/diff/prev/{historyid}".format(number=change["logparams"]['newId'], historyid=change["logparams"]["historyId"])) - embed["title"] = _("Edited abuse filter number {number}").format(number=change["logparams"]['newId']) - elif action == "abusefilter/create": - link = create_article_path("Special:AbuseFilter/{number}".format(number=change["logparams"]['newId'])) - embed["title"] = _("Created abuse filter number {number}").format(number=change["logparams"]['newId']) - elif action == "merge/merge": - link = create_article_path(change["title"].replace(" ", "_")) - embed["title"] = _("Merged revision histories of {article} into {dest}").format(article=change["title"], - dest=change["logparams"]["dest_title"]) - elif action == "interwiki/iw_add": - link = create_article_path("Special:Interwiki") - embed["title"] = _("Added an entry to the interwiki table") - parsed_comment = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=parsed_comment, - prefix=change["logparams"]['0'], - website=change["logparams"]['1']) - elif action == "interwiki/iw_edit": - link = create_article_path("Special:Interwiki") - embed["title"] = _("Edited an entry in interwiki table") - parsed_comment = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=parsed_comment, - prefix=change["logparams"]['0'], - website=change["logparams"]['1']) - elif action == "interwiki/iw_delete": - link = create_article_path("Special:Interwiki") - embed["title"] = _("Deleted an entry in interwiki table") - parsed_comment = _("Prefix: {prefix} | {desc}").format(desc=parsed_comment, prefix=change["logparams"]['0']) - elif action == "contentmodel/change": - link = create_article_path(change["title"].replace(" ", "_")) - embed["title"] = _("Changed the content model of the page {article}").format(article=change["title"]) - parsed_comment = _("Model changed from {old} to {new}: {reason}").format(old=change["logparams"]["oldmodel"], - new=change["logparams"]["newmodel"], - reason=parsed_comment) - elif action == "sprite/sprite": - link = create_article_path(change["title"].replace(" ", "_")) - embed["title"] = _("Edited the sprite for {article}").format(article=change["title"]) - elif action == "sprite/sheet": - link = create_article_path(change["title"].replace(" ", "_")) - embed["title"] = _("Created the sprite sheet for {article}").format(article=change["title"]) - elif action == "sprite/slice": - link = create_article_path(change["title"].replace(" ", "_")) - embed["title"] = _("Edited the slice for {article}").format(article=change["title"]) - elif action == "cargo/createtable": - LinkParser.feed(change["logparams"]["0"]) - table = re.search(r"\[(.*?)\]\(<(.*?)>\)", LinkParser.new_string) - LinkParser.new_string = "" - link = table.group(2) - embed["title"] = _("Created the Cargo table \"{table}\"").format(table=table.group(1)) - parsed_comment = None - elif action == "cargo/deletetable": - link = create_article_path("Special:CargoTables") - embed["title"] = _("Deleted the Cargo table \"{table}\"").format(table=change["logparams"]["0"]) - parsed_comment = None - elif action == "cargo/recreatetable": - LinkParser.feed(change["logparams"]["0"]) - table = re.search(r"\[(.*?)\]\(<(.*?)>\)", LinkParser.new_string) - LinkParser.new_string = "" - link = table.group(2) - embed["title"] = _("Recreated the Cargo table \"{table}\"").format(table=table.group(1)) - parsed_comment = None - elif action == "cargo/replacetable": - LinkParser.feed(change["logparams"]["0"]) - table = re.search(r"\[(.*?)\]\(<(.*?)>\)", LinkParser.new_string) - LinkParser.new_string = "" - link = table.group(2) - embed["title"] = _("Replaced the Cargo table \"{table}\"").format(table=table.group(1)) - parsed_comment = None - elif action == "managetags/create": - link = create_article_path("Special:Tags") - embed["title"] = _("Created a tag \"{tag}\"").format(tag=change["logparams"]["tag"]) - recent_changes.init_info() - elif action == "managetags/delete": - link = create_article_path("Special:Tags") - embed["title"] = _("Deleted a tag \"{tag}\"").format(tag=change["logparams"]["tag"]) - recent_changes.init_info() - elif action == "managetags/activate": - link = create_article_path("Special:Tags") - embed["title"] = _("Activated a tag \"{tag}\"").format(tag=change["logparams"]["tag"]) - elif action == "managetags/deactivate": - link = create_article_path("Special:Tags") - embed["title"] = _("Deactivated a tag \"{tag}\"").format(tag=change["logparams"]["tag"]) - elif action == "suppressed": - link = create_article_path("") - embed["title"] = _("Action has been hidden by administration.") - embed["author"]["name"] = _("Unknown") - else: - logger.warning("No entry for {event} with params: {params}".format(event=action, params=change)) - embed["author"]["icon_url"] = settings["appearance"]["embed"][action]["icon"] - embed["url"] = link - if parsed_comment is not None: - embed["description"] = parsed_comment - if settings["appearance"]["embed"]["show_footer"]: - embed["timestamp"] = change["timestamp"] - if "tags" in change and change["tags"]: - tag_displayname = [] - for tag in change["tags"]: - if tag in recent_changes.tags: - if recent_changes.tags[tag] is None: - continue # Ignore hidden tags - else: - tag_displayname.append(recent_changes.tags[tag]) - else: - tag_displayname.append(tag) - embed.add_field(_("Tags"), ", ".join(tag_displayname)) - logger.debug("Current params in edit action: {}".format(change)) - if categories is not None and not (len(categories["new"]) == 0 and len(categories["removed"]) == 0): - new_cat = (_("**Added**: ") + ", ".join(list(categories["new"])[0:16]) + ("\n" if len(categories["new"])<=15 else _(" and {} more\n").format(len(categories["new"])-15))) if categories["new"] else "" - del_cat = (_("**Removed**: ") + ", ".join(list(categories["removed"])[0:16]) + ("" if len(categories["removed"])<=15 else _(" and {} more").format(len(categories["removed"])-15))) if categories["removed"] else "" - embed.add_field(_("Changed categories"), new_cat + del_cat) - embed.finish_embed() - send_to_discord(embed) - - -def essential_info(change, changed_categories): - """Prepares essential information for both embed and compact message format.""" - logger.debug(change) - if ("actionhidden" in change or "suppressed" in change) and "suppressed" not in settings["ignored"]: # if event is hidden using suppression - appearance_mode("suppressed", change, "", changed_categories) - return - if "commenthidden" not in change: - LinkParser.feed(change["parsedcomment"]) - parsed_comment = LinkParser.new_string - LinkParser.new_string = "" - parsed_comment = re.sub(r"(`|_|\*|~|{|}|\|\|)", "\\\\\\1", parsed_comment, 0) - else: - parsed_comment = _("~~hidden~~") - if not parsed_comment: - parsed_comment = None - if change["type"] in ["edit", "new"]: - logger.debug("List of categories in essential_info: {}".format(changed_categories)) - if "userhidden" in change: - change["user"] = _("hidden") - identification_string = change["type"] - elif change["type"] == "log": - identification_string = "{logtype}/{logaction}".format(logtype=change["logtype"], logaction=change["logaction"]) - if identification_string not in supported_logs: - logger.warning( - "This event is not implemented in the script. Please make an issue on the tracker attaching the following info: wiki url, time, and this information: {}".format( - change)) - return - elif change["type"] == "categorize": - return - else: - logger.warning("This event is not implemented in the script. Please make an issue on the tracker attaching the following info: wiki url, time, and this information: {}".format(change)) - return - if identification_string in settings["ignored"]: - return - appearance_mode(identification_string, change, parsed_comment, changed_categories) def day_overview_request(): logger.info("Fetching daily overview... This may take up to 30 seconds!") @@ -1044,280 +205,6 @@ def day_overview(): logger.debug("function requesting changes for day overview returned with error code") -class Recent_Changes_Class(object): - def __init__(self): - self.ids = [] - self.map_ips = {} - self.recent_id = 0 - self.downtimecredibility = 0 - self.last_downtime = 0 - self.tags = {} - self.groups = {} - self.streak = -1 - self.mw_messages = {} - self.namespaces = None - self.session = session - if settings["limitrefetch"] != -1: - self.file_id = storage["rcid"] - else: - self.file_id = 999999999 # such value won't cause trouble, and it will make sure no refetch happen - - @staticmethod - def handle_mw_errors(request): - if "errors" in request: - logger.error(request["errors"]) - raise MWError - return request - - def log_in(self): - global logged_in - # session.cookies.clear() - if '@' not in settings["wiki_bot_login"]: - logger.error( - "Please provide proper nickname for login from {wiki}Special:BotPasswords".format( - wiki=WIKI_SCRIPT_PATH)) - return - if len(settings["wiki_bot_password"]) != 32: - logger.error( - "Password seems incorrect. It should be 32 characters long! Grab it from {wiki}Special:BotPasswords".format( - wiki=WIKI_SCRIPT_PATH)) - return - logger.info("Trying to log in to {wiki}...".format(wiki=WIKI_SCRIPT_PATH)) - try: - response = self.handle_mw_errors( - self.session.post(WIKI_API_PATH, - data={'action': 'query', 'format': 'json', 'utf8': '', 'meta': 'tokens', - 'type': 'login'})) - response = self.handle_mw_errors( - self.session.post(WIKI_API_PATH, - data={'action': 'login', 'format': 'json', 'utf8': '', - 'lgname': settings["wiki_bot_login"], - 'lgpassword': settings["wiki_bot_password"], - 'lgtoken': response.json()['query']['tokens']['logintoken']})) - except ValueError: - logger.error("Logging in have not succeeded") - return - except MWError: - logger.error("Logging in have not succeeded") - return - try: - if response.json()['login']['result'] == "Success": - logger.info("Successfully logged in") - logged_in = True - else: - logger.error("Logging in have not succeeded") - except: - logger.error("Logging in have not succeeded") - - def add_cache(self, change): - self.ids.append(change["rcid"]) - # self.recent_id = change["rcid"] - if len(self.ids) > settings["limitrefetch"] + 5: - self.ids.pop(0) - - def fetch(self, amount=settings["limit"]): - messagequeue.resend_msgs() - last_check = self.fetch_changes(amount=amount) - # If the request succeeds the last_check will be the last rcid from recentchanges query - if last_check is not None: - self.recent_id = last_check - # Assigns self.recent_id the last rcid if request succeeded, otherwise set the id from the file - if settings["limitrefetch"] != -1 and self.recent_id != self.file_id and self.recent_id != 0: # if saving to database is disabled, don't save the recent_id - self.file_id = self.recent_id - storage["rcid"] = self.recent_id - datafile.save_datafile() - logger.debug("Most recent rcid is: {}".format(self.recent_id)) - return self.recent_id - - def fetch_changes(self, amount, clean=False): - """Fetches the :amount: of changes from the wiki. - Returns None on error and int of rcid of latest change if succeeded""" - global logged_in - if len(self.ids) == 0: - logger.debug("ids is empty, triggering clean fetch") - clean = True - changes = self.safe_request( - "{wiki}?action=query&format=json&list=recentchanges{show_bots}&rcprop=title%7Credirect%7Ctimestamp%7Cids%7Cloginfo%7Cparsedcomment%7Csizes%7Cflags%7Ctags%7Cuser&rclimit={amount}&rctype=edit%7Cnew%7Clog%7Cexternal{categorize}".format( - wiki=WIKI_API_PATH, amount=amount, categorize="%7Ccategorize" if settings["show_added_categories"] else "", show_bots="&rcshow=!bot" if settings["show_bots"] is False else "")) - if changes: - try: - changes = changes.json()['query']['recentchanges'] - changes.reverse() - except ValueError: - logger.warning("ValueError in fetching changes") - logger.warning("Changes URL:" + changes.url) - self.downtime_controller() - return None - except KeyError: - logger.warning("Wiki returned %s" % (changes.json())) - return None - else: - if self.downtimecredibility > 0: - self.downtimecredibility -= 1 - if self.streak > -1: - self.streak += 1 - if self.streak > 8: - self.streak = -1 - send_simple("down_detector", _("Connection to {wiki} seems to be stable now.").format(wiki=settings["wikiname"]), - _("Connection status"), settings["avatars"]["connection_restored"]) - # In the first for loop we analize the categorize events and figure if we will need more changes to fetch - # in order to cover all of the edits - categorize_events = {} - new_events = 0 - for change in changes: - if not (change["rcid"] in self.ids or change["rcid"] < self.recent_id) and not clean: - new_events += 1 - logger.debug( - "New event: {}".format(change["rcid"])) - if new_events == settings["limit"]: - if amount < 500: - # call the function again with max limit for more results, ignore the ones in this request - logger.debug("There were too many new events, requesting max amount of events from the wiki.") - return self.fetch(amount=5000 if logged_in else 500) - else: - logger.debug( - "There were too many new events, but the limit was high enough we don't care anymore about fetching them all.") - if change["type"] == "categorize": - if "commenthidden" not in change: - if len(recent_changes.mw_messages.keys()) > 0: - cat_title = change["title"].split(':', 1)[1] - # I so much hate this, blame Markus for making me do this - if change["revid"] not in categorize_events: - categorize_events[change["revid"]] = {"new": set(), "removed": set()} - comment_to_match = re.sub(r'<.*?a>', '', change["parsedcomment"]) - if recent_changes.mw_messages["recentchanges-page-added-to-category"] in comment_to_match or recent_changes.mw_messages["recentchanges-page-added-to-category-bundled"] in comment_to_match: - categorize_events[change["revid"]]["new"].add(cat_title) - logger.debug("Matched {} to added category for {}".format(cat_title, change["revid"])) - elif recent_changes.mw_messages["recentchanges-page-removed-from-category"] in comment_to_match or recent_changes.mw_messages["recentchanges-page-removed-from-category-bundled"] in comment_to_match: - categorize_events[change["revid"]]["removed"].add(cat_title) - logger.debug("Matched {} to removed category for {}".format(cat_title, change["revid"])) - else: - logger.debug("Unknown match for category change with messages {}, {}, {}, {} and comment_to_match {}".format(recent_changes.mw_messages["recentchanges-page-added-to-category"], recent_changes.mw_messages["recentchanges-page-removed-from-category"], recent_changes.mw_messages["recentchanges-page-removed-from-category-bundled"], recent_changes.mw_messages["recentchanges-page-added-to-category-bundled"], comment_to_match)) - else: - logger.warning("Init information not available, could not read category information. Please restart the bot.") - else: - logger.debug("Log entry got suppressed, ignoring entry.") - # if change["revid"] in categorize_events: - # categorize_events[change["revid"]].append(cat_title) - # else: - # logger.debug("New category '{}' for {}".format(cat_title, change["revid"])) - # categorize_events[change["revid"]] = {cat_title: } - for change in changes: - if change["rcid"] in self.ids or change["rcid"] < self.recent_id: - logger.debug("Change ({}) is in ids or is lower than recent_id {}".format(change["rcid"], - self.recent_id)) - continue - logger.debug(self.ids) - logger.debug(self.recent_id) - self.add_cache(change) - if clean and not (self.recent_id == 0 and change["rcid"] > self.file_id): - logger.debug("Rejected {val}".format(val=change["rcid"])) - continue - essential_info(change, categorize_events.get(change.get("revid"), None)) - return change["rcid"] - - def safe_request(self, url): - try: - request = self.session.get(url, timeout=10, allow_redirects=False) - except requests.exceptions.Timeout: - logger.warning("Reached timeout error for request on link {url}".format(url=url)) - self.downtime_controller() - return None - except requests.exceptions.ConnectionError: - logger.warning("Reached connection error for request on link {url}".format(url=url)) - self.downtime_controller() - return None - except requests.exceptions.ChunkedEncodingError: - logger.warning("Detected faulty response from the web server for request on link {url}".format(url=url)) - self.downtime_controller() - return None - else: - if 499 < request.status_code < 600: - self.downtime_controller() - return None - elif request.status_code == 302: - logger.critical("Redirect detected! Either the wiki given in the script settings (wiki field) is incorrect/the wiki got removed or Gamepedia is giving us the false value. Please provide the real URL to the wiki, current URL redirects to {}".format(request.next.url)) - sys.exit(0) - return request - - def check_connection(self, looped=False): - online = 0 - for website in ["https://google.com", "https://instagram.com", "https://steamcommunity.com"]: - try: - requests.get(website, timeout=10) - online += 1 - except requests.exceptions.ConnectionError: - pass - except requests.exceptions.Timeout: - pass - if online < 1: - logger.error("Failure when checking Internet connection at {time}".format( - time=time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime()))) - self.downtimecredibility = 0 - if not looped: - while 1: # recursed loop, check for connection (every 10 seconds) as long as three services are down, don't do anything else - if self.check_connection(looped=True): - recent_changes.fetch(amount=settings["limitrefetch"]) - break - time.sleep(10) - return False - return True - - def downtime_controller(self): - if not settings["show_updown_messages"]: - return - if self.streak > -1: # reset the streak of successful connections when bad one happens - self.streak = 0 - if self.downtimecredibility < 60: - self.downtimecredibility += 15 - else: - if ( - time.time() - self.last_downtime) > 1800 and self.check_connection(): # check if last downtime happened within 30 minutes, if yes, don't send a message - send_simple("down_detector", _("{wiki} seems to be down or unreachable.").format(wiki=settings["wikiname"]), - _("Connection status"), settings["avatars"]["connection_failed"]) - self.last_downtime = time.time() - self.streak = 0 - - def clear_cache(self): - self.map_ips = {} - - def init_info(self): - startup_info = safe_read(self.safe_request( - "{wiki}?action=query&format=json&uselang=content&list=tags&meta=allmessages%7Csiteinfo&utf8=1&tglimit=max&tgprop=displayname&ammessages=recentchanges-page-added-to-category%7Crecentchanges-page-removed-from-category%7Crecentchanges-page-added-to-category-bundled%7Crecentchanges-page-removed-from-category-bundled&amenableparser=1&amincludelocal=1&siprop=namespaces".format( - wiki=WIKI_API_PATH)), "query") - if startup_info: - if "tags" in startup_info and "allmessages" in startup_info: - for tag in startup_info["tags"]: - try: - self.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text() - except KeyError: - self.tags[tag["name"]] = None # Tags with no display name are hidden and should not appear on RC as well - for message in startup_info["allmessages"]: - if not "missing" in message: # ignore missing strings - self.mw_messages[message["name"]] = message["*"] - else: - logging.warning("Could not fetch the MW message translation for: {}".format(message["name"])) - for key, message in self.mw_messages.items(): - if key.startswith("recentchanges-page-"): - self.mw_messages[key] = re.sub(r'\[\[.*?\]\]', '', message) - self.namespaces = startup_info["namespaces"] - logger.info("Gathered information about the tags and interface messages.") - else: - logger.warning("Could not retrieve initial wiki information. Some features may not work correctly!") - logger.debug(startup_info) - else: - logger.error("Could not retrieve initial wiki information. Possibly internet connection issue?") - - -recent_changes = Recent_Changes_Class() -# Set the proper formatter -if settings["appearance"]["mode"] == "embed": - appearance_mode = embed_formatter -elif settings["appearance"]["mode"] == "compact": - appearance_mode = compact_formatter -else: - logger.critical("Unknown formatter!") - sys.exit(1) # Log in and download wiki information try: