RcGcDw/rcgcdw.py
2019-01-28 10:05:40 +00:00

1137 lines
60 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
# Recent changes Gamepedia compatible Discord webhook is a project for using a webhook as recent changes page from MediaWiki.
# Copyright (C) 2018 Frisk
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# WARNING! SHITTY CODE AHEAD. ENTER ONLY IF YOU ARE SURE YOU CAN TAKE IT
# You have been warned
import time, logging, json, requests, datetime, re, gettext, math, random, os.path, schedule, sys
from bs4 import BeautifulSoup
from collections import defaultdict, Counter
from urllib.parse import quote_plus
from html.parser import HTMLParser
with open("settings.json") as sfile:
settings = json.load(sfile)
if settings["limitrefetch"] < settings["limit"] and settings["limitrefetch"] != -1:
settings["limitrefetch"] = settings["limit"]
logged_in = False
logging.basicConfig(level=settings["verbose_level"])
if settings["limitrefetch"] != -1 and os.path.exists("lastchange.txt") == False:
with open("lastchange.txt", 'w') as sfile:
sfile.write("99999999999")
logging.debug("Current settings: {settings}".format(settings=settings))
lang = gettext.translation('rcgcdw', localedir='locale', languages=[settings["lang"]])
lang.install()
ngettext = lang.ngettext
class MWError(Exception):
pass
class LinkParser(HTMLParser):
new_string = ""
recent_href = ""
def handle_starttag(self, tag, attrs):
for attr in attrs:
if attr[0] == 'href':
self.recent_href = attr[1]
if self.recent_href.startswith("//"):
self.recent_href = "https:{rest}".format(rest=self.recent_href)
elif not self.recent_href.startswith("http"):
self.recent_href = "https://{wiki}.gamepedia.com".format(wiki=settings["wiki"]) + self.recent_href
self.recent_href = self.recent_href.replace(")", "\\)")
def handle_data(self, data):
if self.recent_href:
self.new_string = self.new_string + "[{}]({})".format(data, self.recent_href)
self.recent_href = ""
else:
self.new_string = self.new_string + data
def handle_comment(self, data):
self.new_string = self.new_string + data
def handle_endtag(self, tag):
logging.debug(self.new_string)
LinkParser = LinkParser()
def send(message, name, avatar):
dictionary_creator = {}
dictionary_creator["content"] = message
if name:
dictionary_creator["username"] = name
if avatar:
dictionary_creator["avatar_url"] = avatar
send_to_discord(dictionary_creator)
def safe_read(request, *keys):
if request is None:
return None
try:
request = request.json()
for item in keys:
request = request[item]
except KeyError:
logging.warning(
"Failure while extracting data from request on key {key} in {change}".format(key=item, change=request))
return None
except ValueError:
logging.warning("Failure while extracting data from request in {change}".format(change=request))
return None
return request
def send_to_discord_webhook(data):
header = settings["header"]
if "content" not in data:
header['Content-Type'] = 'application/json'
else:
header['Content-Type'] = 'application/x-www-form-urlencoded'
try:
result = requests.post(settings["webhookURL"], data=data,
headers=header, timeout=10)
except requests.exceptions.Timeout:
logging.warning("Timeouted while sending data to the webhook.")
return 3
except requests.exceptions.ConnectionError:
logging.warning("Connection error while sending the data to a webhook")
return 3
else:
return handle_discord_http(result.status_code, data, result)
def send_to_discord(data):
if recent_changes.unsent_messages:
recent_changes.unsent_messages.append(data)
else:
code = send_to_discord_webhook(data)
if code == 3:
recent_changes.unsent_messages.append(data)
elif code == 2:
time.sleep(5.0)
recent_changes.unsent_messages.append(data)
elif code < 2:
time.sleep(2.5)
pass
def webhook_formatter(action, STATIC, **params):
logging.debug("Received things: {thing}".format(thing=params))
colornumber = None if isinstance(STATIC["color"], str) else STATIC["color"]
data = {"embeds": []}
embed = defaultdict(dict)
if STATIC["ipaction"]:
author_url = "https://{wiki}.gamepedia.com/Special:Contributions/{user}".format(wiki=settings["wiki"],
user=params["user"])
logging.debug("current user: {} with cache of IPs: {}".format(params["user"], recent_changes.map_ips.keys()))
if params["user"] not in list(recent_changes.map_ips.keys()):
contibs = safe_read(recent_changes.safe_request(
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=usercontribs&uclimit=max&ucuser={user}&ucstart={timestamp}&ucprop=".format(
wiki=settings["wiki"], user=params["user"], timestamp=STATIC["timestamp"])), "query", "usercontribs")
if contibs is None:
logging.warning(
"WARNING: Something went wrong when checking amount of contributions for given IP address")
params["user"] = params["user"] + "(?)"
else:
recent_changes.map_ips[params["user"]] = len(contibs)
logging.debug("1Current params user {} and state of map_ips {}".format(params["user"], recent_changes.map_ips))
params["user"] = "{author} ({contribs})".format(author=params["user"], contribs=len(contibs))
else:
logging.debug(
"2Current params user {} and state of map_ips {}".format(params["user"], recent_changes.map_ips))
if action in ("edit", "new"):
recent_changes.map_ips[params["user"]] += 1
params["user"] = "{author} ({amount})".format(author=params["user"],
amount=recent_changes.map_ips[params["user"]])
else:
author_url = "https://{wiki}.gamepedia.com/User:{user}".format(wiki=settings["wiki"],
user=params["user"].replace(" ", "_"))
if action in ("edit", "new"): # edit or new page
editsize = params["size"]
if editsize > 0:
if editsize > 6032:
colornumber = 65280
else:
colornumber = 35840 + (math.floor(editsize / 52)) * 256
elif editsize < 0:
if editsize < -6032:
colornumber = 16711680
else:
colornumber = 9175040 + (math.floor((editsize * -1) / (52))) * 65536
elif editsize == 0:
colornumber = 8750469
link = "https://{wiki}.gamepedia.com/index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format(
wiki=settings["wiki"], pageid=params["pageid"], diff=params["diff"], oldrev=params["oldrev"],
article=params["title"].replace(" ", "_"))
embed["title"] = "{redirect}{article} ({new}{minor}{editsize})".format(redirect="" if STATIC["redirect"] else "",article=params["title"], editsize="+" + str(
editsize) if editsize > 0 else editsize, new=_("(N!) ") if action == "new" else "",
minor=_("m ") if action == "edit" and params[
"minor"] else "")
elif action in ("upload/overwrite", "upload/upload"): # sending files
license = None
urls = safe_read(recent_changes.safe_request(
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&prop=imageinfo&list=&meta=&titles={filename}&iiprop=timestamp%7Curl&iilimit=2".format(
wiki=settings["wiki"], filename=params["title"])), "query", "pages")
undolink = ""
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
additional_info_retrieved = False
if urls is not None:
logging.debug(urls)
if "-1" not in urls: # oage removed before we asked for it
img_info = next(iter(urls.values()))["imageinfo"]
embed["image"]["url"] = img_info[0]["url"]
additional_info_retrieved = True
else:
pass
if params["overwrite"]:
if additional_info_retrieved:
article_encoded = params["title"].replace(" ", "_").replace(')', '\)')
img_timestamp = [x for x in img_info[1]["timestamp"] if x.isdigit()]
undolink = "https://{wiki}.gamepedia.com/index.php?title={filename}&action=revert&oldimage={timestamp}%21{filenamewon}".format(
wiki=settings["wiki"], filename=article_encoded, timestamp="".join(img_timestamp),
filenamewon=article_encoded.split(":", 1)[1])
embed["fields"] = [{"name": _("Options"), "value": _("([preview]({link}) | [undo]({undolink}))").format(
link=embed["image"]["url"], undolink=undolink)}]
embed["title"] = _("Uploaded a new version of {name}").format(name=params["title"])
else:
embed["title"] = _("Uploaded {name}").format(name=params["title"])
article_content = safe_read(recent_changes.safe_request(
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&prop=revisions&titles={article}&rvprop=content".format(
wiki=settings["wiki"], article=quote_plus(params["title"], safe=''))), "query", "pages")
if article_content is None:
logging.warning("Something went wrong when getting license for the image")
return 0
if "-1" not in article_content:
content = list(article_content.values())[0]['revisions'][0]['*']
try:
matches = re.search(re.compile(settings["license_regex"], re.IGNORECASE), content)
if matches is not None:
license = matches.group("license")
else:
if re.search(re.compile(settings["license_regex_detect"], re.IGNORECASE), content) is None:
license = _("**No license!**")
else:
license = "?"
except IndexError:
logging.error(
"Given regex for the license detection is incorrect. It does not have a capturing group called \"license\" specified. Please fix license_regex value in the config!")
license = "?"
except re.error:
logging.error(
"Given regex for the license detection is incorrect. Please fix license_regex or license_regex_detect values in the config!")
license = "?"
if additional_info_retrieved:
embed["fields"] = [
{"name": _("Options"), "value": _("([preview]({link}))").format(link=embed["image"]["url"])}]
params["desc"] = _("{desc}\nLicense: {license}").format(desc=params["desc"],
license=license if license is not None else "?")
elif action == "delete/delete":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Deleted page {article}").format(article=params["title"])
elif action == "delete/delete_redir":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Deleted redirect {article} by overwriting").format(article=params["title"])
elif action == "move/move":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["target"].replace(" ", "_"))
params["desc"] = "{supress}. {desc}".format(desc=params["desc"],
supress=_("No redirect has been made") if params[
"supress"] == True else _(
"A redirect has been made"))
embed["title"] = _("Moved {redirect}{article} to {target}").format(redirect="" if STATIC["redirect"] else "", article=params["title"], target=params["target"])
elif action == "move/move_redir":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["target"].replace(" ", "_"))
embed["title"] = _("Moved {redirect}{article} to {title} over redirect").format(redirect="" if STATIC["redirect"] else "", article=params["title"],
title=params["target"])
elif action == "protect/move_prot":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Moved protection settings from {redirect}{article} to {title}").format(redirect="" if STATIC["redirect"] else "", article=params["title"],
title=params["target"])
elif action == "block/block":
link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"],
user=params["blocked_user"].replace(" ", "_").replace(')',
'\)'))
user = params["blocked_user"].split(':')[1]
block_time = _("infinity and beyond") if params["duration"] == "infinite" else params["duration"]
embed["title"] = _("Blocked {blocked_user} for {time}").format(blocked_user=user, time=block_time)
elif action == "block/reblock":
link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"],
user=params["blocked_user"].replace(" ", "_").replace(')',
'\)'))
user = params["blocked_user"].split(':')[1]
embed["title"] = _("Changed block settings for {blocked_user}").format(blocked_user=user)
elif action == "block/unblock":
link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"],
user=params["blocked_user"].replace(" ", "_").replace(')',
'\)'))
user = params["blocked_user"].split(':')[1]
embed["title"] = _("Unblocked {blocked_user}").format(blocked_user=user)
elif action == "curseprofile/comment-created":
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"],
commentid=params["commentid"])
# link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) old way of linking
embed["title"] = _("Left a comment on {target}'s profile").format(target=params["target"]) if params[
"target"] != \
params[
"user"] else _(
"Left a comment on their own profile")
elif action == "curseprofile/comment-replied":
# link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)'))
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"],
commentid=params["commentid"])
embed["title"] = _("Replied to a comment on {target}'s profile").format(target=params["target"]) if params[
"target"] != \
params[
"user"] else _(
"Replied to a comment on their own profile")
elif action == "curseprofile/comment-edited":
# link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)'))
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"],
commentid=params["commentid"])
embed["title"] = _("Edited a comment on {target}'s profile").format(target=params["target"]) if params[
"target"] != \
params[
"user"] else _(
"Edited a comment on their own profile")
elif action == "curseprofile/profile-edited":
link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"],
target=params["target"].replace(" ",
"_").replace(
')', '\)'))
if params["field"] == "profile-location":
field = _("Location")
elif params["field"] == "profile-aboutme":
field = _("About me")
elif params["field"] == "profile-link-google":
field = _("Google link")
elif params["field"] == "profile-link-facebook":
field = _("Facebook link")
elif params["field"] == "profile-link-twitter":
field = _("Twitter link")
elif params["field"] == "profile-link-reddit":
field = _("Reddit link")
elif params["field"] == "profile-link-twitch":
field = _("Twitch link")
elif params["field"] == "profile-link-psn":
field = _("PSN link")
elif params["field"] == "profile-link-vk":
field = _("VK link")
elif params["field"] == "profile-link-xbl":
field = _("XVL link")
elif params["field"] == "profile-link-steam":
field = _("Steam link")
else:
field = _("Unknown")
embed["title"] = _("Edited {target}'s profile").format(target=params["target"]) if params["user"] != params[
"target"] else _("Edited their own profile")
params["desc"] = _("{field} field changed to: {desc}").format(field=field, desc=params["desc"])
elif action == "curseprofile/comment-deleted":
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"],
commentid=params["commentid"])
# link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)'))
embed["title"] = _("Deleted a comment on {target}'s profile").format(target=params["target"])
elif action in ("rights/rights", "rights/autopromote"):
link = "https://{wiki}.gamepedia.com/User:".format(wiki=settings["wiki"]) + params["title"].split(":")[1]
if action == "rights/rights":
embed["title"] = _("Changed group membership for {target}").format(target=params["title"].split(":")[1])
else:
params["user"] = _("System")
author_url = ""
embed["title"] = _("{target} got autopromoted to a new usergroup").format(
target=params["title"].split(":")[1])
if len(params["old_groups"]) < len(params["new_groups"]):
embed["thumbnail"]["url"] = "https://i.imgur.com/WnGhF5g.gif"
old_groups = []
new_groups = []
for name in params["old_groups"]:
old_groups.append(_(name))
for name in params["new_groups"]:
new_groups.append(_(name))
if len(old_groups) == 0:
old_groups = [_("none")]
if len(new_groups) == 0:
new_groups = [_("none")]
reason = ": {desc}".format(desc=params["desc"]) if params["desc"] != _("No description provided") else ""
params["desc"] = _("Groups changed from {old_groups} to {new_groups}{reason}").format(
old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), reason=reason)
elif action == "protect/protect":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Protected {target}").format(target=params["title"])
params["desc"] = "{settings}{cascade} | {reason}".format(settings=params["settings"],
cascade=_(" [cascading]") if params["cascade"] else "",
reason=params["desc"])
elif action == "protect/modify":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Changed protection level for {article}").format(article=params["title"])
params["desc"] = "{settings}{cascade} | {reason}".format(settings=params["settings"],
cascade=_(" [cascading]") if params["cascade"] else "",
reason=params["desc"])
elif action == "protect/unprotect":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Removed protection from {article}").format(article=params["title"])
elif action == "delete/revision":
amount = len(params["amount"])
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = ngettext("Changed visibility of revision on page {article} ",
"Changed visibility of {amount} revisions on page {article} ", amount).format(
article=params["title"], amount=amount)
elif action == "import/upload":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = ngettext("Imported {article} with {count} revision",
"Imported {article} with {count} revisions", params["amount"]).format(
article=params["title"], count=params["amount"])
elif action == "delete/restore":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Restored {article}").format(article=params["title"])
elif action == "delete/event":
link = "https://{wiki}.gamepedia.com/Special:RecentChanges".format(wiki=settings["wiki"])
embed["title"] = _("Changed visibility of log events")
elif action == "import/interwiki":
link = "https://{wiki}.gamepedia.com/Special:RecentChanges".format(wiki=settings["wiki"])
embed["title"] = _("Imported interwiki")
elif action == "abusefilter/modify":
link = "https://{wiki}.gamepedia.com/Special:AbuseFilter/history/{number}/diff/prev/{historyid}".format(wiki=settings["wiki"], number=params["filternr"], historyid=params["historyid"])
embed["title"] = _("Edited abuse filter number {number}").format(number=params["filternr"])
elif action == "abusefilter/create":
link = "https://{wiki}.gamepedia.com/Special:AbuseFilter/{number}".format(wiki=settings["wiki"], number=params["filternr"])
embed["title"] = _("Created abuse filter number {number}").format(number=params["filternr"])
elif action == "merge/merge":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Merged revision histories of {article} into {dest}").format(article=params["title"],
dest=params["destination"])
elif action == "interwiki/iw_add":
link = "https://{wiki}.gamepedia.com/Special:Interwiki".format(wiki=settings["wiki"])
embed["title"] = _("Added an entry to the interwiki table")
params["desc"] = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=params["desc"],
prefix=params["prefix"],
website=params["website"])
elif action == "interwiki/iw_edit":
link = "https://{wiki}.gamepedia.com/Special:Interwiki".format(wiki=settings["wiki"])
embed["title"] = _("Edited an entry in interwiki table")
params["desc"] = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=params["desc"],
prefix=params["prefix"],
website=params["website"])
elif action == "interwiki/iw_delete":
link = "https://{wiki}.gamepedia.com/Special:Interwiki".format(wiki=settings["wiki"])
embed["title"] = _("Deleted an entry in interwiki table")
params["desc"] = _("Prefix: {prefix} | {desc}").format(desc=params["desc"], prefix=params["prefix"])
elif action == "contentmodel/change":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Changed the content model of the page {article}").format(article=params["title"])
params["desc"] = _("Model changed from {old} to {new}: {reason}").format(old=params["oldmodel"],
new=params["newmodel"],
reason=params["desc"])
elif action == "sprite/sprite":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Edited the sprite for {article}").format(article=params["title"])
elif action == "sprite/sheet":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Created the sprite sheet for {article}").format(article=params["title"])
elif action == "sprite/slice":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Edited the slice for {article}").format(article=params["title"])
elif action == "managetags/create":
link = "https://{wiki}.gamepedia.com/Special:Tags".format(wiki=settings["wiki"])
embed["title"] = _("Created a tag \"{tag}\"").format(tag=params["additional"]["tag"])
recent_changes.init_info()
elif action == "managetags/delete":
link = "https://{wiki}.gamepedia.com/Special:Tags".format(wiki=settings["wiki"])
embed["title"] = _("Deleted a tag \"{tag}\"").format(tag=params["additional"]["tag"])
recent_changes.init_info()
elif action == "managetags/activate":
link = "https://{wiki}.gamepedia.com/Special:Tags".format(wiki=settings["wiki"])
embed["title"] = _("Activated a tag \"{tag}\"").format(tag=params["additional"]["tag"])
elif action == "managetags/deactivate":
link = "https://{wiki}.gamepedia.com/Special:Tags".format(wiki=settings["wiki"])
embed["title"] = _("Deactivated a tag \"{tag}\"").format(tag=params["additional"]["tag"])
elif action == "suppressed":
link = "https://{wiki}.gamepedia.com/".format(wiki=settings["wiki"])
embed["title"] = _("Action has been hidden by administration.")
else:
logging.warning("No entry for {event} with params: {params}".format(event=action, params=params))
embed["author"]["name"] = params["user"]
embed["author"]["url"] = author_url
embed["author"]["icon_url"] = STATIC["icon"]
embed["url"] = link
if "desc" not in params:
params["desc"] = ""
embed["description"] = params["desc"]
embed["color"] = random.randrange(1, 16777215) if colornumber is None else math.floor(colornumber)
embed["timestamp"] = STATIC["timestamp"]
if STATIC["tags"]:
tag_displayname = []
if "fields" not in embed:
embed["fields"] = []
for tag in STATIC["tags"]:
if tag in recent_changes.tags:
tag_displayname.append(recent_changes.tags[tag])
else:
tag_displayname.append(tag)
embed["fields"].append({"name": _("Tags"), "value": ", ".join(tag_displayname)})
logging.debug("Current params in edit action: {}".format(params))
if "changed_categories" in STATIC and STATIC["changed_categories"] is not None and not (len(STATIC["changed_categories"]["new"]) == 0 and len(STATIC["changed_categories"]["removed"]) == 0):
if "fields" not in embed:
embed["fields"] = []
# embed["fields"].append({"name": _("Changed categories"), "value": ", ".join(params["new_categories"][0:15]) + ("" if (len(params["new_categories"]) < 15) else _(" and {} more").format(len(params["new_categories"])-14))})
new_cat = (_("**Added**: ") + ", ".join(STATIC["changed_categories"]["new"][0:16]) + ("\n" if len(STATIC["changed_categories"]["new"])<=15 else _(" and {} more\n").format(len(STATIC["changed_categories"]["new"])-15) ) ) if STATIC["changed_categories"]["new"] else ""
del_cat = (_("**Removed**: ") + ", ".join(STATIC["changed_categories"]["removed"][0:16]) + ("" if len(STATIC["changed_categories"]["removed"])<=15 else _(" and {} more").format(len(STATIC["changed_categories"]["removed"])-15) ) ) if STATIC["changed_categories"]["removed"] else ""
embed["fields"].append({"name": _("Changed categories"), "value": new_cat + del_cat})
data["embeds"].append(dict(embed))
data['avatar_url'] = settings["avatars"]["embed"]
formatted_embed = json.dumps(data, indent=4)
send_to_discord(formatted_embed)
def handle_discord_http(code, formatted_embed, result):
if 300 > code > 199: # message went through
return 0
elif code == 400: # HTTP BAD REQUEST result.status_code, data, result, header
logging.error(
"Following message has been rejected by Discord, please submit a bug on our bugtracker adding it:")
logging.error(formatted_embed)
logging.error(result.text)
return 1
elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND
logging.error("Webhook URL is invalid or no longer in use, please replace it with proper one.")
sys.exit(1)
elif code == 429:
logging.error("We are sending too many requests to the Discord, slowing down...")
return 2
elif 499 < code < 600:
logging.error(
"Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format(
code))
return 3
def first_pass(
change, changed_categories): # I've decided to split the embed formatter and change handler, maybe it's more messy this way, I don't know
if ("actionhidden" in change or "suppressed" in change) and "suppressed" not in settings["ignored"]:
webhook_formatter("suppressed",
{"timestamp": change["timestamp"], "color": settings["appearance"]["suppressed"]["color"],
"icon": settings["appearance"]["suppressed"]["icon"]}, user=change["user"])
return
if "commenthidden" not in change:
LinkParser.feed(change["parsedcomment"])
# parsedcomment = (BeautifulSoup(change["parsedcomment"], "lxml")).get_text()
parsedcomment = LinkParser.new_string
LinkParser.new_string = ""
else:
parsedcomment = _("~~hidden~~")
logging.debug(change)
STATIC_VARS = {"timestamp": change["timestamp"], "tags": change["tags"], "redirect": (True if "redirect" in change else False), "ipaction": (True if "anon" in change else False), "changed_categories": changed_categories}
if not parsedcomment:
parsedcomment = _("No description provided")
parsedcomment = re.sub(r"(`|_|\*|~|<|>|{|})", "\\\\\\1", parsedcomment, 0)
if change["type"] == "edit" and "edit" not in settings["ignored"]:
logging.debug("List of categories in first_pass: {}".format(changed_categories))
if "userhidden" in change:
change["user"] = _("hidden")
STATIC_VARS = {**STATIC_VARS, **{"color": settings["appearance"]["edit"]["color"],
"icon": settings["appearance"]["edit"]["icon"]}}
webhook_formatter("edit", STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
oldrev=change["old_revid"], pageid=change["pageid"], diff=change["revid"],
size=change["newlen"] - change["oldlen"], minor=True if "minor" in change else False)
elif change["type"] == "log":
combination = "{logtype}/{logaction}".format(logtype=change["logtype"], logaction=change["logaction"])
if combination in settings["ignored"]:
return
logging.debug("combination is {}".format(combination))
try:
STATIC_VARS = {**STATIC_VARS, **{"color": settings["appearance"][combination]["color"],
"icon": settings["appearance"][combination]["icon"]}}
except KeyError:
STATIC_VARS = {**STATIC_VARS, **{"color": "", "icon": ""}}
logging.error("No value in the settings has been given for {}".format(combination))
if combination == "protect/protect":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
settings=change["logparams"]["description"], cascade=True if "cascade" in change["logparams"] else False)
elif combination == "protect/modify":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
settings=change["logparams"]["description"], cascade=True if "cascade" in change["logparams"] else False)
elif combination == "protect/unprotect":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
elif combination == "upload/overwrite":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
overwrite=True)
elif combination == "upload/upload":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
overwrite=False)
elif combination == "delete/delete":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
elif combination == "delete/delete_redir":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
elif combination == "delete/restore":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
elif combination == "delete/revision":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
amount=change["logparams"]["ids"])
elif combination == "delete/event":
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment)
elif combination == "import/upload":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
amount=change["logparams"]["count"])
elif combination == "import/interwiki":
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment)
elif combination == "merge/merge":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
destination=change["logparams"]["dest_title"])
elif combination == "move/move":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
supress=True if "suppressredirect" in change["logparams"] else False,
target=change["logparams"]['target_title'])
elif combination == "move/move_redir":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
target=change["logparams"]["target_title"])
elif combination == "protect/move_prot":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["logparams"]["oldtitle_title"], desc=parsedcomment,
target=change["title"])
elif combination == "block/block":
webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"],
desc=parsedcomment, duration=change["logparams"]["duration"])
elif combination == "block/unblock":
webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"],
desc=parsedcomment)
elif combination == "block/reblock":
webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"],
desc=parsedcomment)
elif combination == "rights/rights":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
old_groups=change["logparams"]["oldgroups"], new_groups=change["logparams"]["newgroups"])
elif combination == "rights/autopromote":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
old_groups=change["logparams"]["oldgroups"], new_groups=change["logparams"]["newgroups"])
elif combination == "abusefilter/modify":
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment,
filternr=change["logparams"]['newId'], historyid=change["logparams"]["historyId"])
elif combination == "abusefilter/create":
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment,
filternr=change["logparams"]['newId'])
elif combination == "interwiki/iw_add":
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment,
prefix=change["logparams"]['0'], website=change["logparams"]['1'])
elif combination == "interwiki/iw_edit":
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment,
prefix=change["logparams"]['0'], website=change["logparams"]['1'])
elif combination == "interwiki/iw_delete":
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment,
prefix=change["logparams"]['0'])
elif combination == "curseprofile/comment-created":
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1],
commentid=change["logparams"]["4:comment_id"])
elif combination == "curseprofile/comment-edited":
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1],
commentid=change["logparams"]["4:comment_id"])
elif combination == "curseprofile/comment-deleted":
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1],
commentid=change["logparams"]["4:comment_id"])
elif combination == "curseprofile/profile-edited":
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1],
field=change["logparams"]['4:section'], desc=change["parsedcomment"])
elif combination == "curseprofile/comment-replied":
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1],
commentid=change["logparams"]["4:comment_id"])
elif combination == "contentmodel/change":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
oldmodel=change["logparams"]["oldmodel"], newmodel=change["logparams"]["newmodel"])
elif combination == "sprite/sprite":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
elif combination == "sprite/sheet":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
elif combination == "sprite/slice":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
elif combination == "managetags/create":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
additional=change["logparams"])
elif combination == "managetags/delete":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
additional=change["logparams"])
elif combination == "managetags/activate":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
additional=change["logparams"])
elif combination == "managetags/deactivate":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
additional=change["logparams"])
elif combination == "tag/update":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
else:
logging.warning("No entry matches given change!")
logging.warning("Entry: {}".format(change))
send(_("Unable to process the event"), _("error"), settings["avatars"]["no_event"])
return
# elif change["type"] == "external": # not sure what happens then, but it's listed as possible type
# logging.warning("External event happened, ignoring.")
# print(change)
# return
elif change["type"] == "new" and "new" not in settings["ignored"]: # new page
STATIC_VARS = {**STATIC_VARS, **{"color": settings["appearance"]["new"]["color"],
"icon": settings["appearance"]["new"]["icon"]}}
webhook_formatter("new", STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
oldrev=change["old_revid"], pageid=change["pageid"], diff=change["revid"],
size=change["newlen"])
elif change["type"] == "categorize":
return
else:
logging.warning("This event is not implemented in the bot.")
logging.debug("Cannot process event {}".format(change))
return
def day_overview_request():
logging.info("Fetching daily overview... This may take up to 30 seconds!")
timestamp = (datetime.datetime.utcnow() - datetime.timedelta(hours=24)).isoformat(timespec='milliseconds')
logging.debug("timestamp is {}".format(timestamp))
complete = False
result = []
passes = 0
continuearg = ""
while not complete and passes < 10:
request = recent_changes.safe_request(
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=recentchanges&rcend={timestamp}Z&rcprop=title%7Ctimestamp%7Csizes%7Cloginfo%7Cuser&rcshow=!bot&rclimit=500&rctype=edit%7Cnew%7Clog{continuearg}".format(
wiki=settings["wiki"], timestamp=timestamp, continuearg=continuearg))
if request:
try:
request = request.json()
rc = request['query']['recentchanges']
continuearg = request["continue"]["rccontinue"] if "continue" in request else None
except ValueError:
logging.warning("ValueError in fetching changes")
recent_changes.downtime_controller()
complete = 2
except KeyError:
logging.warning("Wiki returned %s" % (request.json()))
complete = 2
else:
result += rc
if continuearg:
continuearg = "&rccontinue={}".format(continuearg)
passes += 1
logging.debug(
"continuing requesting next pages of recent changes with {} passes and continuearg being {}".format(
passes, continuearg))
time.sleep(3.0)
else:
complete = 1
else:
complete = 2
if passes == 10:
logging.debug("quit the loop because there been too many passes")
return (result, complete)
def add_to_dict(dictionary, key):
if key in dictionary:
dictionary[key] += 1
else:
dictionary[key] = 1
return dictionary
def day_overview(): # time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime(time.time()))
# (datetime.datetime.utcnow()+datetime.timedelta(hours=0)).isoformat(timespec='milliseconds')+'Z'
result = day_overview_request()
if result[1] == 1:
activity = defaultdict(dict)
hours = defaultdict(dict)
articles = defaultdict(dict)
edits = 0
files = 0
admin = 0
changed_bytes = 0
new_articles = 0
if not result[0] and not settings["send_empty_overview"]:
return # no changes in this day
for item in result[0]:
if "actionhidden" in item or "suppressed" in item or "userhidden" in item:
continue # while such actions have type value (edit/new/log) many other values are hidden and therefore can crash with key error, let's not process such events
activity = add_to_dict(activity, item["user"])
hours = add_to_dict(hours, datetime.datetime.strptime(item["timestamp"], "%Y-%m-%dT%H:%M:%SZ").hour)
if item["type"] == "edit":
edits += 1
changed_bytes += item["newlen"] - item["oldlen"]
if item["ns"] == 0:
articles = add_to_dict(articles, item["title"])
if item["type"] == "new":
if item["ns"] == 0:
new_articles += 1
changed_bytes += item["newlen"]
if item["type"] == "log":
files = files + 1 if item["logtype"] == item["logaction"] == "upload" else files
admin = admin + 1 if item["logtype"] in ["delete", "merge", "block", "protect", "import", "rights",
"abusefilter", "interwiki", "managetags"] else admin
overall = round(new_articles + edits * 0.1 + files * 0.3 + admin * 0.1 + math.fabs(changed_bytes * 0.001), 2)
embed = defaultdict(dict)
embed["title"] = _("Daily overview")
embed["url"] = "https://{wiki}.gamepedia.com/Special:Statistics".format(wiki=settings["wiki"])
embed["color"] = settings["appearance"]["daily_overview"]["color"]
embed["author"]["icon_url"] = settings["appearance"]["daily_overview"]["icon"]
embed["author"]["name"] = settings["wikiname"]
embed["author"]["url"] = "https://{wiki}.gamepedia.com/".format(wiki=settings["wiki"])
if activity:
#v = activity.values()
active_users = []
for user, numberu in Counter(activity).most_common(3): # find most active users
active_users.append(user + ngettext(" ({} action)", " ({} actions)", numberu).format(numberu))
# the_one = random.choice(active_users)
#v = articles.values()
active_articles = []
for article, numbere in Counter(articles).most_common(3): # find most active users
active_articles.append(article + ngettext(" ({} edit)", " ({} edits)", numbere).format(numbere))
v = hours.values()
active_hours = []
for hour, numberh in Counter(hours).most_common(list(v).count(max(v))): # find most active hours
active_hours.append(str(hour))
houramount = ngettext(" UTC ({} action)", " UTC ({} actions)", numberh).format(numberh)
else:
active_users = [_("But nobody came")] # a reference to my favorite game of all the time, sorry ^_^
active_hours = [_("But nobody came")]
active_articles = [_("But nobody came")]
usramount = ""
houramount = ""
embed["fields"] = []
fields = (
(ngettext("Most active user", "Most active users", len(active_users)), ', '.join(active_users)),
(ngettext("Most edited article", "Most edited articles", len(active_articles)), ', '.join(active_articles)),
(_("Edits made"), edits), (_("New files"), files), (_("Admin actions"), admin),
(_("Bytes changed"), changed_bytes), (_("New articles"), new_articles),
(_("Unique contributors"), str(len(activity))),
(ngettext("Most active hour", "Most active hours", len(active_hours)), ', '.join(active_hours) + houramount),
(_("Day score"), str(overall)))
for name, value in fields:
embed["fields"].append({"name": name, "value": value})
data = {"embeds": [dict(embed)]}
formatted_embed = json.dumps(data, indent=4)
send_to_discord(formatted_embed)
else:
logging.debug("function requesting changes for day overview returned with error code")
class Recent_Changes_Class(object):
ids = []
map_ips = {}
recent_id = 0
downtimecredibility = 0
last_downtime = 0
tags = {}
groups = {}
streak = -1
unsent_messages = []
mw_messages = {}
session = requests.Session()
session.headers.update(settings["header"])
if settings["limitrefetch"] != -1:
with open("lastchange.txt", "r") as record:
file_content = record.read().strip()
if file_content:
file_id = int(file_content)
logging.debug("File_id is {val}".format(val=file_id))
else:
logging.debug("File is empty")
file_id = 999999999
else:
file_id = 999999999 # such value won't cause trouble, and it will make sure no refetch happen
def handle_mw_errors(self, request):
if "errors" in request:
logging.error(request["errors"])
raise MWError
return request
def log_in(self):
global logged_in
# session.cookies.clear()
if '@' not in settings["wiki_bot_login"]:
logging.error(
"Please provide proper nickname for login from https://{wiki}.gamepedia.com/Special:BotPasswords".format(
wiki=settings["wiki"]))
return
if len(settings["wiki_bot_password"]) != 32:
logging.error(
"Password seems incorrect. It should be 32 characters long! Grab it from https://{wiki}.gamepedia.com/Special:BotPasswords".format(
wiki=settings["wiki"]))
return
logging.info("Trying to log in to https://{wiki}.gamepedia.com...".format(wiki=settings["wiki"]))
try:
response = self.handle_mw_errors(
self.session.post("https://{wiki}.gamepedia.com/api.php".format(wiki=settings["wiki"]),
data={'action': 'query', 'format': 'json', 'utf8': '', 'meta': 'tokens',
'type': 'login'}))
response = self.handle_mw_errors(
self.session.post("https://{wiki}.gamepedia.com/api.php".format(wiki=settings["wiki"]),
data={'action': 'login', 'format': 'json', 'utf8': '',
'lgname': settings["wiki_bot_login"],
'lgpassword': settings["wiki_bot_password"],
'lgtoken': response.json()['query']['tokens']['logintoken']}))
except ValueError:
logging.error("Logging in have not succeeded")
return
except MWError:
logging.error("Logging in have not succeeded")
return
try:
if response.json()['login']['result'] == "Success":
logging.info("Logging to the wiki succeeded")
logged_in = True
else:
logging.error("Logging in have not succeeded")
except:
logging.error("Logging in have not succeeded")
def add_cache(self, change):
self.ids.append(change["rcid"])
# self.recent_id = change["rcid"]
if len(self.ids) > settings["limitrefetch"] + 5:
self.ids.pop(0)
def fetch(self, amount=settings["limit"]):
if self.unsent_messages:
logging.info(
"{} messages waiting to be delivered to Discord due to Discord throwing errors/no connection to Discord servers.".format(
len(self.unsent_messages)))
for num, item in enumerate(self.unsent_messages):
logging.debug(
"Trying to send a message to Discord from the queue with id of {} and content {}".format(str(num),
str(item)))
if send_to_discord_webhook(item) < 2:
logging.debug("Sending message succeeded")
time.sleep(2.5)
else:
logging.debug("Sending message failed")
break
else:
self.unsent_messages = []
logging.debug("Queue emptied, all messages delivered")
self.unsent_messages = self.unsent_messages[num:]
logging.debug(self.unsent_messages)
last_check = self.fetch_changes(amount=amount)
self.recent_id = last_check if last_check is not None else self.file_id
if settings["limitrefetch"] != -1 and self.recent_id != self.file_id:
self.file_id = self.recent_id
with open("lastchange.txt", "w") as record:
record.write(str(self.file_id))
logging.debug("Most recent rcid is: {}".format(self.recent_id))
return self.recent_id
def fetch_changes(self, amount, clean=False):
global logged_in
if len(self.ids) == 0:
logging.debug("ids is empty, triggering clean fetch")
clean = True
changes = self.safe_request(
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=recentchanges&rcshow=!bot&rcprop=title%7Credirect%7Ctimestamp%7Cids%7Cloginfo%7Cparsedcomment%7Csizes%7Cflags%7Ctags%7Cuser&rclimit={amount}&rctype=edit%7Cnew%7Clog%7Cexternal{categorize}".format(
wiki=settings["wiki"], amount=amount, categorize="%7Ccategorize" if settings["show_added_categories"] else ""))
if changes:
try:
changes = changes.json()['query']['recentchanges']
changes.reverse()
except ValueError:
logging.warning("ValueError in fetching changes")
if changes.url == "https://www.gamepedia.com":
logging.critical(
"The wiki specified in the settings most probably doesn't exist, got redirected to gamepedia.com")
sys.exit(1)
self.downtime_controller()
return None
except KeyError:
logging.warning("Wiki returned %s" % (changes.json()))
return None
else:
if self.downtimecredibility > 0:
self.downtimecredibility -= 1
if self.streak > -1:
self.streak += 1
if self.streak > 8:
self.streak = -1
send(_("Connection to {wiki} seems to be stable now.").format(wiki=settings["wikiname"]),
_("Connection status"), settings["avatars"]["connection_restored"])
# In the first for loop we analize the categorize events and figure if we will need more changes to fetch
# in order to cover all of the edits
categorize_events = {}
new_events = 0
for change in changes:
if not (change["rcid"] in self.ids or change["rcid"] < self.recent_id) and not clean:
new_events += 1
logging.debug(
"New event: {}".format(change["rcid"]))
if new_events == settings["limit"]:
if amount < 500:
# call the function again with max limit for more results, ignore the ones in this request
logging.debug("There were too many new events, requesting max amount of events from the wiki.")
return self.fetch(amount=5000 if logged_in else 500)
else:
logging.debug(
"There were too many new events, but the limit was high enough we don't care anymore about fetching them all.")
if change["type"] == "categorize":
if "commenthidden" not in change:
cat_title = change["title"].split(':', 1)[1]
# I so much hate this, blame Markus for making me do this
if change["revid"] not in categorize_events:
categorize_events[change["revid"]] = {"new": [], "removed": []}
comment_to_match = re.sub('<.*?a>', '', change["parsedcomment"])
if recent_changes.mw_messages["recentchanges-page-added-to-category"].replace("[[:$1]]", "") in comment_to_match:
categorize_events[change["revid"]]["new"].append(cat_title)
logging.debug("Matched {} to added category for {}".format(cat_title, change["revid"]))
elif recent_changes.mw_messages["recentchanges-page-removed-from-category"].replace("[[:$1]]", "") in comment_to_match:
categorize_events[change["revid"]]["removed"].append(cat_title)
logging.debug("Matched {} to removed category for {}".format(cat_title, change["revid"]))
else:
logging.debug("Unknown match for category change with messages {} and {} and comment_to_match {}".format(recent_changes.mw_messages["recentchanges-page-added-to-category"].replace("[[:$1]]", ""), recent_changes.mw_messages["recentchanges-page-removed-from-category"].replace("[[:$1]]", ""), comment_to_match))
else:
logging.debug("Log entry got suppressed, ignoring entry.")
# if change["revid"] in categorize_events:
# categorize_events[change["revid"]].append(cat_title)
# else:
# logging.debug("New category '{}' for {}".format(cat_title, change["revid"]))
# categorize_events[change["revid"]] = {cat_title: }
for change in changes:
if change["rcid"] in self.ids or change["rcid"] < self.recent_id:
logging.debug("Change ({}) is in ids or is lower than recent_id {}".format(change["rcid"],
self.recent_id))
continue
logging.debug(self.ids)
logging.debug(self.recent_id)
self.add_cache(change)
if clean and not (self.recent_id == 0 and change["rcid"] > self.file_id):
logging.debug("Rejected {val}".format(val=change["rcid"]))
continue
first_pass(change, categorize_events.get(change.get("revid"), None))
return change["rcid"]
def safe_request(self, url):
try:
request = self.session.get(url, timeout=10)
except requests.exceptions.Timeout:
logging.warning("Reached timeout error for request on link {url}".format(url=url))
self.downtime_controller()
return None
except requests.exceptions.ConnectionError:
logging.warning("Reached connection error for request on link {url}".format(url=url))
self.downtime_controller()
return None
else:
if 499 < request.status_code < 600:
self.downtime_controller()
return None
return request
def check_connection(self, looped=False):
online = 0
for website in ["https://google.com", "https://instagram.com", "https://steamcommunity.com"]:
try:
requests.get(website, timeout=10)
online += 1
except requests.exceptions.ConnectionError:
pass
except requests.exceptions.Timeout:
pass
if online < 1:
logging.error("Failure when checking Internet connection at {time}".format(
time=time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime())))
self.downtimecredibility = 0
if not looped:
while 1: # recursed loop, check for connection (every 10 seconds) as long as three services are down, don't do anything else
if self.check_connection(looped=True):
recent_changes.fetch(amount=settings["limitrefetch"])
break
time.sleep(10)
return False
return True
def downtime_controller(self):
if not settings["show_updown_messages"]:
return
if self.streak > -1: # reset the streak of successful connections when bad one happens
self.streak = 0
if self.downtimecredibility < 60:
self.downtimecredibility += 15
else:
if (
time.time() - self.last_downtime) > 1800 and self.check_connection(): # check if last downtime happened within 30 minutes, if yes, don't send a message
send(_("{wiki} seems to be down or unreachable.").format(wiki=settings["wikiname"]),
_("Connection status"), settings["avatars"]["connection_failed"])
self.last_downtime = time.time()
self.streak = 0
def clear_cache(self):
self.map_ips = {}
def init_info(self):
startup_info = safe_read(self.safe_request(
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&uselang=content&list=tags|recentchanges&meta=allmessages&utf8=1&tglimit=max&tgprop=name|displayname&ammessages=recentchanges-page-added-to-category|recentchanges-page-removed-from-category&amenableparser=1&amincludelocal=1".format(
wiki=settings["wiki"])), "query")
if startup_info:
if "tags" in startup_info and "allmessages" in startup_info:
for tag in startup_info["tags"]:
self.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text()
for message in startup_info["allmessages"]:
self.mw_messages[message["name"]] = message["*"]
else:
logging.warning("Could not retrieve initial wiki information. Some features may not work correctly!")
logging.debug(startup_info)
else:
logging.error("Could not retrieve initial wiki information. Possibly internet connection issue?")
recent_changes = Recent_Changes_Class()
try:
if settings["wiki_bot_login"] and settings["wiki_bot_password"]:
recent_changes.log_in()
recent_changes.init_info()
except requests.exceptions.ConnectionError:
logging.critical("A connection can't be established with the wiki. Exiting...")
sys.exit(1)
time.sleep(1.0)
recent_changes.fetch(amount=settings["limitrefetch"] if settings["limitrefetch"] != -1 else settings["limit"])
schedule.every(settings["cooldown"]).seconds.do(recent_changes.fetch)
if 1 == 2:
print(_("director"), _("bot"), _("editor"), _("directors"), _("sysop"), _("bureaucrat"), _("reviewer"),
_("autoreview"), _("autopatrol"), _("wiki_guardian"))
if settings["overview"]:
schedule.every().day.at("{}:{}".format(time.strptime(settings["overview_time"], '%H:%M').tm_hour,
time.strptime(settings["overview_time"], '%H:%M').tm_min)).do(day_overview)
schedule.every().day.at("00:00").do(recent_changes.clear_cache)
while 1:
time.sleep(1.0)
schedule.run_pending()