diff --git a/extensions/base/mediawiki.py b/extensions/base/mediawiki.py index 8fd166f..b669324 100644 --- a/extensions/base/mediawiki.py +++ b/extensions/base/mediawiki.py @@ -12,11 +12,12 @@ # # You should have received a copy of the GNU General Public License # along with RcGcDw. If not, see . - +import ipaddress import logging import math import re import time +import datetime from src.discord.message import DiscordMessage from src.api import formatter from src.i18n import rc_formatters @@ -27,6 +28,7 @@ from src.configloader import settings from src.exceptions import * _ = rc_formatters.gettext +ngettext = rc_formatters.ngettext logger = logging.getLogger("extensions.base") @@ -416,3 +418,133 @@ def compact_protect_unprotect(ctx, change): author=author, author_url=author_url, article=sanitize_to_markdown(change["title"]), article_url=link, comment=ctx.parsedcomment) return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content) +# block/block +def block_expiry(change: dict) -> str: + if change["logparams"]["duration"] in ["infinite", "indefinite", "infinity", "never"]: + return _("for infinity and beyond") + else: + if "expiry" in change["logparams"]: + expiry_date_time_obj = datetime.datetime.strptime(change["logparams"]["expiry"], '%Y-%m-%dT%H:%M:%SZ') + timestamp_date_time_obj = datetime.datetime.strptime(change["timestamp"], '%Y-%m-%dT%H:%M:%SZ') + timedelta_for_expiry = expiry_date_time_obj - timestamp_date_time_obj + years, days, hours, minutes = timedelta_for_expiry.seconds // 31557600, \ + timedelta_for_expiry.seconds % 31557600 // 86400, \ + timedelta_for_expiry.seconds % 86400 // 3600, timedelta_for_expiry.seconds % 3600 // 60 + if not any([years, days, hours, minutes]): + return _("less than a minute") + time_names = (ngettext("year", "years", years), ngettext("day", "days", days), ngettext("hour", "hours", hours), ngettext("minute", "minutes", minutes)) + final_time = [] + for num, timev in enumerate([years, days, hours, minutes]): + if timev: + final_time.append(_("{time_unit} {time_number}").format(time_unit=time_names[num], time_number=timev)) + return ", ".join(final_time) + else: + return change["logparams"]["duration"] # Temporary? Should be rare? We will see in testing + + +@formatter.embed(event="block/block", mode="embed") +def embed_block_block(ctx, change): + embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url) + embed_helper(ctx, embed, change) + user = change["title"].split(':', 1)[1] + try: + ipaddress.ip_address(user) + embed["url"] = create_article_path("Special:Contributions/{user}".format(user=user)) + except ValueError: + embed["url"] = create_article_path(sanitize_to_url(change["title"])) + if "sitewide" not in change["logparams"]: + restriction_description = "" + if "restrictions" in change["logparams"]: + if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]: + restriction_description = _("Blocked from editing the following pages: ") + for page in change["logparams"]["restrictions"]["pages"]: + restricted_pages = ["*"+i["page_title"]+"*" for i in change["logparams"]["restrictions"]["pages"]] + restriction_description = restriction_description + ", ".join(restricted_pages) + if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["namespaces"]: + namespaces = [] + if restriction_description: + restriction_description = restriction_description + _(" and namespaces: ") + else: + restriction_description = _("Blocked from editing pages on following namespaces: ") + for namespace in change["logparams"]["restrictions"]["namespaces"]: + if str(namespace) in ctx.client.namespaces: # if we have cached namespace name for given namespace number, add its name to the list + namespaces.append("*{ns}*".format(ns=ctx.client.namespaces[str(namespace)]["*"])) + else: + namespaces.append("*{ns}*".format(ns=namespace)) + restriction_description = restriction_description + ", ".join(namespaces) + restriction_description = restriction_description + "." + if len(restriction_description) > 1020: + logger.debug(restriction_description) + restriction_description = restriction_description[:1020]+"…" + embed.add_field(_("Partial block details"), restriction_description, inline=True) + block_flags = change["logparams"].get("flags") + if block_flags: + embed.add_field(_("Block flags"), ", ".join(block_flags)) # TODO Translate flags into MW messages, this requires making additional request in init_request since we want to get all messages with prefix (amprefix) block-log-flags- and that parameter is exclusive with ammessages + embed["title"] = _("Blocked {blocked_user} {time}").format(blocked_user=user, time=block_expiry(change)) + +@formatter.compact(event="block/block", mode="compact") +def compact_block_block(ctx, change): + user = change["title"].split(':', 1)[1] + restriction_description = "" + author, author_url = compact_author(ctx, change) + try: + ipaddress.ip_address(user) + link = clean_link(create_article_path("Special:Contributions/{user}".format(user=user))) + except ValueError: + link = clean_link(create_article_path(sanitize_to_url(change["title"]))) + else: + if "sitewide" not in change["logparams"]: + if "restrictions" in change["logparams"]: + if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]: + restriction_description = _(" on pages: ") + for page in change["logparams"]["restrictions"]["pages"]: + restricted_pages = ["*{page}*".format(page=i["page_title"]) for i in + change["logparams"]["restrictions"]["pages"]] + restriction_description = restriction_description + ", ".join(restricted_pages) + if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"][ + "namespaces"]: + namespaces = [] + if restriction_description: + restriction_description = restriction_description + _(" and namespaces: ") + else: + restriction_description = _(" on namespaces: ") + for namespace in change["logparams"]["restrictions"]["namespaces"]: + if str(namespace) in ctx.client.namespaces: # if we have cached namespace name for given namespace number, add its name to the list + namespaces.append("*{ns}*".format(ns=ctx.client.namespaces[str(namespace)]["*"])) + else: + namespaces.append("*{ns}*".format(ns=namespace)) + restriction_description = restriction_description + ", ".join(namespaces) + restriction_description = restriction_description + "." + if len(restriction_description) > 1020: + logger.debug(restriction_description) + restriction_description = restriction_description[:1020] + "…" + content = _( + "[{author}]({author_url}) blocked [{user}]({user_url}) {time}{restriction_desc}{comment}").format(author=author, + author_url=author_url, + user=user, + time=block_expiry(change), + user_url=link, + restriction_desc=restriction_description, + comment=ctx.parsedcomment) + return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content) + + +# block/reblock - Changing settings of a block +@formatter.embed(event="block/reblock", mode="embed") +def embed_block_reblock(ctx, change): + embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url) + embed_helper(ctx, embed, change) + embed["url"] = create_article_path(sanitize_to_url(change["title"])) + user = change["title"].split(':', 1)[1] + embed["title"] = _("Changed block settings for {blocked_user}").format(blocked_user=sanitize_to_markdown(user)) + return embed + + +@formatter.compact(event="block/reblock") +def compact_block_reblock(ctx, change): + author, author_url = compact_author(ctx, change) + link = clean_link(create_article_path(sanitize_to_url(change["title"]))) + user = change["title"].split(':', 1)[1] + content = _("[{author}]({author_url}) changed block settings for [{blocked_user}]({user_url}){comment}").format( + author=author, author_url=author_url, blocked_user=user, user_url=link, comment=ctx.parsedcomment) + diff --git a/src/api/client.py b/src/api/client.py index 30ab395..8e047ed 100644 --- a/src/api/client.py +++ b/src/api/client.py @@ -37,6 +37,7 @@ class Client: self.WIKI_JUST_DOMAIN = src.misc.WIKI_JUST_DOMAIN self.content_parser = src.misc.ContentParser self.tags = self.__recent_changes.tags + self.namespaces = self.__recent_changes.namespaces #self.make_api_request: src.rc.wiki.__recent_changes.api_request = self.__recent_changes.api_request def refresh_internal_data(self): @@ -69,7 +70,6 @@ class Client: def get_formatters(self): return self._formatters - def get_ipmapper(self) -> dict: """Returns a dict mapping IPs with amount of their edits""" return self.__recent_changes.map_ips \ No newline at end of file diff --git a/src/api/util.py b/src/api/util.py index 3a0651e..8eac730 100644 --- a/src/api/util.py +++ b/src/api/util.py @@ -40,7 +40,7 @@ def clean_link(link: str) -> str: return "<" + link.replace(" ", "_") + ">" -def sanitize_to_markdown(text: str): +def sanitize_to_markdown(text: str) -> str: """Sanitizes given text to escape markdown formatting. It is used in values that will be visible on Discord in messages""" return re.sub(r"([`_*~:<>{}@|\\])", "\\\\\\1", text, 0).replace('//', "/\\/").replace('](', "]\\(") diff --git a/src/rc_formatters.py b/src/rc_formatters.py index dd6a48d..665e971 100644 --- a/src/rc_formatters.py +++ b/src/rc_formatters.py @@ -163,59 +163,9 @@ def compact_formatter(action, change, parsed_comment, categories, recent_changes elif action == "protect/move_prot": elif action == "block/block": - user = change["title"].split(':', 1)[1] - restriction_description = "" - try: - ipaddress.ip_address(user) - link = link_formatter(create_article_path("Special:Contributions/{user}".format(user=user))) - except ValueError: - link = link_formatter(create_article_path(change["title"])) - if change["logparams"]["duration"] in ["infinite", "indefinite", "infinity", "never"]: - block_time = _("for infinity and beyond") - else: - english_length = re.sub(r"(\d+)", "", change["logparams"][ - "duration"]) # note that translation won't work for millenia and century yet - english_length_num = re.sub(r"(\D+)", "", change["logparams"]["duration"]) - try: - if "@" in english_length: - raise ValueError - english_length = english_length.rstrip("s").strip() - block_time = _("for {num} {translated_length}").format(num=english_length_num, - translated_length=ngettext(english_length, - english_length + "s", - int(english_length_num))) - except (AttributeError, ValueError): - date_time_obj = datetime.datetime.strptime(change["logparams"]["expiry"], '%Y-%m-%dT%H:%M:%SZ') - block_time = _("until {}").format(date_time_obj.strftime("%Y-%m-%d %H:%M:%S UTC")) - if "sitewide" not in change["logparams"]: - if "restrictions" in change["logparams"]: - if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]: - restriction_description = _(" on pages: ") - for page in change["logparams"]["restrictions"]["pages"]: - restricted_pages = ["*{page}*".format(page=i["page_title"]) for i in change["logparams"]["restrictions"]["pages"]] - restriction_description = restriction_description + ", ".join(restricted_pages) - if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["namespaces"]: - namespaces = [] - if restriction_description: - restriction_description = restriction_description + _(" and namespaces: ") - else: - restriction_description = _(" on namespaces: ") - for namespace in change["logparams"]["restrictions"]["namespaces"]: - if str(namespace) in recent_changes.namespaces: # if we have cached namespace name for given namespace number, add its name to the list - namespaces.append("*{ns}*".format(ns=recent_changes.namespaces[str(namespace)]["*"])) - else: - namespaces.append("*{ns}*".format(ns=namespace)) - restriction_description = restriction_description + ", ".join(namespaces) - restriction_description = restriction_description + "." - if len(restriction_description) > 1020: - logger.debug(restriction_description) - restriction_description = restriction_description[:1020] + "…" - content = _( - "[{author}]({author_url}) blocked [{user}]({user_url}) {time}{restriction_desc}{comment}").format(author=author, author_url=author_url, user=user, time=block_time, user_url=link, restriction_desc=restriction_description, comment=parsed_comment) + elif action == "block/reblock": - link = link_formatter(create_article_path(change["title"])) - user = change["title"].split(':', 1)[1] - content = _("[{author}]({author_url}) changed block settings for [{blocked_user}]({user_url}){comment}").format(author=author, author_url=author_url, blocked_user=user, user_url=link, comment=parsed_comment) + elif action == "block/unblock": link = link_formatter(create_article_path(change["title"])) user = change["title"].split(':', 1)[1] @@ -702,58 +652,9 @@ def embed_formatter(action, change, parsed_comment, categories, recent_changes): elif action == "protect/move_prot": elif action == "block/block": - user = change["title"].split(':', 1)[1] - try: - ipaddress.ip_address(user) - link = create_article_path("Special:Contributions/{user}".format(user=user)) - except ValueError: - link = create_article_path(change["title"]) - if change["logparams"]["duration"] in ["infinite", "indefinite", "infinity", "never"]: - block_time = _("for infinity and beyond") - else: - english_length = re.sub(r"(\d+)", "", change["logparams"]["duration"]) # note that translation won't work for millenia and century yet - english_length_num = re.sub(r"(\D+)", "", change["logparams"]["duration"]) - try: - if "@" in english_length: - raise ValueError - english_length = english_length.rstrip("s").strip() - block_time = _("for {num} {translated_length}").format(num=english_length_num, translated_length=ngettext(english_length, english_length + "s", int(english_length_num))) - except (AttributeError, ValueError): - if "expiry" in change["logparams"]: - date_time_obj = datetime.datetime.strptime(change["logparams"]["expiry"], '%Y-%m-%dT%H:%M:%SZ') - block_time = _("until {}").format(date_time_obj.strftime("%Y-%m-%d %H:%M:%S UTC")) - else: - block_time = _("unknown expiry time") # THIS IS HERE JUST TEMPORARY AS A HOT FIX TO #157, will be changed with release of 1.13 - if "sitewide" not in change["logparams"]: - restriction_description = "" - if "restrictions" in change["logparams"]: - if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]: - restriction_description = _("Blocked from editing the following pages: ") - for page in change["logparams"]["restrictions"]["pages"]: - restricted_pages = ["*"+i["page_title"]+"*" for i in change["logparams"]["restrictions"]["pages"]] - restriction_description = restriction_description + ", ".join(restricted_pages) - if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["namespaces"]: - namespaces = [] - if restriction_description: - restriction_description = restriction_description + _(" and namespaces: ") - else: - restriction_description = _("Blocked from editing pages on following namespaces: ") - for namespace in change["logparams"]["restrictions"]["namespaces"]: - if str(namespace) in recent_changes.namespaces: # if we have cached namespace name for given namespace number, add its name to the list - namespaces.append("*{ns}*".format(ns=recent_changes.namespaces[str(namespace)]["*"])) - else: - namespaces.append("*{ns}*".format(ns=namespace)) - restriction_description = restriction_description + ", ".join(namespaces) - restriction_description = restriction_description + "." - if len(restriction_description) > 1020: - logger.debug(restriction_description) - restriction_description = restriction_description[:1020]+"…" - embed.add_field(_("Partial block details"), restriction_description, inline=True) - embed["title"] = _("Blocked {blocked_user} {time}").format(blocked_user=user, time=block_time) + elif action == "block/reblock": - link = create_article_path(change["title"]) - user = change["title"].split(':', 1)[1] - embed["title"] = _("Changed block settings for {blocked_user}").format(blocked_user=user) + elif action == "block/unblock": link = create_article_path(change["title"]) user = change["title"].split(':', 1)[1]