diff --git a/src/bot.py b/src/bot.py index 6993d59..2809711 100644 --- a/src/bot.py +++ b/src/bot.py @@ -16,7 +16,6 @@ from src.misc import get_paths from src.msgqueue import messagequeue from src.queue_handler import DBHandler from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essential_feeds -from src.formatters.discussions import embed_formatter, compact_formatter from src.discord import DiscordMessage, formatter_exception_logger, msg_sender_exception_logger logging.config.dictConfig(settings["logging"]) diff --git a/src/formatters/discussions.py b/src/formatters/discussions.py index e0dd68e..2de2f1e 100644 --- a/src/formatters/discussions.py +++ b/src/formatters/discussions.py @@ -3,133 +3,135 @@ import json from urllib.parse import quote_plus from src.config import settings -from src.misc import send_to_discord, escape_formatting -from discord import DiscordMessage +from src.misc import link_formatter, create_article_path, escape_formatting +from src.discord import DiscordMessage +from src.msgqueue import send_to_discord -discussion_logger = logging.getLogger("rcgcdw.discussion_formatter") +logger = logging.getLogger("rcgcdw.discussion_formatters") -def feeds_embed_formatter(post_type, post, message_target, wiki, _): - """Embed formatter for Fandom discussions.""" - embed = DiscordMessage("embed", "discussion", settings["fandom_discussions"]["webhookURL"]) - embed.set_author(post["createdBy"]["name"], "{wikiurl}f/u/{creatorId}".format( - wikiurl=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"]), icon_url=post["createdBy"]["avatarUrl"]) - discussion_post_type = post["_embedded"]["thread"][0].get("containerType", "FORUM") # Can be FORUM, ARTICLE_COMMENT or WALL on UCP - if post_type == "TEXT": - if post["isReply"]: - if discussion_post_type == "FORUM": - embed.event_type = "discussion/forum/reply" - embed["title"] = _("Replied to \"{title}\"").format(title=post["_embedded"]["thread"][0]["title"]) - embed["url"] = "{wikiurl}f/p/{threadId}/r/{postId}".format( - wikiurl=settings["fandom_discussions"]["wiki_url"], threadId=post["threadId"], postId=post["id"]) - elif discussion_post_type == "ARTICLE_COMMENT": - discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037") - return - elif discussion_post_type == "WALL": - user_wall = _("unknown") # Fail safe - embed.event_type = "discussion/wall/reply" - if post["forumName"].endswith(' Message Wall'): - user_wall = post["forumName"][:-13] - embed["url"] = "{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}".format(wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"]) - embed["title"] = _("Replied to \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall) - else: - if discussion_post_type == "FORUM": - embed.event_type = "discussion/forum/post" - embed["title"] = _("Created \"{title}\"").format(title=post["title"]) - embed["url"] = "{wikiurl}f/p/{threadId}".format(wikiurl=settings["fandom_discussions"]["wiki_url"], - threadId=post["threadId"]) - elif discussion_post_type == "ARTICLE_COMMENT": - discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037") - return - elif discussion_post_type == "WALL": - user_wall = _("unknown") # Fail safe - embed.event_type = "discussion/wall/post" - if post["forumName"].endswith(' Message Wall'): - user_wall = post["forumName"][:-13] - embed["url"] = "{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}".format( - wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), - threadid=post["threadId"]) - embed["title"] = _("Created \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall) - if settings["fandom_discussions"]["appearance"]["embed"]["show_content"]: - if post.get("jsonModel") is not None: - npost = DiscussionsFromHellParser(post) - embed["description"] = npost.parse() - if npost.image_last: - embed["image"]["url"] = npost.image_last - embed["description"] = embed["description"].replace(npost.image_last, "") - else: # Fallback when model is not available - embed["description"] = post.get("rawContent", "") - elif post_type == "POLL": - embed.event_type = "discussion/forum/poll" - poll = post["poll"] - embed["title"] = _("Created a poll titled \"{title}\"").format(title=poll["question"]) - image_type = False - if poll["answers"][0]["image"] is not None: - image_type = True - for num, option in enumerate(poll["answers"]): - embed.add_field(option["text"] if image_type is True else _("Option {}").format(num+1), - option["text"] if image_type is False else _("__[View image]({image_url})__").format(image_url=option["image"]["url"]), - inline=True) - else: - # UNKNOWN EVENT - embed["footer"]["text"] = post["forumName"] - embed["timestamp"] = datetime.datetime.fromtimestamp(post["creationDate"]["epochSecond"], tz=datetime.timezone.utc).isoformat() - embed.finish_embed() - send_to_discord(embed) - - -def feeds_compact_formatter(post_type, post, message_target, wiki, _): +async def feeds_compact_formatter(post_type, post, message_target, wiki, _): """Compact formatter for Fandom discussions.""" message = None - discussion_post_type = post["_embedded"]["thread"][0].get("containerType", - "FORUM") # Can be FORUM, ARTICLE_COMMENT or WALL on UCP - if post_type == "TEXT": + if post_type == "FORUM": if not post["isReply"]: - if discussion_post_type == "FORUM": - message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{url}f/p/{threadId}>) in {forumName}").format( - author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"]) - elif discussion_post_type == "ARTICLE_COMMENT": - discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037") - return - elif discussion_post_type == "WALL": - user_wall = _("unknown") # Fail safe - if post["forumName"].endswith(' Message Wall'): - user_wall = post["forumName"][:-13] - message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}>) on {user}'s Message Wall").format( - author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall, - wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"] - ) + thread_funnel = post.get("funnel") + msg_text = "[{author}](<{url}f/u/{creatorId}>) created [{title}](<{url}f/p/{threadId}>) in {forumName}" + if thread_funnel == "POLL": + msg_text = "[{author}](<{url}f/u/{creatorId}>) created a poll [{title}](<{url}f/p/{threadId}>) in {forumName}" + elif thread_funnel != "TEXT": + logger.warning("No entry for {event} with params: {params}".format(event=thread_funnel, params=post)) + message = _(msg_text).format(author=post["createdBy"]["name"], url=wiki, creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"]) else: - if discussion_post_type == "FORUM": - message = _("[{author}](<{url}f/u/{creatorId}>) created a [reply](<{url}f/p/{threadId}/r/{postId}>) to [{title}](<{url}f/p/{threadId}>) in {forumName}").format( - author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], threadId=post["threadId"], postId=post["id"], title=post["_embedded"]["thread"][0]["title"], forumName=post["forumName"] - ) - elif discussion_post_type == "ARTICLE_COMMENT": - discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037") - return - elif discussion_post_type == "WALL": - user_wall = _("unknown") # Fail safe - if post["forumName"].endswith(' Message Wall'): - user_wall = post["forumName"][:-13] - message = _( - "[{author}](<{url}f/u/{creatorId}>) replied to [{title}](<{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}>) on {user}'s Message Wall").format( - author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall, - wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"]) - elif post_type == "POLL": - message = _( - "[{author}](<{url}f/u/{creatorId}>) created a poll [{title}](<{url}f/p/{threadId}>) in {forumName}").format( - author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], - creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"]) + message = _("[{author}](<{url}f/u/{creatorId}>) created a [reply](<{url}f/p/{threadId}/r/{postId}>) to [{title}](<{url}f/p/{threadId}>) in {forumName}").format(author=post["createdBy"]["name"], url=wiki, creatorId=post["creatorId"], threadId=post["threadId"], postId=post["id"], title=post["_embedded"]["thread"][0]["title"], forumName=post["forumName"]) + elif post_type == "WALL": + user_wall = _("unknown") # Fail safe + if post["forumName"].endswith(' Message Wall'): + user_wall = post["forumName"][:-13] + if not post["isReply"]: + message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{url}wiki/Message_Wall:{user_wall}?threadId={threadId}>) on [{user}'s Message Wall](<{url}wiki/Message_Wall:{user_wall}>)").format(author=post["createdBy"]["name"], url=wiki, creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall, user_wall=quote_plus(user_wall.replace(" ", "_")), threadId=post["threadId"]) + else: + message = _("[{author}](<{url}f/u/{creatorId}>) created a [reply](<{url}wiki/Message_Wall:{user_wall}?threadId={threadId}#{replyId}>) to [{title}](<{url}wiki/Message_Wall:{user_wall}?threadId={threadId}) on [{user}'s Message Wall](<{url}wiki/Message_Wall:{user_wall}>)").format(author=post["createdBy"]["name"], url=wiki, creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall, user_wall=quote_plus(user_wall.replace(" ", "_")), threadId=post["threadId"], replyId=post["id"]) + elif post_type == "ARTICLE_COMMENT": + article_page = _("unknown") # No page known + if not post["isReply"]: + message = _("[{author}](<{url}f/u/{creatorId}>) created a [comment](<{url}wiki/{article}?threadId={threadId}>) on [{article}](<{url}wiki/{article}>)").format(author=post["createdBy"]["name"], url=wiki, creatorId=post["creatorId"], article=article_page, threadId=post["threadId"]) + else: + message = _("[{author}](<{url}f/u/{creatorId}>) created a [reply](<{url}wiki/{article}?threadId={threadId}) to a [comment](<{url}wiki/{article}?threadId={threadId}#{replyId}>) on [{article}](<{url}wiki/{article}>)").format(author=post["createdBy"]["name"], url=wiki, creatorId=post["creatorId"], article=article_page, threadId=post["threadId"], replyId=post["id"]) else: - # UNKNOWN EVENT - send_to_discord(DiscordMessage("compact", "discussion", settings["fandom_discussions"]["webhookURL"], content=message)) + logger.warning("No entry for {event} with params: {params}".format(event=post_type, params=post)) + if not settings["support"]: + return + else: + content = _("Unknown event `{event}` by [{author}]({author_url}), report it on the [support server](<{support}>).").format(event=post_type, author=post["createdBy"]["name"], author_url=link_formatter(create_article_path("User:{user}".format(user=post["createdBy"]["name"]), "{url}wiki/$1".format(url=wiki))), support=settings["support"]) + await send_to_discord(DiscordMessage("compact", "discussion", message_target[1], content=message, wiki=wiki)) + + +async def feeds_embed_formatter(post_type, post, message_target, wiki, _): + """Embed formatter for Fandom discussions.""" + embed = DiscordMessage("embed", "discussion", message_target[1], wiki=wiki) + if post_type == "FORUM": + embed.set_author(post["createdBy"]["name"], "{url}f/u/{creatorId}".format(url=wiki, creatorId=post["creatorId"]), icon_url=post["createdBy"]["avatarUrl"]) + else: + embed.set_author(post["createdBy"]["name"], "{url}wiki/User:{creator}".format(url=wiki, creator=post["createdBy"]["name"]), icon_url=post["createdBy"]["avatarUrl"]) + if message_target[0][1] == 3: + if post.get("jsonModel") is not None: + npost = DiscussionsFromHellParser(post, wiki) + embed["description"] = npost.parse() + if npost.image_last: + embed["image"]["url"] = npost.image_last + embed["description"] = embed["description"].replace(npost.image_last, "") + else: # Fallback when model is not available + embed["description"] = post.get("rawContent", "") + embed["footer"]["text"] = post["forumName"] + embed["timestamp"] = datetime.datetime.fromtimestamp(post["creationDate"]["epochSecond"], tz=datetime.timezone.utc).isoformat() + if post_type == "FORUM": + if not post["isReply"]: + embed["url"] = "{url}f/p/{threadId}".format(url=wiki, threadId=post["threadId"]) + embed["title"] = _("Created \"{title}\"").format(title=post["title"]) + thread_funnel = post.get("funnel") + if thread_funnel == "POLL": + embed.event_type = "discussion/forum/poll" + poll = post["poll"] + embed["title"] = _("Created a poll \"{title}\"").format(title=poll["question"]) + image_type = False + if poll["answers"][0]["image"] is not None: + image_type = True + for num, option in enumerate(poll["answers"]): + embed.add_field(option["text"] if image_type is True else _("Option {}").format(num+1), + option["text"] if image_type is False else _("__[View image]({image_url})__").format(image_url=option["image"]["url"]), + inline=True) + elif thread_funnel == "TEXT": + embed.event_type = "discussion/forum/post" + else: + logger.warning("No entry for {event} with params: {params}".format(event=thread_funnel, params=post)) + else: + embed.event_type = "discussion/forum/reply" + embed["title"] = _("Replied to \"{title}\"").format(title=post["_embedded"]["thread"][0]["title"]) + embed["url"] = "{url}f/p/{threadId}/r/{postId}".format(url=wiki, threadId=post["threadId"], postId=post["id"]) + elif post_type == "WALL": + user_wall = _("unknown") # Fail safe + if post["forumName"].endswith(' Message Wall'): + user_wall = post["forumName"][:-13] + if not post["isReply"]: + embed.event_type = "discussion/wall/post" + embed["url"] = "{url}wiki/Message_Wall:{user_wall}?threadId={threadid}".format(url=wiki, user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"]) + embed["title"] = _("Created \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall) + else: + embed.event_type = "discussion/wall/reply" + embed["url"] = "{url}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}".format(url=wiki, user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"]) + embed["title"] = _("Replied to \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall) + elif post_type == "ARTICLE_COMMENT": + article_page = _("unknown") # No page known + if not post["isReply"]: + embed.event_type = "discussion/comment/post" + # embed["url"] = "{url}wiki/{article}?threadId={threadid}".format(url=wiki, article=quote_plus(article_page.replace(" ", "_")), threadid=post["threadId"]) + embed["title"] = _("Commented on {article}").format(article=article_page) + else: + embed.event_type = "discussion/comment/reply" + # embed["url"] = "{url}wiki/{article}?threadId={threadid}#{replyId}".format(url=wiki, article=quote_plus(article_page.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"]) + embed["title"] = _("Replied to a comment on {article}").format(article=article_page) + embed["footer"]["text"] = article_page + else: + logger.warning("No entry for {event} with params: {params}".format(event=post_type, params=post)) + embed["title"] = _("Unknown event `{event}`").format(event=post_type) + embed["color"] = 0 + if settings["support"]: + change_params = "[```json\n{params}\n```]({support})".format(params=json.dumps(post, indent=2), support=settings["support"]) + if len(change_params) > 1000: + embed.add_field(_("Report this on the support server"), settings["support"]) + else: + embed.add_field(_("Report this on the support server"), change_params) + embed.finish_embed() + await send_to_discord(embed) class DiscussionsFromHellParser: """This class converts fairly convoluted Fandom jsonModal of a discussion post into Markdown formatted usable thing. Takes string, returns string. Kudos to MarkusRost for allowing me to implement this formatter based on his code in Wiki-Bot.""" - def __init__(self, post): + def __init__(self, post, wiki): self.post = post + self.wiki = wiki self.jsonModal = json.loads(post.get("jsonModel", "{}")) self.markdown_text = "" self.item_num = 1 @@ -164,17 +166,17 @@ class DiscussionsFromHellParser: self.parse_content(item["content"], item["type"]) self.markdown_text += "\n" elif item["type"] == "openGraph": - if not item["attrs"]["wasAddedWithInlineLink"]: + if not item["attrs"].get("wasAddedWithInlineLink", False): self.markdown_text = "{old}{link}\n".format(old=self.markdown_text, link=item["attrs"]["url"]) elif item["type"] == "image": try: - discussion_logger.debug(item["attrs"]["id"]) + logger.debug(item["attrs"]["id"]) if item["attrs"]["id"] is not None: self.markdown_text = "{old}{img_url}\n".format(old=self.markdown_text, img_url=self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"]) self.image_last = self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"] except (IndexError, ValueError): - discussion_logger.warning("Image {} not found.".format(item["attrs"]["id"])) - discussion_logger.debug(self.markdown_text) + logger.warning("Image {} not found.".format(item["attrs"]["id"])) + logger.debug(self.markdown_text) elif item["type"] == "code_block": self.markdown_text += "```\n" if "content" in item: @@ -196,7 +198,7 @@ class DiscussionsFromHellParser: for mark in marks: if mark["type"] == "mention": prefix += "[" - suffix = "]({wiki}f/u/{userid}){suffix}".format(wiki=settings["fandom_discussions"]["wiki_url"], userid=mark["attrs"]["userId"], suffix=suffix) + suffix = "]({wiki}f/u/{userid}){suffix}".format(wiki=self.wiki, userid=mark["attrs"]["userId"], suffix=suffix) elif mark["type"] == "strong": prefix += "**" suffix = "**{suffix}".format(suffix=suffix) diff --git a/src/misc.py b/src/misc.py index 78c47b5..c8fcc3a 100644 --- a/src/misc.py +++ b/src/misc.py @@ -39,13 +39,13 @@ class LinkParser(HTMLParser): def handle_data(self, data): if self.recent_href: - self.new_string = self.new_string + "[{}](<{}>)".format(data.replace("//", "/\\/"), self.recent_href) + self.new_string = self.new_string + "[{}](<{}>)".format(escape_formatting(data), self.recent_href) self.recent_href = "" else: - self.new_string = self.new_string + data.replace("//", "/\\/") + self.new_string = self.new_string + escape_formatting(data) def handle_comment(self, data): - self.new_string = self.new_string + data.replace("//", "/\\/") + self.new_string = self.new_string + escape_formatting(data) def handle_endtag(self, tag): # logger.debug(self.new_string) @@ -71,7 +71,7 @@ def link_formatter(link: str) -> str: def escape_formatting(data: str) -> str: """Escape Discord formatting""" - return re.sub(r"([`_*~<>{}@/|\\])", "\\\\\\1", data, 0) + return re.sub(r"([`_*~:<>{}@/|\\\[\]\(\)])", "\\\\\\1", data, 0) def create_article_path(article: str, WIKI_ARTICLE_PATH: str) -> str: @@ -125,7 +125,7 @@ class ContentParser(HTMLParser): self.added = True def handle_data(self, data): - data = re.sub(r"([`_*~<>{}@/|\\])", "\\\\\\1", data, 0) + data = escape_formatting(data) if self.current_tag == "ins" and self.ins_length <= 1000: self.ins_length += len("**" + data + '**') if self.ins_length <= 1000: diff --git a/src/queue_handler.py b/src/queue_handler.py index f2a3f93..234f3f9 100644 --- a/src/queue_handler.py +++ b/src/queue_handler.py @@ -8,7 +8,7 @@ class UpdateDB: def __init__(self): self.updated = [] - def add(self, wiki, rc_id, feeds: None): + def add(self, wiki, rc_id, feeds=None): self.updated.append((wiki, rc_id, feeds)) def clear_list(self): diff --git a/src/wiki.py b/src/wiki.py index e97966b..d162ca1 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -202,7 +202,6 @@ async def essential_info(change: dict, changed_categories, local_wiki: Wiki, db_ return if "commenthidden" not in change: parsed_comment = parse_link(paths[3], change["parsedcomment"]) - parsed_comment = re.sub(r"(`|_|\*|~|{|}|\|\|)", "\\\\\\1", parsed_comment, 0) else: parsed_comment = _("~~hidden~~") if not parsed_comment: @@ -234,4 +233,5 @@ async def essential_feeds(change: dict, db_wiki: tuple, target: tuple): lang = langs[target[0][0]] appearance_mode = feeds_embed_formatter if target[0][1] > 0 else feeds_compact_formatter - await appearance_mode(change.get("funnel", "TEXT"), change, target, db_wiki["wiki"], _) + identification_string = change["_embedded"]["thread"][0]["containerType"] + await appearance_mode(identification_string, change, target, db_wiki["wiki"], _)