added few new formatters (WIP)

This commit is contained in:
Frisk 2021-05-02 16:26:48 +02:00
parent 29a241d228
commit d3115153df
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
4 changed files with 139 additions and 106 deletions

View file

@ -12,11 +12,12 @@
#
# You should have received a copy of the GNU General Public License
# along with RcGcDw. If not, see <http://www.gnu.org/licenses/>.
import ipaddress
import logging
import math
import re
import time
import datetime
from src.discord.message import DiscordMessage
from src.api import formatter
from src.i18n import rc_formatters
@ -27,6 +28,7 @@ from src.configloader import settings
from src.exceptions import *
_ = rc_formatters.gettext
ngettext = rc_formatters.ngettext
logger = logging.getLogger("extensions.base")
@ -416,3 +418,133 @@ def compact_protect_unprotect(ctx, change):
author=author, author_url=author_url, article=sanitize_to_markdown(change["title"]), article_url=link, comment=ctx.parsedcomment)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# block/block
def block_expiry(change: dict) -> str:
if change["logparams"]["duration"] in ["infinite", "indefinite", "infinity", "never"]:
return _("for infinity and beyond")
else:
if "expiry" in change["logparams"]:
expiry_date_time_obj = datetime.datetime.strptime(change["logparams"]["expiry"], '%Y-%m-%dT%H:%M:%SZ')
timestamp_date_time_obj = datetime.datetime.strptime(change["timestamp"], '%Y-%m-%dT%H:%M:%SZ')
timedelta_for_expiry = expiry_date_time_obj - timestamp_date_time_obj
years, days, hours, minutes = timedelta_for_expiry.seconds // 31557600, \
timedelta_for_expiry.seconds % 31557600 // 86400, \
timedelta_for_expiry.seconds % 86400 // 3600, timedelta_for_expiry.seconds % 3600 // 60
if not any([years, days, hours, minutes]):
return _("less than a minute")
time_names = (ngettext("year", "years", years), ngettext("day", "days", days), ngettext("hour", "hours", hours), ngettext("minute", "minutes", minutes))
final_time = []
for num, timev in enumerate([years, days, hours, minutes]):
if timev:
final_time.append(_("{time_unit} {time_number}").format(time_unit=time_names[num], time_number=timev))
return ", ".join(final_time)
else:
return change["logparams"]["duration"] # Temporary? Should be rare? We will see in testing
@formatter.embed(event="block/block", mode="embed")
def embed_block_block(ctx, change):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
user = change["title"].split(':', 1)[1]
try:
ipaddress.ip_address(user)
embed["url"] = create_article_path("Special:Contributions/{user}".format(user=user))
except ValueError:
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
if "sitewide" not in change["logparams"]:
restriction_description = ""
if "restrictions" in change["logparams"]:
if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]:
restriction_description = _("Blocked from editing the following pages: ")
for page in change["logparams"]["restrictions"]["pages"]:
restricted_pages = ["*"+i["page_title"]+"*" for i in change["logparams"]["restrictions"]["pages"]]
restriction_description = restriction_description + ", ".join(restricted_pages)
if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["namespaces"]:
namespaces = []
if restriction_description:
restriction_description = restriction_description + _(" and namespaces: ")
else:
restriction_description = _("Blocked from editing pages on following namespaces: ")
for namespace in change["logparams"]["restrictions"]["namespaces"]:
if str(namespace) in ctx.client.namespaces: # if we have cached namespace name for given namespace number, add its name to the list
namespaces.append("*{ns}*".format(ns=ctx.client.namespaces[str(namespace)]["*"]))
else:
namespaces.append("*{ns}*".format(ns=namespace))
restriction_description = restriction_description + ", ".join(namespaces)
restriction_description = restriction_description + "."
if len(restriction_description) > 1020:
logger.debug(restriction_description)
restriction_description = restriction_description[:1020]+""
embed.add_field(_("Partial block details"), restriction_description, inline=True)
block_flags = change["logparams"].get("flags")
if block_flags:
embed.add_field(_("Block flags"), ", ".join(block_flags)) # TODO Translate flags into MW messages, this requires making additional request in init_request since we want to get all messages with prefix (amprefix) block-log-flags- and that parameter is exclusive with ammessages
embed["title"] = _("Blocked {blocked_user} {time}").format(blocked_user=user, time=block_expiry(change))
@formatter.compact(event="block/block", mode="compact")
def compact_block_block(ctx, change):
user = change["title"].split(':', 1)[1]
restriction_description = ""
author, author_url = compact_author(ctx, change)
try:
ipaddress.ip_address(user)
link = clean_link(create_article_path("Special:Contributions/{user}".format(user=user)))
except ValueError:
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
else:
if "sitewide" not in change["logparams"]:
if "restrictions" in change["logparams"]:
if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]:
restriction_description = _(" on pages: ")
for page in change["logparams"]["restrictions"]["pages"]:
restricted_pages = ["*{page}*".format(page=i["page_title"]) for i in
change["logparams"]["restrictions"]["pages"]]
restriction_description = restriction_description + ", ".join(restricted_pages)
if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"][
"namespaces"]:
namespaces = []
if restriction_description:
restriction_description = restriction_description + _(" and namespaces: ")
else:
restriction_description = _(" on namespaces: ")
for namespace in change["logparams"]["restrictions"]["namespaces"]:
if str(namespace) in ctx.client.namespaces: # if we have cached namespace name for given namespace number, add its name to the list
namespaces.append("*{ns}*".format(ns=ctx.client.namespaces[str(namespace)]["*"]))
else:
namespaces.append("*{ns}*".format(ns=namespace))
restriction_description = restriction_description + ", ".join(namespaces)
restriction_description = restriction_description + "."
if len(restriction_description) > 1020:
logger.debug(restriction_description)
restriction_description = restriction_description[:1020] + ""
content = _(
"[{author}]({author_url}) blocked [{user}]({user_url}) {time}{restriction_desc}{comment}").format(author=author,
author_url=author_url,
user=user,
time=block_expiry(change),
user_url=link,
restriction_desc=restriction_description,
comment=ctx.parsedcomment)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# block/reblock - Changing settings of a block
@formatter.embed(event="block/reblock", mode="embed")
def embed_block_reblock(ctx, change):
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
embed["url"] = create_article_path(sanitize_to_url(change["title"]))
user = change["title"].split(':', 1)[1]
embed["title"] = _("Changed block settings for {blocked_user}").format(blocked_user=sanitize_to_markdown(user))
return embed
@formatter.compact(event="block/reblock")
def compact_block_reblock(ctx, change):
author, author_url = compact_author(ctx, change)
link = clean_link(create_article_path(sanitize_to_url(change["title"])))
user = change["title"].split(':', 1)[1]
content = _("[{author}]({author_url}) changed block settings for [{blocked_user}]({user_url}){comment}").format(
author=author, author_url=author_url, blocked_user=user, user_url=link, comment=ctx.parsedcomment)

View file

@ -37,6 +37,7 @@ class Client:
self.WIKI_JUST_DOMAIN = src.misc.WIKI_JUST_DOMAIN
self.content_parser = src.misc.ContentParser
self.tags = self.__recent_changes.tags
self.namespaces = self.__recent_changes.namespaces
#self.make_api_request: src.rc.wiki.__recent_changes.api_request = self.__recent_changes.api_request
def refresh_internal_data(self):
@ -69,7 +70,6 @@ class Client:
def get_formatters(self):
return self._formatters
def get_ipmapper(self) -> dict:
"""Returns a dict mapping IPs with amount of their edits"""
return self.__recent_changes.map_ips

View file

@ -40,7 +40,7 @@ def clean_link(link: str) -> str:
return "<" + link.replace(" ", "_") + ">"
def sanitize_to_markdown(text: str):
def sanitize_to_markdown(text: str) -> str:
"""Sanitizes given text to escape markdown formatting. It is used in values that will be visible on Discord in messages"""
return re.sub(r"([`_*~:<>{}@|\\])", "\\\\\\1", text, 0).replace('//', "/\\/").replace('](', "]\\(")

View file

@ -163,59 +163,9 @@ def compact_formatter(action, change, parsed_comment, categories, recent_changes
elif action == "protect/move_prot":
elif action == "block/block":
user = change["title"].split(':', 1)[1]
restriction_description = ""
try:
ipaddress.ip_address(user)
link = link_formatter(create_article_path("Special:Contributions/{user}".format(user=user)))
except ValueError:
link = link_formatter(create_article_path(change["title"]))
if change["logparams"]["duration"] in ["infinite", "indefinite", "infinity", "never"]:
block_time = _("for infinity and beyond")
else:
english_length = re.sub(r"(\d+)", "", change["logparams"][
"duration"]) # note that translation won't work for millenia and century yet
english_length_num = re.sub(r"(\D+)", "", change["logparams"]["duration"])
try:
if "@" in english_length:
raise ValueError
english_length = english_length.rstrip("s").strip()
block_time = _("for {num} {translated_length}").format(num=english_length_num,
translated_length=ngettext(english_length,
english_length + "s",
int(english_length_num)))
except (AttributeError, ValueError):
date_time_obj = datetime.datetime.strptime(change["logparams"]["expiry"], '%Y-%m-%dT%H:%M:%SZ')
block_time = _("until {}").format(date_time_obj.strftime("%Y-%m-%d %H:%M:%S UTC"))
if "sitewide" not in change["logparams"]:
if "restrictions" in change["logparams"]:
if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]:
restriction_description = _(" on pages: ")
for page in change["logparams"]["restrictions"]["pages"]:
restricted_pages = ["*{page}*".format(page=i["page_title"]) for i in change["logparams"]["restrictions"]["pages"]]
restriction_description = restriction_description + ", ".join(restricted_pages)
if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["namespaces"]:
namespaces = []
if restriction_description:
restriction_description = restriction_description + _(" and namespaces: ")
else:
restriction_description = _(" on namespaces: ")
for namespace in change["logparams"]["restrictions"]["namespaces"]:
if str(namespace) in recent_changes.namespaces: # if we have cached namespace name for given namespace number, add its name to the list
namespaces.append("*{ns}*".format(ns=recent_changes.namespaces[str(namespace)]["*"]))
else:
namespaces.append("*{ns}*".format(ns=namespace))
restriction_description = restriction_description + ", ".join(namespaces)
restriction_description = restriction_description + "."
if len(restriction_description) > 1020:
logger.debug(restriction_description)
restriction_description = restriction_description[:1020] + ""
content = _(
"[{author}]({author_url}) blocked [{user}]({user_url}) {time}{restriction_desc}{comment}").format(author=author, author_url=author_url, user=user, time=block_time, user_url=link, restriction_desc=restriction_description, comment=parsed_comment)
elif action == "block/reblock":
link = link_formatter(create_article_path(change["title"]))
user = change["title"].split(':', 1)[1]
content = _("[{author}]({author_url}) changed block settings for [{blocked_user}]({user_url}){comment}").format(author=author, author_url=author_url, blocked_user=user, user_url=link, comment=parsed_comment)
elif action == "block/unblock":
link = link_formatter(create_article_path(change["title"]))
user = change["title"].split(':', 1)[1]
@ -702,58 +652,9 @@ def embed_formatter(action, change, parsed_comment, categories, recent_changes):
elif action == "protect/move_prot":
elif action == "block/block":
user = change["title"].split(':', 1)[1]
try:
ipaddress.ip_address(user)
link = create_article_path("Special:Contributions/{user}".format(user=user))
except ValueError:
link = create_article_path(change["title"])
if change["logparams"]["duration"] in ["infinite", "indefinite", "infinity", "never"]:
block_time = _("for infinity and beyond")
else:
english_length = re.sub(r"(\d+)", "", change["logparams"]["duration"]) # note that translation won't work for millenia and century yet
english_length_num = re.sub(r"(\D+)", "", change["logparams"]["duration"])
try:
if "@" in english_length:
raise ValueError
english_length = english_length.rstrip("s").strip()
block_time = _("for {num} {translated_length}").format(num=english_length_num, translated_length=ngettext(english_length, english_length + "s", int(english_length_num)))
except (AttributeError, ValueError):
if "expiry" in change["logparams"]:
date_time_obj = datetime.datetime.strptime(change["logparams"]["expiry"], '%Y-%m-%dT%H:%M:%SZ')
block_time = _("until {}").format(date_time_obj.strftime("%Y-%m-%d %H:%M:%S UTC"))
else:
block_time = _("unknown expiry time") # THIS IS HERE JUST TEMPORARY AS A HOT FIX TO #157, will be changed with release of 1.13
if "sitewide" not in change["logparams"]:
restriction_description = ""
if "restrictions" in change["logparams"]:
if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]:
restriction_description = _("Blocked from editing the following pages: ")
for page in change["logparams"]["restrictions"]["pages"]:
restricted_pages = ["*"+i["page_title"]+"*" for i in change["logparams"]["restrictions"]["pages"]]
restriction_description = restriction_description + ", ".join(restricted_pages)
if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["namespaces"]:
namespaces = []
if restriction_description:
restriction_description = restriction_description + _(" and namespaces: ")
else:
restriction_description = _("Blocked from editing pages on following namespaces: ")
for namespace in change["logparams"]["restrictions"]["namespaces"]:
if str(namespace) in recent_changes.namespaces: # if we have cached namespace name for given namespace number, add its name to the list
namespaces.append("*{ns}*".format(ns=recent_changes.namespaces[str(namespace)]["*"]))
else:
namespaces.append("*{ns}*".format(ns=namespace))
restriction_description = restriction_description + ", ".join(namespaces)
restriction_description = restriction_description + "."
if len(restriction_description) > 1020:
logger.debug(restriction_description)
restriction_description = restriction_description[:1020]+""
embed.add_field(_("Partial block details"), restriction_description, inline=True)
embed["title"] = _("Blocked {blocked_user} {time}").format(blocked_user=user, time=block_time)
elif action == "block/reblock":
link = create_article_path(change["title"])
user = change["title"].split(':', 1)[1]
embed["title"] = _("Changed block settings for {blocked_user}").format(blocked_user=user)
elif action == "block/unblock":
link = create_article_path(change["title"])
user = change["title"].split(':', 1)[1]