Merge branch 'master' into wiki-rate-limiting

This commit is contained in:
Frisk 2020-08-03 13:08:57 +02:00
commit c1831b992b
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
7 changed files with 268 additions and 191 deletions

View file

@ -15,7 +15,7 @@ from src.exceptions import *
from src.misc import get_paths
from src.msgqueue import messagequeue
from src.queue_handler import DBHandler
from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info
from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essential_feeds
from src.discord import DiscordMessage, formatter_exception_logger, msg_sender_exception_logger
logging.config.dictConfig(settings["logging"])
@ -72,11 +72,12 @@ async def wiki_scanner():
'SELECT webhook, wiki, lang, display, wikiid, rcid, postid FROM rcgcdw GROUP BY wiki')
for db_wiki in fetch_all.fetchall():
logger.debug("Wiki {}".format(db_wiki["wiki"]))
extended = False
if db_wiki["wiki"] not in all_wikis:
logger.info("Registering new wiki locally: {}".format(db_wiki["wiki"]))
all_wikis[db_wiki["wiki"]] = Wiki()
local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory
if db_wiki["rcid"] != -1:
extended = False
if local_wiki.mw_messages is None:
extended = True
async with aiohttp.ClientSession(headers=settings["header"],
@ -122,8 +123,8 @@ async def wiki_scanner():
if change["rcid"] > db_wiki["rcid"]:
for target in targets.items():
try:
await essential_info(change, categorize_events, local_wiki, db_wiki, target, paths,
recent_changes_resp)
await essential_info(change, categorize_events, local_wiki, db_wiki,
target, paths, recent_changes_resp)
except:
if command_line_args.debug:
raise # reraise the issue
@ -132,8 +133,58 @@ async def wiki_scanner():
await formatter_exception_logger(db_wiki["wiki"], change, traceback.format_exc())
if recent_changes:
DBHandler.add(db_wiki["wiki"], change["rcid"])
await asyncio.sleep(delay=2.0) # temporary measure until rate limiting is not implemented
if db_wiki["wikiid"] is not None:
header = settings["header"]
header["Accept"] = "application/hal+json"
async with aiohttp.ClientSession(headers=header,
timeout=aiohttp.ClientTimeout(3.0)) as session:
try:
feeds_response = await local_wiki.fetch_feeds(db_wiki["wikiid"], session)
except (WikiServerError, WikiError):
logger.error("Exeption when fetching the wiki")
continue # ignore this wiki if it throws errors
try:
discussion_feed_resp = await feeds_response.json(encoding="UTF-8")
if "title" in discussion_feed_resp:
error = discussion_feed_resp["error"]
if error == "site doesn't exists":
db_cursor.execute("UPDATE rcgcdw SET wikiid = ? WHERE wiki = ?",
(None, db_wiki["wiki"],))
DBHandler.update_db()
continue
raise WikiError
discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"]
discussion_feed.reverse()
except aiohttp.ContentTypeError:
logger.exception("Wiki seems to be resulting in non-json content.")
continue
except:
logger.exception("On loading json of response.")
continue
if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel
if len(discussion_feed) > 0:
DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True)
else:
DBHandler.add(db_wiki["wiki"], "0", True)
DBHandler.update_db()
continue
targets = generate_targets(db_wiki["wiki"])
for post in discussion_feed:
if post["id"] > db_wiki["postid"]:
for target in targets.items():
try:
await essential_feeds(post, db_wiki, target)
except:
if command_line_args.debug:
raise # reraise the issue
else:
logger.exception("Exception on Feeds formatter")
await formatter_exception_logger(db_wiki["wiki"], post, traceback.format_exc())
if discussion_feed:
DBHandler.add(db_wiki["wiki"], post["id"], True)
await asyncio.sleep(delay=calc_delay)
DBHandler.update_db()
except asyncio.CancelledError:
raise

View file

@ -1,72 +1,79 @@
import datetime, logging
import json
import gettext
from urllib.parse import quote_plus
from src.config import settings
from src.misc import send_to_discord, escape_formatting
from discord import DiscordMessage
from src.i18n import disc
_ = disc.gettext
from src.misc import link_formatter, create_article_path, escape_formatting
from src.discord import DiscordMessage
from src.msgqueue import send_to_discord
discussion_logger = logging.getLogger("rcgcdw.discussion_formatter")
logger = logging.getLogger("rcgcdw.discussion_formatters")
def embed_formatter(post, post_type):
"""Embed formatter for Fandom discussions."""
embed = DiscordMessage("embed", "discussion", settings["fandom_discussions"]["webhookURL"])
embed.set_author(post["createdBy"]["name"], "{wikiurl}f/u/{creatorId}".format(
wikiurl=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"]), icon_url=post["createdBy"]["avatarUrl"])
discussion_post_type = post["_embedded"]["thread"][0].get("containerType", "FORUM") # Can be FORUM, ARTICLE_COMMENT or WALL on UCP
if post_type == "TEXT":
if post["isReply"]:
if discussion_post_type == "FORUM":
embed.event_type = "discussion/forum/reply"
embed["title"] = _("Replied to \"{title}\"").format(title=post["_embedded"]["thread"][0]["title"])
embed["url"] = "{wikiurl}f/p/{threadId}/r/{postId}".format(
wikiurl=settings["fandom_discussions"]["wiki_url"], threadId=post["threadId"], postId=post["id"])
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
user_wall = _("unknown") # Fail safe
embed.event_type = "discussion/wall/reply"
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
embed["url"] = "{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}".format(wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"])
embed["title"] = _("Replied to \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall)
async def feeds_compact_formatter(post_type, post, message_target, wiki, _):
"""Compact formatter for Fandom discussions."""
message = None
if post_type == "FORUM":
if not post["isReply"]:
thread_funnel = post.get("funnel")
msg_text = "[{author}](<{url}f/u/{creatorId}>) created [{title}](<{url}f/p/{threadId}>) in {forumName}"
if thread_funnel == "POLL":
msg_text = "[{author}](<{url}f/u/{creatorId}>) created a poll [{title}](<{url}f/p/{threadId}>) in {forumName}"
elif thread_funnel != "TEXT":
logger.warning("No entry for {event} with params: {params}".format(event=thread_funnel, params=post))
message = _(msg_text).format(author=post["createdBy"]["name"], url=wiki, creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"])
else:
if discussion_post_type == "FORUM":
embed.event_type = "discussion/forum/post"
embed["title"] = _("Created \"{title}\"").format(title=post["title"])
embed["url"] = "{wikiurl}f/p/{threadId}".format(wikiurl=settings["fandom_discussions"]["wiki_url"],
threadId=post["threadId"])
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
message = _("[{author}](<{url}f/u/{creatorId}>) created a [reply](<{url}f/p/{threadId}/r/{postId}>) to [{title}](<{url}f/p/{threadId}>) in {forumName}").format(author=post["createdBy"]["name"], url=wiki, creatorId=post["creatorId"], threadId=post["threadId"], postId=post["id"], title=post["_embedded"]["thread"][0]["title"], forumName=post["forumName"])
elif post_type == "WALL":
user_wall = _("unknown") # Fail safe
embed.event_type = "discussion/wall/post"
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
embed["url"] = "{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}".format(
wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")),
threadid=post["threadId"])
embed["title"] = _("Created \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall)
if settings["fandom_discussions"]["appearance"]["embed"]["show_content"]:
if not post["isReply"]:
message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{url}wiki/Message_Wall:{user_wall}?threadId={threadId}>) on [{user}'s Message Wall](<{url}wiki/Message_Wall:{user_wall}>)").format(author=post["createdBy"]["name"], url=wiki, creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall, user_wall=quote_plus(user_wall.replace(" ", "_")), threadId=post["threadId"])
else:
message = _("[{author}](<{url}f/u/{creatorId}>) created a [reply](<{url}wiki/Message_Wall:{user_wall}?threadId={threadId}#{replyId}>) to [{title}](<{url}wiki/Message_Wall:{user_wall}?threadId={threadId}) on [{user}'s Message Wall](<{url}wiki/Message_Wall:{user_wall}>)").format(author=post["createdBy"]["name"], url=wiki, creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall, user_wall=quote_plus(user_wall.replace(" ", "_")), threadId=post["threadId"], replyId=post["id"])
elif post_type == "ARTICLE_COMMENT":
article_page = _("unknown") # No page known
if not post["isReply"]:
message = _("[{author}](<{url}f/u/{creatorId}>) created a [comment](<{url}wiki/{article}?threadId={threadId}>) on [{article}](<{url}wiki/{article}>)").format(author=post["createdBy"]["name"], url=wiki, creatorId=post["creatorId"], article=article_page, threadId=post["threadId"])
else:
message = _("[{author}](<{url}f/u/{creatorId}>) created a [reply](<{url}wiki/{article}?threadId={threadId}) to a [comment](<{url}wiki/{article}?threadId={threadId}#{replyId}>) on [{article}](<{url}wiki/{article}>)").format(author=post["createdBy"]["name"], url=wiki, creatorId=post["creatorId"], article=article_page, threadId=post["threadId"], replyId=post["id"])
else:
logger.warning("No entry for {event} with params: {params}".format(event=post_type, params=post))
if not settings["support"]:
return
else:
content = _("Unknown event `{event}` by [{author}]({author_url}), report it on the [support server](<{support}>).").format(event=post_type, author=post["createdBy"]["name"], author_url=link_formatter(create_article_path("User:{user}".format(user=post["createdBy"]["name"]), "{url}wiki/$1".format(url=wiki))), support=settings["support"])
await send_to_discord(DiscordMessage("compact", "discussion", message_target[1], content=message, wiki=wiki))
async def feeds_embed_formatter(post_type, post, message_target, wiki, _):
"""Embed formatter for Fandom discussions."""
embed = DiscordMessage("embed", "discussion", message_target[1], wiki=wiki)
if post_type == "FORUM":
embed.set_author(post["createdBy"]["name"], "{url}f/u/{creatorId}".format(url=wiki, creatorId=post["creatorId"]), icon_url=post["createdBy"]["avatarUrl"])
else:
embed.set_author(post["createdBy"]["name"], "{url}wiki/User:{creator}".format(url=wiki, creator=post["createdBy"]["name"]), icon_url=post["createdBy"]["avatarUrl"])
if message_target[0][1] == 3:
if post.get("jsonModel") is not None:
npost = DiscussionsFromHellParser(post)
npost = DiscussionsFromHellParser(post, wiki)
embed["description"] = npost.parse()
if npost.image_last:
embed["image"]["url"] = npost.image_last
embed["description"] = embed["description"].replace(npost.image_last, "")
else: # Fallback when model is not available
embed["description"] = post.get("rawContent", "")
elif post_type == "POLL":
embed["footer"]["text"] = post["forumName"]
embed["timestamp"] = datetime.datetime.fromtimestamp(post["creationDate"]["epochSecond"], tz=datetime.timezone.utc).isoformat()
if post_type == "FORUM":
if not post["isReply"]:
embed["url"] = "{url}f/p/{threadId}".format(url=wiki, threadId=post["threadId"])
embed["title"] = _("Created \"{title}\"").format(title=post["title"])
thread_funnel = post.get("funnel")
if thread_funnel == "POLL":
embed.event_type = "discussion/forum/poll"
poll = post["poll"]
embed["title"] = _("Created a poll titled \"{title}\"").format(title=poll["question"])
embed["title"] = _("Created a poll \"{title}\"").format(title=poll["question"])
image_type = False
if poll["answers"][0]["image"] is not None:
image_type = True
@ -74,63 +81,57 @@ def embed_formatter(post, post_type):
embed.add_field(option["text"] if image_type is True else _("Option {}").format(num+1),
option["text"] if image_type is False else _("__[View image]({image_url})__").format(image_url=option["image"]["url"]),
inline=True)
embed["footer"]["text"] = post["forumName"]
embed["timestamp"] = datetime.datetime.fromtimestamp(post["creationDate"]["epochSecond"], tz=datetime.timezone.utc).isoformat()
embed.finish_embed()
send_to_discord(embed)
def compact_formatter(post, post_type):
"""Compact formatter for Fandom discussions."""
message = None
discussion_post_type = post["_embedded"]["thread"][0].get("containerType",
"FORUM") # Can be FORUM, ARTICLE_COMMENT or WALL on UCP
if post_type == "TEXT":
if not post["isReply"]:
if discussion_post_type == "FORUM":
message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{url}f/p/{threadId}>) in {forumName}").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"])
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
user_wall = _("unknown") # Fail safe
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}>) on {user}'s Message Wall").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall,
wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"]
)
elif thread_funnel == "TEXT":
embed.event_type = "discussion/forum/post"
else:
if discussion_post_type == "FORUM":
message = _("[{author}](<{url}f/u/{creatorId}>) created a [reply](<{url}f/p/{threadId}/r/{postId}>) to [{title}](<{url}f/p/{threadId}>) in {forumName}").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], threadId=post["threadId"], postId=post["id"], title=post["_embedded"]["thread"][0]["title"], forumName=post["forumName"]
)
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
logger.warning("No entry for {event} with params: {params}".format(event=thread_funnel, params=post))
else:
embed.event_type = "discussion/forum/reply"
embed["title"] = _("Replied to \"{title}\"").format(title=post["_embedded"]["thread"][0]["title"])
embed["url"] = "{url}f/p/{threadId}/r/{postId}".format(url=wiki, threadId=post["threadId"], postId=post["id"])
elif post_type == "WALL":
user_wall = _("unknown") # Fail safe
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
message = _(
"[{author}](<{url}f/u/{creatorId}>) replied to [{title}](<{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}>) on {user}'s Message Wall").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall,
wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"])
elif post_type == "POLL":
message = _(
"[{author}](<{url}f/u/{creatorId}>) created a poll [{title}](<{url}f/p/{threadId}>) in {forumName}").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"],
creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"])
send_to_discord(DiscordMessage("compact", "discussion", settings["fandom_discussions"]["webhookURL"], content=message))
if not post["isReply"]:
embed.event_type = "discussion/wall/post"
embed["url"] = "{url}wiki/Message_Wall:{user_wall}?threadId={threadid}".format(url=wiki, user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"])
embed["title"] = _("Created \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall)
else:
embed.event_type = "discussion/wall/reply"
embed["url"] = "{url}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}".format(url=wiki, user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"])
embed["title"] = _("Replied to \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall)
elif post_type == "ARTICLE_COMMENT":
article_page = _("unknown") # No page known
if not post["isReply"]:
embed.event_type = "discussion/comment/post"
# embed["url"] = "{url}wiki/{article}?threadId={threadid}".format(url=wiki, article=quote_plus(article_page.replace(" ", "_")), threadid=post["threadId"])
embed["title"] = _("Commented on {article}").format(article=article_page)
else:
embed.event_type = "discussion/comment/reply"
# embed["url"] = "{url}wiki/{article}?threadId={threadid}#{replyId}".format(url=wiki, article=quote_plus(article_page.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"])
embed["title"] = _("Replied to a comment on {article}").format(article=article_page)
embed["footer"]["text"] = article_page
else:
logger.warning("No entry for {event} with params: {params}".format(event=post_type, params=post))
embed["title"] = _("Unknown event `{event}`").format(event=post_type)
embed["color"] = 0
if settings["support"]:
change_params = "[```json\n{params}\n```]({support})".format(params=json.dumps(post, indent=2), support=settings["support"])
if len(change_params) > 1000:
embed.add_field(_("Report this on the support server"), settings["support"])
else:
embed.add_field(_("Report this on the support server"), change_params)
embed.finish_embed()
await send_to_discord(embed)
class DiscussionsFromHellParser:
"""This class converts fairly convoluted Fandom jsonModal of a discussion post into Markdown formatted usable thing. Takes string, returns string.
Kudos to MarkusRost for allowing me to implement this formatter based on his code in Wiki-Bot."""
def __init__(self, post):
def __init__(self, post, wiki):
self.post = post
self.wiki = wiki
self.jsonModal = json.loads(post.get("jsonModel", "{}"))
self.markdown_text = ""
self.item_num = 1
@ -165,17 +166,17 @@ class DiscussionsFromHellParser:
self.parse_content(item["content"], item["type"])
self.markdown_text += "\n"
elif item["type"] == "openGraph":
if not item["attrs"]["wasAddedWithInlineLink"]:
if not item["attrs"].get("wasAddedWithInlineLink", False):
self.markdown_text = "{old}{link}\n".format(old=self.markdown_text, link=item["attrs"]["url"])
elif item["type"] == "image":
try:
discussion_logger.debug(item["attrs"]["id"])
logger.debug(item["attrs"]["id"])
if item["attrs"]["id"] is not None:
self.markdown_text = "{old}{img_url}\n".format(old=self.markdown_text, img_url=self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"])
self.image_last = self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"]
except (IndexError, ValueError):
discussion_logger.warning("Image {} not found.".format(item["attrs"]["id"]))
discussion_logger.debug(self.markdown_text)
logger.warning("Image {} not found.".format(item["attrs"]["id"]))
logger.debug(self.markdown_text)
elif item["type"] == "code_block":
self.markdown_text += "```\n"
if "content" in item:
@ -197,7 +198,7 @@ class DiscussionsFromHellParser:
for mark in marks:
if mark["type"] == "mention":
prefix += "["
suffix = "]({wiki}f/u/{userid}){suffix}".format(wiki=settings["fandom_discussions"]["wiki_url"], userid=mark["attrs"]["userId"], suffix=suffix)
suffix = "]({wiki}f/u/{userid}){suffix}".format(wiki=self.wiki, userid=mark["attrs"]["userId"], suffix=suffix)
elif mark["type"] == "strong":
prefix += "**"
suffix = "**{suffix}".format(suffix=suffix)

View file

@ -95,7 +95,7 @@ async def compact_formatter(action, change, parsed_comment, categories, recent_c
link = link_formatter(create_article_path("Special:Contributions/{user}".format(user=user), WIKI_ARTICLE_PATH))
except ValueError:
link = link_formatter(create_article_path(change["title"], WIKI_ARTICLE_PATH))
if change["logparams"]["duration"] == "infinite":
if change["logparams"]["duration"] in ["infinite", "infinity"]:
block_time = _("for infinity and beyond")
else:
english_length = re.sub(r"(\d+)", "", change["logparams"][
@ -330,7 +330,7 @@ async def compact_formatter(action, change, parsed_comment, categories, recent_c
await send_to_discord(DiscordMessage("compact", action, message_target[1], content=content, wiki=WIKI_SCRIPT_PATH))
async def embed_formatter(action, change, parsed_comment, categories, recent_changes, target, _, ngettext, paths, additional_data=None):
async def embed_formatter(action, change, parsed_comment, categories, recent_changes, message_target, _, ngettext, paths, additional_data=None):
"""Recent Changes embed formatter, part of RcGcDw"""
if additional_data is None:
additional_data = {"namespaces": {}, "tags": {}}
@ -338,7 +338,7 @@ async def embed_formatter(action, change, parsed_comment, categories, recent_cha
WIKI_SCRIPT_PATH = paths[1]
WIKI_ARTICLE_PATH = paths[2]
WIKI_JUST_DOMAIN = paths[3]
embed = DiscordMessage("embed", action, target[1], wiki=WIKI_SCRIPT_PATH)
embed = DiscordMessage("embed", action, message_target[1], wiki=WIKI_SCRIPT_PATH)
if parsed_comment is None:
parsed_comment = _("No description provided")
if action != "suppressed":
@ -369,7 +369,7 @@ async def embed_formatter(action, change, parsed_comment, categories, recent_cha
embed["title"] = "{redirect}{article} ({new}{minor}{bot}{space}{editsize})".format(redirect="" if "redirect" in change else "", article=change["title"], editsize="+" + str(
editsize) if editsize > 0 else editsize, new=_("(N!) ") if action == "new" else "",
minor=_("m") if action == "edit" and "minor" in change else "", bot=_('b') if "bot" in change else "", space=" " if "bot" in change or (action == "edit" and "minor" in change) or action == "new" else "")
if target[0][1] == 3:
if message_target[0][1] == 3:
if action == "new":
changed_content = await recent_changes.safe_request(
"{wiki}?action=compare&format=json&fromtext=&torev={diff}&topst=1&prop=diff".format(
@ -433,7 +433,7 @@ async def embed_formatter(action, change, parsed_comment, categories, recent_cha
wiki=WIKI_SCRIPT_PATH, filename=article_encoded, archiveid=revision["archivename"])
embed.add_field(_("Options"), _("([preview]({link}) | [undo]({undolink}))").format(
link=image_direct_url, undolink=undolink))
if target[0][1] > 1:
if message_target[0][1] > 1:
embed["image"]["url"] = image_direct_url
if action == "upload/overwrite":
embed["title"] = _("Uploaded a new version of {name}").format(name=change["title"])
@ -443,7 +443,7 @@ async def embed_formatter(action, change, parsed_comment, categories, recent_cha
embed["title"] = _("Uploaded {name}").format(name=change["title"])
if additional_info_retrieved:
embed.add_field(_("Options"), _("([preview]({link}))").format(link=image_direct_url))
if target[0][1] > 1:
if message_target[0][1] > 1:
embed["image"]["url"] = image_direct_url
elif action == "delete/delete":
link = create_article_path(change["title"], WIKI_ARTICLE_PATH)
@ -472,7 +472,7 @@ async def embed_formatter(action, change, parsed_comment, categories, recent_cha
link = create_article_path("Special:Contributions/{user}".format(user=user), WIKI_ARTICLE_PATH)
except ValueError:
link = create_article_path(change["title"], WIKI_ARTICLE_PATH)
if change["logparams"]["duration"] == "infinite":
if change["logparams"]["duration"] in ["infinite", "infinity"]:
block_time = _("for infinity and beyond")
else:
english_length = re.sub(r"(\d+)", "", change["logparams"]["duration"]) # note that translation won't work for millenia and century yet

View file

View file

@ -39,13 +39,13 @@ class LinkParser(HTMLParser):
def handle_data(self, data):
if self.recent_href:
self.new_string = self.new_string + "[{}](<{}>)".format(data.replace("//", "/\\/"), self.recent_href)
self.new_string = self.new_string + "[{}](<{}>)".format(escape_formatting(data), self.recent_href)
self.recent_href = ""
else:
self.new_string = self.new_string + data.replace("//", "/\\/")
self.new_string = self.new_string + escape_formatting(data)
def handle_comment(self, data):
self.new_string = self.new_string + data.replace("//", "/\\/")
self.new_string = self.new_string + escape_formatting(data)
def handle_endtag(self, tag):
# logger.debug(self.new_string)
@ -71,7 +71,7 @@ def link_formatter(link: str) -> str:
def escape_formatting(data: str) -> str:
"""Escape Discord formatting"""
return re.sub(r"([`_*~<>{}@/|\\])", "\\\\\\1", data, 0)
return re.sub(r"([`_*~:<>{}@/|\\\[\]\(\)])", "\\\\\\1", data, 0)
def create_article_path(article: str, WIKI_ARTICLE_PATH: str) -> str:
@ -125,7 +125,7 @@ class ContentParser(HTMLParser):
self.added = True
def handle_data(self, data):
data = re.sub(r"([`_*~<>{}@/|\\])", "\\\\\\1", data, 0)
data = escape_formatting(data)
if self.current_tag == "ins" and self.ins_length <= 1000:
self.ins_length += len("**" + data + '**')
if self.ins_length <= 1000:

View file

@ -8,15 +8,16 @@ class UpdateDB:
def __init__(self):
self.updated = []
def add(self, wiki, rc_id):
self.updated.append((wiki, rc_id))
def add(self, wiki, rc_id, feeds=None):
self.updated.append((wiki, rc_id, feeds))
def clear_list(self):
self.updated.clear()
def update_db(self):
for update in self.updated:
db_cursor.execute("UPDATE rcgcdw SET rcid = ? WHERE wiki = ?", (update[1], update[0],))
update_type = "postid" if update[2] is not None else "rcid"
db_cursor.execute("UPDATE rcgcdw SET {} = ? WHERE wiki = ?".format(update_type), (update[1],update[0],))
db_connection.commit()
self.clear_list()

View file

@ -4,6 +4,7 @@ import logging, aiohttp
from src.exceptions import *
from src.database import db_cursor, db_connection
from src.formatters.rc import embed_formatter, compact_formatter
from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter
from src.misc import parse_link
from src.i18n import langs
import src.discord
@ -49,6 +50,18 @@ class Wiki:
raise WikiServerError
return response
@staticmethod
async def fetch_feeds(wiki_id, session: aiohttp.ClientSession) -> aiohttp.ClientResponse:
url_path = "https://services.fandom.com/discussion/{wikiid}/posts".format(wikiid=wiki_id)
params = {"sortDirection": "descending", "sortKey": "creation_date", "limit": 20}
try:
response = await session.get(url_path, params=params)
response.raise_for_status()
except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError, aiohttp.ClientResponseError):
logger.exception("A connection error occurred while requesting {}".format(url_path))
raise WikiServerError
return response
@staticmethod
async def safe_request(url, *keys):
try:
@ -190,7 +203,6 @@ async def essential_info(change: dict, changed_categories, local_wiki: Wiki, db_
return
if "commenthidden" not in change:
parsed_comment = parse_link(paths[3], change["parsedcomment"])
parsed_comment = re.sub(r"(`|_|\*|~|{|}|\|\|)", "\\\\\\1", parsed_comment, 0)
else:
parsed_comment = _("~~hidden~~")
if not parsed_comment:
@ -212,3 +224,15 @@ async def essential_info(change: dict, changed_categories, local_wiki: Wiki, db_
except KeyError:
additional_data["tags"][tag["name"]] = None # Tags with no displ
await appearance_mode(identification_string, change, parsed_comment, changed_categories, local_wiki, target, _, ngettext, paths, additional_data=additional_data)
async def essential_feeds(change: dict, db_wiki: tuple, target: tuple):
"""Prepares essential information for both embed and compact message format."""
def _(string: str) -> str:
"""Our own translation string to make it compatible with async"""
return lang.gettext(string)
lang = langs[target[0][0]]
appearance_mode = feeds_embed_formatter if target[0][1] > 0 else feeds_compact_formatter
identification_string = change["_embedded"]["thread"][0]["containerType"]
await appearance_mode(identification_string, change, target, db_wiki["wiki"], _)