RcGcDw/misc.py

313 lines
10 KiB
Python
Raw Normal View History

2019-05-20 15:28:55 +00:00
# -*- coding: utf-8 -*-
# Recent changes Goat compatible Discord webhook is a project for using a webhook as recent changes page from MediaWiki.
2019-05-20 13:32:23 +00:00
# Copyright (C) 2018 Frisk
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
2020-04-05 21:50:36 +00:00
import json, logging, sys, re, time
2019-05-20 13:11:30 +00:00
from html.parser import HTMLParser
from urllib.parse import urlparse, urlunparse
import requests
from configloader import settings
2019-05-20 19:01:45 +00:00
import gettext
# Initialize translation
2019-05-20 19:01:45 +00:00
t = gettext.translation('misc', localedir='locale', languages=[settings["lang"]])
_ = t.gettext
# Create a custom logger
misc_logger = logging.getLogger("rcgcdw.misc")
data_template = {"rcid": 99999999999, "discussion_id": 0,
"daily_overview": {"edits": None, "new_files": None, "admin_actions": None, "bytes_changed": None,
"new_articles": None, "unique_editors": None, "day_score": None, "days_tracked": 0}}
WIKI_API_PATH: str = ""
WIKI_ARTICLE_PATH: str = ""
WIKI_SCRIPT_PATH: str = ""
WIKI_JUST_DOMAIN: str = ""
class DataFile:
"""Data class which instance of is shared by multiple modules to remain consistent and do not cause too many IO operations."""
def __init__(self):
self.data = self.load_datafile()
@staticmethod
def generate_datafile():
"""Generate a data.json file from a template."""
try:
with open("data.json", 'w') as data:
data.write(json.dumps(data_template, indent=4))
except PermissionError:
misc_logger.critical("Could not create a data file (no permissions). No way to store last edit.")
sys.exit(1)
def load_datafile(self) -> dict:
"""Read a data.json file and return a dictionary with contents
:rtype: dict
"""
try:
with open("data.json") as data:
return json.loads(data.read())
except FileNotFoundError:
self.generate_datafile()
misc_logger.info("The data file could not be found. Generating a new one...")
return data_template
def save_datafile(self):
"""Overwrites the data.json file with given dictionary"""
try:
with open("data.json", "w") as data_file:
data_file.write(json.dumps(self.data, indent=4))
except PermissionError:
misc_logger.critical("Could not modify a data file (no permissions). No way to store last edit.")
sys.exit(1)
class MessageQueue:
"""Message queue class for undelivered messages"""
def __init__(self):
self._queue = []
def __repr__(self):
return self._queue
def __len__(self):
return len(self._queue)
def __iter__(self):
return self._queue
def clear(self):
self._queue.clear()
def add_message(self, message):
self._queue.append(message)
def cut_messages(self, item_num):
self._queue = self._queue[item_num:]
messagequeue = MessageQueue()
datafile = DataFile()
def weighted_average(value, weight, new_value):
"""Calculates weighted average of value number with weight weight and new_value with weight 1"""
return round(((value * weight) + new_value) / (weight + 1), 2)
def link_formatter(link):
"""Formats a link to not embed it"""
return "<" + re.sub(r"([)])", "\\\\\\1", link).replace(" ", "_") + ">"
2019-05-20 13:11:30 +00:00
class ContentParser(HTMLParser):
more = _("\n__And more__")
current_tag = ""
small_prev_ins = ""
small_prev_del = ""
ins_length = len(more)
del_length = len(more)
added = False
def handle_starttag(self, tagname, attribs):
if tagname == "ins" or tagname == "del":
self.current_tag = tagname
if tagname == "td" and 'diff-addedline' in attribs[0]:
self.current_tag = tagname + "a"
if tagname == "td" and 'diff-deletedline' in attribs[0]:
self.current_tag = tagname + "d"
if tagname == "td" and 'diff-marker' in attribs[0]:
self.added = True
def handle_data(self, data):
2019-09-28 12:17:26 +00:00
data = re.sub(r"([`_*~<>{}@/|\\])", "\\\\\\1", data, 0)
2019-05-20 13:11:30 +00:00
if self.current_tag == "ins" and self.ins_length <= 1000:
self.ins_length += len("**" + data + '**')
if self.ins_length <= 1000:
self.small_prev_ins = self.small_prev_ins + "**" + data + '**'
else:
self.small_prev_ins = self.small_prev_ins + self.more
if self.current_tag == "del" and self.del_length <= 1000:
self.del_length += len("~~" + data + '~~')
if self.del_length <= 1000:
self.small_prev_del = self.small_prev_del + "~~" + data + '~~'
else:
self.small_prev_del = self.small_prev_del + self.more
if (self.current_tag == "afterins" or self.current_tag == "tda") and self.ins_length <= 1000:
self.ins_length += len(data)
if self.ins_length <= 1000:
self.small_prev_ins = self.small_prev_ins + data
else:
self.small_prev_ins = self.small_prev_ins + self.more
if (self.current_tag == "afterdel" or self.current_tag == "tdd") and self.del_length <= 1000:
self.del_length += len(data)
if self.del_length <= 1000:
self.small_prev_del = self.small_prev_del + data
else:
self.small_prev_del = self.small_prev_del + self.more
if self.added:
if data == '+' and self.ins_length <= 1000:
self.ins_length += 1
if self.ins_length <= 1000:
self.small_prev_ins = self.small_prev_ins + '\n'
else:
self.small_prev_ins = self.small_prev_ins + self.more
if data == '' and self.del_length <= 1000:
self.del_length += 1
if self.del_length <= 1000:
self.small_prev_del = self.small_prev_del + '\n'
else:
self.small_prev_del = self.small_prev_del + self.more
self.added = False
def handle_endtag(self, tagname):
if tagname == "ins":
self.current_tag = "afterins"
elif tagname == "del":
self.current_tag = "afterdel"
else:
self.current_tag = ""
def safe_read(request, *keys):
if request is None:
return None
try:
request = request.json()
for item in keys:
request = request[item]
except KeyError:
misc_logger.warning(
"Failure while extracting data from request on key {key} in {change}".format(key=item, change=request))
return None
except ValueError:
misc_logger.warning("Failure while extracting data from request in {change}".format(change=request))
return None
return request
def handle_discord_http(code, formatted_embed, result):
if 300 > code > 199: # message went through
return 0
elif code == 400: # HTTP BAD REQUEST result.status_code, data, result, header
misc_logger.error(
"Following message has been rejected by Discord, please submit a bug on our bugtracker adding it:")
misc_logger.error(formatted_embed)
misc_logger.error(result.text)
return 1
elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND
misc_logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.")
sys.exit(1)
elif code == 429:
misc_logger.error("We are sending too many requests to the Discord, slowing down...")
return 2
elif 499 < code < 600:
misc_logger.error(
"Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format(
code))
return 3
def add_to_dict(dictionary, key):
if key in dictionary:
dictionary[key] += 1
else:
dictionary[key] = 1
return dictionary
def prepare_paths():
global WIKI_API_PATH
global WIKI_ARTICLE_PATH
global WIKI_SCRIPT_PATH
global WIKI_JUST_DOMAIN
"""Set the URL paths for article namespace and script namespace
WIKI_API_PATH will be: WIKI_DOMAIN/api.php
WIKI_ARTICLE_PATH will be: WIKI_DOMAIN/articlepath/$1 where $1 is the replaced string
WIKI_SCRIPT_PATH will be: WIKI_DOMAIN/
WIKI_JUST_DOMAIN will be: WIKI_DOMAIN"""
def quick_try_url(url):
"""Quickly test if URL is the proper script path,
False if it appears invalid
dictionary when it appears valid"""
try:
request = requests.get(url, timeout=5)
if request.status_code == requests.codes.ok:
if request.json()["query"]["general"] is not None:
return request
return False
except (KeyError, requests.exceptions.ConnectionError):
return False
try:
parsed_url = urlparse(settings["wiki_url"])
except KeyError:
misc_logger.critical("wiki_url is not specified in the settings. Please provide the wiki url in the settings and start the script again.")
sys.exit(1)
for url_scheme in (settings["wiki_url"], settings["wiki_url"].split("wiki")[0], urlunparse((*parsed_url[0:2], "", "", "", ""))): # check different combinations, it's supposed to be idiot-proof
tested = quick_try_url(url_scheme + "/api.php?action=query&format=json&meta=siteinfo")
if tested:
WIKI_API_PATH = urlunparse((*parsed_url[0:2], "", "", "", "")) + tested.json()["query"]["general"]["scriptpath"] + "/api.php"
WIKI_SCRIPT_PATH = urlunparse((*parsed_url[0:2], "", "", "", "")) + tested.json()["query"]["general"]["scriptpath"] + "/"
WIKI_ARTICLE_PATH = urlunparse((*parsed_url[0:2], "", "", "", "")) + tested.json()["query"]["general"]["articlepath"]
WIKI_JUST_DOMAIN = urlunparse((*parsed_url[0:2], "", "", "", ""))
break
else:
misc_logger.critical("Could not verify wikis paths. Please make sure you have given the proper wiki URL in settings.json and your Internet connection is working.")
sys.exit(1)
2020-04-05 00:07:56 +00:00
prepare_paths()
def create_article_path(article: str) -> str:
"""Takes the string and creates an URL with it as the article name"""
return WIKI_ARTICLE_PATH.replace("$1", article)
2020-04-05 21:50:36 +00:00
def send_to_discord_webhook(data):
header = settings["header"]
if isinstance(data, str):
header['Content-Type'] = 'application/json'
else:
header['Content-Type'] = 'application/x-www-form-urlencoded'
try:
result = requests.post(settings["webhookURL"], data=data,
headers=header, timeout=10)
except requests.exceptions.Timeout:
misc_logger.warning("Timeouted while sending data to the webhook.")
return 3
except requests.exceptions.ConnectionError:
misc_logger.warning("Connection error while sending the data to a webhook")
return 3
else:
return handle_discord_http(result.status_code, data, result)
def send_to_discord(data):
if messagequeue:
messagequeue.add_message(data)
else:
code = send_to_discord_webhook(data)
if code == 3:
messagequeue.add_message(data)
elif code == 2:
time.sleep(5.0)
messagequeue.add_message(data)
elif code < 2:
time.sleep(2.0)
pass