Further work and some improvements

This commit is contained in:
Frisk 2021-04-27 15:10:29 +02:00
parent 7eb06e5335
commit 1b3ffb0228
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
7 changed files with 119 additions and 73 deletions

View file

@ -19,6 +19,7 @@ from src.discord.message import DiscordMessage
from src.api import formatter from src.api import formatter
from src.i18n import rc_formatters from src.i18n import rc_formatters
from src.api.context import Context from src.api.context import Context
from src.api.util import embed_helper
from src.configloader import settings from src.configloader import settings
from src.exceptions import * from src.exceptions import *
@ -27,28 +28,25 @@ _ = rc_formatters.gettext
logger = logging.getLogger("extensions.base") logger = logging.getLogger("extensions.base")
# Page edit - event edit
@formatter.embed(event="edit", mode="embed") @formatter.embed(event="edit", mode="embed")
def embed_edit(ctx: Context, change: dict) -> DiscordMessage: def embed_edit(ctx: Context, change: dict) -> DiscordMessage:
embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url) embed = DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url)
embed_helper(ctx, embed, change)
action = ctx.event action = ctx.event
editsize = change["newlen"] - change["oldlen"] editsize = change["newlen"] - change["oldlen"]
if editsize > 0: if editsize > 0:
if editsize > 6032: embed["color"] = min(65280, 35840 + (math.floor(editsize / 52)) * 256) # Choose shade of green
embed["color"] = 65280
else:
embed["color"] = 35840 + (math.floor(editsize / 52)) * 256
elif editsize < 0: elif editsize < 0:
if editsize < -6032: embed["color"] = min(16711680, 9175040 + (math.floor(abs(editsize) / 52)) * 65536) # Choose shade of red
embed["color"] = 16711680
else:
embed["color"] = 9175040 + (math.floor((editsize * -1) / 52)) * 65536
elif editsize == 0: elif editsize == 0:
embed["color"] = 8750469 embed["color"] = 8750469
if change["title"].startswith("MediaWiki:Tag-"): # Refresh tag list when tag display name is edited if change["title"].startswith("MediaWiki:Tag-"): # Refresh tag list when tag display name is edited
ctx.client.refresh_internal_data() ctx.client.refresh_internal_data()
embed.set_link("{wiki}index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format( embed["link"] = "{wiki}index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format(
wiki=ctx.client.WIKI_SCRIPT_PATH, pageid=change["pageid"], diff=change["revid"], oldrev=change["old_revid"], wiki=ctx.client.WIKI_SCRIPT_PATH, pageid=change["pageid"], diff=change["revid"], oldrev=change["old_revid"],
article=change["title"].replace(" ", "_").replace("%", "%25").replace("\\", "%5C").replace("&", "%26"))) article=change["title"].replace(" ", "_").replace("%", "%25").replace("\\", "%5C").replace("&", "%26"))
embed["title"] = "{redirect}{article} ({new}{minor}{bot}{space}{editsize})".format( embed["title"] = "{redirect}{article} ({new}{minor}{bot}{space}{editsize})".format(
redirect="" if "redirect" in change else "", article=change["title"], editsize="+" + str( redirect="" if "redirect" in change else "", article=change["title"], editsize="+" + str(
editsize) if editsize > 0 else editsize, new=_("(N!) ") if action == "new" else "", editsize) if editsize > 0 else editsize, new=_("(N!) ") if action == "new" else "",
@ -58,14 +56,12 @@ def embed_edit(ctx: Context, change: dict) -> DiscordMessage:
try: try:
if action == "new": if action == "new":
changed_content = ctx.client.make_api_request( changed_content = ctx.client.make_api_request(
"{wiki}?action=compare&format=json&fromtext=&torev={diff}&topst=1&prop=diff".format( "?action=compare&format=json&torev={diff}&topst=1&prop=diff".format(diff=change["revid"]
wiki=ctx.client.WIKI_API_PATH, diff=change["revid"]
), "compare", "*") ), "compare", "*")
else: else:
changed_content = ctx.client.make_api_request( changed_content = ctx.client.make_api_request(
"{wiki}?action=compare&format=json&fromrev={oldrev}&torev={diff}&topst=1&prop=diff".format( "?action=compare&format=json&fromrev={oldrev}&torev={diff}&topst=1&prop=diff".format(
wiki=ctx.client.WIKI_API_PATH, diff=change["revid"], oldrev=change["old_revid"] diff=change["revid"], oldrev=change["old_revid"]), "compare", "*")
), "compare", "*")
except ServerError: except ServerError:
changed_content = None changed_content = None
if changed_content: if changed_content:
@ -90,6 +86,44 @@ def embed_edit(ctx: Context, change: dict) -> DiscordMessage:
logger.warning("Unable to download data on the edit content!") logger.warning("Unable to download data on the edit content!")
return embed return embed
@formatter.compact(event="edit", mode="compact") @formatter.compact(event="edit", mode="compact")
def compact_edit(ctx: Context, change: dict): def compact_edit(ctx: Context, change: dict):
return DiscordMessage() action = ctx.event
edit_link = link_formatter("{wiki}index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format(
wiki=ctx.client.WIKI_SCRIPT_PATH, pageid=change["pageid"], diff=change["revid"], oldrev=change["old_revid"],
article=change["title"]))
logger.debug(edit_link)
edit_size = change["newlen"] - change["oldlen"]
sign = ""
if edit_size > 0:
sign = "+"
bold = ""
if abs(edit_size) > 500:
bold = "**"
if change["title"].startswith("MediaWiki:Tag-"):
pass
if action == "edit":
content = _(
"[{author}]({author_url}) edited [{article}]({edit_link}){comment} {bold}({sign}{edit_size}){bold}").format(
author=author, author_url=author_url, article=change["title"], edit_link=edit_link, comment=parsed_comment,
edit_size=edit_size, sign=sign, bold=bold)
else:
content = _(
"[{author}]({author_url}) created [{article}]({edit_link}){comment} {bold}({sign}{edit_size}){bold}").format(
author=author, author_url=author_url, article=change["title"], edit_link=edit_link, comment=parsed_comment,
edit_size=edit_size, sign=sign, bold=bold)
return DiscordMessage(ctx.message_type, ctx.event, ctx.webhook_url, content=content)
# Page creation - event new
@formatter.embed(event="new", mode="embed")
def embed_new(ctx, change):
return embed_edit(ctx, change)
@formatter.compact(event="new", mode="compact")
def compact_new(ctx, change):
return compact_edit(ctx, change)

View file

@ -14,9 +14,14 @@
# along with RcGcDw. If not, see <http://www.gnu.org/licenses/>. # along with RcGcDw. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
import src.misc import src.misc
from typing import Union from typing import Union
from collections import OrderedDict from collections import OrderedDict
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from src.wiki import Wiki
class Client: class Client:
@ -25,7 +30,7 @@ class Client:
""" """
def __init__(self, hooks, wiki): def __init__(self, hooks, wiki):
self._formatters = hooks self._formatters = hooks
self.__recent_changes = wiki self.__recent_changes: Wiki = wiki
self.WIKI_API_PATH = src.misc.WIKI_API_PATH self.WIKI_API_PATH = src.misc.WIKI_API_PATH
self.WIKI_ARTICLE_PATH = src.misc.WIKI_ARTICLE_PATH self.WIKI_ARTICLE_PATH = src.misc.WIKI_ARTICLE_PATH
self.WIKI_SCRIPT_PATH = src.misc.WIKI_SCRIPT_PATH self.WIKI_SCRIPT_PATH = src.misc.WIKI_SCRIPT_PATH
@ -37,7 +42,7 @@ class Client:
"""Refreshes internal storage data for wiki tags and MediaWiki messages.""" """Refreshes internal storage data for wiki tags and MediaWiki messages."""
self.__recent_changes.init_info() self.__recent_changes.init_info()
def make_api_request(self, params: Union[str, OrderedDict], *json_path: list[str], timeout: int=10, allow_redirects: bool=False): def make_api_request(self, params: Union[str, OrderedDict], *json_path: str, timeout: int = 10, allow_redirects: bool = False):
"""Method to GET request data from the wiki's API with error handling including recognition of MediaWiki errors. """Method to GET request data from the wiki's API with error handling including recognition of MediaWiki errors.
Parameters: Parameters:
@ -58,7 +63,11 @@ class Client:
BadRequest: When params argument is of wrong type BadRequest: When params argument is of wrong type
MediaWikiError: When MediaWiki returns an error MediaWikiError: When MediaWiki returns an error
""" """
return self.__recent_changes.api_request(params, *json_path, timeout, allow_redirects) return self.__recent_changes.api_request(params, *json_path, timeout=timeout, allow_redirects=allow_redirects)
def get_formatters(self): def get_formatters(self):
return self._formatters return self._formatters
def get_ipmapper(self) -> dict:
"""Returns a dict mapping IPs with amount of their edits"""
return self.__recent_changes.map_ips

View file

@ -12,11 +12,16 @@
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with RcGcDw. If not, see <http://www.gnu.org/licenses/>. # along with RcGcDw. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from src.api.client import Client
class Context: class Context:
"""Context object containing client and some metadata regarding specific formatter call""" """Context object containing client and some metadata regarding specific formatter call"""
def __init__(self, message_type: str, webhook_url: str, client): def __init__(self, message_type: str, webhook_url: str, client: Client):
self.client = client self.client = client
self.webhook_url = webhook_url self.webhook_url = webhook_url
self.message_type = message_type self.message_type = message_type

View file

@ -47,7 +47,8 @@ def embed(**kwargs):
""" """
Decorator to register a formatter are return a function Decorator to register a formatter are return a function
:param kwargs: :key event: Event string
:key mode: Discord Message mode
:return: :return:
""" """
@ -62,8 +63,8 @@ def compact(**kwargs):
""" """
Decorator to register a formatter are return a function Decorator to register a formatter are return a function
:param func: :key event: Event string
:param kwargs: :key mode: Discord Message mode
:return: :return:
""" """

View file

@ -12,14 +12,18 @@
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with RcGcDw. If not, see <http://www.gnu.org/licenses/>. # along with RcGcDw. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations
import re import re
from urllib.parse import quote from urllib.parse import quote
from typing import Optional, Callable from typing import Optional, Callable, TYPE_CHECKING
from src.discord.message import DiscordMessage from src.discord.message import DiscordMessage
from src.configloader import settings from src.configloader import settings
import src.misc import src.misc
import logging import logging
if TYPE_CHECKING:
from src.api.context import Context
logger = logging.getLogger("src.api.util") logger = logging.getLogger("src.api.util")
def default_message(event: str, formatter_hooks: dict) -> Callable: def default_message(event: str, formatter_hooks: dict) -> Callable:
@ -42,42 +46,42 @@ def create_article_path(article: str) -> str:
return src.misc.WIKI_ARTICLE_PATH.replace("$1", article) return src.misc.WIKI_ARTICLE_PATH.replace("$1", article)
def embed_helper(ctx, message: DiscordMessage, change: dict): def embed_helper(ctx: Context, message: DiscordMessage, change: dict) -> None:
"""Helps in preparing common edit/log fields for events. Sets up: """Helps in preparing common edit/log fields for events. Passed arguments automatically become saturated with needed data.
author, author_url, icon""" Currently handles: setting usernames"""
# TODO Repurpose it so change['user'] stays the same
def format_user(change, recent_changes, action): if "anon" in change:
if "anon" in change: author_url = create_article_path("Special:Contributions/{user}".format(
author_url = create_article_path("Special:Contributions/{user}".format( user=change["user"].replace(" ", "_"))) # Replace here needed in case of #75
user=change["user"].replace(" ", "_"))) # Replace here needed in case of #75 ip_mapper = ctx.client.get_ipmapper()
logger.debug("current user: {} with cache of IPs: {}".format(change["user"], recent_changes.map_ips.keys())) logger.debug("current user: {} with cache of IPs: {}".format(change["user"], ip_mapper.keys()))
if change["user"] not in list(recent_changes.map_ips.keys()): if change["user"] not in list(ip_mapper.keys()):
contibs = ctx.client.make_api_request( contibs = ctx.client.make_api_request(
"{wiki}?action=query&format=json&list=usercontribs&uclimit=max&ucuser={user}&ucstart={timestamp}&ucprop=".format( "{wiki}?action=query&format=json&list=usercontribs&uclimit=max&ucuser={user}&ucstart={timestamp}&ucprop=".format(
wiki=ctx.client.WIKI_API_PATH, user=change["user"], timestamp=change["timestamp"]), "query", wiki=ctx.client.WIKI_API_PATH, user=change["user"], timestamp=change["timestamp"]), "query",
"usercontribs") "usercontribs")
if contibs is None: if contibs is None:
logger.warning( logger.warning(
"WARNING: Something went wrong when checking amount of contributions for given IP address") "WARNING: Something went wrong when checking amount of contributions for given IP address")
if settings.get("hide_ips", False): if settings.get("hide_ips", False):
change["user"] = _("Unregistered user") change["user"] = _("Unregistered user")
change["user"] = change["user"] + "(?)" change["user"] = change["user"] + "(?)"
else:
recent_changes.map_ips[change["user"]] = len(contibs)
logger.debug(
"Current params user {} and state of map_ips {}".format(change["user"], recent_changes.map_ips))
if settings.get("hide_ips", False):
change["user"] = _("Unregistered user")
change["user"] = "{author} ({contribs})".format(author=change["user"], contribs=len(contibs))
else: else:
ip_mapper[change["user"]] = len(contibs)
logger.debug( logger.debug(
"Current params user {} and state of map_ips {}".format(change["user"], recent_changes.map_ips)) "Current params user {} and state of map_ips {}".format(change["user"], ip_mapper))
if action in ("edit", "new"): if settings.get("hide_ips", False):
recent_changes.map_ips[change["user"]] += 1 change["user"] = _("Unregistered user")
change["user"] = "{author} ({amount})".format( change["user"] = "{author} ({contribs})".format(author=change["user"], contribs=len(contibs))
author=change["user"] if settings.get("hide_ips", False) is False else _("Unregistered user"),
amount=recent_changes.map_ips[change["user"]])
else: else:
author_url = create_article_path("User:{}".format(change["user"].replace(" ", "_"))) logger.debug(
message.set_author(change["user"], author_url) "Current params user {} and state of map_ips {}".format(change["user"], ip_mapper))
if ctx.event in ("edit", "new"):
ip_mapper[change["user"]] += 1
change["user"] = "{author} ({amount})".format(
author=change["user"] if settings.get("hide_ips", False) is False else _("Unregistered user"),
amount=ip_mapper[change["user"]])
else:
author_url = create_article_path("User:{}".format(change["user"].replace(" ", "_")))
message.set_author(change["user"], author_url)

View file

@ -262,6 +262,7 @@ def rc_processor(change, changed_categories):
identification_string = change.get("type", "unknown") # If event doesn't have a type identification_string = change.get("type", "unknown") # If event doesn't have a type
if identification_string in settings["ignored"]: if identification_string in settings["ignored"]:
return return
context.event = identification_string
discord_message: Optional[DiscordMessage] = default_message(identification_string, formatter_hooks)(context, change) discord_message: Optional[DiscordMessage] = default_message(identification_string, formatter_hooks)(context, change)
send_to_discord(discord_message, metadata) send_to_discord(discord_message, metadata)

View file

@ -280,7 +280,7 @@ class Wiki(object):
sys.exit(0) sys.exit(0)
return request return request
def api_request(self, params: Union[str, OrderedDict], *json_path: list[str], timeout: int=10, allow_redirects: bool=False): def api_request(self, params: Union[str, OrderedDict], *json_path: str, timeout: int = 10, allow_redirects: bool = False):
"""Method to GET request data from the wiki's API with error handling including recognition of MediaWiki errors. """Method to GET request data from the wiki's API with error handling including recognition of MediaWiki errors.
Parameters: Parameters:
@ -303,22 +303,14 @@ class Wiki(object):
""" """
# Making request # Making request
try: try:
if isinstance(params, str): if isinstance(params, str): # Todo Make it so there are some default arguments like warning/error format appended
request = self.session.get(WIKI_API_PATH + params, timeout=timeout, allow_redirects=allow_redirects) request = self.session.get(WIKI_API_PATH + params, timeout=timeout, allow_redirects=allow_redirects)
elif isinstance(params, OrderedDict): elif isinstance(params, OrderedDict):
request = self.session.get(WIKI_API_PATH, params=params, timeout=timeout, allow_redirects=allow_redirects) request = self.session.get(WIKI_API_PATH, params=params, timeout=timeout, allow_redirects=allow_redirects)
else: else:
raise BadRequest(params) raise BadRequest(params)
except requests.exceptions.Timeout: except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError) as exc:
logger.warning("Reached timeout error for request on link {url}".format(url=WIKI_API_PATH+str(params))) logger.warning("Reached {error} error for request on link {url}".format(error=repr(exc), url=WIKI_API_PATH+str(params)))
self.downtime_controller(True)
raise ServerError
except requests.exceptions.ConnectionError:
logger.warning("Reached connection error for request on link {url}".format(url=WIKI_API_PATH+str(params)))
self.downtime_controller(True)
raise ServerError
except requests.exceptions.ChunkedEncodingError:
logger.warning("Detected faulty response from the web server for request on link {url}".format(url=WIKI_API_PATH+str(params)))
self.downtime_controller(True) self.downtime_controller(True)
raise ServerError raise ServerError
# Catching HTTP errors # Catching HTTP errors
@ -337,7 +329,7 @@ class Wiki(object):
# JSON Extraction # JSON Extraction
try: try:
request_json = parse_mw_request_info(request.json(), request.url) request_json = parse_mw_request_info(request.json(), request.url)
for item in request_json: for item in json_path:
request_json = request_json[item] request_json = request_json[item]
except ValueError: except ValueError:
logger.warning("ValueError when extracting JSON data on {url}".format(url=request.url)) logger.warning("ValueError when extracting JSON data on {url}".format(url=request.url))