diff --git a/extensions/base/__init__.py b/extensions/base/__init__.py index e6ffd54..eb02987 100644 --- a/extensions/base/__init__.py +++ b/extensions/base/__init__.py @@ -23,4 +23,5 @@ import extensions.base.translate import extensions.base.discussions import extensions.base.curseprofile import extensions.base.interwiki -import extensions.base.renameuser \ No newline at end of file +import extensions.base.renameuser +import extensions.base.rcgcdb diff --git a/extensions/base/rcgcdb.py b/extensions/base/rcgcdb.py index 3c94bec..9f68cd7 100644 --- a/extensions/base/rcgcdb.py +++ b/extensions/base/rcgcdb.py @@ -1,4 +1,3 @@ -import logging import json from src.discord.message import DiscordMessage from src.api import formatter diff --git a/src/bot.py b/src/bot.py index cb0c802..4b0fa46 100644 --- a/src/bot.py +++ b/src/bot.py @@ -9,18 +9,14 @@ from collections import defaultdict, namedtuple from typing import Generator from contextlib import asynccontextmanager - +from src.discord.queue import messagequeue from src.argparser import command_line_args from src.config import settings from src.database import db_connection from src.exceptions import * -from src.misc import get_paths, get_domain -from src.msgqueue import messagequeue, send_to_discord -from src.queue_handler import DBHandler -from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essential_feeds -from src.discord.discord import DiscordMessage, generic_msg_sender_exception_logger, stack_message_list +from src.queue_handler import UpdateDB +from src.wiki import Wiki, process_cats, essential_feeds from src.wiki_ratelimiter import RateLimiter -from src.irc_feed import AioIRCCat from src.domain_manager import domains @@ -74,122 +70,122 @@ async def message_sender(): shutdown(loop=asyncio.get_event_loop()) else: logger.exception("Exception on DC message sender") - await generic_msg_sender_exception_logger(traceback.format_exc(), "Message sender exception") + # await generic_msg_sender_exception_logger(traceback.format_exc(), "Message sender exception") # TODO -async def discussion_handler(): - await asyncio.sleep(3.0) # Make some time before IRC code is executed, happens only once and saves if inside - try: - while True: - async with db.pool().acquire() as connection: - async with connection.transaction(): - async for db_wiki in connection.cursor("SELECT DISTINCT wiki, rcid, postid FROM rcgcdw WHERE postid != '-1' OR postid IS NULL"): - try: - local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory - except KeyError: - local_wiki = all_wikis[db_wiki["wiki"]] = Wiki() - local_wiki.rc_active = db_wiki["rcid"] - if db_wiki["wiki"] not in rcqueue.irc_mapping["fandom.com"].updated_discussions and \ - local_wiki.last_discussion_check+settings["irc_overtime"] > time.time(): # I swear if another wiki farm ever starts using Fandom discussions I'm gonna use explosion magic - continue - else: - try: - rcqueue.irc_mapping["fandom.com"].updated_discussions.remove(db_wiki["wiki"]) - except KeyError: - pass # to be expected - header = settings["header"] - header["Accept"] = "application/hal+json" - async with aiohttp.ClientSession(headers=header, - timeout=aiohttp.ClientTimeout(6.0)) as session: - try: - feeds_response = await local_wiki.fetch_feeds(db_wiki["wiki"], session) - except (WikiServerError, WikiError): - continue # ignore this wiki if it throws errors - try: - discussion_feed_resp = await feeds_response.json(encoding="UTF-8") - if "error" in discussion_feed_resp: - error = discussion_feed_resp["error"] - if error == "NotFoundException": # Discussions disabled - if db_wiki["rcid"] != -1: # RC feed is disabled - await connection.execute("UPDATE rcgcdw SET postid = $1 WHERE wiki = $2", "-1", db_wiki["wiki"]) - else: - await local_wiki.remove(db_wiki["wiki"], 1000) - continue - raise WikiError - discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"] - discussion_feed.reverse() - except aiohttp.ContentTypeError: - logger.exception("Wiki seems to be resulting in non-json content.") - continue - except asyncio.TimeoutError: - logger.debug("Timeout on reading JSON of discussion post feeed.") - continue - except: - logger.exception("On loading json of response.") - continue - if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel - if len(discussion_feed) > 0: - DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True) - else: - DBHandler.add(db_wiki["wiki"], "0", True) - continue - comment_events = [] - targets = await generate_targets(db_wiki["wiki"], "AND NOT postid = '-1'") - for post in discussion_feed: - if post["_embedded"]["thread"][0]["containerType"] == "ARTICLE_COMMENT" and post["id"] > db_wiki["postid"]: - comment_events.append(post["forumId"]) - comment_pages: dict = {} - if comment_events: - try: - comment_pages = await local_wiki.safe_request( - "{wiki}wikia.php?controller=FeedsAndPosts&method=getArticleNamesAndUsernames&stablePageIds={pages}&format=json".format( - wiki=db_wiki["wiki"], pages=",".join(comment_events) - ), RateLimiter(), "articleNames") - except aiohttp.ClientResponseError: # Fandom can be funny sometimes... See #30 - comment_pages = None - except: - if command_line_args.debug: - logger.exception("Exception on Feeds article comment request") - shutdown(loop=asyncio.get_event_loop()) - else: - logger.exception("Exception on Feeds article comment request") - await generic_msg_sender_exception_logger(traceback.format_exc(), - "Exception on Feeds article comment request", - Post=str(post)[0:1000], Wiki=db_wiki["wiki"]) - message_list = defaultdict(list) - for post in discussion_feed: # Yeah, second loop since the comments require an extra request - if post["id"] > db_wiki["postid"]: - for target in targets.items(): - try: - message = await essential_feeds(post, comment_pages, db_wiki, target) - if message is not None: - message_list[target[0]].append(message) - except asyncio.CancelledError: - raise - except: - if command_line_args.debug: - logger.exception("Exception on Feeds formatter") - shutdown(loop=asyncio.get_event_loop()) - else: - logger.exception("Exception on Feeds formatter") - await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in feed formatter", Post=str(post)[0:1000], Wiki=db_wiki["wiki"]) - # Lets stack the messages - for messages in message_list.values(): - messages = stack_message_list(messages) - for message in messages: - await send_to_discord(message) - if discussion_feed: - DBHandler.add(db_wiki["wiki"], post["id"], True) - await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more - await asyncio.sleep(delay=1.0) # Avoid lock on no wikis - except asyncio.CancelledError: - pass - except: - if command_line_args.debug: - raise # reraise the issue - else: - logger.exception("Exception on Feeds formatter") - await generic_msg_sender_exception_logger(traceback.format_exc(), "Discussion handler task exception", Wiki=db_wiki["wiki"]) +# async def discussion_handler(): +# await asyncio.sleep(3.0) # Make some time before IRC code is executed, happens only once and saves if inside +# try: +# while True: +# async with db.pool().acquire() as connection: +# async with connection.transaction(): +# async for db_wiki in connection.cursor("SELECT DISTINCT wiki, rcid, postid FROM rcgcdw WHERE postid != '-1' OR postid IS NULL"): +# try: +# local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory +# except KeyError: +# local_wiki = all_wikis[db_wiki["wiki"]] = Wiki() +# local_wiki.rc_active = db_wiki["rcid"] +# if db_wiki["wiki"] not in rcqueue.irc_mapping["fandom.com"].updated_discussions and \ +# local_wiki.last_discussion_check+settings["irc_overtime"] > time.time(): # I swear if another wiki farm ever starts using Fandom discussions I'm gonna use explosion magic +# continue +# else: +# try: +# rcqueue.irc_mapping["fandom.com"].updated_discussions.remove(db_wiki["wiki"]) +# except KeyError: +# pass # to be expected +# header = settings["header"] +# header["Accept"] = "application/hal+json" +# async with aiohttp.ClientSession(headers=header, +# timeout=aiohttp.ClientTimeout(6.0)) as session: +# try: +# feeds_response = await local_wiki.fetch_feeds(db_wiki["wiki"], session) +# except (WikiServerError, WikiError): +# continue # ignore this wiki if it throws errors +# try: +# discussion_feed_resp = await feeds_response.json(encoding="UTF-8") +# if "error" in discussion_feed_resp: +# error = discussion_feed_resp["error"] +# if error == "NotFoundException": # Discussions disabled +# if db_wiki["rcid"] != -1: # RC feed is disabled +# await connection.execute("UPDATE rcgcdw SET postid = $1 WHERE wiki = $2", "-1", db_wiki["wiki"]) +# else: +# await local_wiki.remove(db_wiki["wiki"], 1000) +# continue +# raise WikiError +# discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"] +# discussion_feed.reverse() +# except aiohttp.ContentTypeError: +# logger.exception("Wiki seems to be resulting in non-json content.") +# continue +# except asyncio.TimeoutError: +# logger.debug("Timeout on reading JSON of discussion post feeed.") +# continue +# except: +# logger.exception("On loading json of response.") +# continue +# if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel +# if len(discussion_feed) > 0: +# DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True) +# else: +# DBHandler.add(db_wiki["wiki"], "0", True) +# continue +# comment_events = [] +# targets = await generate_targets(db_wiki["wiki"], "AND NOT postid = '-1'") +# for post in discussion_feed: +# if post["_embedded"]["thread"][0]["containerType"] == "ARTICLE_COMMENT" and post["id"] > db_wiki["postid"]: +# comment_events.append(post["forumId"]) +# comment_pages: dict = {} +# if comment_events: +# try: +# comment_pages = await local_wiki.safe_request( +# "{wiki}wikia.php?controller=FeedsAndPosts&method=getArticleNamesAndUsernames&stablePageIds={pages}&format=json".format( +# wiki=db_wiki["wiki"], pages=",".join(comment_events) +# ), RateLimiter(), "articleNames") +# except aiohttp.ClientResponseError: # Fandom can be funny sometimes... See #30 +# comment_pages = None +# except: +# if command_line_args.debug: +# logger.exception("Exception on Feeds article comment request") +# shutdown(loop=asyncio.get_event_loop()) +# else: +# logger.exception("Exception on Feeds article comment request") +# await generic_msg_sender_exception_logger(traceback.format_exc(), +# "Exception on Feeds article comment request", +# Post=str(post)[0:1000], Wiki=db_wiki["wiki"]) +# message_list = defaultdict(list) +# for post in discussion_feed: # Yeah, second loop since the comments require an extra request +# if post["id"] > db_wiki["postid"]: +# for target in targets.items(): +# try: +# message = await essential_feeds(post, comment_pages, db_wiki, target) +# if message is not None: +# message_list[target[0]].append(message) +# except asyncio.CancelledError: +# raise +# except: +# if command_line_args.debug: +# logger.exception("Exception on Feeds formatter") +# shutdown(loop=asyncio.get_event_loop()) +# else: +# logger.exception("Exception on Feeds formatter") +# await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in feed formatter", Post=str(post)[0:1000], Wiki=db_wiki["wiki"]) +# # Lets stack the messages +# for messages in message_list.values(): +# messages = stack_message_list(messages) +# for message in messages: +# await send_to_discord(message) +# if discussion_feed: +# DBHandler.add(db_wiki["wiki"], post["id"], True) +# await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more +# await asyncio.sleep(delay=1.0) # Avoid lock on no wikis +# except asyncio.CancelledError: +# pass +# except: +# if command_line_args.debug: +# raise # reraise the issue +# else: +# logger.exception("Exception on Feeds formatter") +# await generic_msg_sender_exception_logger(traceback.format_exc(), "Discussion handler task exception", Wiki=db_wiki["wiki"]) def shutdown(loop, signal=None): @@ -197,7 +193,7 @@ def shutdown(loop, signal=None): loop.remove_signal_handler(signal) if len(messagequeue) > 0: logger.warning("Some messages are still queued!") - for task in (main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["msg_queue_shield"], main_tasks["database_updates_shield"]): + for task in asyncio.all_tasks(loop): task.cancel() loop.run_until_complete(main_tasks["message_sender"]) loop.run_until_complete(main_tasks["database_updates"]) @@ -246,10 +242,10 @@ async def main_loop(): # loop.set_exception_handler(global_exception_handler) try: main_tasks = {"message_sender": asyncio.create_task(message_sender()), - "discussion_handler": asyncio.create_task(discussion_handler()), "database_updates": asyncio.create_task(DBHandler.update_db())} + "database_updates": asyncio.create_task(DBHandler.update_db())} # "discussion_handler": asyncio.create_task(discussion_handler()), main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"]) main_tasks["database_updates_shield"] = asyncio.shield(main_tasks["database_updates"]) - await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"], main_tasks["database_updates"]) + await asyncio.gather(main_tasks["message_sender"], main_tasks["database_updates"]) except KeyboardInterrupt: shutdown(loop) except asyncio.CancelledError: diff --git a/src/discord/message.py b/src/discord/message.py index ac02025..6c4d246 100644 --- a/src/discord/message.py +++ b/src/discord/message.py @@ -29,7 +29,7 @@ with open("src/api/template_settings.json", "r") as template_json: class DiscordMessageMetadata: def __init__(self, method, log_id = None, page_id = None, rev_id = None, webhook_url = None): - self.method = method + self.method = method # unused, remains for compatibility reasons self.page_id = page_id self.log_id = log_id self.rev_id = rev_id @@ -172,8 +172,9 @@ class StackedDiscordMessage(): self.message_list: list[DiscordMessage] = [] self.length = 0 self.message_type: int = m_type # 0 for compact, 1 for embed - self.discord_callback_message_ids: list[int] = [] + self.discord_callback_message_id: int = -1 self.wiki: Wiki = wiki + self.webhook: Optional[str] = None def __len__(self): return self.length @@ -188,7 +189,12 @@ class StackedDiscordMessage(): def filter(self, params: dict) -> list[tuple[int, DiscordMessage]]: """Filters messages by their metadata""" - return [(num, message) for num, message in enumerate(self.message_list)] + return [(num, message) for num, message in enumerate(self.message_list) if message.matches(params)] + + def delete_message_by_id(self, message_ids: list[int]): + """Deletes messages with given IDS from the message_ids list""" + for message_id in sorted(message_ids, reverse=True): + self.message_list.pop(message_id) def add_message(self, message: DiscordMessage): if len(self) + len(message) > 6000 or len(self.message_list) > 9: diff --git a/src/discord/queue.py b/src/discord/queue.py index 1c2f6cc..fc3e3f2 100644 --- a/src/discord/queue.py +++ b/src/discord/queue.py @@ -38,11 +38,12 @@ logger = logging.getLogger("rcgcdw.discord.queue") class QueueEntry: - def __init__(self, discord_message, webhooks, wiki): + def __init__(self, discord_message, webhooks, wiki, method="POST"): self.discord_message: [DiscordMessage, StackedDiscordMessage] = discord_message self.webhooks: list[str] = webhooks self._sent_webhooks: set[str] = set() self.wiki: Wiki = wiki + self.method = method def check_sent_status(self, webhook: str) -> bool: """Checks sent status for given message, if True it means that the message has been sent before to given webhook, otherwise False.""" @@ -111,11 +112,17 @@ class MessageQueue: if self.compare_message_to_dict(item[1], properties): self._queue.pop(index) - async def pack_massages(self, messages: list[QueueEntry]) -> AsyncGenerator[tuple[StackedDiscordMessage, int]]: + async def pack_massages(self, messages: list[QueueEntry], current_pack=None) -> AsyncGenerator[tuple[StackedDiscordMessage, int, str]]: """Pack messages into StackedDiscordMessage. It's an async generator""" - current_pack = StackedDiscordMessage(0 if messages[0].discord_message.message_type == "compact" else 1, messages[0].wiki) # first message - index = -1 + # TODO Rebuild to support DELETE and PATCH messages for index, message in enumerate(messages): + if message.method == "POST": + if current_pack is None: + current_pack = StackedDiscordMessage(0 if message.discord_message.message_type == "compact" else 1, + message.wiki) + else: + # message.discord_message. # TODO Where do we store method? + yield message.discord_message, index, message.method message = message.discord_message try: current_pack.add_message(message) @@ -123,18 +130,18 @@ class MessageQueue: yield current_pack, index-1 current_pack = StackedDiscordMessage(0 if message.message_type == "compact" else 1, message.wiki) # next messages current_pack.add_message(message) - yield current_pack, index + yield current_pack, index, "POST" async def send_msg_set(self, msg_set: tuple[str, list[QueueEntry]]): webhook_url, messages = msg_set # str("daosdkosakda/adkahfwegr34", list(DiscordMessage, DiscordMessage, DiscordMessage) - async for msg, index in self.pack_massages(messages): + async for msg, index, method in self.pack_massages(messages): client_error = False if self.global_rate_limit: return # if we are globally rate limited just wait for first gblocked request to finish # Verify that message hasn't been sent before # noinspection PyTypeChecker try: - status = await send_to_discord_webhook(msg, webhook_url) + status = await send_to_discord_webhook(msg, webhook_url, method) except aiohttp.ClientError: client_error = True except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError): @@ -148,6 +155,7 @@ class MessageQueue: for queue_message in messages[max(index-len(msg.message_list), 0):index]: # mark messages as delivered queue_message.confirm_sent_status(webhook_url) if client_error is False: + msg.webhook = webhook_url msg.wiki.add_message(msg) async def resend_msgs(self): @@ -199,7 +207,7 @@ def handle_discord_http(code: int, formatted_embed: str, result: ClientResponse) raise aiohttp.ServerConnectionError() -async def send_to_discord_webhook(message: [StackedDiscordMessage, DiscordMessageMetadata], webhook_path: str): +async def send_to_discord_webhook(message: [StackedDiscordMessage, DiscordMessageMetadata], webhook_path: str, method: str): header = settings["header"] header['Content-Type'] = 'application/json' header['X-RateLimit-Precision'] = "millisecond" @@ -209,7 +217,7 @@ async def send_to_discord_webhook(message: [StackedDiscordMessage, DiscordMessag try: resp_json = await resp.json() # Add Discord Message ID which we can later use to delete/redact messages if we want - message.discord_callback_message_ids.append(resp_json["id"]) + message.discord_callback_message_id = resp_json["id"] except KeyError: raise aiohttp.ServerConnectionError(f"Could not get the ID from POST request with message data. Data: {await resp.text()}") except ContentTypeError: @@ -217,9 +225,9 @@ async def send_to_discord_webhook(message: [StackedDiscordMessage, DiscordMessag except ValueError: logger.exception(f"Could not decode JSON response from Discord. Response: {await resp.text()}]") return handle_discord_http(resp.status, repr(message), resp) - elif message.method == "DELETE": - async with session.request(method=message.method, url=f"https://discord.com/api/webhooks/{webhook_path}") as resp: - pass - elif message.method == "PATCH": - async with session.request(method=message.method, url=f"https://discord.com/api/webhooks/{webhook_path}", data=repr(message)) as resp: - pass + elif method == "DELETE": + async with session.request(method=message.method, url=f"https://discord.com/api/webhooks/{webhook_path}/messages/{message.discord_callback_message_id}") as resp: + return handle_discord_http(resp.status, repr(message), resp) + elif method == "PATCH": + async with session.request(method=message.method, url=f"https://discord.com/api/webhooks/{webhook_path}/messages/{message.discord_callback_message_id}", data=repr(message)) as resp: + return handle_discord_http(resp.status, repr(message), resp) diff --git a/src/discord/redaction.py b/src/discord/redaction.py deleted file mode 100644 index 8c70ae9..0000000 --- a/src/discord/redaction.py +++ /dev/null @@ -1,114 +0,0 @@ -# This file is part of Recent changes Goat compatible Discord webhook (RcGcDw). - -# RcGcDw is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# RcGcDw is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. - -# You should have received a copy of the GNU General Public License -# along with RcGcDw. If not, see . - -import logging -import json -from typing import List, Union - -from src.configloader import settings -from src.discord.message import DiscordMessageMetadata, DiscordMessageRaw -from src.discord.queue import send_to_discord, messagequeue -from src.fileio.database import db_cursor, db_connection -from src.i18n import redaction as redaction_translation - -logger = logging.getLogger("rcgcdw.discord.redaction") # TODO Figure out why does this logger do not work -_ = redaction_translation.gettext -#ngettext = redaction_translation.ngettext - - -def delete_messages(matching_data: dict): - """Delete messages that match given data""" - sql_conditions = "" - for key, value in matching_data.items(): - sql_conditions += "{} = ? AND".format(key) - else: - sql_conditions = sql_conditions[0:-4] # remove last AND statement - to_delete = db_cursor.execute("SELECT msg_id FROM event WHERE {CON}".format(CON=sql_conditions), list(matching_data.values())) - if len(messagequeue) > 0: - messagequeue.delete_all_with_matching_metadata(**matching_data) - msg_to_remove = [] - logger.debug("Deleting messages for data: {}".format(matching_data)) - for message in to_delete: - webhook_url = "{main_webhook}/messages/{message_id}".format(main_webhook=settings["webhookURL"], message_id=message[0]) - msg_to_remove.append(message[0]) - logger.debug("Removing following message: {}".format(message[0])) - send_to_discord(None, DiscordMessageMetadata("DELETE", webhook_url=webhook_url)) - for msg in msg_to_remove: - db_cursor.execute("DELETE FROM messages WHERE message_id = ?", (msg,)) - db_connection.commit() - - -def redact_messages(ids, entry_type: int, to_censor: dict): # : Union[List[Union[str, int]], set[Union[int, str]]] - """Redact past Discord messages - - ids: list of ints - entry_type: int - 0 for revdel, 1 for logdel - to_censor: dict - logparams of message parts to censor""" - for event_id in ids: - if entry_type == 0: - message = db_cursor.execute("SELECT content, message_id FROM messages INNER JOIN event ON event.msg_id = messages.message_id WHERE event.revid = ?;", (event_id, )) - else: - message = db_cursor.execute( - "SELECT content, message_id FROM messages INNER JOIN event ON event.msg_id = messages.message_id WHERE event.logid = ?;", - (event_id,)) - if settings["appearance"]["mode"] == "embed": - if message is not None: - row = message.fetchone() - try: - message = json.loads(row[0]) - new_embed = message["embeds"][0] - except ValueError: - logger.error("Couldn't loads JSON for message data. What happened? Data: {}".format(row[0])) - return - except TypeError: - logger.error("Couldn't find entry in the database for RevDel to censor information. This is probably because the script has been recently restarted or cache cleared.") - return - if "user" in to_censor and "url" in new_embed["author"]: - new_embed["author"]["name"] = _("hidden") - new_embed["author"].pop("url") - if "action" in to_censor and "url" in new_embed: - new_embed["title"] = _("~~hidden~~") - new_embed.pop("url") - if "content" in to_censor and "fields" in new_embed: - new_embed.pop("fields") - if "comment" in to_censor: - new_embed["description"] = _("~~hidden~~") - message["embeds"][0] = new_embed - db_cursor.execute("UPDATE messages SET content = ? WHERE message_id = ?;", (json.dumps(message), row[1],)) - db_connection.commit() - logger.debug(message) - send_to_discord(DiscordMessageRaw(message, settings["webhookURL"]+"/messages/"+str(row[1])), DiscordMessageMetadata("PATCH")) - else: - logger.debug("Could not find message in the database.") - - -def find_middle_next(ids: List[str], pageid: int) -> set: - """To address #235 RcGcDw should now remove diffs in next revs relative to redacted revs to protect information in revs that revert revdeleted information. - - :arg ids - list - :arg pageid - int - - :return list""" - ids = [int(x) for x in ids] - result = set() - ids.sort() # Just to be sure, sort the list to make sure it's always sorted - messages = db_cursor.execute("SELECT revid FROM event WHERE pageid = ? AND revid >= ? ORDER BY revid", (pageid, ids[0],)) - all_in_page = [x[0] for x in messages.fetchall()] - for id in ids: - try: - result.add(all_in_page[all_in_page.index(id)+1]) - except (KeyError, ValueError): - logger.debug(f"Value {id} not in {all_in_page} or no value after that.") - return result - set(ids) diff --git a/src/formatters/discussions.py b/src/formatters/discussions.py deleted file mode 100644 index 156bc88..0000000 --- a/src/formatters/discussions.py +++ /dev/null @@ -1,274 +0,0 @@ -import datetime, logging -import json -from urllib.parse import quote_plus - -from src.config import settings -from src.misc import link_formatter, create_article_path, escape_formatting -from src.discord import DiscordMessage -from src.msgqueue import send_to_discord -from src.i18n import langs - - -logger = logging.getLogger("rcgcdw.discussion_formatters") - -async def feeds_compact_formatter(post_type, post, message_target, wiki, article_page=None) -> DiscordMessage: - """Compact formatter for Fandom discussions.""" - _ = langs[message_target[0][0]]["discussion_formatters"].gettext - message = None - author = _("unknown") # Fail safe - if post_type == "FORUM": - if post["createdBy"]["name"]: - author = post["createdBy"]["name"] - author_url = "<{url}f/u/{creatorId}>".format(url=wiki, creatorId=post["creatorId"]) - elif post["creatorIp"]: - author = post["creatorIp"][1:] - author_url = "<{url}wiki/Special:Contributions{creatorIp}>".format(url=wiki, creatorIp=post["creatorIp"]) - else: - if post["createdBy"]["name"]: - author = post["createdBy"]["name"] - author_url = link_formatter(create_article_path("User:{user}".format(user=author), wiki + "wiki/$1")) - else: - author_url = "<{url}f/u/{creatorId}>".format(url=wiki, creatorId=post["creatorId"]) - event_type = "discussion" - if post_type == "FORUM": - if not post["isReply"]: - thread_funnel = post.get("funnel") - msg_text = _("[{author}]({author_url}) created [{title}](<{url}f/p/{threadId}>) in {forumName}") - if thread_funnel == "POLL": - event_type = "discussion/forum/poll" - msg_text = _("[{author}]({author_url}) created a poll [{title}](<{url}f/p/{threadId}>) in {forumName}") - elif thread_funnel == "QUIZ": - event_type = "discussion/forum/quiz" - msg_text = _("[{author}]({author_url}) created a quiz [{title}](<{url}f/p/{threadId}>) in {forumName}") - elif thread_funnel == "TEXT": - event_type = "discussion/forum/post" - else: - logger.warning("No entry for {event} with params: {params}".format(event=thread_funnel, params=post)) - event_type = "unknown" - message = msg_text.format(author=author, author_url=author_url, title=escape_formatting(post["title"]), url=wiki, threadId=post["threadId"], forumName=post["forumName"]) - else: - event_type = "discussion/forum/reply" - message = _("[{author}]({author_url}) created a [reply](<{url}f/p/{threadId}/r/{postId}>) to [{title}](<{url}f/p/{threadId}>) in {forumName}").format(author=author, author_url=author_url, url=wiki, threadId=post["threadId"], postId=post["id"], title=escape_formatting(post["_embedded"]["thread"][0]["title"]), forumName=post["forumName"]) - elif post_type == "WALL": - user_wall = _("unknown") # Fail safe - if post["forumName"].endswith(' Message Wall'): - user_wall = post["forumName"][:-13] - if not post["isReply"]: - event_type = "discussion/wall/post" - message = _("[{author}]({author_url}) created [{title}](<{url}wiki/Message_Wall:{user_wall}?threadId={threadId}>) on [{user}'s Message Wall](<{url}wiki/Message_Wall:{user_wall}>)").format(author=author, author_url=author_url, title=escape_formatting(post["title"]), url=wiki, user=user_wall, user_wall=quote_plus(user_wall.replace(" ", "_")), threadId=post["threadId"]) - else: - event_type = "discussion/wall/reply" - message = _("[{author}]({author_url}) created a [reply](<{url}wiki/Message_Wall:{user_wall}?threadId={threadId}#{replyId}>) to [{title}](<{url}wiki/Message_Wall:{user_wall}?threadId={threadId}>) on [{user}'s Message Wall](<{url}wiki/Message_Wall:{user_wall}>)").format(author=author, author_url=author_url, url=wiki, title=escape_formatting(post["_embedded"]["thread"][0]["title"]), user=user_wall, user_wall=quote_plus(user_wall.replace(" ", "_")), threadId=post["threadId"], replyId=post["id"]) - elif post_type == "ARTICLE_COMMENT": - if article_page is None: - article_page = {"title": _("unknown"), "fullUrl": wiki} # No page known - article_page["fullUrl"] = article_page["fullUrl"].replace(")", "\)").replace("()", "\(") - if not post["isReply"]: - event_type = "discussion/comment/post" - message = _("[{author}]({author_url}) created a [comment](<{url}?commentId={commentId}>) on [{article}](<{url}>)").format(author=author, author_url=author_url, url=article_page["fullUrl"], article=article_page["title"], commentId=post["threadId"]) - else: - event_type = "discussion/comment/reply" - message = _("[{author}]({author_url}) created a [reply](<{url}?commentId={commentId}&replyId={replyId}>) to a [comment](<{url}?commentId={commentId}>) on [{article}](<{url}>)").format(author=author, author_url=author_url, url=article_page["fullUrl"], article=article_page["title"], commentId=post["threadId"], replyId=post["id"]) - else: - logger.warning("No entry for {event} with params: {params}".format(event=post_type, params=post)) - if not settings["support"]: - return - else: - message = _("Unknown event `{event}` by [{author}]({author_url}), report it on the [support server](<{support}>).").format(event=post_type, author=author, author_url=author_url, support=settings["support"]) - event_type = "unknown" - return DiscordMessage("compact", event_type, message_target[1], content=message, wiki=wiki) - - -async def feeds_embed_formatter(post_type, post, message_target, wiki, article_page=None) -> DiscordMessage: - """Embed formatter for Fandom discussions.""" - _ = langs[message_target[0][0]]["discussion_formatters"].gettext - embed = DiscordMessage("embed", "discussion", message_target[1], wiki=wiki) - author = _("unknown") # Fail safe - if post_type == "FORUM": - if post["createdBy"]["name"]: - author = post["createdBy"]["name"] - embed.set_author(author, "{url}f/u/{creatorId}".format(url=wiki, creatorId=post["creatorId"]), icon_url=post["createdBy"]["avatarUrl"]) - elif post["creatorIp"]: - author = post["creatorIp"][1:] - embed.set_author(author, "{url}wiki/Special:Contributions{creatorIp}".format(url=wiki, creatorIp=post["creatorIp"])) - else: - if post["createdBy"]["name"]: - author = post["createdBy"]["name"] - embed.set_author(author, "{url}wiki/User:{creator}".format(url=wiki, creator=author.replace(" ", "_")), icon_url=post["createdBy"]["avatarUrl"]) - else: - embed.set_author(author, "{url}f/u/{creatorId}".format(url=wiki, creatorId=post["creatorId"]), icon_url=post["createdBy"]["avatarUrl"]) - if message_target[0][1] == 3: - if post.get("jsonModel") is not None: - npost = DiscussionsFromHellParser(post, wiki) - embed["description"] = npost.parse() - if npost.image_last: - embed["image"]["url"] = npost.image_last - embed["description"] = embed["description"].replace(npost.image_last, "") - else: # Fallback when model is not available - embed["description"] = post.get("rawContent", "") - if post["forumName"] is not None: - embed.set_footer(post["forumName"].replace("_", " ")) - embed["timestamp"] = datetime.datetime.fromtimestamp(post["creationDate"]["epochSecond"], tz=datetime.timezone.utc).isoformat() - if post_type == "FORUM": - if not post["isReply"]: - embed["url"] = "{url}f/p/{threadId}".format(url=wiki, threadId=post["threadId"]) - embed["title"] = _("Created \"{title}\"").format(title=escape_formatting(post["title"])) - thread_funnel = post.get("funnel") - if thread_funnel == "POLL": - embed.event_type = "discussion/forum/poll" - embed["title"] = _("Created a poll \"{title}\"").format(title=escape_formatting(post["title"])) - if message_target[0][1] > 1: - poll = post["poll"] - image_type = False - if poll["answers"][0]["image"] is not None: - image_type = True - for num, option in enumerate(poll["answers"]): - embed.add_field(option["text"] if image_type is True else _("Option {}").format(num+1), - option["text"] if image_type is False else _("__[View image]({image_url})__").format(image_url=option["image"]["url"]), - inline=True) - elif thread_funnel == "QUIZ": - embed.event_type = "discussion/forum/quiz" - embed["title"] = _("Created a quiz \"{title}\"").format(title=escape_formatting(post["title"])) - if message_target[0][1] > 1: - quiz = post["_embedded"]["quizzes"][0] - embed["description"] = quiz["title"] - if quiz["image"] is not None: - embed["image"]["url"] = quiz["image"] - elif thread_funnel == "TEXT": - embed.event_type = "discussion/forum/post" - else: - logger.warning("No entry for {event} with params: {params}".format(event=thread_funnel, params=post)) - embed.event_type = "unknown" - if message_target[0][1] > 1 and post["_embedded"]["thread"][0]["tags"]: - tag_displayname = [] - for tag in post["_embedded"]["thread"][0]["tags"]: - tag_displayname.append("[{title}]({url})".format(title=tag["articleTitle"], url=create_article_path(tag["articleTitle"], wiki + "wiki/$1"))) - if len(", ".join(tag_displayname)) > 1000: - embed.add_field(_("Tags"), _("{} tags").format(len(post["_embedded"]["thread"][0]["tags"]))) - else: - embed.add_field(_("Tags"), ", ".join(tag_displayname)) - else: - embed.event_type = "discussion/forum/reply" - embed["title"] = _("Replied to \"{title}\"").format(title=escape_formatting(post["_embedded"]["thread"][0]["title"])) - embed["url"] = "{url}f/p/{threadId}/r/{postId}".format(url=wiki, threadId=post["threadId"], postId=post["id"]) - elif post_type == "WALL": - user_wall = _("unknown") # Fail safe - if post["forumName"].endswith(' Message Wall'): - user_wall = post["forumName"][:-13].replace("_", " ") - if not post["isReply"]: - embed.event_type = "discussion/wall/post" - embed["url"] = "{url}wiki/Message_Wall:{user_wall}?threadId={threadId}".format(url=wiki, user_wall=quote_plus(user_wall.replace(" ", "_")), threadId=post["threadId"]) - embed["title"] = _("Created \"{title}\" on {user}'s Message Wall").format(title=escape_formatting(post["title"]), user=user_wall) - else: - embed.event_type = "discussion/wall/reply" - embed["url"] = "{url}wiki/Message_Wall:{user_wall}?threadId={threadId}#{replyId}".format(url=wiki, user_wall=quote_plus(user_wall.replace(" ", "_")), threadId=post["threadId"], replyId=post["id"]) - embed["title"] = _("Replied to \"{title}\" on {user}'s Message Wall").format(title=escape_formatting(post["_embedded"]["thread"][0]["title"]), user=user_wall) - elif post_type == "ARTICLE_COMMENT": - if article_page is None: - article_page = {"title": _("unknown"), "fullUrl": wiki} # No page known - if not post["isReply"]: - embed.event_type = "discussion/comment/post" - embed["url"] = "{url}?commentId={commentId}".format(url=article_page["fullUrl"], commentId=post["threadId"]) - embed["title"] = _("Commented on {article}").format(article=article_page["title"]) - else: - embed.event_type = "discussion/comment/reply" - embed["url"] = "{url}?commentId={commentId}&replyId={replyId}".format(url=article_page["fullUrl"], commentId=post["threadId"], replyId=post["id"]) - embed["title"] = _("Replied to a comment on {article}").format(article=article_page["title"]) - embed.set_footer(article_page["title"]) - else: - logger.warning("No entry for {event} with params: {params}".format(event=post_type, params=post)) - embed["title"] = _("Unknown event `{event}`").format(event=post_type) - embed.event_type = "unknown" - if settings["support"]: - change_params = "[```json\n{params}\n```]({support})".format(params=json.dumps(post, indent=2), support=settings["support"]) - if len(change_params) > 1000: - embed.add_field(_("Report this on the support server"), settings["support"]) - else: - embed.add_field(_("Report this on the support server"), change_params) - embed.finish_embed() - return embed - - -class DiscussionsFromHellParser: - """This class converts fairly convoluted Fandom jsonModal of a discussion post into Markdown formatted usable thing. Takes string, returns string. - Kudos to MarkusRost for allowing me to implement this formatter based on his code in Wiki-Bot.""" - def __init__(self, post, wiki): - self.post = post - self.wiki = wiki - self.jsonModal = json.loads(post.get("jsonModel", "{}")) - self.markdown_text = "" - self.item_num = 1 - self.image_last = None - - def parse(self) -> str: - """Main parsing logic""" - self.parse_content(self.jsonModal["content"]) - if len(self.markdown_text) > 2000: - self.markdown_text = self.markdown_text[0:2000] + "…" - return self.markdown_text - - def parse_content(self, content, ctype=None): - self.image_last = None - for item in content: - if ctype == "bulletList": - self.markdown_text += "\t• " - if ctype == "orderedList": - self.markdown_text += "\t{num}. ".format(num=self.item_num) - self.item_num += 1 - if item["type"] == "text": - if "marks" in item: - prefix, suffix = self.convert_marks(item["marks"]) - self.markdown_text = "{old}{pre}{text}{suf}".format(old=self.markdown_text, pre=prefix, text=escape_formatting(item["text"]), suf=suffix) - else: - if ctype == "code_block": - self.markdown_text += item["text"] # ignore formatting on preformatted text which cannot have additional formatting anyways - else: - self.markdown_text += escape_formatting(item["text"]) - elif item["type"] == "paragraph": - if "content" in item: - self.parse_content(item["content"], item["type"]) - self.markdown_text += "\n" - elif item["type"] == "openGraph": - if not item["attrs"].get("wasAddedWithInlineLink", False): - self.markdown_text = "{old}{link}\n".format(old=self.markdown_text, link=item["attrs"]["url"]) - elif item["type"] == "image": - try: - logger.debug(item["attrs"]["id"]) - if item["attrs"]["id"] is not None: - self.markdown_text = "{old}{img_url}\n".format(old=self.markdown_text, img_url=self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"]) - self.image_last = self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"] - except (IndexError, ValueError, TypeError): - logger.warning("Image {} not found.".format(item["attrs"]["id"])) - logger.debug(self.markdown_text) - elif item["type"] == "code_block": - self.markdown_text += "```\n" - if "content" in item: - self.parse_content(item["content"], item["type"]) - self.markdown_text += "\n```\n" - elif item["type"] == "bulletList": - if "content" in item: - self.parse_content(item["content"], item["type"]) - elif item["type"] == "orderedList": - self.item_num = 1 - if "content" in item: - self.parse_content(item["content"], item["type"]) - elif item["type"] == "listItem": - self.parse_content(item["content"], item["type"]) - - def convert_marks(self, marks): - prefix = "" - suffix = "" - for mark in marks: - if mark["type"] == "mention": - prefix += "[" - suffix = "]({wiki}f/u/{userid}){suffix}".format(wiki=self.wiki, userid=mark["attrs"]["userId"], suffix=suffix) - elif mark["type"] == "strong": - prefix += "**" - suffix = "**{suffix}".format(suffix=suffix) - elif mark["type"] == "link": - prefix += "[" - suffix = "]({link}){suffix}".format(link=mark["attrs"]["href"], suffix=suffix) - elif mark["type"] == "em": - prefix += "_" - suffix = "_" + suffix - return prefix, suffix \ No newline at end of file diff --git a/src/formatters/rc.py b/src/formatters/rc.py deleted file mode 100644 index 18a2334..0000000 --- a/src/formatters/rc.py +++ /dev/null @@ -1,1154 +0,0 @@ -import ipaddress -import math -import re -import time -import json -import logging -import datetime -from aiohttp import ClientResponseError -from src.config import settings -from src.misc import link_formatter, create_article_path, parse_link, profile_field_name, ContentParser -from src.discord import DiscordMessage -from src.i18n import langs - -from bs4 import BeautifulSoup - -logger = logging.getLogger("rcgcdw.rc_formatters") - -if 1 == 2: # additional translation strings in unreachable code - print(_("director"), _("bot"), _("editor"), _("directors"), _("sysop"), _("bureaucrat"), _("reviewer"), - _("autoreview"), _("autopatrol"), _("wiki_guardian"), ngettext("second", "seconds", 1), ngettext("minute", "minutes", 1), ngettext("hour", "hours", 1), ngettext("day", "days", 1), ngettext("week", "weeks", 1), ngettext("month", "months",1), ngettext("year", "years", 1), ngettext("millennium", "millennia", 1), ngettext("decade", "decades", 1), ngettext("century", "centuries", 1)) - -async def compact_formatter(action, change, parsed_comment, categories, recent_changes, message_target, paths, rate_limiter, - additional_data=None) -> DiscordMessage: - """Recent Changes compact formatter, part of RcGcDw""" - _ = langs[message_target[0][0]]["rc_formatters"].gettext - ngettext = langs[message_target[0][0]]["rc_formatters"].ngettext - if additional_data is None: - additional_data = {"namespaces": {}, "tags": {}} - WIKI_API_PATH = paths[0] - WIKI_SCRIPT_PATH = paths[1] - WIKI_ARTICLE_PATH = paths[2] - WIKI_JUST_DOMAIN = paths[3] - if action != "suppressed": - if "anon" in change: - author_url = link_formatter(create_article_path("Special:Contributions/{user}".format(user=change["user"]), WIKI_ARTICLE_PATH)) - else: - author_url = link_formatter(create_article_path("User:{user}".format(user=change["user"]), WIKI_ARTICLE_PATH)) - author = change["user"] - parsed_comment = "" if parsed_comment is None else " *("+parsed_comment+")*" - if action in ["edit", "new"]: - edit_link = link_formatter("{wiki}index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format( - wiki=WIKI_SCRIPT_PATH, pageid=change["pageid"], diff=change["revid"], oldrev=change["old_revid"], - article=change["title"])) - edit_size = change["newlen"] - change["oldlen"] - sign = "" - if edit_size > 0: - sign = "+" - bold = "" - if abs(edit_size) > 500: - bold = "**" - if action == "edit": - content = _("[{author}]({author_url}) edited [{article}]({edit_link}){comment} {bold}({sign}{edit_size}){bold}").format(author=author, author_url=author_url, article=change["title"], edit_link=edit_link, comment=parsed_comment, edit_size=edit_size, sign=sign, bold=bold) - else: - content = _("[{author}]({author_url}) created [{article}]({edit_link}){comment} {bold}({sign}{edit_size}){bold}").format(author=author, author_url=author_url, article=change["title"], edit_link=edit_link, comment=parsed_comment, edit_size=edit_size, sign=sign, bold=bold) - elif action =="upload/upload": - file_link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) uploaded [{file}]({file_link}){comment}").format(author=author, - author_url=author_url, - file=change["title"], - file_link=file_link, - comment=parsed_comment) - elif action == "upload/revert": - file_link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) reverted a version of [{file}]({file_link}){comment}").format( - author=author, author_url=author_url, file=change["title"], file_link=file_link, comment=parsed_comment) - elif action == "upload/overwrite": - file_link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) uploaded a new version of [{file}]({file_link}){comment}").format(author=author, author_url=author_url, file=change["title"], file_link=file_link, comment=parsed_comment) - elif action == "delete/delete": - page_link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) deleted [{page}]({page_link}){comment}").format(author=author, author_url=author_url, page=change["title"], page_link=page_link, - comment=parsed_comment) - elif action == "delete/delete_redir": - page_link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) deleted redirect by overwriting [{page}]({page_link}){comment}").format(author=author, author_url=author_url, page=change["title"], page_link=page_link, - comment=parsed_comment) - elif action == "move/move": - link = link_formatter(create_article_path(change["logparams"]['target_title'], WIKI_ARTICLE_PATH)) - redirect_status = _("without making a redirect") if "suppressredirect" in change["logparams"] else _("with a redirect") - content = _("[{author}]({author_url}) moved {redirect}*{article}* to [{target}]({target_url}) {made_a_redirect}{comment}").format(author=author, author_url=author_url, redirect="⤷ " if "redirect" in change else "", article=change["title"], - target=change["logparams"]['target_title'], target_url=link, comment=parsed_comment, made_a_redirect=redirect_status) - elif action == "move/move_redir": - link = link_formatter(create_article_path(change["logparams"]["target_title"], WIKI_ARTICLE_PATH)) - redirect_status = _("without making a redirect") if "suppressredirect" in change["logparams"] else _( - "with a redirect") - content = _("[{author}]({author_url}) moved {redirect}*{article}* over redirect to [{target}]({target_url}) {made_a_redirect}{comment}").format(author=author, author_url=author_url, redirect="⤷ " if "redirect" in change else "", article=change["title"], - target=change["logparams"]['target_title'], target_url=link, comment=parsed_comment, made_a_redirect=redirect_status) - elif action == "protect/move_prot": - link = link_formatter(create_article_path(change["logparams"]["oldtitle_title"], WIKI_ARTICLE_PATH)) - content = _( - "[{author}]({author_url}) moved protection settings from {redirect}*{article}* to [{target}]({target_url}){comment}").format(author=author, author_url=author_url, redirect="⤷ " if "redirect" in change else "", article=change["logparams"]["oldtitle_title"], - target=change["title"], target_url=link, comment=parsed_comment) - elif action == "block/block": - user = change["title"].split(':', 1)[1] - restriction_description = "" - try: - ipaddress.ip_address(user) - link = link_formatter(create_article_path("Special:Contributions/{user}".format(user=user), WIKI_ARTICLE_PATH)) - except ValueError: - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - if change["logparams"]["duration"] in ["infinite", "indefinite", "infinity", "never"]: - block_time = _("for infinity and beyond") - else: - english_length = re.sub(r"(\d+)", "", change["logparams"][ - "duration"]) # note that translation won't work for millenia and century yet - english_length_num = re.sub(r"(\D+)", "", change["logparams"]["duration"]) - try: - if "@" in english_length: - raise ValueError - english_length = english_length.rstrip("s").strip() - try: - block_time = _("for {num} {translated_length}").format(num=english_length_num, - translated_length=ngettext(english_length, - english_length + "s", - int(english_length_num))) - except ValueError: - logger.exception("Couldn't properly resolve block expiry.") - except (AttributeError, ValueError): - date_time_obj = datetime.datetime.strptime(change["logparams"]["expiry"], '%Y-%m-%dT%H:%M:%SZ') - block_time = _("until {}").format(date_time_obj.strftime("%Y-%m-%d %H:%M:%S UTC")) - if "sitewide" not in change["logparams"]: - if "restrictions" in change["logparams"]: - if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]: - restriction_description = _(" on pages: ") - for page in change["logparams"]["restrictions"]["pages"]: - restricted_pages = ["*{page}*".format(page=i["page_title"]) for i in change["logparams"]["restrictions"]["pages"]] - restriction_description = restriction_description + ", ".join(restricted_pages) - if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["namespaces"]: - namespaces = [] - if restriction_description: - restriction_description = restriction_description + _(" and namespaces: ") - else: - restriction_description = _(" on namespaces: ") - for namespace in change["logparams"]["restrictions"]["namespaces"]: - if str(namespace) in additional_data.namespaces: # if we have cached namespace name for given namespace number, add its name to the list - namespaces.append("*{ns}*".format(ns=additional_data.namespaces[str(namespace)]["*"])) - else: - namespaces.append("*{ns}*".format(ns=namespace)) - restriction_description = restriction_description + ", ".join(namespaces) - restriction_description = restriction_description + "." - if len(restriction_description) > 1020: - logger.debug(restriction_description) - restriction_description = restriction_description[:1020] + "…" - content = _( - "[{author}]({author_url}) blocked [{user}]({user_url}) {time}{restriction_desc}{comment}").format(author=author, author_url=author_url, user=user, time=block_time, user_url=link, restriction_desc=restriction_description, comment=parsed_comment) - elif action == "block/reblock": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - user = change["title"].split(':', 1)[1] - content = _("[{author}]({author_url}) changed block settings for [{blocked_user}]({user_url}){comment}").format(author=author, author_url=author_url, blocked_user=user, user_url=link, comment=parsed_comment) - elif action == "block/unblock": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - user = change["title"].split(':', 1)[1] - content = _("[{author}]({author_url}) unblocked [{blocked_user}]({user_url}){comment}").format(author=author, author_url=author_url, blocked_user=user, user_url=link, comment=parsed_comment) - elif action == "curseprofile/comment-created": - link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]), WIKI_ARTICLE_PATH)) - target_user = change["title"].split(':', 1)[1] - if target_user != author: - content = _("[{author}]({author_url}) left a [comment]({comment}) on {target}'s profile".format(author=author, author_url=author_url, comment=link, target=target_user)) - else: - content = _("[{author}]({author_url}) left a [comment]({comment}) on their own profile".format(author=author, author_url=author_url, comment=link)) - elif action == "curseprofile/comment-replied": - link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]), WIKI_ARTICLE_PATH)) - target_user = change["title"].split(':', 1)[1] - if target_user != author: - content = _( - "[{author}]({author_url}) replied to a [comment]({comment}) on {target}'s profile".format(author=author, - author_url=author_url, - comment=link, - target=target_user)) - else: - content = _( - "[{author}]({author_url}) replied to a [comment]({comment}) on their own profile".format(author=author, - comment=link, - author_url=author_url)) - elif action == "curseprofile/comment-edited": - link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]), WIKI_ARTICLE_PATH)) - target_user = change["title"].split(':', 1)[1] - if target_user != author: - content = _( - "[{author}]({author_url}) edited a [comment]({comment}) on {target}'s profile".format(author=author, - author_url=author_url, - comment=link, - target=target_user)) - else: - content = _( - "[{author}]({author_url}) edited a [comment]({comment}) on their own profile".format(author=author, - comment=link, - author_url=author_url)) - elif action == "curseprofile/comment-purged": - target_user = change["title"].split(':', 1)[1] - if target_user != author: - content = _("[{author}]({author_url}) purged a comment on {target}'s profile".format(author=author, author_url=author_url,target=target_user)) - else: - content = _("[{author}]({author_url}) purged a comment on their own profile".format(author=author, author_url=author_url)) - elif action == "curseprofile/comment-deleted": - if "4:comment_id" in change["logparams"]: - link = link_formatter(create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]), WIKI_ARTICLE_PATH)) - else: - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - target_user = change["title"].split(':', 1)[1] - if target_user != author: - content = _("[{author}]({author_url}) deleted a [comment]({comment}) on {target}'s profile".format(author=author,author_url=author_url, comment=link, target=target_user)) - else: - content = _("[{author}]({author_url}) deleted a [comment]({comment}) on their own profile".format(author=author, author_url=author_url, comment=link)) - - elif action == "curseprofile/profile-edited": - target_user = change["title"].split(':', 1)[1] - link = link_formatter(create_article_path("UserProfile:{user}".format(user=target_user), WIKI_ARTICLE_PATH)) - if target_user != author: - content = _("[{author}]({author_url}) edited the {field} on [{target}]({target_url})'s profile. *({desc})*").format(author=author, - author_url=author_url, - target=target_user, - target_url=link, - field=profile_field_name(change["logparams"]['4:section'], False, message_target[0][0]), - desc=BeautifulSoup(change["parsedcomment"], "lxml").get_text()) - else: - content = _("[{author}]({author_url}) edited the {field} on [their own]({target_url}) profile. *({desc})*").format( - author=author, - author_url=author_url, - target_url=link, - field=profile_field_name(change["logparams"]['4:section'], False, message_target[0][0]), - desc=BeautifulSoup(change["parsedcomment"], "lxml").get_text()) - elif action in ("rights/rights", "rights/autopromote"): - link = link_formatter(create_article_path("User:{user}".format(user=change["title"].split(":")[1]), WIKI_ARTICLE_PATH)) - old_groups = [] - new_groups = [] - for name in change["logparams"]["oldgroups"]: - old_groups.append(_(name)) - for name in change["logparams"]["newgroups"]: - new_groups.append(_(name)) - if len(old_groups) == 0: - old_groups = [_("none")] - if len(new_groups) == 0: - new_groups = [_("none")] - - if action == "rights/rights": - content = _("[{author}]({author_url}) changed group membership for [{target}]({target_url}) from {old_groups} to {new_groups}{comment}").format(author=author, author_url=author_url, target=change["title"].split(":")[1], target_url=link, old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), comment=parsed_comment) - else: - content = _("{author} autopromoted [{target}]({target_url}) from {old_groups} to {new_groups}{comment}").format( - author=_("System"), author_url=author_url, target=change["title"].split(":")[1], target_url=link, - old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), - comment=parsed_comment) - elif action == "protect/protect": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) protected [{article}]({article_url}) with the following settings: {settings}{comment}").format(author=author, author_url=author_url, - article=change["title"], article_url=link, - settings=change["logparams"].get("description", "")+(_(" [cascading]") if "cascade" in change["logparams"] else ""), - comment=parsed_comment) - elif action == "protect/modify": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _( - "[{author}]({author_url}) modified protection settings of [{article}]({article_url}) to: {settings}{comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - settings=change["logparams"].get("description", "") + (_(" [cascading]") if "cascade" in change["logparams"] else ""), - comment=parsed_comment) - elif action == "protect/unprotect": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) removed protection from [{article}]({article_url}){comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, comment=parsed_comment) - elif action == "delete/revision": - amount = len(change["logparams"]["ids"]) - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = ngettext("[{author}]({author_url}) changed visibility of revision on page [{article}]({article_url}){comment}", - "[{author}]({author_url}) changed visibility of {amount} revisions on page [{article}]({article_url}){comment}", amount).format(author=author, author_url=author_url, - article=change["title"], article_url=link, amount=amount, comment=parsed_comment) - elif action == "import/upload": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = ngettext("[{author}]({author_url}) imported [{article}]({article_url}) with {count} revision{comment}", - "[{author}]({author_url}) imported [{article}]({article_url}) with {count} revisions{comment}", change["logparams"]["count"]).format( - author=author, author_url=author_url, article=change["title"], article_url=link, count=change["logparams"]["count"], comment=parsed_comment) - elif action == "delete/restore": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) restored [{article}]({article_url}){comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, comment=parsed_comment) - elif action == "delete/event": - content = _("[{author}]({author_url}) changed visibility of log events{comment}").format(author=author, author_url=author_url, comment=parsed_comment) - elif action == "import/interwiki": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - source_link = link_formatter(create_article_path(change["logparams"]["interwiki_title"], WIKI_ARTICLE_PATH)) - content = ngettext("[{author}]({author_url}) imported [{article}]({article_url}) with {count} revision from [{source}]({source_url}){comment}", - "[{author}]({author_url}) imported [{article}]({article_url}) with {count} revisions from [{source}]({source_url}){comment}", change["logparams"]["count"]).format( - author=author, author_url=author_url, article=change["title"], article_url=link, count=change["logparams"]["count"], source=change["logparams"]["interwiki_title"], source_url=source_link, comment=parsed_comment) - elif action == "abusefilter/modify": - link = link_formatter(create_article_path("Special:AbuseFilter/history/{number}/diff/prev/{historyid}".format(number=change["logparams"]['newId'], historyid=change["logparams"]["historyId"]), WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) edited abuse filter [number {number}]({filter_url})").format(author=author, author_url=author_url, number=change["logparams"]['newId'], filter_url=link) - elif action == "abusefilter/create": - link = link_formatter( - create_article_path("Special:AbuseFilter/{number}".format(number=change["logparams"]['newId']), WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) created abuse filter [number {number}]({filter_url})").format(author=author, author_url=author_url, number=change["logparams"]['newId'], filter_url=link) - elif action == "merge/merge": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - link_dest = link_formatter(create_article_path(change["logparams"]["dest_title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) merged revision histories of [{article}]({article_url}) into [{dest}]({dest_url}){comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, dest_url=link_dest, - dest=change["logparams"]["dest_title"], comment=parsed_comment) - elif action == "newusers/autocreate": - content = _("Account [{author}]({author_url}) was created automatically").format(author=author, author_url=author_url) - elif action == "newusers/create": - content = _("Account [{author}]({author_url}) was created").format(author=author, author_url=author_url) - elif action == "newusers/create2": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("Account [{article}]({article_url}) was created by [{author}]({author_url}){comment}").format(article=change["title"], article_url=link, author=author, author_url=author_url, comment=parsed_comment) - elif action == "newusers/byemail": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("Account [{article}]({article_url}) was created by [{author}]({author_url}) and password was sent by email{comment}").format(article=change["title"], article_url=link, author=author, author_url=author_url, comment=parsed_comment) - elif action == "newusers/newusers": - content = _("Account [{author}]({author_url}) was created").format(author=author, author_url=author_url) - elif action == "interwiki/iw_add": - link = link_formatter(create_article_path("Special:Interwiki", WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) added an entry to the [interwiki table]({table_url}) pointing to {website} with {prefix} prefix").format(author=author, author_url=author_url, desc=parsed_comment, - prefix=change["logparams"]['0'], - website=change["logparams"]['1'], - table_url=link) - elif action == "interwiki/iw_edit": - link = link_formatter(create_article_path("Special:Interwiki", WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) edited an entry in [interwiki table]({table_url}) pointing to {website} with {prefix} prefix").format(author=author, author_url=author_url, desc=parsed_comment, - prefix=change["logparams"]['0'], - website=change["logparams"]['1'], - table_url=link) - elif action == "interwiki/iw_delete": - link = link_formatter(create_article_path("Special:Interwiki", WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) deleted an entry in [interwiki table]({table_url})").format(author=author, author_url=author_url, table_url=link) - elif action == "contentmodel/change": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) changed the content model of the page [{article}]({article_url}) from {old} to {new}{comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, old=change["logparams"]["oldmodel"], - new=change["logparams"]["newmodel"], comment=parsed_comment) - elif action == "contentmodel/new": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) created the page [{article}]({article_url}) using a non-default content model {new}{comment}").format(author=author, author_url=author_url, article=change["title"], article_url=link, new=change["logparams"]["newmodel"], comment=parsed_comment) - elif action == "sprite/sprite": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) edited the sprite for [{article}]({article_url})").format(author=author, author_url=author_url, article=change["title"], article_url=link) - elif action == "sprite/sheet": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) created the sprite sheet for [{article}]({article_url})").format(author=author, author_url=author_url, article=change["title"], article_url=link) - elif action == "sprite/slice": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) edited the slice for [{article}]({article_url})").format(author=author, author_url=author_url, article=change["title"], article_url=link) - elif action == "cargo/createtable": - table = parse_link(paths[3], change["logparams"]["0"]) - content = _("[{author}]({author_url}) created the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=table) - elif action == "cargo/deletetable": - content = _("[{author}]({author_url}) deleted the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=change["logparams"]["0"]) - elif action == "cargo/recreatetable": - table = parse_link(paths[3], change["logparams"]["0"]) - content = _("[{author}]({author_url}) recreated the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=table) - elif action == "cargo/replacetable": - table = parse_link(paths[3], change["logparams"]["0"]) - content = _("[{author}]({author_url}) replaced the Cargo table \"{table}\"").format(author=author, author_url=author_url, table=table) - elif action == "managetags/create": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) created the [tag]({tag_url}) \"{tag}\"{comment}").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link, comment=parsed_comment) - elif action == "managetags/delete": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - if change["logparams"]["count"] == 0: - content = _("[{author}]({author_url}) deleted the [tag]({tag_url}) \"{tag}\"{comment}").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link, comment=parsed_comment) - else: - content = ngettext("[{author}]({author_url}) deleted the [tag]({tag_url}) \"{tag}\" and removed it from {count} revision or log entry{comment}", - "[{author}]({author_url}) deleted the [tag]({tag_url}) \"{tag}\" and removed it from {count} revisions and/or log entries{comment}", - change["logparams"]["count"]).format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link, count=change["logparams"]["count"], comment=parsed_comment) - elif action == "managetags/activate": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) activated the [tag]({tag_url}) \"{tag}\"{comment}").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link, comment=parsed_comment) - elif action == "managetags/deactivate": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) deactivated the [tag]({tag_url}) \"{tag}\"{comment}").format(author=author, author_url=author_url, tag=change["logparams"]["tag"], tag_url=link, comment=parsed_comment) - elif action == "managewiki/settings": # Miraheze's ManageWiki extension https://github.com/miraheze/ManageWiki - content = _( - "[{author}]({author_url}) changed wiki settings{reason}".format(author=author, author_url=author_url, - reason=parsed_comment)) - elif action == "managewiki/delete": - content = _("[{author}]({author_url}) deleted a wiki *{wiki_name}*{comment}").format(author=author, author_url=author_url, - wiki_name=change["logparams"].get("wiki", _("Unknown")), comment=parsed_comment) - elif action == "managewiki/lock": - content = _("[{author}]({author_url}) locked a wiki *{wiki_name}*{comment}").format( - author=author, author_url=author_url, wiki_name=change["logparams"].get("wiki", _("Unknown")), comment=parsed_comment) - elif action == "managewiki/namespaces": - content = _("[{author}]({author_url}) modified a namespace *{namespace_name}* on *{wiki_name}*{comment}").format( - author=author, author_url=author_url, namespace_name=change["logparams"].get("namespace", _("Unknown")), - wiki_name=change["logparams"].get("wiki", _("Unknown")), comment=parsed_comment) - elif action == "managewiki/namespaces-delete": - content = _( - "[{author}]({author_url}) deleted namespace *{namespace_name}* on *{wiki_name}*{comment}").format( - author=author, author_url=author_url, - namespace_name=change["logparams"].get("namespace", _("Unknown")), - wiki_name=change["logparams"].get("wiki", _("Unknown")), comment=parsed_comment) - elif action == "managewiki/rights": - group_name = change["title"].split("/permissions/", 1)[1] - content = _("[{author}]({author_url}) modified user group *{group_name}*{comment}").format( - author=author, author_url=author_url, group_name=group_name, comment=parsed_comment - ) - elif action == "managewiki/undelete": - content = _("[{author}]({author_url}) undeleted a wiki *{wiki_name}*{comment}").format( - author=author, author_url=author_url, wiki_name=change["logparams"].get("wiki", _("Unknown")), comment=parsed_comment - ) - elif action == "managewiki/unlock": - content = _("[{author}]({author_url}) unlocked a wiki *{wiki_name}*{comment}").format( - author=author, author_url=author_url, wiki_name=change["logparams"].get("wiki", _("Unknown")), - comment=parsed_comment - ) - elif action == "datadump/generate": - content = _("[{author}]({author_url}) generated *{file}* dump{comment}").format( - author=author, author_url=author_url, file=change["logparams"]["filename"], - comment=parsed_comment - ) - elif action == "datadump/delete": - content = _("[{author}]({author_url}) deleted *{file}* dump{comment}").format( - author=author, author_url=author_url, file=change["logparams"]["filename"], - comment=parsed_comment - ) - elif action == "pagetranslation/mark": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - if "?" in link: - link = link + "&oldid={}".format(change["logparams"]["revision"]) - else: - link = link + "?oldid={}".format(change["logparams"]["revision"]) - link = link_formatter(link) - content = _("[{author}]({author_url}) marked [{article}]({article_url}) for translation{comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - comment=parsed_comment - ) - elif action == "pagetranslation/unmark": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) removed [{article}]({article_url}) from the translation system{comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - comment=parsed_comment - ) - elif action == "pagetranslation/moveok": - link = link_formatter(create_article_path(change["logparams"]["target"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) completed moving translation pages from *{article}* to [{target}]({target_url}){comment}").format( - author=author, author_url=author_url, - article=change["title"], target=change["logparams"]["target"], target_url=link, - comment=parsed_comment - ) - elif action == "pagetranslation/movenok": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - target_url = link_formatter(create_article_path(change["logparams"]["target"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) encountered a problem while moving [{article}]({article_url}) to [{target}]({target_url}){comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - target=change["logparams"]["target"], target_url=target_url, - comment=parsed_comment - ) - elif action == "pagetranslation/deletefok": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) completed deletion of translatable page [{article}]({article_url}){comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - comment=parsed_comment - ) - elif action == "pagetranslation/deletefnok": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - target_url = link_formatter(create_article_path(change["logparams"]["target"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) failed to delete [{article}]({article_url}) which belongs to translatable page [{target}]({target_url}){comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - target=change["logparams"]["target"], target_url=target_url, - comment=parsed_comment - ) - elif action == "pagetranslation/deletelok": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) completed deletion of translation page [{article}]({article_url}){comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - comment=parsed_comment - ) - elif action == "pagetranslation/deletelnok": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - target_url = link_formatter(create_article_path(change["logparams"]["target"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) failed to delete [{article}]({article_url}) which belongs to translation page [{target}]({target_url}){comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - target=change["logparams"]["target"], target_url=target_url, - comment=parsed_comment - ) - elif action == "pagetranslation/encourage": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) encouraged translation of [{article}]({article_url}){comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - comment=parsed_comment - ) - elif action == "pagetranslation/discourage": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) discouraged translation of [{article}]({article_url}){comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - comment=parsed_comment - ) - elif action == "pagetranslation/prioritylanguages": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - if "languages" in change["logparams"]: - languages = "`, `".join(change["logparams"]["languages"].split(",")) - if change["logparams"]["force"] == "on": - content = _("[{author}]({author_url}) limited languages for [{article}]({article_url}) to `{languages}`{comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - languages=languages, comment=parsed_comment - ) - else: - content = _("[{author}]({author_url}) set the priority languages for [{article}]({article_url}) to `{languages}`{comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - languages=languages, comment=parsed_comment - ) - else: - content = _("[{author}]({author_url}) removed priority languages from [{article}]({article_url}){comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - comment=parsed_comment - ) - elif action == "pagetranslation/associate": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) added translatable page [{article}]({article_url}) to aggregate group \"{group}\"{comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - group=change["logparams"]["aggregategroup"], comment=parsed_comment - ) - elif action == "pagetranslation/dissociate": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - content = _("[{author}]({author_url}) removed translatable page [{article}]({article_url}) from aggregate group \"{group}\"{comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - group=change["logparams"]["aggregategroup"], comment=parsed_comment - ) - elif action == "translationreview/message": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - if "?" in link: - link = link + "&oldid={}".format(change["logparams"]["revision"]) - else: - link = link + "?oldid={}".format(change["logparams"]["revision"]) - link = link_formatter(link) - content = _("[{author}]({author_url}) reviewed translation [{article}]({article_url}){comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - comment=parsed_comment - ) - elif action == "translationreview/group": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - if "old-state" in change["logparams"]: - content = _("[{author}]({author_url}) changed the state of `{language}` translations of [{article}]({article_url}) from `{old_state}` to `{new_state}`{comment}").format( - author=author, author_url=author_url, language=change["logparams"]["language"], - article=change["logparams"]["group-label"], article_url=link, - old_state=change["logparams"]["old-state"], new_state=change["logparams"]["new-state"], - comment=parsed_comment - ) - else: - content = _("[{author}]({author_url}) changed the state of `{language}` translations of [{article}]({article_url}) to `{new_state}`{comment}").format( - author=author, author_url=author_url, language=change["logparams"]["language"], - article=change["logparams"]["group-label"], article_url=link, - new_state=change["logparams"]["new-state"], comment=parsed_comment - ) - elif action == "pagelang/pagelang": - link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH)) - old_lang = "`{}`".format(change["logparams"]["oldlanguage"]) - if change["logparams"]["oldlanguage"][-5:] == "[def]": - old_lang = "`{}` {}".format(change["logparams"]["oldlanguage"][:-5], _("(default)")) - new_lang = "`{}`".format(change["logparams"]["newlanguage"]) - if change["logparams"]["newlanguage"][-5:] == "[def]": - new_lang = "`{}` {}".format(change["logparams"]["oldlanguage"][:-5], _("(default)")) - content = _("[{author}]({author_url}) changed the language of [{article}]({article_url}) from {old_lang} to {new_lang}{comment}").format( - author=author, author_url=author_url, - article=change["title"], article_url=link, - old_lang=old_lang, new_lang=new_lang, comment=parsed_comment - ) - elif action == "renameuser/renameuser": - link = link_formatter(create_article_path("User:"+change["logparams"]["newuser"], WIKI_ARTICLE_PATH)) - edits = change["logparams"]["edits"] - if edits > 0: - content = ngettext("[{author}]({author_url}) renamed user *{old_name}* with {edits} edit to [{new_name}]({link}){comment}", - "[{author}]({author_url}) renamed user *{old_name}* with {edits} edits to [{new_name}]({link}){comment}", edits).format( - author=author, author_url=author_url, old_name=change["logparams"]["olduser"], edits=edits, new_name=change["logparams"]["newuser"], link=link, comment=parsed_comment - ) - else: - content = _("[{author}]({author_url}) renamed user *{old_name}* to [{new_name}]({link}){comment}").format( - author=author, author_url=author_url, old_name=change["logparams"]["olduser"], new_name=change["logparams"]["newuser"], link=link, comment=parsed_comment - ) - elif action == "suppressed": - content = _("An action has been hidden by administration.") - else: - logger.warning("No entry for {event} with params: {params}".format(event=action, params=change)) - if not settings.get("support", None): - return - else: - content = _("Unknown event `{event}` by [{author}]({author_url}), report it on the [support server](<{support}>).").format(event=action, author=author, author_url=author_url, support=settings["support"]) - action = "unknown" - return DiscordMessage("compact", action, message_target[1], content=content, wiki=WIKI_SCRIPT_PATH) - - -async def embed_formatter(action, change, parsed_comment, categories, recent_changes, message_target, paths, rate_limiter, additional_data=None) -> DiscordMessage: - """Recent Changes embed formatter, part of RcGcDw""" - _ = langs[message_target[0][0]]["rc_formatters"].gettext - ngettext = langs[message_target[0][0]]["rc_formatters"].ngettext - if additional_data is None: - additional_data = {"namespaces": {}, "tags": {}} - WIKI_API_PATH = paths[0] - WIKI_SCRIPT_PATH = paths[1] - WIKI_ARTICLE_PATH = paths[2] - WIKI_JUST_DOMAIN = paths[3] - embed = DiscordMessage("embed", action, message_target[1], wiki=WIKI_SCRIPT_PATH) - if parsed_comment is None: - parsed_comment = _("No description provided") - if action != "suppressed": - if "anon" in change: - author_url = create_article_path("Special:Contributions/{user}".format(user=change["user"]), WIKI_ARTICLE_PATH) - else: - author_url = create_article_path("User:{}".format(change["user"]), WIKI_ARTICLE_PATH) - embed.set_author(change["user"], author_url) - if action in ("edit", "new"): # edit or new page - editsize = change["newlen"] - change["oldlen"] - if editsize > 0: - if editsize > 6032: - embed["color"] = 65280 - else: - embed["color"] = 35840 + (math.floor(editsize / 52)) * 256 - elif editsize < 0: - if editsize < -6032: - embed["color"] = 16711680 - else: - embed["color"] = 9175040 + (math.floor((editsize * -1) / 52)) * 65536 - elif editsize == 0: - embed["color"] = 8750469 - link = "{wiki}index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format( - wiki=WIKI_SCRIPT_PATH, pageid=change["pageid"], diff=change["revid"], oldrev=change["old_revid"], - article=change["title"].replace(" ", "_").replace("%", "%25").replace("\\", "%5C").replace("&", "%26")) - embed["title"] = "{redirect}{article} ({new}{minor}{bot}{space}{editsize})".format(redirect="⤷ " if "redirect" in change else "", article=change["title"], editsize="+" + str( - editsize) if editsize > 0 else editsize, new=_("(N!) ") if action == "new" else "", - minor=_("m") if action == "edit" and "minor" in change else "", bot=_('b') if "bot" in change else "", space=" " if "bot" in change or (action == "edit" and "minor" in change) or action == "new" else "") - if message_target[0][1] == 3: - try: - if action == "new": - changed_content = await recent_changes.safe_request( - "{wiki}?action=compare&format=json&fromtext=&torev={diff}&topst=1&prop=diff".format( - wiki=WIKI_API_PATH, diff=change["revid"] - ), rate_limiter, "compare", "*") - else: - changed_content = await recent_changes.safe_request( - "{wiki}?action=compare&format=json&fromrev={oldrev}&torev={diff}&topst=1&prop=diff".format( - wiki=WIKI_API_PATH, diff=change["revid"],oldrev=change["old_revid"] - ), rate_limiter, "compare", "*") - except ClientResponseError: - changed_content = None - if changed_content: - EditDiff = ContentParser(message_target[0][0]) - EditDiff.feed(changed_content) - if EditDiff.small_prev_del: - if EditDiff.small_prev_del.replace("~~", "").isspace(): - EditDiff.small_prev_del = _('__Only whitespace__') - else: - EditDiff.small_prev_del = EditDiff.small_prev_del.replace("~~~~", "") - if EditDiff.small_prev_ins: - if EditDiff.small_prev_ins.replace("**", "").isspace(): - EditDiff.small_prev_ins = _('__Only whitespace__') - else: - EditDiff.small_prev_ins = EditDiff.small_prev_ins.replace("****", "") - logger.debug("Changed content: {}".format(EditDiff.small_prev_ins)) - if EditDiff.small_prev_del and not action == "new": - embed.add_field(_("Removed"), "{data}".format(data=EditDiff.small_prev_del), inline=True) - if EditDiff.small_prev_ins: - embed.add_field(_("Added"), "{data}".format(data=EditDiff.small_prev_ins), inline=True) - else: - logger.warning("Unable to download data on the edit content!") - elif action in ("upload/overwrite", "upload/upload", "upload/revert"): # sending files - license = None - try: - urls = await recent_changes.safe_request( - "{wiki}?action=query&format=json&prop=imageinfo&list=&meta=&titles={filename}&iiprop=timestamp%7Curl%7Carchivename&iilimit=5".format( - wiki=WIKI_API_PATH, filename=change["title"]), rate_limiter, "query", "pages") - except ClientResponseError: - # We could do this in safe_request but I don't know how that would affect other requests, - # prefer to have handling in here instead. When this happens, simply ignore the image preview - urls = None - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - additional_info_retrieved = False - if urls is not None: - logger.debug(urls) - if "-1" not in urls: # image still exists and not removed - try: - img_info = next(iter(urls.values()))["imageinfo"] - for num, revision in enumerate(img_info): - if revision["timestamp"] == change["logparams"]["img_timestamp"]: # find the correct revision corresponding for this log entry - image_direct_url = "{rev}?{cache}".format(rev=revision["url"], cache=int(time.time()*5)) # cachebusting - additional_info_retrieved = True - break - except KeyError: - logger.warning("Wiki did not respond with extended information about file. The preview will not be shown.") - else: - logger.warning("Request for additional image information have failed. The preview will not be shown.") - if action in ("upload/overwrite", "upload/revert"): - if additional_info_retrieved: - article_encoded = change["title"].replace(" ", "_").replace("%", "%25").replace("\\", "%5C").replace("&", "%26").replace(')', '\\)') - try: - revision = img_info[num+1] - except IndexError: - logger.exception("Could not analize the information about the image (does it have only one version when expected more in overwrite?) which resulted in no Options field: {}".format(img_info)) - else: - undolink = "{wiki}index.php?title={filename}&action=revert&oldimage={archiveid}".format( - wiki=WIKI_SCRIPT_PATH, filename=article_encoded, archiveid=revision["archivename"]) - embed.add_field(_("Options"), _("([preview]({link}) | [undo]({undolink}))").format( - link=image_direct_url, undolink=undolink)) - if message_target[0][1] > 1: - embed["image"]["url"] = image_direct_url - if action == "upload/overwrite": - embed["title"] = _("Uploaded a new version of {name}").format(name=change["title"]) - elif action == "upload/revert": - embed["title"] = _("Reverted a version of {name}").format(name=change["title"]) - else: - embed["title"] = _("Uploaded {name}").format(name=change["title"]) - if additional_info_retrieved: - embed.add_field(_("Options"), _("([preview]({link}))").format(link=image_direct_url)) - if message_target[0][1] > 1: - embed["image"]["url"] = image_direct_url - elif action == "delete/delete": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Deleted page {article}").format(article=change["title"]) - elif action == "delete/delete_redir": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Deleted redirect {article} by overwriting").format(article=change["title"]) - elif action == "move/move": - link = create_article_path(change["logparams"]['target_title'], WIKI_ARTICLE_PATH) - parsed_comment = "{supress}. {desc}".format(desc=parsed_comment, - supress=_("No redirect has been made") if "suppressredirect" in change["logparams"] else _( - "A redirect has been made")) - embed["title"] = _("Moved {redirect}{article} to {target}").format(redirect="⤷ " if "redirect" in change else "", article=change["title"], target=change["logparams"]['target_title']) - elif action == "move/move_redir": - link = create_article_path(change["logparams"]["target_title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Moved {redirect}{article} to {title} over redirect").format(redirect="⤷ " if "redirect" in change else "", article=change["title"], - title=change["logparams"]["target_title"]) - elif action == "protect/move_prot": - link = create_article_path(change["logparams"]["oldtitle_title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Moved protection settings from {redirect}{article} to {title}").format(redirect="⤷ " if "redirect" in change else "", article=change["logparams"]["oldtitle_title"], - title=change["title"]) - elif action == "block/block": - user = change["title"].split(':', 1)[1] - try: - ipaddress.ip_address(user) - link = create_article_path("Special:Contributions/{user}".format(user=user), WIKI_ARTICLE_PATH) - except ValueError: - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - if change["logparams"]["duration"] in ["infinite", "indefinite", "infinity", "never"]: - block_time = _("for infinity and beyond") - else: - english_length = re.sub(r"(\d+)", "", change["logparams"]["duration"]) # note that translation won't work for millenia and century yet - english_length_num = re.sub(r"(\D+)", "", change["logparams"]["duration"]) - try: - if "@" in english_length: - raise ValueError - english_length = english_length.rstrip("s").strip() - block_time = _("for {num} {translated_length}").format(num=english_length_num, translated_length=ngettext(english_length, english_length + "s", int(english_length_num))) - except (AttributeError, ValueError): - if "expiry" in change["logparams"]: - date_time_obj = datetime.datetime.strptime(change["logparams"]["expiry"], '%Y-%m-%dT%H:%M:%SZ') - block_time = _("until {}").format(date_time_obj.strftime("%Y-%m-%d %H:%M:%S UTC")) - else: - block_time = _("unknown expiry time") # THIS IS HERE JUST TEMPORARY AS A HOT FIX TO #157, will be changed with release of 1.13 - if "sitewide" not in change["logparams"]: - restriction_description = "" - if "restrictions" in change["logparams"]: - if "pages" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["pages"]: - restriction_description = _("Blocked from editing the following pages: ") - for page in change["logparams"]["restrictions"]["pages"]: - restricted_pages = ["*"+i["page_title"]+"*" for i in change["logparams"]["restrictions"]["pages"]] - restriction_description = restriction_description + ", ".join(restricted_pages) - if "namespaces" in change["logparams"]["restrictions"] and change["logparams"]["restrictions"]["namespaces"]: - namespaces = [] - if restriction_description: - restriction_description = restriction_description + _(" and namespaces: ") - else: - restriction_description = _("Blocked from editing pages on following namespaces: ") - for namespace in change["logparams"]["restrictions"]["namespaces"]: - if str(namespace) in additional_data.namespaces: # if we have cached namespace name for given namespace number, add its name to the list - namespaces.append("*{ns}*".format(ns=additional_data.namespaces[str(namespace)]["*"])) - else: - namespaces.append("*{ns}*".format(ns=namespace)) - restriction_description = restriction_description + ", ".join(namespaces) - restriction_description = restriction_description + "." - if len(restriction_description) > 1020: - logger.debug(restriction_description) - restriction_description = restriction_description[:1020]+"…" - embed.add_field(_("Partial block details"), restriction_description, inline=True) - embed["title"] = _("Blocked {blocked_user} {time}").format(blocked_user=user, time=block_time) - elif action == "block/reblock": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - user = change["title"].split(':', 1)[1] - embed["title"] = _("Changed block settings for {blocked_user}").format(blocked_user=user) - elif action == "block/unblock": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - user = change["title"].split(':', 1)[1] - embed["title"] = _("Unblocked {blocked_user}").format(blocked_user=user) - elif action == "curseprofile/comment-created": - if message_target[0][1] == 3: - parsed_comment = await recent_changes.pull_comment(change["logparams"]["4:comment_id"], WIKI_API_PATH, rate_limiter) - link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]), WIKI_ARTICLE_PATH) - target_user = change["title"].split(':', 1)[1] - if target_user != change["user"]: - embed["title"] = _("Left a comment on {target}'s profile").format(target=target_user) - else: - embed["title"] = _("Left a comment on their own profile") - elif action == "curseprofile/comment-replied": - if message_target[0][1] == 3: - parsed_comment = await recent_changes.pull_comment(change["logparams"]["4:comment_id"], WIKI_API_PATH, rate_limiter) - link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]), WIKI_ARTICLE_PATH) - target_user = change["title"].split(':', 1)[1] - if target_user != change["user"]: - embed["title"] = _("Replied to a comment on {target}'s profile").format(target=target_user) - else: - embed["title"] = _("Replied to a comment on their own profile") - elif action == "curseprofile/comment-edited": - if message_target[0][1] == 3: - parsed_comment = await recent_changes.pull_comment(change["logparams"]["4:comment_id"], WIKI_API_PATH, rate_limiter) - link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]), WIKI_ARTICLE_PATH) - target_user = change["title"].split(':', 1)[1] - if target_user != change["user"]: - embed["title"] = _("Edited a comment on {target}'s profile").format(target=target_user) - else: - embed["title"] = _("Edited a comment on their own profile") - elif action == "curseprofile/profile-edited": - target_user = change["title"].split(':', 1)[1] - link = create_article_path("UserProfile:{target}".format(target=target_user), WIKI_ARTICLE_PATH) - if target_user != change["user"]: - embed["title"] = _("Edited {target}'s profile").format(target=target_user) - else: - embed["title"] = _("Edited their own profile") - if not change["parsedcomment"]: # If the field is empty - parsed_comment = _("Cleared the {field} field").format(field=profile_field_name(change["logparams"]['4:section'], True, message_target[0][0])) - else: - parsed_comment = _("{field} field changed to: {desc}").format(field=profile_field_name(change["logparams"]['4:section'], True, message_target[0][0]), desc=BeautifulSoup(change["parsedcomment"], "lxml").get_text()) - elif action == "curseprofile/comment-purged": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - target_user = change["title"].split(':', 1)[1] - if target_user != change["user"]: - embed["title"] = _("Purged a comment on {target}'s profile").format(target=target_user) - else: - embed["title"] = _("Purged a comment on their own profile") - elif action == "curseprofile/comment-deleted": - if "4:comment_id" in change["logparams"]: - link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]), WIKI_ARTICLE_PATH) - else: - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - target_user = change["title"].split(':', 1)[1] - if target_user != change["user"]: - embed["title"] = _("Deleted a comment on {target}'s profile").format(target=target_user) - else: - embed["title"] = _("Deleted a comment on their own profile") - elif action in ("rights/rights", "rights/autopromote"): - link = create_article_path("User:{}".format(change["title"].split(":")[1]), WIKI_ARTICLE_PATH) - if action == "rights/rights": - embed["title"] = _("Changed group membership for {target}").format(target=change["title"].split(":")[1]) - else: - author_url = "" - embed.set_author(_("System"), author_url) - embed["title"] = _("{target} got autopromoted to a new usergroup").format( - target=change["title"].split(":")[1]) - if len(change["logparams"]["oldgroups"]) < len(change["logparams"]["newgroups"]): - embed["thumbnail"]["url"] = "https://i.imgur.com/WnGhF5g.gif" - old_groups = [] - new_groups = [] - for name in change["logparams"]["oldgroups"]: - old_groups.append(_(name)) - for name in change["logparams"]["newgroups"]: - new_groups.append(_(name)) - if len(old_groups) == 0: - old_groups = [_("none")] - if len(new_groups) == 0: - new_groups = [_("none")] - reason = ": {desc}".format(desc=parsed_comment) if parsed_comment != _("No description provided") else "" - parsed_comment = _("Groups changed from {old_groups} to {new_groups}{reason}").format( - old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), reason=reason) - elif action == "protect/protect": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Protected {target}").format(target=change["title"]) - parsed_comment = "{settings}{cascade} | {reason}".format(settings=change["logparams"].get("description", ""), - cascade=_(" [cascading]") if "cascade" in change["logparams"] else "", - reason=parsed_comment) - elif action == "protect/modify": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Changed protection level for {article}").format(article=change["title"]) - parsed_comment = "{settings}{cascade} | {reason}".format(settings=change["logparams"].get("description", ""), - cascade=_(" [cascading]") if "cascade" in change["logparams"] else "", - reason=parsed_comment) - elif action == "protect/unprotect": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Removed protection from {article}").format(article=change["title"]) - elif action == "delete/revision": - amount = len(change["logparams"]["ids"]) - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = ngettext("Changed visibility of revision on page {article} ", - "Changed visibility of {amount} revisions on page {article} ", amount).format( - article=change["title"], amount=amount) - elif action == "import/upload": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = ngettext("Imported {article} with {count} revision", - "Imported {article} with {count} revisions", change["logparams"]["count"]).format( - article=change["title"], count=change["logparams"]["count"]) - elif action == "delete/restore": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Restored {article}").format(article=change["title"]) - elif action == "delete/event": - link = create_article_path("Special:RecentChanges", WIKI_ARTICLE_PATH) - embed["title"] = _("Changed visibility of log events") - elif action == "import/interwiki": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = ngettext("Imported {article} with {count} revision from \"{source}\"", - "Imported {article} with {count} revisions from \"{source}\"", change["logparams"]["count"]).format( - article=change["title"], count=change["logparams"]["count"], source=change["logparams"]["interwiki_title"]) - elif action == "abusefilter/modify": - link = create_article_path("Special:AbuseFilter/history/{number}/diff/prev/{historyid}".format(number=change["logparams"]['newId'], historyid=change["logparams"]["historyId"]), WIKI_ARTICLE_PATH) - embed["title"] = _("Edited abuse filter number {number}").format(number=change["logparams"]['newId']) - elif action == "abusefilter/create": - link = create_article_path("Special:AbuseFilter/{number}".format(number=change["logparams"]['newId']), WIKI_ARTICLE_PATH) - embed["title"] = _("Created abuse filter number {number}").format(number=change["logparams"]['newId']) - elif action == "merge/merge": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Merged revision histories of {article} into {dest}").format(article=change["title"], - dest=change["logparams"]["dest_title"]) - elif action == "newusers/autocreate": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Created account automatically") - elif action == "newusers/create": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Created account") - elif action == "newusers/create2": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Created account {article}").format(article=change["title"]) - elif action == "newusers/byemail": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Created account {article} and password was sent by email").format(article=change["title"]) - elif action == "newusers/newusers": - link = author_url - embed["title"] = _("Created account") - elif action == "interwiki/iw_add": - link = create_article_path("Special:Interwiki", WIKI_ARTICLE_PATH) - embed["title"] = _("Added an entry to the interwiki table") - parsed_comment = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=parsed_comment, - prefix=change["logparams"]['0'], - website=change["logparams"]['1']) - elif action == "interwiki/iw_edit": - link = create_article_path("Special:Interwiki", WIKI_ARTICLE_PATH) - embed["title"] = _("Edited an entry in interwiki table") - parsed_comment = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=parsed_comment, - prefix=change["logparams"]['0'], - website=change["logparams"]['1']) - elif action == "interwiki/iw_delete": - link = create_article_path("Special:Interwiki", WIKI_ARTICLE_PATH) - embed["title"] = _("Deleted an entry in interwiki table") - parsed_comment = _("Prefix: {prefix} | {desc}").format(desc=parsed_comment, prefix=change["logparams"]['0']) - elif action == "contentmodel/change": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Changed the content model of the page {article}").format(article=change["title"]) - parsed_comment = _("Model changed from {old} to {new}: {reason}").format(old=change["logparams"]["oldmodel"], - new=change["logparams"]["newmodel"], - reason=parsed_comment) - elif action == "contentmodel/new": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Created the page {article} using a non-default content model").format(article=change["title"]) - parsed_comment = _("Created with model {new}: {reason}").format(new=change["logparams"]["newmodel"], reason=parsed_comment) - elif action == "sprite/sprite": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Edited the sprite for {article}").format(article=change["title"]) - elif action == "sprite/sheet": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Created the sprite sheet for {article}").format(article=change["title"]) - elif action == "sprite/slice": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Edited the slice for {article}").format(article=change["title"]) - elif action == "cargo/createtable": - table = re.search(r"\[(.*?)\]\(<(.*?)>\)", parse_link(paths[3], change["logparams"]["0"])) - link = table.group(2) - embed["title"] = _("Created the Cargo table \"{table}\"").format(table=table.group(1)) - parsed_comment = None - elif action == "cargo/deletetable": - link = create_article_path("Special:CargoTables", WIKI_ARTICLE_PATH) - embed["title"] = _("Deleted the Cargo table \"{table}\"").format(table=change["logparams"]["0"]) - parsed_comment = None - elif action == "cargo/recreatetable": - table = re.search(r"\[(.*?)\]\(<(.*?)>\)", parse_link(paths[3], change["logparams"]["0"])) - link = table.group(2) - embed["title"] = _("Recreated the Cargo table \"{table}\"").format(table=table.group(1)) - parsed_comment = None - elif action == "cargo/replacetable": - table = re.search(r"\[(.*?)\]\(<(.*?)>\)", parse_link(paths[3], change["logparams"]["0"])) - link = table.group(2) - embed["title"] = _("Replaced the Cargo table \"{table}\"").format(table=table.group(1)) - parsed_comment = None - elif action == "managetags/create": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Created the tag \"{tag}\"").format(tag=change["logparams"]["tag"]) - elif action == "managetags/delete": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Deleted the tag \"{tag}\"").format(tag=change["logparams"]["tag"]) - if change["logparams"]["count"] > 0: - embed.add_field(_('Removed from'), ngettext("{} revision or log entry", "{} revisions and/or log entries", change["logparams"]["count"]).format(change["logparams"]["count"])) - elif action == "managetags/activate": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Activated the tag \"{tag}\"").format(tag=change["logparams"]["tag"]) - elif action == "managetags/deactivate": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Deactivated the tag \"{tag}\"").format(tag=change["logparams"]["tag"]) - elif action == "managewiki/settings": # Miraheze's ManageWiki extension https://github.com/miraheze/ManageWiki - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Changed wiki settings") - if change["logparams"].get("changes", ""): - embed.add_field("Setting", change["logparams"].get("changes")) - elif action == "managewiki/delete": - embed["title"] = _("Deleted a \"{wiki}\" wiki").format(wiki=change["logparams"].get("wiki", _("Unknown"))) - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - elif action == "managewiki/lock": - embed["title"] = _("Locked a \"{wiki}\" wiki").format(wiki=change["logparams"].get("wiki", _("Unknown"))) - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - elif action == "managewiki/namespaces": - embed["title"] = _("Modified \"{namespace_name}\" namespace").format(namespace_name=change["logparams"].get("namespace", _("Unknown"))) - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed.add_field(_('Wiki'), change["logparams"].get("wiki", _("Unknown"))) - elif action == "managewiki/namespaces-delete": - embed["title"] = _("Deleted a \"{namespace_name}\" namespace").format( - namespace_name=change["logparams"].get("namespace", _("Unknown"))) - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed.add_field(_('Wiki'), change["logparams"].get("wiki", _("Unknown"))) - elif action == "managewiki/rights": - group_name = change["title"].split("/permissions/", 1)[1] - embed["title"] = _("Modified \"{usergroup_name}\" usergroup").format(usergroup_name=group_name) - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - elif action == "managewiki/undelete": - embed["title"] = _("Undeleted a \"{wiki}\" wiki").format(wiki=change["logparams"].get("wiki", _("Unknown"))) - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - elif action == "managewiki/unlock": - embed["title"] = _("Unlocked a \"{wiki}\" wiki").format(wiki=change["logparams"].get("wiki", _("Unknown"))) - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - elif action == "datadump/generate": - embed["title"] = _("Generated {file} dump").format(file=change["logparams"]["filename"]) - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - elif action == "datadump/delete": - embed["title"] = _("Deleted {file} dump").format(file=change["logparams"]["filename"]) - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - elif action == "pagetranslation/mark": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - if "?" in link: - link = link + "&oldid={}".format(change["logparams"]["revision"]) - else: - link = link + "?oldid={}".format(change["logparams"]["revision"]) - embed["title"] = _("Marked \"{article}\" for translation").format(article=change["title"]) - elif action == "pagetranslation/unmark": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Removed \"{article}\" from the translation system").format(article=change["title"]) - elif action == "pagetranslation/moveok": - link = create_article_path(change["logparams"]["target"], WIKI_ARTICLE_PATH) - embed["title"] = _("Completed moving translation pages from \"{article}\" to \"{target}\"").format(article=change["title"], target=change["logparams"]["target"]) - elif action == "pagetranslation/movenok": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Encountered a problem while moving \"{article}\" to \"{target}\"").format(article=change["title"], target=change["logparams"]["target"]) - elif action == "pagetranslation/deletefok": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Completed deletion of translatable page \"{article}\"").format(article=change["title"]) - elif action == "pagetranslation/deletefnok": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Failed to delete \"{article}\" which belongs to translatable page \"{target}\"").format(article=change["title"], target=change["logparams"]["target"]) - elif action == "pagetranslation/deletelok": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Completed deletion of translation page \"{article}\"").format(article=change["title"]) - elif action == "pagetranslation/deletelnok": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Failed to delete \"{article}\" which belongs to translation page \"{target}\"").format(article=change["title"], target=change["logparams"]["target"]) - elif action == "pagetranslation/encourage": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Encouraged translation of \"{article}\"").format(article=change["title"]) - elif action == "pagetranslation/discourage": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Discouraged translation of \"{article}\"").format(article=change["title"]) - elif action == "pagetranslation/prioritylanguages": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - if "languages" in change["logparams"]: - languages = "`, `".join(change["logparams"]["languages"].split(",")) - if change["logparams"]["force"] == "on": - embed["title"] = _("Limited languages for \"{article}\" to `{languages}`").format(article=change["title"], languages=languages) - else: - embed["title"] = _("Priority languages for \"{article}\" set to `{languages}`").format(article=change["title"], languages=languages) - else: - embed["title"] = _("Removed priority languages from \"{article}\"").format(article=change["title"]) - elif action == "pagetranslation/associate": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Added translatable page \"{article}\" to aggregate group \"{group}\"").format(article=change["title"], group=change["logparams"]["aggregategroup"]) - elif action == "pagetranslation/dissociate": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Removed translatable page \"{article}\" from aggregate group \"{group}\"").format(article=change["title"], group=change["logparams"]["aggregategroup"]) - elif action == "translationreview/message": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - if "?" in link: - link = link + "&oldid={}".format(change["logparams"]["revision"]) - else: - link = link + "?oldid={}".format(change["logparams"]["revision"]) - embed["title"] = _("Reviewed translation \"{article}\"").format(article=change["title"]) - elif action == "translationreview/group": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - embed["title"] = _("Changed the state of `{language}` translations of \"{article}\"").format(language=change["logparams"]["language"], article=change["title"]) - if "old-state" in change["logparams"]: - embed.add_field(_("Old state"), change["logparams"]["old-state"], inline=True) - embed.add_field(_("New state"), change["logparams"]["new-state"], inline=True) - elif action == "pagelang/pagelang": - link = create_article_path(change["title"], WIKI_ARTICLE_PATH) - old_lang = "`{}`".format(change["logparams"]["oldlanguage"]) - if change["logparams"]["oldlanguage"][-5:] == "[def]": - old_lang = "`{}` {}".format(change["logparams"]["oldlanguage"][:-5], _("(default)")) - new_lang = "`{}`".format(change["logparams"]["newlanguage"]) - if change["logparams"]["newlanguage"][-5:] == "[def]": - new_lang = "`{}` {}".format(change["logparams"]["oldlanguage"][:-5], _("(default)")) - embed["title"] = _("Changed the language of \"{article}\"").format(article=change["title"]) - embed.add_field(_("Old language"), old_lang, inline=True) - embed.add_field(_("New language"), new_lang, inline=True) - elif action == "renameuser/renameuser": - edits = change["logparams"]["edits"] - if edits > 0: - embed["title"] = ngettext("Renamed user \"{old_name}\" with {edits} edit to \"{new_name}\"", "Renamed user \"{old_name}\" with {edits} edits to \"{new_name}\"", edits).format(old_name=change["logparams"]["olduser"], edits=edits, new_name=change["logparams"]["newuser"]) - else: - embed["title"] = _("Renamed user \"{old_name}\" to \"{new_name}\"").format(old_name=change["logparams"]["olduser"], new_name=change["logparams"]["newuser"]) - link = create_article_path("User:"+change["logparams"]["newuser"], WIKI_ARTICLE_PATH) - elif action == "suppressed": - link = create_article_path("", WIKI_ARTICLE_PATH) - embed.set_author(_("Unknown")) - else: - logger.warning("No entry for {event} with params: {params}".format(event=action, params=change)) - link = create_article_path("Special:RecentChanges", WIKI_ARTICLE_PATH) - embed["title"] = _("Unknown event `{event}`").format(event=action) - embed.event_type = "unknown" - if settings.get("support", None): - change_params = "[```json\n{params}\n```]({support})".format(params=json.dumps(change, indent=2), support=settings["support"]) - if len(change_params) > 1000: - embed.add_field(_("Report this on the support server"), settings["support"]) - else: - embed.add_field(_("Report this on the support server"), change_params) - embed["url"] = link - if parsed_comment is not None: - embed["description"] = parsed_comment - embed["timestamp"] = change["timestamp"] - if "tags" in change and change["tags"]: - tag_displayname = [] - for tag in change["tags"]: - if tag in additional_data["tags"]: - if additional_data["tags"][tag] is None: - continue # Ignore hidden tags - else: - tag_displayname.append(additional_data["tags"][tag]) - else: - tag_displayname.append(tag) - if tag_displayname: - embed.add_field(_("Tags"), ", ".join(tag_displayname)) - if len(embed["title"]) > 254: - embed["title"] = embed["title"][0:253]+"…" - logger.debug("Current params in edit action: {}".format(change)) - if categories and not (len(categories["new"]) == 0 and len(categories["removed"]) == 0): - new_cat = (_("**Added**: ") + ", ".join(list(categories["new"])[0:16]) + ("\n" if len(categories["new"])<=15 else _(" and {} more\n").format(len(categories["new"])-15))) if categories["new"] else "" - del_cat = (_("**Removed**: ") + ", ".join(list(categories["removed"])[0:16]) + ("" if len(categories["removed"])<=15 else _(" and {} more").format(len(categories["removed"])-15))) if categories["removed"] else "" - embed.add_field(_("Changed categories"), new_cat + del_cat) - embed.finish_embed() - return embed diff --git a/src/queue_handler.py b/src/queue_handler.py index e76c882..7d4163e 100644 --- a/src/queue_handler.py +++ b/src/queue_handler.py @@ -1,18 +1,17 @@ import asyncio import collections import logging -from typing import Union +from typing import Union, Optional import asyncpg -from src.database import db - logger = logging.getLogger("rcgcdb.queue_handler") class UpdateDB: def __init__(self): self.updated: list[tuple[str, tuple[Union[str, int]]]] = [] + self.db: Optional[] = None def add(self, sql_expression): self.updated.append(sql_expression) @@ -21,7 +20,7 @@ class UpdateDB: self.updated.clear() async def fetch_rows(self, SQLstatement: str, args: Union[str, int]) -> collections.AsyncIterable: - async with db.pool().acquire() as connection: + async with self.db.pool().acquire() as connection: async with connection.transaction(): async for row in connection.cursor(SQLstatement, *args): yield row @@ -30,7 +29,7 @@ class UpdateDB: try: while True: if self.updated: - async with db.pool().acquire() as connection: + async with self.db.pool().acquire() as connection: async with connection.transaction(): for update in self.updated: await connection.execute(update[0], *update[1]) @@ -38,12 +37,12 @@ class UpdateDB: await asyncio.sleep(10.0) except asyncio.CancelledError: logger.info("Shutting down after updating DB with {} more entries...".format(len(self.updated))) - async with db.pool().acquire() as connection: + async with self.db.pool().acquire() as connection: async with connection.transaction(): for update in self.updated: await connection.execute(update[0], *update[1]) self.clear_list() - await db.shutdown_connection() + await self.db.shutdown_connection() -DBHandler = UpdateDB() +dbmanager = UpdateDB() diff --git a/src/wiki.py b/src/wiki.py index 603f356..403ae46 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -12,8 +12,7 @@ from api.util import default_message from src.discord.queue import messagequeue, QueueEntry from mw_messages import MWMessages from src.exceptions import * -from src.queue_handler import DBHandler -from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter +from src.queue_handler import UpdateDB from src.api.hooks import formatter_hooks from src.api.client import Client from src.api.context import Context @@ -80,6 +79,63 @@ class Wiki: def set_domain(self, domain: Domain): self.domain = domain + # def find_middle_next(ids: List[str], pageid: int) -> set: # TODO Properly re-implement for RcGcDb + # """To address #235 RcGcDw should now remove diffs in next revs relative to redacted revs to protect information in revs that revert revdeleted information. + # + # :arg ids - list + # :arg pageid - int + # + # :return list""" + # ids = [int(x) for x in ids] + # result = set() + # ids.sort() # Just to be sure, sort the list to make sure it's always sorted + # messages = db_cursor.execute("SELECT revid FROM event WHERE pageid = ? AND revid >= ? ORDER BY revid", + # (pageid, ids[0],)) + # all_in_page = [x[0] for x in messages.fetchall()] + # for id in ids: + # try: + # result.add(all_in_page[all_in_page.index(id) + 1]) + # except (KeyError, ValueError): + # logger.debug(f"Value {id} not in {all_in_page} or no value after that.") + # return result - set(ids) + + def search_message_history(self, params: dict) -> list[tuple[StackedDiscordMessage, list[int]]]: + """Search self.message_history for messages which match all properties in params and return them in a list""" + output = [] + for message in self.message_history: + returned_matches_for_stacked = message.filter(params) + if returned_matches_for_stacked: + output.append((message, [x[0] for x in returned_matches_for_stacked])) + return output + + def delete_messages(self, params: dict): + """Delete certain messages from message_history which DiscordMessageMetadata matches all properties in params""" + # Delete all messages with given IDs + for stacked_message, ids in self.search_message_history(params): + stacked_message.delete_message_by_id(ids) + # If all messages were removed, send a DELETE to Discord + if len(stacked_message.message_list) == 0: + messagequeue.add_message(QueueEntry(stacked_message, [stacked_message.webhook], self, method="DELETE")) + else: + messagequeue.add_message(QueueEntry(stacked_message, [stacked_message.webhook], self, method="PATCH")) + + def redact_messages(self, ids: list[int], mode: str, censored_properties: dict): + # ids can refer to multiple events, and search does not support additive mode, so we have to loop it for all ids + for revlogid in ids: + for stacked_message, ids in self.search_message_history({mode: revlogid}): # This might not work depending on how Python handles it, but hey, learning experience + for message in [message for num, message in enumerate(stacked_message.message_list) if num in ids]: + if "user" in censored_properties and "url" in message["author"]: + message["author"]["name"] = _("hidden") + message["author"].pop("url") + if "action" in censored_properties and "url" in message: + message["title"] = _("~~hidden~~") + message["embed"].pop("url") + if "content" in censored_properties and "fields" in message: + message["embed"].pop("fields") + if "comment" in censored_properties: + message["description"] = _("~~hidden~~") + messagequeue.add_message(QueueEntry(stacked_message, [stacked_message.webhook], self, method="PATCH")) + # async def downtime_controller(self, down, reason=None): # if down: # self.fail_times += 1 @@ -341,21 +397,21 @@ async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, displ else: raise if identification_string in ("delete/delete", "delete/delete_redir"): # TODO Move it into a hook? - delete_messages(dict(pageid=change.get("pageid"))) + wiki.delete_messages(dict(pageid=change.get("pageid"))) elif identification_string == "delete/event": logparams = change.get('logparams', {"ids": []}) if settings["appearance"]["mode"] == "embed": - redact_messages(logparams.get("ids", []), 1, logparams.get("new", {})) + wiki.redact_messages(logparams.get("ids", []), "rev_id", logparams.get("new", {})) else: for logid in logparams.get("ids", []): - delete_messages(dict(logid=logid)) + wiki.delete_messages(dict(logid=logid)) elif identification_string == "delete/revision": logparams = change.get('logparams', {"ids": []}) if settings["appearance"]["mode"] == "embed": - redact_messages(logparams.get("ids", []), 0, logparams.get("new", {})) + wiki.redact_messages(logparams.get("ids", []), "log_id", logparams.get("new", {})) else: for revid in logparams.get("ids", []): - delete_messages(dict(revid=revid)) + wiki.delete_messages(dict(revid=revid)) discord_message.finish_embed() if discord_message: discord_message.metadata = metadata @@ -418,7 +474,7 @@ async def process_cats(event: dict, local_wiki: Wiki, categorize_events: dict): # mw_msgs[key] = msgs # it may be a little bit messy for sure, however I don't expect any reason to remove mw_msgs entries by one # local_wiki.mw_messages = key -async def essential_feeds(change: dict, comment_pages: dict, db_wiki, target: tuple) -> discord.DiscordMessage: +async def essential_feeds(change: dict, comment_pages: dict, db_wiki, target: tuple) -> DiscordMessage: """Prepares essential information for both embed and compact message format.""" appearance_mode = feeds_embed_formatter if target[0][1] > 0 else feeds_compact_formatter identification_string = change["_embedded"]["thread"][0]["containerType"]