mirror of
https://gitlab.com/chicken-riders/RcGcDw.git
synced 2025-02-23 00:24:09 +00:00
1054 lines
54 KiB
Python
1054 lines
54 KiB
Python
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# Recent changes Gamepedia compatible Discord webhook is a project for using a webhook as recent changes page from MediaWiki.
|
|
# Copyright (C) 2018 Frisk
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
# WARNING! SHITTY CODE AHEAD. ENTER ONLY IF YOU ARE SURE YOU CAN TAKE IT
|
|
# You have been warned
|
|
|
|
import time, logging, json, requests, datetime, re, gettext, math, random, os.path, schedule, sys
|
|
from bs4 import BeautifulSoup
|
|
from collections import defaultdict, Counter
|
|
from urllib.parse import quote_plus
|
|
from html.parser import HTMLParser
|
|
|
|
with open("settings.json") as sfile:
|
|
settings = json.load(sfile)
|
|
if settings["limitrefetch"] < settings["limit"] and settings["limitrefetch"] != -1:
|
|
settings["limitrefetch"] = settings["limit"]
|
|
logged_in = False
|
|
logging.basicConfig(level=settings["verbose_level"])
|
|
if settings["limitrefetch"] != -1 and os.path.exists("lastchange.txt") == False:
|
|
with open("lastchange.txt", 'w') as sfile:
|
|
sfile.write("99999999999")
|
|
logging.info("Current settings: {settings}".format(settings=settings))
|
|
lang = gettext.translation('rcgcdw', localedir='locale', languages=[settings["lang"]])
|
|
lang.install()
|
|
ngettext = lang.ngettext
|
|
|
|
|
|
class MWError(Exception):
|
|
pass
|
|
|
|
|
|
class MyHTMLParser(HTMLParser):
|
|
new_string = ""
|
|
recent_href = ""
|
|
|
|
def handle_starttag(self, tag, attrs):
|
|
for attr in attrs:
|
|
if attr[0] == 'href':
|
|
self.recent_href = attr[1]
|
|
if self.recent_href.startswith("//"):
|
|
self.recent_href = "https:{rest}".format(rest=self.recent_href)
|
|
elif not self.recent_href.startswith("https"):
|
|
self.recent_href = "https://{wiki}.gamepedia.com".format(wiki=settings["wiki"]) + self.recent_href
|
|
self.recent_href = self.recent_href.replace(")", "\\)")
|
|
|
|
def handle_data(self, data):
|
|
if self.recent_href:
|
|
self.new_string = self.new_string + "[{}]({})".format(data, self.recent_href)
|
|
self.recent_href = ""
|
|
else:
|
|
self.new_string = self.new_string + data
|
|
|
|
def handle_comment(self, data):
|
|
self.new_string = self.new_string + data
|
|
|
|
def handle_endtag(self, tag):
|
|
print(self.new_string)
|
|
|
|
|
|
HTMLParse = MyHTMLParser()
|
|
|
|
|
|
def send(message, name, avatar):
|
|
send_to_discord({"content": message, "avatar_url": avatar, "username": name})
|
|
|
|
|
|
def safe_read(request, *keys):
|
|
if request is None:
|
|
return None
|
|
try:
|
|
request = request.json()
|
|
for item in keys:
|
|
request = request[item]
|
|
except KeyError:
|
|
logging.warning(
|
|
"Failure while extracting data from request on key {key} in {change}".format(key=item, change=request))
|
|
return None
|
|
except ValueError:
|
|
logging.warning("Failure while extracting data from request in {change}".format(change=request))
|
|
return None
|
|
return request
|
|
|
|
|
|
def send_to_discord_webhook(data):
|
|
try:
|
|
result = requests.post(settings["webhookURL"], data=data,
|
|
headers={**{'Content-Type': 'application/json'}, **settings["header"]}, timeout=10)
|
|
except requests.exceptions.Timeout:
|
|
logging.warning("Timeouted while sending data to the webhook.")
|
|
return 3
|
|
except requests.exceptions.ConnectionError:
|
|
logging.warning("Connection error while sending the data to a webhook")
|
|
return 3
|
|
else:
|
|
return handle_discord_http(result.status_code, data)
|
|
|
|
|
|
def send_to_discord(data):
|
|
if recent_changes.unsent_messages:
|
|
recent_changes.unsent_messages.append(data)
|
|
else:
|
|
code = send_to_discord_webhook(data)
|
|
if code == 3:
|
|
recent_changes.unsent_messages.append(data)
|
|
elif code == 2:
|
|
time.sleep(5.0)
|
|
recent_changes.unsent_messages.append(data)
|
|
elif code < 2:
|
|
time.sleep(2.5)
|
|
pass
|
|
|
|
|
|
def webhook_formatter(action, STATIC, **params):
|
|
logging.debug("Received things: {thing}".format(thing=params))
|
|
colornumber = None if isinstance(STATIC["color"], str) else STATIC["color"]
|
|
data = {"embeds": []}
|
|
embed = defaultdict(dict)
|
|
if "title" in params:
|
|
article_encoded = params["title"].replace(" ", "_").replace(')', '\)')
|
|
if re.match(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", params["user"]) is not None:
|
|
author_url = "https://{wiki}.gamepedia.com/Special:Contributions/{user}".format(wiki=settings["wiki"],
|
|
user=params["user"])
|
|
if params["user"] not in list(recent_changes.map_ips.keys()):
|
|
contibs = safe_read(recent_changes.safe_request(
|
|
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=usercontribs&uclimit=max&ucuser={user}&ucprop=".format(
|
|
wiki=settings["wiki"], user=params["user"])), "query", "usercontribs")
|
|
if contibs is None:
|
|
logging.warning(
|
|
"WARNING: Something went wrong when checking amount of contributions for given IP address")
|
|
params["user"] = params["user"] + "(?)"
|
|
else:
|
|
params["user"] = "{author} ({contribs})".format(author=params["user"], contribs=len(contibs))
|
|
recent_changes.map_ips[params["user"]] = len(contibs)
|
|
else:
|
|
recent_changes.map_ips[params["user"]] += 1
|
|
params["user"] = "{author} ({amount})".format(author=params["user"],
|
|
amount=recent_changes.map_ips[params["user"]])
|
|
else:
|
|
author_url = "https://{wiki}.gamepedia.com/User:{user}".format(wiki=settings["wiki"],
|
|
user=params["user"].replace(" ", "_"))
|
|
if action in ("edit", "new"): # edit or new page
|
|
editsize = params["size"]
|
|
print(editsize)
|
|
if editsize > 0:
|
|
if editsize > 6032:
|
|
colornumber = 65280
|
|
else:
|
|
colornumber = 35840 + (math.floor(editsize / 52)) * 256
|
|
elif editsize < 0:
|
|
if editsize < -6032:
|
|
colornumber = 16711680
|
|
else:
|
|
colornumber = 9175040 + (math.floor((editsize * -1) / (52))) * 65536
|
|
elif editsize == 0:
|
|
colornumber = 8750469
|
|
link = "https://{wiki}.gamepedia.com/index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format(
|
|
wiki=settings["wiki"], pageid=params["pageid"], diff=params["diff"], oldrev=params["oldrev"],
|
|
article=params["title"].replace(" ", "_"))
|
|
embed["title"] = "{article} ({new}{minor}{editsize})".format(article=params["title"], editsize="+" + str(
|
|
editsize) if editsize > 0 else editsize, new=_("(N!) ") if action == "new" else "",
|
|
minor=_("m ") if action == "edit" and params[
|
|
"minor"] else "")
|
|
elif action in ("upload/overwrite", "upload/upload"): # sending files
|
|
license = None
|
|
urls = safe_read(recent_changes.safe_request(
|
|
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&prop=imageinfo&list=&meta=&titles={filename}&iiprop=timestamp%7Curl&iilimit=2".format(
|
|
wiki=settings["wiki"], filename=params["title"])), "query", "pages")
|
|
undolink = ""
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["title"].replace(" ", "_"))
|
|
additional_info_retrieved = False
|
|
if urls is not None:
|
|
if "-1" not in urls: # oage removed before we asked for it
|
|
img_info = next(iter(urls.values()))["imageinfo"]
|
|
embed["image"]["url"] = img_info[0]["url"]
|
|
additional_info_retrieved = True
|
|
else:
|
|
pass
|
|
if params["overwrite"]:
|
|
if additional_info_retrieved:
|
|
img_timestamp = [x for x in img_info[1]["timestamp"] if x.isdigit()]
|
|
undolink = "https://{wiki}.gamepedia.com/index.php?title={filename}&action=revert&oldimage={timestamp}%21{filenamewon}".format(
|
|
wiki=settings["wiki"], filename=article_encoded, timestamp="".join(img_timestamp),
|
|
filenamewon=article_encoded[5:])
|
|
embed["fields"] = [{"name": _("Options"), "value": _("([preview]({link}) | [undo]({undolink}))").format(
|
|
link=embed["image"]["url"], undolink=undolink)}]
|
|
embed["title"] = _("Uploaded a new version of {name}").format(name=params["title"])
|
|
else:
|
|
embed["title"] = _("Uploaded {name}").format(name=params["title"])
|
|
article_content = safe_read(recent_changes.safe_request(
|
|
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&prop=revisions&titles={article}&rvprop=content".format(
|
|
wiki=settings["wiki"], article=quote_plus(params["title"], safe=''))), "query", "pages")
|
|
if article_content is None:
|
|
logging.warning("Something went wrong when getting license for the image")
|
|
return 0
|
|
if "-1" not in article_content:
|
|
content = list(article_content.values())[0]['revisions'][0]['*']
|
|
try:
|
|
matches = re.search(re.compile(settings["license_regex"], re.IGNORECASE), content)
|
|
if matches is not None:
|
|
license = matches.group("license")
|
|
else:
|
|
if re.search(re.compile(settings["license_regex_detect"], re.IGNORECASE), content) is None:
|
|
license = _("**No license!**")
|
|
else:
|
|
license = "?"
|
|
except IndexError:
|
|
logging.error(
|
|
"Given regex for the license detection is incorrect. It does not have a capturing group called \"license\" specified. Please fix license_regex value in the config!")
|
|
license = "?"
|
|
except re.error:
|
|
logging.error(
|
|
"Given regex for the license detection is incorrect. Please fix license_regex or license_regex_detect values in the config!")
|
|
license = "?"
|
|
if additional_info_retrieved:
|
|
embed["fields"] = [
|
|
{"name": _("Options"), "value": _("([preview]({link}))").format(link=embed["image"]["url"])}]
|
|
params["desc"] = _("{desc}\nLicense: {license}").format(desc=params["desc"],
|
|
license=license if license is not None else "?")
|
|
elif action == "delete/delete":
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["title"].replace(" ", "_"))
|
|
embed["title"] = _("Deleted page {article}").format(article=params["title"])
|
|
elif action == "delete/delete_redir":
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["title"].replace(" ", "_"))
|
|
embed["title"] = _("Deleted redirect {article} by overwriting").format(article=params["title"])
|
|
elif action == "move/move":
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["target"].replace(" ", "_"))
|
|
params["desc"] = "{supress}. {desc}".format(desc=params["desc"],
|
|
supress=_("No redirect has been made") if params[
|
|
"supress"] == True else _(
|
|
"A redirect has been made"))
|
|
embed["title"] = _("Moved {article} to {target}").format(article=params["title"], target=params["target"])
|
|
elif action == "move/move_redir":
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["target"].replace(" ", "_"))
|
|
embed["title"] = _("Moved {article} to {title} over redirect").format(article=params["title"],
|
|
title=params["target"])
|
|
elif action == "protect/move_prot":
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["title"].replace(" ", "_"))
|
|
embed["title"] = _("Moved protection settings from {article} to {title}").format(article=params["title"],
|
|
title=params["target"])
|
|
elif action == "block/block":
|
|
link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"],
|
|
user=params["blocked_user"].replace(" ", "_").replace(')',
|
|
'\)'))
|
|
user = params["blocked_user"].split(':')[1]
|
|
block_time = _("infinity and beyond") if params["duration"] == "infinite" else params["duration"]
|
|
embed["title"] = _("Blocked {blocked_user} for {time}").format(blocked_user=user, time=block_time)
|
|
elif action == "block/reblock":
|
|
link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"],
|
|
user=params["blocked_user"].replace(" ", "_").replace(')',
|
|
'\)'))
|
|
user = params["blocked_user"].split(':')[1]
|
|
embed["title"] = _("Changed block settings for {blocked_user}").format(blocked_user=user)
|
|
elif action == "block/unblock":
|
|
link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"],
|
|
user=params["blocked_user"].replace(" ", "_").replace(')',
|
|
'\)'))
|
|
user = params["blocked_user"].split(':')[1]
|
|
embed["title"] = _("Unblocked {blocked_user}").format(blocked_user=user)
|
|
elif action == "curseprofile/comment-created":
|
|
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"],
|
|
commentid=params["commentid"])
|
|
# link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) old way of linking
|
|
embed["title"] = _("Left a comment on {target}'s profile").format(target=params["target"]) if params[
|
|
"target"] != \
|
|
params[
|
|
"user"] else _(
|
|
"Left a comment on their own profile")
|
|
elif action == "curseprofile/comment-replied":
|
|
# link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)'))
|
|
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"],
|
|
commentid=params["commentid"])
|
|
embed["title"] = _("Replied to a comment on {target}'s profile").format(target=params["target"]) if params[
|
|
"target"] != \
|
|
params[
|
|
"user"] else _(
|
|
"Replied to a comment on their own profile")
|
|
elif action == "curseprofile/comment-edited":
|
|
# link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)'))
|
|
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"],
|
|
commentid=params["commentid"])
|
|
embed["title"] = _("Edited a comment on {target}'s profile").format(target=params["target"]) if params[
|
|
"target"] != \
|
|
params[
|
|
"user"] else _(
|
|
"Edited a comment on their own profile")
|
|
elif action == "curseprofile/profile-edited":
|
|
link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"],
|
|
target=params["target"].replace(" ",
|
|
"_").replace(
|
|
')', '\)'))
|
|
if params["field"] == "profile-location":
|
|
field = _("Location")
|
|
elif params["field"] == "profile-aboutme":
|
|
field = _("About me")
|
|
elif params["field"] == "profile-link-google":
|
|
field = _("Google link")
|
|
elif params["field"] == "profile-link-facebook":
|
|
field = _("Facebook link")
|
|
elif params["field"] == "profile-link-twitter":
|
|
field = _("Twitter link")
|
|
elif params["field"] == "profile-link-reddit":
|
|
field = _("Reddit link")
|
|
elif params["field"] == "profile-link-twitch":
|
|
field = _("Twitch link")
|
|
elif params["field"] == "profile-link-psn":
|
|
field = _("PSN link")
|
|
elif params["field"] == "profile-link-vk":
|
|
field = _("VK link")
|
|
elif params["field"] == "profile-link-xbl":
|
|
field = _("XVL link")
|
|
elif params["field"] == "profile-link-steam":
|
|
field = _("Steam link")
|
|
else:
|
|
field = _("Unknown")
|
|
embed["title"] = _("Edited {target}'s profile").format(target=params["target"]) if params["user"] != params[
|
|
"target"] else _("Edited their own profile")
|
|
params["desc"] = _("{field} field changed to: {desc}").format(field=field, desc=params["desc"])
|
|
elif action == "curseprofile/comment-deleted":
|
|
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"],
|
|
commentid=params["commentid"])
|
|
# link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)'))
|
|
embed["title"] = _("Deleted a comment on {target}'s profile").format(target=params["target"])
|
|
elif action in ("rights/rights", "rights/autopromote"):
|
|
link = "https://{wiki}.gamepedia.com/User:".format(wiki=settings["wiki"]) + params["title"].split(":")[1]
|
|
if action == "rights/rights":
|
|
embed["title"] = _("Changed group membership for {target}").format(target=params["title"].split(":")[1])
|
|
else:
|
|
params["user"] = _("System")
|
|
author_url = ""
|
|
embed["title"] = _("{target} got autopromoted to a new usergroup").format(
|
|
target=params["title"].split(":")[1])
|
|
if len(params["old_groups"]) < len(params["new_groups"]):
|
|
embed["thumbnail"]["url"] = "https://i.imgur.com/WnGhF5g.gif"
|
|
old_groups = []
|
|
new_groups = []
|
|
for name in params["old_groups"]:
|
|
old_groups.append(_(name))
|
|
for name in params["new_groups"]:
|
|
new_groups.append(_(name))
|
|
if len(old_groups) == 0:
|
|
old_groups = [_("none")]
|
|
if len(new_groups) == 0:
|
|
new_groups = [_("none")]
|
|
reason = ": {desc}".format(desc=params["desc"]) if params["desc"] != _("No description provided") else ""
|
|
params["desc"] = _("Groups changed from {old_groups} to {new_groups}{reason}").format(
|
|
old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), reason=reason)
|
|
elif action == "protect/protect":
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["title"].replace(" ", "_"))
|
|
embed["title"] = _("Protected {target}").format(target=params["title"])
|
|
params["desc"] = params["settings"] + " | " + params["desc"]
|
|
elif action == "protect/modify":
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["title"].replace(" ", "_"))
|
|
embed["title"] = _("Changed protection level for {article}").format(article=params["title"])
|
|
params["desc"] = params["settings"] + " | " + params["desc"]
|
|
elif action == "protect/unprotect":
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["title"].replace(" ", "_"))
|
|
embed["title"] = _("Removed protection from {article}").format(article=params["title"])
|
|
elif action == "delete/revision":
|
|
amount = len(params["amount"])
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["title"].replace(" ", "_"))
|
|
embed["title"] = ngettext("Changed visibility of revision on page {article} ",
|
|
"Changed visibility of {amount} revisions on page {article} ", amount).format(
|
|
article=params["title"], amount=amount)
|
|
elif action == "import/upload":
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["title"].replace(" ", "_"))
|
|
embed["title"] = ngettext("Imported {article} with {count} revision",
|
|
"Imported {article} with {count} revisions", params["amount"]).format(
|
|
article=params["title"], count=params["amount"])
|
|
elif action == "delete/restore":
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["title"].replace(" ", "_"))
|
|
embed["title"] = _("Restored {article}").format(article=params["title"])
|
|
elif action == "delete/event":
|
|
link = "https://{wiki}.gamepedia.com/Special:RecentChanges".format(wiki=settings["wiki"])
|
|
embed["title"] = _("Changed visibility of log events")
|
|
elif action == "import/interwiki":
|
|
link = "https://{wiki}.gamepedia.com/Special:RecentChanges".format(wiki=settings["wiki"])
|
|
embed["title"] = _("Imported interwiki")
|
|
elif action == "abusefilter/modify":
|
|
link = "https://{wiki}.gamepedia.com/Special:RecentChanges".format(wiki=settings["wiki"])
|
|
embed["title"] = _("Edited abuse filter number {number}").format(number=params["filternr"])
|
|
elif action == "merge/merge":
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["title"].replace(" ", "_"))
|
|
embed["title"] = _("Merged revision histories of {article} into {dest}").format(article=params["title"],
|
|
dest=params["destination"])
|
|
elif action == "interwiki/iw_add":
|
|
link = "https://{wiki}.gamepedia.com/Special:Interwiki".format(wiki=settings["wiki"])
|
|
embed["title"] = _("Added an entry to the interwiki table")
|
|
params["desc"] = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=params["desc"],
|
|
prefix=params["prefix"],
|
|
website=params["website"])
|
|
elif action == "interwiki/iw_edit":
|
|
link = "https://{wiki}.gamepedia.com/Special:Interwiki".format(wiki=settings["wiki"])
|
|
embed["title"] = _("Edited an entry in interwiki table")
|
|
params["desc"] = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=params["desc"],
|
|
prefix=params["prefix"],
|
|
website=params["website"])
|
|
elif action == "interwiki/iw_delete":
|
|
link = "https://{wiki}.gamepedia.com/Special:Interwiki".format(wiki=settings["wiki"])
|
|
embed["title"] = _("Deleted an entry in interwiki table")
|
|
params["desc"] = _("Prefix: {prefix} | {desc}").format(desc=params["desc"], prefix=params["prefix"])
|
|
elif action == "contentmodel/change":
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["title"].replace(" ", "_"))
|
|
embed["title"] = _("Changed the content model of the page {article}").format(article=params["title"])
|
|
params["desc"] = _("Model changed from {old} to {new}: {reason}").format(old=params["oldmodel"],
|
|
new=params["newmodel"],
|
|
reason=params["desc"])
|
|
elif action == "sprite/sprite":
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["title"].replace(" ", "_"))
|
|
embed["title"] = _("Edited the sprite for {article}").format(article=params["title"])
|
|
elif action == "sprite/sheet":
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["title"].replace(" ", "_"))
|
|
embed["title"] = _("Created the sprite sheet for {article}").format(article=params["title"])
|
|
elif action == "sprite/slice":
|
|
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
|
|
article=params["title"].replace(" ", "_"))
|
|
embed["title"] = _("Edited the slice for {article}").format(article=params["title"])
|
|
elif action == "managetags/create":
|
|
link = "https://{wiki}.gamepedia.com/Special:Tags".format(wiki=settings["wiki"])
|
|
embed["title"] = _("Created a tag \"{tag}\"").format(tag=params["additional"]["tag"])
|
|
recent_changes.update_tags()
|
|
elif action == "managetags/delete":
|
|
link = "https://{wiki}.gamepedia.com/Special:Tags".format(wiki=settings["wiki"])
|
|
embed["title"] = _("Deleted a tag \"{tag}\"").format(tag=params["additional"]["tag"])
|
|
recent_changes.update_tags()
|
|
elif action == "managetags/activate":
|
|
link = "https://{wiki}.gamepedia.com/Special:Tags".format(wiki=settings["wiki"])
|
|
embed["title"] = _("Activated a tag \"{tag}\"").format(tag=params["additional"]["tag"])
|
|
elif action == "managetags/deactivate":
|
|
link = "https://{wiki}.gamepedia.com/Special:Tags".format(wiki=settings["wiki"])
|
|
embed["title"] = _("Deactivated a tag \"{tag}\"").format(tag=params["additional"]["tag"])
|
|
elif action == "suppressed":
|
|
link = "https://{wiki}.gamepedia.com/".format(wiki=settings["wiki"])
|
|
embed["title"] = _("Action has been hidden by Gamepedia staff.")
|
|
else:
|
|
logging.warning("No entry for {event} with params: {params}".format(event=action, params=params))
|
|
embed["author"]["name"] = params["user"]
|
|
embed["author"]["url"] = author_url
|
|
embed["author"]["icon_url"] = STATIC["icon"]
|
|
embed["url"] = link
|
|
if "desc" not in params:
|
|
params["desc"] = ""
|
|
embed["description"] = params["desc"]
|
|
embed["color"] = random.randrange(1, 16777215) if colornumber is None else math.floor(colornumber)
|
|
embed["timestamp"] = STATIC["timestamp"]
|
|
if STATIC["tags"]:
|
|
tag_displayname = []
|
|
if "fields" not in embed:
|
|
embed["fields"] = []
|
|
for tag in STATIC["tags"]:
|
|
if tag in recent_changes.tags:
|
|
tag_displayname.append(recent_changes.tags[tag])
|
|
else:
|
|
tag_displayname.append(tag)
|
|
embed["fields"].append({"name": _("Tags"), "value": ", ".join(tag_displayname)})
|
|
if "new_categories" in params and params["new_categories"]:
|
|
embed["categories"] = []
|
|
embed["categories"].append({"name": _("Added categories"), "value": ", ".join(params["new_categories"][0:15]) + "" if len(params["new_categories"]) < 15 else _(" and {} more").format(len(params["new_categories"])-14)})
|
|
data["embeds"].append(dict(embed))
|
|
data['avatar_url'] = settings["avatars"]["embed"]
|
|
formatted_embed = json.dumps(data, indent=4)
|
|
send_to_discord(formatted_embed)
|
|
|
|
|
|
def handle_discord_http(code, formatted_embed):
|
|
if 300 > code > 199: # message went through
|
|
return 0
|
|
elif code == 400: # HTTP BAD REQUEST
|
|
logging.error(
|
|
"Following message has been rejected by Discord, please submit a bug on our bugtracker adding it:")
|
|
logging.error(formatted_embed)
|
|
return 1
|
|
elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND
|
|
logging.error("Webhook URL is invalid or no longer in use, please replace it with proper one.")
|
|
sys.exit(1)
|
|
elif code == 429:
|
|
logging.error("We are sending too many requests to the Discord, slowing down...")
|
|
return 2
|
|
elif 499 < code < 600:
|
|
logging.error(
|
|
"Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format(
|
|
code))
|
|
return 3
|
|
|
|
|
|
def first_pass(
|
|
change, added_categories): # I've decided to split the embed formatter and change handler, maybe it's more messy this way, I don't know
|
|
if "actionhidden" in change or "suppressed" in change and "suppressed" not in settings["ignored"]:
|
|
webhook_formatter("suppressed",
|
|
{"timestamp": change["timestamp"], "color": settings["appearance"]["suppressed"]["color"],
|
|
"icon": settings["appearance"]["suppressed"]["icon"]}, user=change["user"])
|
|
return
|
|
HTMLParse.feed(change["parsedcomment"])
|
|
# parsedcomment = (BeautifulSoup(change["parsedcomment"], "lxml")).get_text()
|
|
parsedcomment = HTMLParse.new_string
|
|
HTMLParse.new_string = ""
|
|
logging.debug(change)
|
|
STATIC_VARS = {"timestamp": change["timestamp"], "tags": change["tags"]}
|
|
if not parsedcomment:
|
|
parsedcomment = _("No description provided")
|
|
if change["type"] == "edit" and "edit" not in settings["ignored"]:
|
|
STATIC_VARS = {**STATIC_VARS, **{"color": settings["appearance"]["edit"]["color"],
|
|
"icon": settings["appearance"]["edit"]["icon"]}}
|
|
webhook_formatter("edit", STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
oldrev=change["old_revid"], pageid=change["pageid"], diff=change["revid"],
|
|
size=change["newlen"] - change["oldlen"], minor=True if "minor" in change else False, new_categories=added_categories)
|
|
elif change["type"] == "log":
|
|
combination = "{logtype}/{logaction}".format(logtype=change["logtype"], logaction=change["logaction"])
|
|
if combination in settings["ignored"]:
|
|
return
|
|
logging.debug("combination is {}".format(combination))
|
|
try:
|
|
STATIC_VARS = {**STATIC_VARS, **{"color": settings["appearance"][combination]["color"],
|
|
"icon": settings["appearance"][combination]["icon"]}}
|
|
except KeyError:
|
|
STATIC_VARS = {**STATIC_VARS, **{"color": "", "icon": ""}}
|
|
logging.error("No value in the settings has been given for {}".format(combination))
|
|
if combination == "protect/protect":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
settings=change["logparams"]["description"])
|
|
elif combination == "protect/modify":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
settings=change["logparams"]["description"])
|
|
elif combination == "protect/unprotect":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
|
|
elif combination == "upload/overwrite":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
overwrite=True)
|
|
elif combination == "upload/upload":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
overwrite=False)
|
|
elif combination == "delete/delete":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
|
|
elif combination == "delete/delete_redir":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
|
|
elif combination == "delete/restore":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
|
|
elif combination == "delete/revision":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
amount=change["logparams"]["ids"])
|
|
elif combination == "delete/event":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment)
|
|
elif combination == "import/upload":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
amount=change["logparams"]["count"])
|
|
elif combination == "import/interwiki":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment)
|
|
elif combination == "merge/merge":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
destination=change["logparams"]["dest_title"])
|
|
elif combination == "move/move":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
supress=True if "suppressredirect" in change["logparams"] else False,
|
|
target=change["logparams"]['target_title'])
|
|
elif combination == "move/move_redir":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
target=change["logparams"]["target_title"])
|
|
elif combination == "protect/move_prot":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["logparams"]["oldtitle_title"], desc=parsedcomment,
|
|
target=change["title"])
|
|
elif combination == "block/block":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"],
|
|
desc=parsedcomment, duration=change["logparams"]["duration"])
|
|
elif combination == "block/unblock":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"],
|
|
desc=parsedcomment)
|
|
elif combination == "block/reblock":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"],
|
|
desc=parsedcomment)
|
|
elif combination == "rights/rights":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
old_groups=change["logparams"]["oldgroups"], new_groups=change["logparams"]["newgroups"])
|
|
elif combination == "rights/autopromote":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
old_groups=change["logparams"]["oldgroups"], new_groups=change["logparams"]["newgroups"])
|
|
elif combination == "abusefilter/modify":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment,
|
|
filternr=change["logparams"]['1'])
|
|
elif combination == "interwiki/iw_add":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment,
|
|
prefix=change["logparams"]['0'], website=change["logparams"]['1'])
|
|
elif combination == "interwiki/iw_edit":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment,
|
|
prefix=change["logparams"]['0'], website=change["logparams"]['1'])
|
|
elif combination == "interwiki/iw_delete":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment,
|
|
prefix=change["logparams"]['0'])
|
|
elif combination == "curseprofile/comment-created":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1],
|
|
commentid=change["logparams"]["0"])
|
|
elif combination == "curseprofile/comment-edited":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1],
|
|
commentid=change["logparams"]["0"])
|
|
elif combination == "curseprofile/comment-deleted":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1],
|
|
commentid=change["logparams"]["0"])
|
|
elif combination == "curseprofile/profile-edited":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1],
|
|
field=change["logparams"]['0'], desc=change["parsedcomment"])
|
|
elif combination == "curseprofile/comment-replied":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1],
|
|
commentid=change["logparams"]["0"])
|
|
elif combination == "contentmodel/change":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
oldmodel=change["logparams"]["oldmodel"], newmodel=change["logparams"]["newmodel"])
|
|
elif combination == "sprite/sprite":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
|
|
elif combination == "sprite/sheet":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
|
|
elif combination == "sprite/slice":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
|
|
elif combination == "managetags/create":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
additional=change["logparams"])
|
|
elif combination == "managetags/delete":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
additional=change["logparams"])
|
|
elif combination == "managetags/activate":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
additional=change["logparams"])
|
|
elif combination == "managetags/deactivate":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
additional=change["logparams"])
|
|
elif combination == "tag/update":
|
|
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
|
|
else:
|
|
logging.warning("No entry matches given change!")
|
|
print(change)
|
|
send(_("Unable to process the event"), _("error"), settings["avatars"]["no_event"])
|
|
return
|
|
if change["type"] == "external": # not sure what happens then, but it's listed as possible type
|
|
logging.warning("External event happened, ignoring.")
|
|
print(change)
|
|
return
|
|
elif change["type"] == "new" and "new" not in settings["ignored"]: # new page
|
|
STATIC_VARS = {**STATIC_VARS, **{"color": settings["appearance"]["new"]["color"],
|
|
"icon": settings["appearance"]["new"]["icon"]}}
|
|
webhook_formatter("new", STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
|
|
oldrev=change["old_revid"], pageid=change["pageid"], diff=change["revid"],
|
|
size=change["newlen"])
|
|
|
|
|
|
def day_overview_request():
|
|
logging.info("Fetching daily overview... This may take up to 30 seconds!")
|
|
timestamp = (datetime.datetime.utcnow() - datetime.timedelta(hours=24)).isoformat(timespec='milliseconds')
|
|
logging.debug("timestamp is {}".format(timestamp))
|
|
complete = False
|
|
result = []
|
|
passes = 0
|
|
continuearg = ""
|
|
while not complete and passes < 10:
|
|
request = recent_changes.safe_request(
|
|
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=recentchanges&rcend={timestamp}Z&rcprop=title%7Ctimestamp%7Csizes%7Cloginfo%7Cuser&rcshow=!bot&rclimit=500&rctype=edit%7Cnew%7Clog{continuearg}".format(
|
|
wiki=settings["wiki"], timestamp=timestamp, continuearg=continuearg))
|
|
if request:
|
|
try:
|
|
request = request.json()
|
|
rc = request['query']['recentchanges']
|
|
continuearg = request["continue"]["rccontinue"] if "continue" in request else None
|
|
except ValueError:
|
|
logging.warning("ValueError in fetching changes")
|
|
recent_changes.downtime_controller()
|
|
complete = 2
|
|
except KeyError:
|
|
logging.warning("Wiki returned %s" % (request.json()))
|
|
complete = 2
|
|
else:
|
|
result += rc
|
|
if continuearg:
|
|
continuearg = "&rccontinue={}".format(continuearg)
|
|
passes += 1
|
|
logging.debug(
|
|
"continuing requesting next pages of recent changes with {} passes and continuearg being {}".format(
|
|
passes, continuearg))
|
|
time.sleep(3.0)
|
|
else:
|
|
complete = 1
|
|
else:
|
|
complete = 2
|
|
if passes == 10:
|
|
logging.debug("quit the loop because there been too many passes")
|
|
return (result, complete)
|
|
|
|
|
|
def add_to_dict(dictionary, key):
|
|
if key in dictionary:
|
|
dictionary[key] += 1
|
|
else:
|
|
dictionary[key] = 1
|
|
return dictionary
|
|
|
|
|
|
def day_overview(): # time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime(time.time()))
|
|
# (datetime.datetime.utcnow()+datetime.timedelta(hours=0)).isoformat(timespec='milliseconds')+'Z'
|
|
result = day_overview_request()
|
|
if result[1] == 1:
|
|
activity = defaultdict(dict)
|
|
hours = defaultdict(dict)
|
|
edits = 0
|
|
files = 0
|
|
admin = 0
|
|
changed_bytes = 0
|
|
new_articles = 0
|
|
if not result[0] and not settings["send_empty_overview"]:
|
|
return # no changes in this day
|
|
for item in result[0]:
|
|
activity = add_to_dict(activity, item["user"])
|
|
hours = add_to_dict(hours, datetime.datetime.strptime(item["timestamp"], "%Y-%m-%dT%H:%M:%SZ").hour)
|
|
if "actionhidden" in item or "suppressed" in item:
|
|
continue # while such actions have type value (edit/new/log) many other values are hidden and therefore can crash with key error, let's not process such events
|
|
if item["type"] == "edit":
|
|
edits += 1
|
|
changed_bytes += item["newlen"] - item["oldlen"]
|
|
if item["type"] == "new":
|
|
if item["ns"] == 0:
|
|
new_articles += 1
|
|
changed_bytes += item["newlen"]
|
|
if item["type"] == "log":
|
|
files = files + 1 if item["logtype"] == item["logaction"] == "upload" else files
|
|
admin = admin + 1 if item["logtype"] in ["delete", "merge", "block", "protect", "import", "rights",
|
|
"abusefilter", "interwiki", "managetags"] else admin
|
|
overall = round(new_articles + edits * 0.1 + files * 0.3 + admin * 0.1 + math.fabs(changed_bytes * 0.001), 2)
|
|
embed = defaultdict(dict)
|
|
embed["title"] = _("Daily overview")
|
|
embed["url"] = "https://{wiki}.gamepedia.com/Special:Statistics".format(wiki=settings["wiki"])
|
|
embed["color"] = settings["appearance"]["daily_overview"]["color"]
|
|
embed["author"]["icon_url"] = settings["appearance"]["daily_overview"]["icon"]
|
|
embed["author"]["name"] = settings["wikiname"]
|
|
embed["author"]["url"] = "https://{wiki}.gamepedia.com/".format(wiki=settings["wiki"])
|
|
if activity:
|
|
v = activity.values()
|
|
active_users = []
|
|
for user, numberu in Counter(activity).most_common(list(v).count(max(v))): # find most active users
|
|
active_users.append(user)
|
|
# the_one = random.choice(active_users)
|
|
v = hours.values()
|
|
active_hours = []
|
|
for hour, numberh in Counter(hours).most_common(list(v).count(max(v))): # find most active users
|
|
active_hours.append(str(hour))
|
|
usramount = ngettext(" ({} action)", " ({} actions)", numberu).format(numberu)
|
|
houramount = ngettext(" UTC ({} action)", " UTC ({} actions)", numberh).format(numberh)
|
|
else:
|
|
active_users = [_("But nobody came")] # a reference to my favorite game of all the time, sorry ^_^
|
|
active_hours = [_("But nobody came")]
|
|
usramount = ""
|
|
houramount = ""
|
|
embed["fields"] = []
|
|
fields = (
|
|
(ngettext("Most active user", "Most active users", len(active_users)), ', '.join(active_users) + usramount),
|
|
(_("Edits made"), edits), (_("New files"), files), (_("Admin actions"), admin),
|
|
(_("Bytes changed"), changed_bytes), (_("New articles"), new_articles),
|
|
(_("Unique contributors"), str(len(activity))),
|
|
(ngettext("Most active hour", "Most active hours", len(active_hours)), ', '.join(active_hours) + houramount),
|
|
(_("Day score"), str(overall)))
|
|
for name, value in fields:
|
|
embed["fields"].append({"name": name, "value": value})
|
|
data = {"embeds": [dict(embed)]}
|
|
formatted_embed = json.dumps(data, indent=4)
|
|
send_to_discord(formatted_embed)
|
|
else:
|
|
logging.debug("function requesting changes for day overview returned with error code")
|
|
|
|
|
|
class recent_changes_class(object):
|
|
starttime = time.time()
|
|
ids = []
|
|
map_ips = {}
|
|
recent_id = 0
|
|
downtimecredibility = 0
|
|
last_downtime = 0
|
|
clock = 0
|
|
tags = {}
|
|
groups = {}
|
|
unsent_messages = []
|
|
streak = -1
|
|
session = requests.Session()
|
|
session.headers.update(settings["header"])
|
|
if settings["limitrefetch"] != -1:
|
|
with open("lastchange.txt", "r") as record:
|
|
file_content = record.read().strip()
|
|
if file_content:
|
|
file_id = int(file_content)
|
|
logging.debug("File_id is {val}".format(val=file_id))
|
|
else:
|
|
logging.debug("File is empty")
|
|
file_id = 999999999
|
|
else:
|
|
file_id = 999999999 # such value won't cause trouble, and it will make sure no refetch happen
|
|
|
|
def handle_mw_errors(self, request):
|
|
if "errors" in request:
|
|
print(request["errors"])
|
|
raise MWError
|
|
return request
|
|
|
|
def log_in(self):
|
|
global logged_in
|
|
# session.cookies.clear()
|
|
if '@' not in settings["wiki_bot_login"]:
|
|
logging.error(
|
|
"Please provide proper nickname for login from https://{wiki}.gamepedia.com/Special:BotPasswords".format(
|
|
wiki=settings["wiki"]))
|
|
return
|
|
if len(settings["wiki_bot_password"]) != 32:
|
|
logging.error(
|
|
"Password seems incorrect. It should be 32 characters long! Grab it from https://{wiki}.gamepedia.com/Special:BotPasswords".format(
|
|
wiki=settings["wiki"]))
|
|
return
|
|
logging.info("Trying to log in to https://{wiki}.gamepedia.com...".format(wiki=settings["wiki"]))
|
|
try:
|
|
response = self.handle_mw_errors(
|
|
self.session.post("https://{wiki}.gamepedia.com/api.php".format(wiki=settings["wiki"]),
|
|
data={'action': 'query', 'format': 'json', 'utf8': '', 'meta': 'tokens',
|
|
'type': 'login'}))
|
|
response = self.handle_mw_errors(
|
|
self.session.post("https://{wiki}.gamepedia.com/api.php".format(wiki=settings["wiki"]),
|
|
data={'action': 'login', 'format': 'json', 'utf8': '',
|
|
'lgname': settings["wiki_bot_login"],
|
|
'lgpassword': settings["wiki_bot_password"],
|
|
'lgtoken': response.json()['query']['tokens']['logintoken']}))
|
|
except ValueError:
|
|
logging.error("Logging in have not succeeded")
|
|
return
|
|
except MWError:
|
|
logging.error("Logging in have not succeeded")
|
|
return
|
|
try:
|
|
if response.json()['login']['result'] == "Success":
|
|
logging.info("Logging to the wiki succeeded")
|
|
logged_in = True
|
|
else:
|
|
logging.error("Logging in have not succeeded")
|
|
except:
|
|
logging.error("Logging in have not succeeded")
|
|
|
|
def add_cache(self, change):
|
|
self.ids.append(change["rcid"])
|
|
# self.recent_id = change["rcid"]
|
|
if len(self.ids) > settings["limitrefetch"] + 5:
|
|
self.ids.pop(0)
|
|
|
|
def fetch(self, amount=settings["limit"]):
|
|
if self.unsent_messages:
|
|
logging.info(
|
|
"{} messages waiting to be delivered to Discord due to Discord throwing errors/no connection to Discord servers.".format(
|
|
len(self.unsent_messages)))
|
|
for num, item in enumerate(self.unsent_messages):
|
|
logging.debug(
|
|
"Trying to send a message to Discord from the queue with id of {} and content {}".format(str(num),
|
|
str(item)))
|
|
if send_to_discord_webhook(item) < 2:
|
|
logging.debug("Sending message succeeded")
|
|
time.sleep(2.5)
|
|
else:
|
|
logging.debug("Sending message failed")
|
|
break
|
|
else:
|
|
self.unsent_messages = []
|
|
logging.debug("Queue emptied, all messages delivered")
|
|
self.unsent_messages = self.unsent_messages[num:]
|
|
logging.debug(self.unsent_messages)
|
|
last_check = self.fetch_changes(amount=amount)
|
|
self.recent_id = last_check if last_check is not None else self.file_id
|
|
if settings["limitrefetch"] != -1 and self.recent_id != self.file_id:
|
|
self.file_id = self.recent_id
|
|
with open("lastchange.txt", "w") as record:
|
|
record.write(str(self.file_id))
|
|
logging.debug("Most recent rcid is: {}".format(self.recent_id))
|
|
return self.recent_id
|
|
|
|
def fetch_changes(self, amount, clean=False):
|
|
global logged_in
|
|
if len(self.ids) == 0:
|
|
logging.debug("ids is empty, triggering clean fetch")
|
|
clean = True
|
|
changes = self.safe_request(
|
|
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=recentchanges&rcshow=!bot&rcprop=title%7Ctimestamp%7Cids%7Cloginfo%7Cparsedcomment%7Csizes%7Cflags%7Ctags%7Cuser&rclimit={amount}&rctype=edit%7Cnew%7Clog%7Cexternal%7Ccategorize".format(
|
|
wiki=settings["wiki"], amount=amount))
|
|
if changes:
|
|
try:
|
|
changes = changes.json()['query']['recentchanges']
|
|
changes.reverse()
|
|
except ValueError:
|
|
logging.warning("ValueError in fetching changes")
|
|
if changes.url == "https://www.gamepedia.com":
|
|
logging.critical(
|
|
"The wiki specified in the settings most probably doesn't exist, got redirected to gamepedia.com")
|
|
sys.exit(1)
|
|
self.downtime_controller()
|
|
return None
|
|
except KeyError:
|
|
logging.warning("Wiki returned %s" % (changes.json()))
|
|
return None
|
|
else:
|
|
if self.downtimecredibility > 0:
|
|
self.downtimecredibility -= 1
|
|
if self.streak > -1:
|
|
self.streak += 1
|
|
if self.streak > 8:
|
|
self.streak = -1
|
|
send(_("Connection to {wiki} seems to be stable now.").format(wiki=settings["wikiname"]),
|
|
_("Connection status"), settings["avatars"]["connection_restored"])
|
|
# In the first for loop we analize the categorize events and figure if we will need more changes to fetch
|
|
# in order to cover all of the edits
|
|
categorize_events = {}
|
|
new_events = 0
|
|
for change in changes:
|
|
if not (change["rcid"] in self.ids or change["rcid"] < self.recent_id):
|
|
new_events += 1
|
|
if new_events == settings["limit"]:
|
|
if amount < 500:
|
|
# call the function again with max limit for more results, ignore the ones in this request
|
|
logging.debug("There were too many new events, requesting max amount of events from the wiki.")
|
|
return self.fetch(amount=5000 if logged_in else 500)
|
|
else:
|
|
logging.debug(
|
|
"There were too many new events, but the limit was high enough we don't care anymore about fetching them all.")
|
|
if change["type"] == "categorize":
|
|
if change["revid"] in categorize_events:
|
|
categorize_events[change["revid"]].append(change["title"])
|
|
else:
|
|
categorize_events[change["revid"]] = [change["title"]]
|
|
for change in changes:
|
|
if change["rcid"] in self.ids or change["rcid"] < self.recent_id:
|
|
logging.debug("Change ({}) is in ids or is lower than recent_id {}".format(change["rcid"],
|
|
self.recent_id))
|
|
continue
|
|
logging.debug(self.ids)
|
|
logging.debug(self.recent_id)
|
|
self.add_cache(change)
|
|
if clean and not (self.recent_id == 0 and change["rcid"] > self.file_id):
|
|
logging.debug("Rejected {val}".format(val=change["rcid"]))
|
|
continue
|
|
first_pass(change, categorize_events.get(change.get("revid"), []))
|
|
return change["rcid"]
|
|
|
|
def safe_request(self, url):
|
|
try:
|
|
request = self.session.get(url, timeout=10)
|
|
except requests.exceptions.Timeout:
|
|
logging.warning("Reached timeout error for request on link {url}".format(url=url))
|
|
self.downtime_controller()
|
|
return None
|
|
except requests.exceptions.ConnectionError:
|
|
logging.warning("Reached connection error for request on link {url}".format(url=url))
|
|
self.downtime_controller()
|
|
return None
|
|
else:
|
|
return request
|
|
|
|
def check_connection(self, looped=False):
|
|
online = 0
|
|
for website in ["https://google.com", "https://instagram.com", "https://steamcommunity.com"]:
|
|
try:
|
|
requests.get(website, timeout=10)
|
|
online += 1
|
|
except requests.exceptions.ConnectionError:
|
|
pass
|
|
except requests.exceptions.Timeout:
|
|
pass
|
|
if online < 1:
|
|
logging.error("Failure when checking Internet connection at {time}".format(
|
|
time=time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime())))
|
|
self.downtimecredibility = 0
|
|
if not looped:
|
|
while 1: # recursed loop, check for connection (every 10 seconds) as long as three services are down, don't do anything else
|
|
if self.check_connection(looped=True):
|
|
recent_changes.fetch(amount=settings["limitrefetch"])
|
|
break
|
|
time.sleep(10)
|
|
return False
|
|
return True
|
|
|
|
def downtime_controller(self):
|
|
if not settings["show_updown_messages"]:
|
|
return
|
|
if self.streak > -1: # reset the streak of successful connections when bad one happens
|
|
self.streak = 0
|
|
if self.downtimecredibility < 60:
|
|
self.downtimecredibility += 15
|
|
else:
|
|
if (
|
|
time.time() - self.last_downtime) > 1800 and self.check_connection(): # check if last downtime happened within 30 minutes, if yes, don't send a message
|
|
send(_("{wiki} seems to be down or unreachable.").format(wiki=settings["wikiname"]),
|
|
_("Connection status"), settings["avatars"]["connection_failed"])
|
|
self.last_downtime = time.time()
|
|
self.streak = 0
|
|
|
|
def clear_cache(self):
|
|
self.map_ips = {}
|
|
|
|
def update_tags(self):
|
|
tags_read = safe_read(self.safe_request(
|
|
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=tags&tglimit=max&tgprop=name|displayname".format(
|
|
wiki=settings["wiki"])), "query", "tags")
|
|
if tags_read:
|
|
for tag in tags_read:
|
|
self.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text()
|
|
else:
|
|
logging.warning("Could not retrive tags. Internal names will be used!")
|
|
|
|
|
|
recent_changes = recent_changes_class()
|
|
if settings["wiki_bot_login"] and settings["wiki_bot_password"]:
|
|
recent_changes.log_in()
|
|
recent_changes.update_tags()
|
|
time.sleep(1.0)
|
|
recent_changes.fetch(amount=settings["limitrefetch"] if settings["limitrefetch"] != -1 else settings["limit"])
|
|
|
|
schedule.every(settings["cooldown"]).seconds.do(recent_changes.fetch)
|
|
if 1 == 2:
|
|
print(_("director"), _("bot"), _("editor"), _("directors"), _("sysop"), _("bureaucrat"), _("reviewer"),
|
|
_("autoreview"), _("autopatrol"), _("wiki_guardian"))
|
|
|
|
if settings["overview"]:
|
|
schedule.every().day.at("{}:{}".format(time.strptime(settings["overview_time"], '%H:%M').tm_hour,
|
|
time.strptime(settings["overview_time"], '%H:%M').tm_min)).do(day_overview)
|
|
schedule.every().day.at("00:00").do(recent_changes.clear_cache)
|
|
|
|
while 1:
|
|
time.sleep(1.0)
|
|
schedule.run_pending()
|