Added code

This commit is contained in:
Frisk 2020-07-18 14:12:00 +02:00
parent f5980aa2bc
commit 6c9dd2245d
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
6 changed files with 302 additions and 23 deletions

View file

@ -1,10 +1,11 @@
import logging.config import logging.config
from src.config import settings from src.config import settings
import sqlite3 import sqlite3
from src.wiki import Wiki, process_event, process_mwmsgs from src.wiki import Wiki, process_cats, process_mwmsgs
import asyncio, aiohttp import asyncio, aiohttp
from src.exceptions import * from src.exceptions import *
from src.database import db_cursor from src.database import db_cursor
from queue_handler import DBHandler
logging.config.dictConfig(settings["logging"]) logging.config.dictConfig(settings["logging"])
logger = logging.getLogger("rcgcdb.bot") logger = logging.getLogger("rcgcdb.bot")
@ -45,7 +46,7 @@ async def main_loop():
wiki_response = await local_wiki.fetch_wiki(extended, db_wiki[3], db_wiki[4]) wiki_response = await local_wiki.fetch_wiki(extended, db_wiki[3], db_wiki[4])
await local_wiki.check_status(wiki[0], wiki_response.status, db_wiki[1]) await local_wiki.check_status(wiki[0], wiki_response.status, db_wiki[1])
except (WikiServerError, WikiError): except (WikiServerError, WikiError):
continue # ignore this wikis if it throws errors continue # ignore this wiki if it throws errors
try: try:
recent_changes_resp = await wiki_response.json(encoding="UTF-8") recent_changes_resp = await wiki_response.json(encoding="UTF-8")
recent_changes = recent_changes_resp['query']['recentchanges'].reverse() recent_changes = recent_changes_resp['query']['recentchanges'].reverse()
@ -54,7 +55,19 @@ async def main_loop():
continue continue
if extended: if extended:
await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs) await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs)
categorize_events = {}
if db_wiki[6] is None: # new wiki, just get the last rc to not spam the channel
if len(recent_changes) > 0:
DBHandler.add(db_wiki[0], recent_changes[-1]["rcid"])
continue
else:
DBHandler.add(db_wiki[0], 0)
continue
for change in recent_changes: for change in recent_changes:
await process_cats(change, local_wiki, mw_msgs, categorize_events)
for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up
if change["rcid"] < db_wiki[6]: if change["rcid"] < db_wiki[6]:
await process_event(change, local_wiki)
await asyncio.sleep(delay=calc_delay) await asyncio.sleep(delay=calc_delay)

View file

@ -0,0 +1,209 @@
import datetime, logging
import json
import gettext
from urllib.parse import quote_plus
from src.config import settings
from src.misc import DiscordMessage, send_to_discord, escape_formatting
from src.i18n import disc
_ = disc.gettext
discussion_logger = logging.getLogger("rcgcdw.discussion_formatter")
def embed_formatter(post, post_type):
"""Embed formatter for Fandom discussions."""
embed = DiscordMessage("embed", "discussion", settings["fandom_discussions"]["webhookURL"])
embed.set_author(post["createdBy"]["name"], "{wikiurl}f/u/{creatorId}".format(
wikiurl=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"]), icon_url=post["createdBy"]["avatarUrl"])
discussion_post_type = post["_embedded"]["thread"][0].get("containerType", "FORUM") # Can be FORUM, ARTICLE_COMMENT or WALL on UCP
if post_type == "TEXT":
if post["isReply"]:
if discussion_post_type == "FORUM":
embed.event_type = "discussion/forum/reply"
embed["title"] = _("Replied to \"{title}\"").format(title=post["_embedded"]["thread"][0]["title"])
embed["url"] = "{wikiurl}f/p/{threadId}/r/{postId}".format(
wikiurl=settings["fandom_discussions"]["wiki_url"], threadId=post["threadId"], postId=post["id"])
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
user_wall = _("unknown") # Fail safe
embed.event_type = "discussion/wall/reply"
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
embed["url"] = "{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}".format(wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"])
embed["title"] = _("Replied to \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall)
else:
if discussion_post_type == "FORUM":
embed.event_type = "discussion/forum/post"
embed["title"] = _("Created \"{title}\"").format(title=post["title"])
embed["url"] = "{wikiurl}f/p/{threadId}".format(wikiurl=settings["fandom_discussions"]["wiki_url"],
threadId=post["threadId"])
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
user_wall = _("unknown") # Fail safe
embed.event_type = "discussion/wall/post"
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
embed["url"] = "{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}".format(
wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")),
threadid=post["threadId"])
embed["title"] = _("Created \"{title}\" on {user}'s Message Wall").format(title=post["_embedded"]["thread"][0]["title"], user=user_wall)
if settings["fandom_discussions"]["appearance"]["embed"]["show_content"]:
if post.get("jsonModel") is not None:
npost = DiscussionsFromHellParser(post)
embed["description"] = npost.parse()
if npost.image_last:
embed["image"]["url"] = npost.image_last
embed["description"] = embed["description"].replace(npost.image_last, "")
else: # Fallback when model is not available
embed["description"] = post.get("rawContent", "")
elif post_type == "POLL":
embed.event_type = "discussion/forum/poll"
poll = post["poll"]
embed["title"] = _("Created a poll titled \"{title}\"").format(title=poll["question"])
image_type = False
if poll["answers"][0]["image"] is not None:
image_type = True
for num, option in enumerate(poll["answers"]):
embed.add_field(option["text"] if image_type is True else _("Option {}").format(num+1),
option["text"] if image_type is False else _("__[View image]({image_url})__").format(image_url=option["image"]["url"]),
inline=True)
embed["footer"]["text"] = post["forumName"]
embed["timestamp"] = datetime.datetime.fromtimestamp(post["creationDate"]["epochSecond"], tz=datetime.timezone.utc).isoformat()
embed.finish_embed()
send_to_discord(embed)
def compact_formatter(post, post_type):
"""Compact formatter for Fandom discussions."""
message = None
discussion_post_type = post["_embedded"]["thread"][0].get("containerType",
"FORUM") # Can be FORUM, ARTICLE_COMMENT or WALL on UCP
if post_type == "TEXT":
if not post["isReply"]:
if discussion_post_type == "FORUM":
message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{url}f/p/{threadId}>) in {forumName}").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"])
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
user_wall = _("unknown") # Fail safe
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
message = _("[{author}](<{url}f/u/{creatorId}>) created [{title}](<{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}>) on {user}'s Message Wall").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall,
wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"]
)
else:
if discussion_post_type == "FORUM":
message = _("[{author}](<{url}f/u/{creatorId}>) created a [reply](<{url}f/p/{threadId}/r/{postId}>) to [{title}](<{url}f/p/{threadId}>) in {forumName}").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], threadId=post["threadId"], postId=post["id"], title=post["_embedded"]["thread"][0]["title"], forumName=post["forumName"]
)
elif discussion_post_type == "ARTICLE_COMMENT":
discussion_logger.warning("Article comments are not yet implemented. For reasons see https://gitlab.com/piotrex43/RcGcDw/-/issues/126#note_366480037")
return
elif discussion_post_type == "WALL":
user_wall = _("unknown") # Fail safe
if post["forumName"].endswith(' Message Wall'):
user_wall = post["forumName"][:-13]
message = _(
"[{author}](<{url}f/u/{creatorId}>) replied to [{title}](<{wikiurl}wiki/Message_Wall:{user_wall}?threadId={threadid}#{replyId}>) on {user}'s Message Wall").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"], creatorId=post["creatorId"], title=post["_embedded"]["thread"][0]["title"], user=user_wall,
wikiurl=settings["fandom_discussions"]["wiki_url"], user_wall=quote_plus(user_wall.replace(" ", "_")), threadid=post["threadId"], replyId=post["id"])
elif post_type == "POLL":
message = _(
"[{author}](<{url}f/u/{creatorId}>) created a poll [{title}](<{url}f/p/{threadId}>) in {forumName}").format(
author=post["createdBy"]["name"], url=settings["fandom_discussions"]["wiki_url"],
creatorId=post["creatorId"], title=post["title"], threadId=post["threadId"], forumName=post["forumName"])
send_to_discord(DiscordMessage("compact", "discussion", settings["fandom_discussions"]["webhookURL"], content=message))
class DiscussionsFromHellParser:
"""This class converts fairly convoluted Fandom jsonModal of a discussion post into Markdown formatted usable thing. Takes string, returns string.
Kudos to MarkusRost for allowing me to implement this formatter based on his code in Wiki-Bot."""
def __init__(self, post):
self.post = post
self.jsonModal = json.loads(post.get("jsonModel", "{}"))
self.markdown_text = ""
self.item_num = 1
self.image_last = None
def parse(self) -> str:
"""Main parsing logic"""
self.parse_content(self.jsonModal["content"])
if len(self.markdown_text) > 2000:
self.markdown_text = self.markdown_text[0:2000] + ""
return self.markdown_text
def parse_content(self, content, ctype=None):
self.image_last = None
for item in content:
if ctype == "bulletList":
self.markdown_text += "\t"
if ctype == "orderedList":
self.markdown_text += "\t{num}. ".format(num=self.item_num)
self.item_num += 1
if item["type"] == "text":
if "marks" in item:
prefix, suffix = self.convert_marks(item["marks"])
self.markdown_text = "{old}{pre}{text}{suf}".format(old=self.markdown_text, pre=prefix, text=escape_formatting(item["text"]), suf=suffix)
else:
if ctype == "code_block":
self.markdown_text += item["text"] # ignore formatting on preformatted text which cannot have additional formatting anyways
else:
self.markdown_text += escape_formatting(item["text"])
elif item["type"] == "paragraph":
if "content" in item:
self.parse_content(item["content"], item["type"])
self.markdown_text += "\n"
elif item["type"] == "openGraph":
if not item["attrs"]["wasAddedWithInlineLink"]:
self.markdown_text = "{old}{link}\n".format(old=self.markdown_text, link=item["attrs"]["url"])
elif item["type"] == "image":
try:
discussion_logger.debug(item["attrs"]["id"])
if item["attrs"]["id"] is not None:
self.markdown_text = "{old}{img_url}\n".format(old=self.markdown_text, img_url=self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"])
self.image_last = self.post["_embedded"]["contentImages"][int(item["attrs"]["id"])]["url"]
except (IndexError, ValueError):
discussion_logger.warning("Image {} not found.".format(item["attrs"]["id"]))
discussion_logger.debug(self.markdown_text)
elif item["type"] == "code_block":
self.markdown_text += "```\n"
if "content" in item:
self.parse_content(item["content"], item["type"])
self.markdown_text += "\n```\n"
elif item["type"] == "bulletList":
if "content" in item:
self.parse_content(item["content"], item["type"])
elif item["type"] == "orderedList":
self.item_num = 1
if "content" in item:
self.parse_content(item["content"], item["type"])
elif item["type"] == "listItem":
self.parse_content(item["content"], item["type"])
def convert_marks(self, marks):
prefix = ""
suffix = ""
for mark in marks:
if mark["type"] == "mention":
prefix += "["
suffix = "]({wiki}f/u/{userid}){suffix}".format(wiki=settings["fandom_discussions"]["wiki_url"], userid=mark["attrs"]["userId"], suffix=suffix)
elif mark["type"] == "strong":
prefix += "**"
suffix = "**{suffix}".format(suffix=suffix)
elif mark["type"] == "link":
prefix += "["
suffix = "]({link}){suffix}".format(link=mark["attrs"]["href"], suffix=suffix)
elif mark["type"] == "em":
prefix += "_"
suffix = "_" + suffix
return prefix, suffix

View file

@ -1,16 +1,25 @@
from src.discord import DiscordMessage import ipaddress
import math
import re import re
from src.i18n import ngettext import time
import logging
from urllib.parse import quote_plus
def create_article_path(article: str) -> str: from bs4 import BeautifulSoup
"""Takes the string and creates an URL with it as the article name"""
return WIKI_ARTICLE_PATH.replace("$1", article)
def link_formatter(link) -> str: #from src.configloader import settings
"""Formats a link to not embed it""" #from src.misc import link_formatter, create_article_path, WIKI_SCRIPT_PATH, send_to_discord, DiscordMessage, safe_read, \
return "<" + re.sub(r"([)])", "\\\\\\1", link).replace(" ", "_") + ">" # WIKI_API_PATH, ContentParser, profile_field_name, LinkParser
from src.i18n import lang
#from src.rc import recent_changes, pull_comment
ngettext = lang.ngettext
def compact_formatter(action, change, parsed_comment, categories): logger = logging.getLogger("rcgcdw.rc_formatters")
#from src.rcgcdw import recent_changes, ngettext, logger, profile_field_name, LinkParser, pull_comment
LinkParser = LinkParser()
def compact_formatter(action, change, parsed_comment, categories, recent_changes):
if action != "suppressed": if action != "suppressed":
author_url = link_formatter(create_article_path("User:{user}".format(user=change["user"]))) author_url = link_formatter(create_article_path("User:{user}".format(user=change["user"])))
author = change["user"] author = change["user"]
@ -25,8 +34,8 @@ def compact_formatter(action, change, parsed_comment, categories):
sign = "+" sign = "+"
else: else:
sign = "" sign = ""
if change["title"].startswith("MediaWiki:Tag-"): # Refresh tag list when tag display name is edited if change["title"].startswith("MediaWiki:Tag-"):
recent_changes.init_info() pass
if action == "edit": if action == "edit":
content = _("[{author}]({author_url}) edited [{article}]({edit_link}){comment} ({sign}{edit_size})").format(author=author, author_url=author_url, article=change["title"], edit_link=edit_link, comment=parsed_comment, edit_size=edit_size, sign=sign) content = _("[{author}]({author_url}) edited [{article}]({edit_link}){comment} ({sign}{edit_size})").format(author=author, author_url=author_url, article=change["title"], edit_link=edit_link, comment=parsed_comment, edit_size=edit_size, sign=sign)
else: else:
@ -298,7 +307,7 @@ def compact_formatter(action, change, parsed_comment, categories):
send_to_discord(DiscordMessage("compact", action, settings["webhookURL"], content=content)) send_to_discord(DiscordMessage("compact", action, settings["webhookURL"], content=content))
def embed_formatter(action, change, parsed_comment, categories): def embed_formatter(action, change, parsed_comment, categories, recent_changes):
embed = DiscordMessage("embed", action, settings["webhookURL"]) embed = DiscordMessage("embed", action, settings["webhookURL"])
if parsed_comment is None: if parsed_comment is None:
parsed_comment = _("No description provided") parsed_comment = _("No description provided")
@ -527,21 +536,21 @@ def embed_formatter(action, change, parsed_comment, categories):
embed["title"] = _("Unblocked {blocked_user}").format(blocked_user=user) embed["title"] = _("Unblocked {blocked_user}").format(blocked_user=user)
elif action == "curseprofile/comment-created": elif action == "curseprofile/comment-created":
if settings["appearance"]["embed"]["show_edit_changes"]: if settings["appearance"]["embed"]["show_edit_changes"]:
parsed_comment = pull_comment(change["logparams"]["4:comment_id"]) parsed_comment = recent_changes.pull_comment(change["logparams"]["4:comment_id"])
link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])) link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))
embed["title"] = _("Left a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \ embed["title"] = _("Left a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \
change["user"] else _( change["user"] else _(
"Left a comment on their own profile") "Left a comment on their own profile")
elif action == "curseprofile/comment-replied": elif action == "curseprofile/comment-replied":
if settings["appearance"]["embed"]["show_edit_changes"]: if settings["appearance"]["embed"]["show_edit_changes"]:
parsed_comment = pull_comment(change["logparams"]["4:comment_id"]) parsed_comment = recent_changes.pull_comment(change["logparams"]["4:comment_id"])
link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])) link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))
embed["title"] = _("Replied to a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \ embed["title"] = _("Replied to a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \
change["user"] else _( change["user"] else _(
"Replied to a comment on their own profile") "Replied to a comment on their own profile")
elif action == "curseprofile/comment-edited": elif action == "curseprofile/comment-edited":
if settings["appearance"]["embed"]["show_edit_changes"]: if settings["appearance"]["embed"]["show_edit_changes"]:
parsed_comment = pull_comment(change["logparams"]["4:comment_id"]) parsed_comment = recent_changes.pull_comment(change["logparams"]["4:comment_id"])
link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"])) link = create_article_path("Special:CommentPermalink/{commentid}".format(commentid=change["logparams"]["4:comment_id"]))
embed["title"] = _("Edited a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \ embed["title"] = _("Edited a comment on {target}'s profile").format(target=change["title"].split(':')[1]) if change["title"].split(':')[1] != \
change["user"] else _( change["user"] else _(

View file

@ -3,10 +3,16 @@ import sys, logging, gettext
logger = logging.getLogger("rcgcdb.i18n") logger = logging.getLogger("rcgcdb.i18n")
try: try:
lang = gettext.translation('rcgcdb', localedir='locale', languages=["en"]) en = gettext.translation('rcgcdb', localedir='locale', languages=["en"])
de = gettext.translation('rcgcdb', localedir='locale', languages=["de"])
pl = gettext.translation('rcgcdb', localedir='locale', languages=["pl"])
pt = gettext.translation('rcgcdb', localedir='locale', languages=["pt"])
ru = gettext.translation('rcgcdb', localedir='locale', languages=["ru"])
uk = gettext.translation('rcgcdb', localedir='locale', languages=["uk"])
fr = gettext.translation('rcgcdb', localedir='locale', languages=["fr"])
langs = {"en": en, "de": de, "pl": pl, "pt": pt, "ru": ru, "uk": uk, "fr": fr}
except FileNotFoundError: except FileNotFoundError:
logger.critical("No language files have been found. Make sure locale folder is located in the directory.") logger.critical("No language files have been found. Make sure locale folder is located in the directory.")
sys.exit(1) sys.exit(1)
lang.install() #ngettext = en.ngettext
ngettext = lang.ngettext

View file

@ -14,3 +14,6 @@ class UpdateDB():
def update_db(self): def update_db(self):
for update in self.updated: for update in self.updated:
DBHandler = UpdateDB()

View file

@ -4,6 +4,8 @@ import re
import logging, aiohttp import logging, aiohttp
from src.exceptions import * from src.exceptions import *
from src.database import db_cursor, db_connection from src.database import db_cursor, db_connection
from src.formatters.rc import embed_formatter, compact_formatter
from i18n import langs
import src.discord import src.discord
logger = logging.getLogger("rcgcdb.wiki") logger = logging.getLogger("rcgcdb.wiki")
@ -59,8 +61,7 @@ class Wiki:
db_connection.commit() db_connection.commit()
async def process_event(event: dict, local_wiki: Wiki, category_msgs: dict): async def process_cats(event: dict, local_wiki: Wiki, category_msgs: dict, categorize_events: dict):
categorize_events = {}
if event["type"] == "categorize": if event["type"] == "categorize":
if "commenthidden" not in event: if "commenthidden" not in event:
if local_wiki.mw_messages: if local_wiki.mw_messages:
@ -115,3 +116,41 @@ async def process_mwmsgs(wiki_response: dict, local_wiki: Wiki, mw_msgs: dict):
key = len(mw_msgs) key = len(mw_msgs)
mw_msgs[key] = msgs # it may be a little bit messy for sure, however I don't expect any reason to remove mw_msgs entries by one mw_msgs[key] = msgs # it may be a little bit messy for sure, however I don't expect any reason to remove mw_msgs entries by one
local_wiki.mw_messages = key local_wiki.mw_messages = key
def essential_info(change, changed_categories, local_wiki, db_wiki):
"""Prepares essential information for both embed and compact message format."""
logger.debug(change)
lang = langs[db_wiki[1]]
appearance_mode = embed_formatter
if ("actionhidden" in change or "suppressed" in change): # if event is hidden using suppression
appearance_mode("suppressed", change, "", changed_categories, recent_changes)
return
if "commenthidden" not in change:
LinkParser.feed(change["parsedcomment"])
parsed_comment = LinkParser.new_string
LinkParser.new_string = ""
parsed_comment = re.sub(r"(`|_|\*|~|{|}|\|\|)", "\\\\\\1", parsed_comment, 0)
else:
parsed_comment = _("~~hidden~~")
if not parsed_comment:
parsed_comment = None
if change["type"] in ["edit", "new"]:
logger.debug("List of categories in essential_info: {}".format(changed_categories))
if "userhidden" in change:
change["user"] = _("hidden")
identification_string = change["type"]
elif change["type"] == "log":
identification_string = "{logtype}/{logaction}".format(logtype=change["logtype"], logaction=change["logaction"])
if identification_string not in supported_logs:
logger.warning(
"This event is not implemented in the script. Please make an issue on the tracker attaching the following info: wiki url, time, and this information: {}".format(
change))
return
elif change["type"] == "categorize":
return
else:
logger.warning("This event is not implemented in the script. Please make an issue on the tracker attaching the following info: wiki url, time, and this information: {}".format(change))
return
if identification_string in settings["ignored"]:
return
appearance_mode(identification_string, change, parsed_comment, changed_categories, recent_changes)