Fixed a fix, code refactoring

This commit is contained in:
Frisk 2018-09-30 18:14:44 +02:00
parent e6523e3a96
commit 229c745acb
No known key found for this signature in database
GPG key ID: 0E9A7D3C0A01586C

397
rcgcdw.py
View file

@ -39,12 +39,15 @@ lang = gettext.translation('rcgcdw', localedir='locale', languages=[settings["la
lang.install()
ngettext = lang.ngettext
class MWError(Exception):
pass
class MyHTMLParser(HTMLParser):
new_string = ""
recent_href = ""
def handle_starttag(self, tag, attrs):
for attr in attrs:
if attr[0] == 'href':
@ -53,23 +56,29 @@ class MyHTMLParser(HTMLParser):
self.recent_href = "https:{rest}".format(rest=self.recent_href)
elif not self.recent_href.startswith("https"):
self.recent_href = "https://{wiki}.gamepedia.com".format(wiki=settings["wiki"]) + self.recent_href
self.recent_href = self.recent_href.replace(")", "\)")
self.recent_href = self.recent_href.replace(")", "\\)")
def handle_data(self, data):
if self.recent_href:
self.new_string = self.new_string + "[{}]({})".format(data, self.recent_href)
self.recent_href = ""
else:
self.new_string = self.new_string + data
def handle_comment(self, data):
self.new_string = self.new_string + data
def handle_endtag(self, tag):
print(self.new_string)
HTMLParse = MyHTMLParser()
def send(message, name, avatar):
send_to_discord({"content": message, "avatar_url": avatar, "username": name})
def safe_read(request, *keys):
if request is None:
return None
@ -78,16 +87,19 @@ def safe_read(request, *keys):
for item in keys:
request = request[item]
except KeyError:
logging.warning("Failure while extracting data from request on key {key} in {change}".format(key=item, change=request))
logging.warning(
"Failure while extracting data from request on key {key} in {change}".format(key=item, change=request))
return None
except ValueError:
logging.warning("Failure while extracting data from request in {change}".format(change=request))
return None
return request
def send_to_discord_webhook(data):
try:
result = requests.post(settings["webhookURL"], data=data, headers={**{'Content-Type': 'application/json'}, **settings["header"]}, timeout=10)
result = requests.post(settings["webhookURL"], data=data,
headers={**{'Content-Type': 'application/json'}, **settings["header"]}, timeout=10)
except requests.exceptions.Timeout:
logging.warning("Timeouted while sending data to the webhook.")
return 3
@ -97,6 +109,7 @@ def send_to_discord_webhook(data):
else:
return handle_discord_http(result.status_code, data)
def send_to_discord(data):
if recent_changes.unsent_messages:
recent_changes.unsent_messages.append(data)
@ -111,6 +124,7 @@ def send_to_discord(data):
time.sleep(2.5)
pass
def webhook_formatter(action, STATIC, **params):
logging.debug("Received things: {thing}".format(thing=params))
colornumber = None if isinstance(STATIC["color"], str) else STATIC["color"]
@ -120,20 +134,26 @@ def webhook_formatter(action, STATIC, **params):
if "title" in params:
article_encoded = params["title"].replace(" ", "_").replace(')', '\)')
if re.match(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", params["user"]) is not None:
author_url = "https://{wiki}.gamepedia.com/Special:Contributions/{user}".format(wiki=settings["wiki"], user=params["user"])
author_url = "https://{wiki}.gamepedia.com/Special:Contributions/{user}".format(wiki=settings["wiki"],
user=params["user"])
if params["user"] not in list(recent_changes.map_ips.keys()):
contibs = safe_read(recent_changes.safe_request("https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=usercontribs&uclimit=max&ucuser={user}&ucprop=".format(wiki=settings["wiki"], user=params["user"])), "query", "usercontribs")
contibs = safe_read(recent_changes.safe_request(
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=usercontribs&uclimit=max&ucuser={user}&ucprop=".format(
wiki=settings["wiki"], user=params["user"])), "query", "usercontribs")
if contibs is None:
logging.warning("WARNING: Something went wrong when checking amount of contributions for given IP address")
logging.warning(
"WARNING: Something went wrong when checking amount of contributions for given IP address")
params["user"] = params["user"] + "(?)"
else:
params["user"] = "{author} ({contribs})".format(author=params["user"], contribs=len(contibs))
recent_changes.map_ips[params["user"]] = len(contibs)
else:
recent_changes.map_ips[params["user"]] += 1
params["user"] = "{author} ({amount})".format(author=params["user"], amount=recent_changes.map_ips[params["user"]])
params["user"] = "{author} ({amount})".format(author=params["user"],
amount=recent_changes.map_ips[params["user"]])
else:
author_url = "https://{wiki}.gamepedia.com/User:{user}".format(wiki=settings["wiki"], user=params["user"].replace(" ", "_"))
author_url = "https://{wiki}.gamepedia.com/User:{user}".format(wiki=settings["wiki"],
user=params["user"].replace(" ", "_"))
if action in ("edit", "new"): # edit or new page
editsize = params["size"]
print(editsize)
@ -149,13 +169,21 @@ def webhook_formatter(action, STATIC, **params):
colornumber = 9175040 + (math.floor((editsize * -1) / (52))) * 65536
elif editsize == 0:
colornumber = 8750469
link = "https://{wiki}.gamepedia.com/index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format(wiki=settings["wiki"], pageid=params["pageid"], diff=params["diff"], oldrev=params["oldrev"], article=params["title"].replace(" ", "_"))
embed["title"] = "{article} ({new}{minor}{editsize})".format(article=params["title"], editsize="+"+str(editsize) if editsize>0 else editsize, new= _("(N!) ") if action == "new" else "", minor=_("m ") if action == "edit" and params["minor"] else "")
link = "https://{wiki}.gamepedia.com/index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format(
wiki=settings["wiki"], pageid=params["pageid"], diff=params["diff"], oldrev=params["oldrev"],
article=params["title"].replace(" ", "_"))
embed["title"] = "{article} ({new}{minor}{editsize})".format(article=params["title"], editsize="+" + str(
editsize) if editsize > 0 else editsize, new=_("(N!) ") if action == "new" else "",
minor=_("m ") if action == "edit" and params[
"minor"] else "")
elif action in ("upload/overwrite", "upload/upload"): # sending files
license = None
urls = safe_read(recent_changes.safe_request("https://{wiki}.gamepedia.com/api.php?action=query&format=json&prop=imageinfo&list=&meta=&titles={filename}&iiprop=timestamp%7Curl&iilimit=2".format(wiki=settings["wiki"], filename=params["title"])), "query", "pages")
urls = safe_read(recent_changes.safe_request(
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&prop=imageinfo&list=&meta=&titles={filename}&iiprop=timestamp%7Curl&iilimit=2".format(
wiki=settings["wiki"], filename=params["title"])), "query", "pages")
undolink = ""
link ="https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_"))
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
additional_info_retrieved = False
if urls is not None:
if "-1" not in urls: # oage removed before we asked for it
@ -167,12 +195,17 @@ def webhook_formatter(action, STATIC, **params):
if params["overwrite"]:
if additional_info_retrieved:
img_timestamp = [x for x in img_info[1]["timestamp"] if x.isdigit()]
undolink = "https://{wiki}.gamepedia.com/index.php?title={filename}&action=revert&oldimage={timestamp}%21{filenamewon}".format(wiki=settings["wiki"], filename=article_encoded, timestamp="".join(img_timestamp), filenamewon = article_encoded[5:])
embed["fields"] = [{"name": _("Options"), "value": _("([preview]({link}) | [undo]({undolink}))").format(link=embed["image"]["url"], undolink=undolink)}]
undolink = "https://{wiki}.gamepedia.com/index.php?title={filename}&action=revert&oldimage={timestamp}%21{filenamewon}".format(
wiki=settings["wiki"], filename=article_encoded, timestamp="".join(img_timestamp),
filenamewon=article_encoded[5:])
embed["fields"] = [{"name": _("Options"), "value": _("([preview]({link}) | [undo]({undolink}))").format(
link=embed["image"]["url"], undolink=undolink)}]
embed["title"] = _("Uploaded a new version of {name}").format(name=params["title"])
else:
embed["title"] = _("Uploaded {name}").format(name=params["title"])
article_content = safe_read(recent_changes.safe_request("https://{wiki}.gamepedia.com/api.php?action=query&format=json&prop=revisions&titles={article}&rvprop=content".format(wiki=settings["wiki"], article=quote_plus(params["title"], safe=''))), "query", "pages")
article_content = safe_read(recent_changes.safe_request(
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&prop=revisions&titles={article}&rvprop=content".format(
wiki=settings["wiki"], article=quote_plus(params["title"], safe=''))), "query", "pages")
if article_content is None:
logging.warning("Something went wrong when getting license for the image")
return 0
@ -188,57 +221,83 @@ def webhook_formatter(action, STATIC, **params):
else:
license = "?"
except IndexError:
logging.error("Given regex for the license detection is incorrect. It does not have a capturing group called \"license\" specified. Please fix license_regex value in the config!")
logging.error(
"Given regex for the license detection is incorrect. It does not have a capturing group called \"license\" specified. Please fix license_regex value in the config!")
license = "?"
except re.error:
logging.error("Given regex for the license detection is incorrect. Please fix license_regex or license_regex_detect values in the config!")
logging.error(
"Given regex for the license detection is incorrect. Please fix license_regex or license_regex_detect values in the config!")
license = "?"
if additional_info_retrieved:
embed["fields"] = [{"name": _("Options"), "value": _("([preview]({link}))").format(link=embed["image"]["url"])}]
params["desc"] = _("{desc}\nLicense: {license}").format(desc=params["desc"], license=license if license is not None else "?")
embed["fields"] = [
{"name": _("Options"), "value": _("([preview]({link}))").format(link=embed["image"]["url"])}]
params["desc"] = _("{desc}\nLicense: {license}").format(desc=params["desc"],
license=license if license is not None else "?")
elif action == "delete/delete":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_"))
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Deleted page {article}").format(article=params["title"])
elif action == "delete/delete_redir":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_"))
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Deleted redirect {article} by overwriting").format(article=params["title"])
elif action == "move/move":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["target"].replace(" ", "_"))
params["desc"] = "{supress}. {desc}".format(desc=params["desc"], supress=_("No redirect has been made") if params["supress"] == True else _("A redirect has been made"))
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["target"].replace(" ", "_"))
params["desc"] = "{supress}. {desc}".format(desc=params["desc"],
supress=_("No redirect has been made") if params[
"supress"] == True else _(
"A redirect has been made"))
embed["title"] = _("Moved {article} to {target}").format(article=params["title"], target=params["target"])
elif action == "move/move_redir":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["target"].replace(" ", "_"))
embed["title"] = _("Moved {article} to {title} over redirect").format(article=params["title"], title=params["target"])
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["target"].replace(" ", "_"))
embed["title"] = _("Moved {article} to {title} over redirect").format(article=params["title"],
title=params["target"])
elif action == "protect/move_prot":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_"))
embed["title"] = _("Moved protection settings from {article} to {title}").format(article=params["title"], title=params["target"])
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Moved protection settings from {article} to {title}").format(article=params["title"],
title=params["target"])
elif action == "block/block":
link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"], user=params["blocked_user"].replace(" ", "_").replace(')', '\)'))
link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"],
user=params["blocked_user"].replace(" ", "_").replace(')',
'\)'))
user = params["blocked_user"].split(':')[1]
time = _("infinity and beyond") if params["duration"] == "infinite" else params["duration"]
embed["title"] = _("Blocked {blocked_user} for {time}").format(blocked_user=user, time=time)
elif action == "block/reblock":
link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"], user=params["blocked_user"].replace(" ", "_").replace(')', '\)'))
link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"],
user=params["blocked_user"].replace(" ", "_").replace(')',
'\)'))
user = params["blocked_user"].split(':')[1]
embed["title"] = _("Changed block settings for {blocked_user}").format(blocked_user=user)
elif action == "block/unblock":
link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"], user=params["blocked_user"].replace(" ", "_").replace(')', '\)'))
link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"],
user=params["blocked_user"].replace(" ", "_").replace(')',
'\)'))
user = params["blocked_user"].split(':')[1]
embed["title"] = _("Unblocked {blocked_user}").format(blocked_user=user)
elif action == "curseprofile/comment-created":
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], commentid=params["commentid"])
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"],
commentid=params["commentid"])
# link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) old way of linking
embed["title"] = _("Left a comment on {target}'s profile").format(target=params["target"])
elif action == "curseprofile/comment-replied":
# link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)'))
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], commentid=params["commentid"])
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"],
commentid=params["commentid"])
embed["title"] = _("Replied to a comment on {target}'s profile").format(target=params["target"])
elif action == "curseprofile/comment-edited":
# link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)'))
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], commentid=params["commentid"])
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"],
commentid=params["commentid"])
embed["title"] = _("Edited a comment on {target}'s profile").format(target=params["target"])
elif action == "curseprofile/profile-edited":
link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)'))
link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"],
target=params["target"].replace(" ",
"_").replace(
')', '\)'))
if params["field"] == "profile-location":
field = _("Location")
elif params["field"] == "profile-aboutme":
@ -266,7 +325,8 @@ def webhook_formatter(action, STATIC, **params):
embed["title"] = _("Edited {target}'s profile").format(target=params["target"])
params["desc"] = _("{field} field changed to: {desc}").format(field=field, desc=params["desc"])
elif action == "curseprofile/comment-deleted":
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], commentid=params["commentid"])
link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"],
commentid=params["commentid"])
# link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)'))
embed["title"] = _("Deleted a comment on {target}'s profile").format(target=params["target"])
elif action in ("rights/rights", "rights/autopromote"):
@ -276,7 +336,8 @@ def webhook_formatter(action, STATIC, **params):
else:
params["user"] = _("System")
author_url = ""
embed["title"] = _("{target} got autopromoted to a new usergroup").format(target=params["title"].split(":")[1])
embed["title"] = _("{target} got autopromoted to a new usergroup").format(
target=params["title"].split(":")[1])
if len(params["old_groups"]) < len(params["new_groups"]):
embed["thumbnail"]["url"] = "https://i.imgur.com/WnGhF5g.gif"
old_groups = []
@ -290,27 +351,38 @@ def webhook_formatter(action, STATIC, **params):
if len(new_groups) == 0:
new_groups = [_("none")]
reason = ": {desc}".format(desc=params["desc"]) if params["desc"] != _("No description provided") else ""
params["desc"] = _("Groups changed from {old_groups} to {new_groups}{reason}").format(old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), reason=reason)
params["desc"] = _("Groups changed from {old_groups} to {new_groups}{reason}").format(
old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), reason=reason)
elif action == "protect/protect":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_"))
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Protected {target}").format(target=params["title"])
params["desc"] = params["settings"] + " | " + params["desc"]
elif action == "protect/modify":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_"))
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Changed protection level for {article}").format(article=params["title"])
params["desc"] = params["settings"] + " | " + params["desc"]
elif action == "protect/unprotect":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_"))
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Removed protection from {article}").format(article=params["title"])
elif action == "delete/revision":
amount = len(params["amount"])
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_"))
embed["title"] = ngettext("Changed visibility of revision on page {article} ", "Changed visibility of {amount} revisions on page {article} ", amount).format(article=params["title"], amount=amount)
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = ngettext("Changed visibility of revision on page {article} ",
"Changed visibility of {amount} revisions on page {article} ", amount).format(
article=params["title"], amount=amount)
elif action == "import/upload":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_"))
embed["title"] = ngettext("Imported {article} with {count} revision", "Imported {article} with {count} revisions", params["amount"]).format(article=params["title"], count=params["amount"])
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = ngettext("Imported {article} with {count} revision",
"Imported {article} with {count} revisions", params["amount"]).format(
article=params["title"], count=params["amount"])
elif action == "delete/restore":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_"))
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Restored {article}").format(article=params["title"])
elif action == "delete/event":
link = "https://{wiki}.gamepedia.com/Special:RecentChanges".format(wiki=settings["wiki"])
@ -322,32 +394,44 @@ def webhook_formatter(action, STATIC, **params):
link = "https://{wiki}.gamepedia.com/Special:RecentChanges".format(wiki=settings["wiki"])
embed["title"] = _("Edited abuse filter number {number}").format(number=params["filternr"])
elif action == "merge/merge":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_"))
embed["title"] = _("Merged revision histories of {article} into {dest}").format(article=params["title"], dest=params["destination"])
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Merged revision histories of {article} into {dest}").format(article=params["title"],
dest=params["destination"])
elif action == "interwiki/iw_add":
link = "https://{wiki}.gamepedia.com/Special:Interwiki".format(wiki=settings["wiki"])
embed["title"] = _("Added an entry to the interwiki table")
params["desc"] =_("Prefix: {prefix}, website: {website} | {desc}").format(desc=params["desc"], prefix=params["prefix"], website=params["website"])
params["desc"] = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=params["desc"],
prefix=params["prefix"],
website=params["website"])
elif action == "interwiki/iw_edit":
link = "https://{wiki}.gamepedia.com/Special:Interwiki".format(wiki=settings["wiki"])
embed["title"] = _("Edited an entry in interwiki table")
params["desc"] =_("Prefix: {prefix}, website: {website} | {desc}").format(desc=params["desc"], prefix=params["prefix"], website=params["website"])
params["desc"] = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=params["desc"],
prefix=params["prefix"],
website=params["website"])
elif action == "interwiki/iw_delete":
link = "https://{wiki}.gamepedia.com/Special:Interwiki".format(wiki=settings["wiki"])
embed["title"] = _("Deleted an entry in interwiki table")
params["desc"] = _("Prefix: {prefix} | {desc}").format(desc=params["desc"], prefix=params["prefix"])
elif action == "contentmodel/change":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_"))
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Changed the content model of the page {article}").format(article=params["title"])
params["desc"] = _("Model changed from {old} to {new}: {reason}").format(old=params["oldmodel"], new=params["newmodel"], reason=params["desc"])
params["desc"] = _("Model changed from {old} to {new}: {reason}").format(old=params["oldmodel"],
new=params["newmodel"],
reason=params["desc"])
elif action == "sprite/sprite":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_"))
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Edited the sprite for {article}").format(article=params["title"])
elif action == "sprite/sheet":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_"))
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Created the sprite sheet for {article}").format(article=params["title"])
elif action == "sprite/slice":
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_"))
link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"],
article=params["title"].replace(" ", "_"))
embed["title"] = _("Edited the slice for {article}").format(article=params["title"])
elif action == "managetags/create":
link = "https://{wiki}.gamepedia.com/Special:Tags".format(wiki=settings["wiki"])
@ -392,11 +476,13 @@ def webhook_formatter(action, STATIC, **params):
formatted_embed = json.dumps(data, indent=4)
send_to_discord(formatted_embed)
def handle_discord_http(code, formatted_embed):
if 300 > code > 199: # message went through
return 0
elif code == 400: # HTTP BAD REQUEST
logging.error("Following message has been rejected by Discord, please submit a bug on our bugtracker adding it:")
logging.error(
"Following message has been rejected by Discord, please submit a bug on our bugtracker adding it:")
logging.error(formatted_embed)
return 1
elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND
@ -406,12 +492,18 @@ def handle_discord_http(code, formatted_embed):
logging.error("We are sending too many requests to the Discord, slowing down...")
return 2
elif 499 < code < 600:
logging.error("Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format(code))
logging.error(
"Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format(
code))
return 3
def first_pass(change): #I've decided to split the embed formatter and change handler, maybe it's more messy this way, I don't know
def first_pass(
change): # I've decided to split the embed formatter and change handler, maybe it's more messy this way, I don't know
if "actionhidden" in change or "suppressed" in change and "suppressed" not in settings["ignored"]:
webhook_formatter("suppressed", {"timestamp": change["timestamp"], "color": settings["appearance"]["suppressed"]["color"], "icon": settings["appearance"]["suppressed"]["icon"]}, user=change["user"])
webhook_formatter("suppressed",
{"timestamp": change["timestamp"], "color": settings["appearance"]["suppressed"]["color"],
"icon": settings["appearance"]["suppressed"]["icon"]}, user=change["user"])
return
parse_output = HTMLParse.feed(change["parsedcomment"])
# parsedcomment = (BeautifulSoup(change["parsedcomment"], "lxml")).get_text()
@ -422,28 +514,36 @@ def first_pass(change): #I've decided to split the embed formatter and change ha
if not parsedcomment:
parsedcomment = _("No description provided")
if change["type"] == "edit" and "edit" not in settings["ignored"]:
STATIC_VARS = {**STATIC_VARS ,**{"color": settings["appearance"]["edit"]["color"], "icon": settings["appearance"]["edit"]["icon"]}}
webhook_formatter("edit", STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, oldrev=change["old_revid"], pageid=change["pageid"], diff=change["revid"], size=change["newlen"]-change["oldlen"], minor= True if "minor" in change else False)
STATIC_VARS = {**STATIC_VARS, **{"color": settings["appearance"]["edit"]["color"],
"icon": settings["appearance"]["edit"]["icon"]}}
webhook_formatter("edit", STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
oldrev=change["old_revid"], pageid=change["pageid"], diff=change["revid"],
size=change["newlen"] - change["oldlen"], minor=True if "minor" in change else False)
elif change["type"] == "log":
combination = "{logtype}/{logaction}".format(logtype=change["logtype"], logaction=change["logaction"])
if combination in settings["ignored"]:
return
logging.debug("combination is {}".format(combination))
try:
STATIC_VARS = {**STATIC_VARS ,**{"color": settings["appearance"][combination]["color"], "icon": settings["appearance"][combination]["icon"]}}
STATIC_VARS = {**STATIC_VARS, **{"color": settings["appearance"][combination]["color"],
"icon": settings["appearance"][combination]["icon"]}}
except KeyError:
STATIC_VARS = {**STATIC_VARS, **{"color": "", "icon": ""}}
logging.error("No value in the settings has been given for {}".format(combination))
if combination == "protect/protect":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, settings=change["logparams"]["description"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
settings=change["logparams"]["description"])
elif combination == "protect/modify":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, settings=change["logparams"]["description"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
settings=change["logparams"]["description"])
elif combination == "protect/unprotect":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
elif combination == "upload/overwrite":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, overwrite=True)
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
overwrite=True)
elif combination == "upload/upload":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, overwrite=False)
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
overwrite=False)
elif combination == "delete/delete":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
elif combination == "delete/delete_redir":
@ -451,51 +551,73 @@ def first_pass(change): #I've decided to split the embed formatter and change ha
elif combination == "delete/restore":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
elif combination == "delete/revision":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, amount=change["logparams"]["ids"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
amount=change["logparams"]["ids"])
elif combination == "delete/event":
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment)
elif combination == "import/upload":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, amount=change["logparams"]["count"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
amount=change["logparams"]["count"])
elif combination == "import/interwiki":
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment)
elif combination == "merge/merge":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, destination=change["logparams"]["dest_title"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
destination=change["logparams"]["dest_title"])
elif combination == "move/move":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, supress=True if "suppressredirect" in change["logparams"] else False, target=change["logparams"]['target_title'])
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
supress=True if "suppressredirect" in change["logparams"] else False,
target=change["logparams"]['target_title'])
elif combination == "move/move_redir":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, target=change["logparams"]["target_title"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
target=change["logparams"]["target_title"])
elif combination == "protect/move_prot":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, target=change["logparams"]["oldtitle_title"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
target=change["logparams"]["oldtitle_title"])
elif combination == "block/block":
webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"], desc=parsedcomment, duration=change["logparams"]["duration"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"],
desc=parsedcomment, duration=change["logparams"]["duration"])
elif combination == "block/unblock":
webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"], desc=parsedcomment)
webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"],
desc=parsedcomment)
elif combination == "block/reblock":
webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"], desc=parsedcomment)
webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"],
desc=parsedcomment)
elif combination == "rights/rights":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, old_groups=change["logparams"]["oldgroups"], new_groups=change["logparams"]["newgroups"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
old_groups=change["logparams"]["oldgroups"], new_groups=change["logparams"]["newgroups"])
elif combination == "rights/autopromote":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, old_groups=change["logparams"]["oldgroups"], new_groups=change["logparams"]["newgroups"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
old_groups=change["logparams"]["oldgroups"], new_groups=change["logparams"]["newgroups"])
elif combination == "abusefilter/modify":
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, filternr=change["logparams"]['1'])
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment,
filternr=change["logparams"]['1'])
elif combination == "interwiki/iw_add":
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, prefix=change["logparams"]['0'], website=change["logparams"]['1'])
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment,
prefix=change["logparams"]['0'], website=change["logparams"]['1'])
elif combination == "interwiki/iw_edit":
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, prefix=change["logparams"]['0'], website=change["logparams"]['1'])
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment,
prefix=change["logparams"]['0'], website=change["logparams"]['1'])
elif combination == "interwiki/iw_delete":
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, prefix=change["logparams"]['0'])
webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment,
prefix=change["logparams"]['0'])
elif combination == "curseprofile/comment-created":
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], commentid=change["logparams"]["0"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1],
commentid=change["logparams"]["0"])
elif combination == "curseprofile/comment-edited":
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], commentid=change["logparams"]["0"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1],
commentid=change["logparams"]["0"])
elif combination == "curseprofile/comment-deleted":
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], commentid=change["logparams"]["0"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1],
commentid=change["logparams"]["0"])
elif combination == "curseprofile/profile-edited":
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], field=change["logparams"]['0'], desc=change["parsedcomment"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1],
field=change["logparams"]['0'], desc=change["parsedcomment"])
elif combination == "curseprofile/comment-replied":
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], commentid=change["logparams"]["0"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1],
commentid=change["logparams"]["0"])
elif combination == "contentmodel/change":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, oldmodel=change["logparams" ]["oldmodel"], newmodel=change["logparams" ]["newmodel"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
oldmodel=change["logparams"]["oldmodel"], newmodel=change["logparams"]["newmodel"])
elif combination == "sprite/sprite":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
elif combination == "sprite/sheet":
@ -503,13 +625,17 @@ def first_pass(change): #I've decided to split the embed formatter and change ha
elif combination == "sprite/slice":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
elif combination == "managetags/create":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, additional=change["logparams"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
additional=change["logparams"])
elif combination == "managetags/delete":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, additional=change["logparams"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
additional=change["logparams"])
elif combination == "managetags/activate":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, additional=change["logparams"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
additional=change["logparams"])
elif combination == "managetags/deactivate":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, additional=change["logparams"])
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
additional=change["logparams"])
elif combination == "tag/update":
webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment)
else:
@ -522,8 +648,12 @@ def first_pass(change): #I've decided to split the embed formatter and change ha
print(change)
return
elif change["type"] == "new" and "new" not in settings["ignored"]: # new page
STATIC_VARS = {**STATIC_VARS ,**{"color": settings["appearance"]["new"]["color"], "icon": settings["appearance"]["new"]["icon"]}}
webhook_formatter("new", STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, oldrev=change["old_revid"], pageid=change["pageid"], diff=change["revid"], size=change["newlen"])
STATIC_VARS = {**STATIC_VARS, **{"color": settings["appearance"]["new"]["color"],
"icon": settings["appearance"]["new"]["icon"]}}
webhook_formatter("new", STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment,
oldrev=change["old_revid"], pageid=change["pageid"], diff=change["revid"],
size=change["newlen"])
def day_overview_request():
logging.info("Fetching daily overview... This may take up to 30 seconds!")
@ -534,7 +664,9 @@ def day_overview_request():
passes = 0
continuearg = ""
while not complete and passes < 10:
request = recent_changes.safe_request("https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=recentchanges&rcend={timestamp}Z&rcprop=title%7Ctimestamp%7Csizes%7Cloginfo%7Cuser&rcshow=!bot&rclimit=500&rctype=edit%7Cnew%7Clog{continuearg}".format(wiki=settings["wiki"], timestamp=timestamp, continuearg=continuearg))
request = recent_changes.safe_request(
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=recentchanges&rcend={timestamp}Z&rcprop=title%7Ctimestamp%7Csizes%7Cloginfo%7Cuser&rcshow=!bot&rclimit=500&rctype=edit%7Cnew%7Clog{continuearg}".format(
wiki=settings["wiki"], timestamp=timestamp, continuearg=continuearg))
if request:
try:
request = request.json()
@ -552,7 +684,9 @@ def day_overview_request():
if continuearg:
continuearg = "&rccontinue={}".format(continuearg)
passes += 1
logging.debug("continuing requesting next pages of recent changes with {} passes and continuearg being {}".format(passes, continuearg))
logging.debug(
"continuing requesting next pages of recent changes with {} passes and continuearg being {}".format(
passes, continuearg))
time.sleep(3.0)
else:
complete = 1
@ -562,6 +696,7 @@ def day_overview_request():
logging.debug("quit the loop because there been too many passes")
return (result, complete)
def add_to_dict(dictionary, key):
if key in dictionary:
dictionary[key] += 1
@ -569,6 +704,7 @@ def add_to_dict(dictionary, key):
dictionary[key] = 1
return dictionary
def day_overview(): # time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime(time.time()))
# (datetime.datetime.utcnow()+datetime.timedelta(hours=0)).isoformat(timespec='milliseconds')+'Z'
result = day_overview_request()
@ -594,7 +730,8 @@ def day_overview(): #time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime(time.ti
changed_bytes += item["newlen"]
if item["type"] == "log":
files = files + 1 if item["logtype"] == item["logaction"] == "upload" else files
admin = admin+1 if item["logtype"] in ["delete", "merge", "block", "protect", "import", "rights", "abusefilter", "interwiki", "managetags"] else admin
admin = admin + 1 if item["logtype"] in ["delete", "merge", "block", "protect", "import", "rights",
"abusefilter", "interwiki", "managetags"] else admin
overall = round(new_articles + edits * 0.1 + files * 0.3 + admin * 0.1 + math.fabs(changed_bytes * 0.001), 2)
embed = defaultdict(dict)
embed["title"] = _("Daily overview")
@ -621,7 +758,13 @@ def day_overview(): #time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime(time.ti
usramount = ""
houramount = ""
embed["fields"] = []
fields = ((ngettext("Most active user", "Most active users", len(active_users)), ', '.join(active_users) + usramount), (_("Edits made"), edits), (_("New files"), files), (_("Admin actions"), admin), (_("Bytes changed"), changed_bytes), (_("New articles"), new_articles), (_("Unique contributors"), str(len(activity))), (ngettext("Most active hour", "Most active hours", len(active_hours)), ', '.join(active_hours) + houramount), (_("Day score"), str(overall)))
fields = (
(ngettext("Most active user", "Most active users", len(active_users)), ', '.join(active_users) + usramount),
(_("Edits made"), edits), (_("New files"), files), (_("Admin actions"), admin),
(_("Bytes changed"), changed_bytes), (_("New articles"), new_articles),
(_("Unique contributors"), str(len(activity))),
(ngettext("Most active hour", "Most active hours", len(active_hours)), ', '.join(active_hours) + houramount),
(_("Day score"), str(overall)))
for name, value in fields:
embed["fields"].append({"name": name, "value": value})
data = {}
@ -631,6 +774,7 @@ def day_overview(): #time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime(time.ti
else:
logging.debug("function requesting changes for day overview returned with error code")
class recent_changes_class(object):
starttime = time.time()
ids = []
@ -666,15 +810,27 @@ class recent_changes_class(object):
def log_in(self):
# session.cookies.clear()
if '@' not in settings["wiki_bot_login"]:
logging.error("Please provide proper nickname for login from https://{wiki}.gamepedia.com/Special:BotPasswords".format(wiki=settings["wiki"]))
logging.error(
"Please provide proper nickname for login from https://{wiki}.gamepedia.com/Special:BotPasswords".format(
wiki=settings["wiki"]))
return
if len(settings["wiki_bot_password"]) != 32:
logging.error("Password seems incorrect. It should be 32 characters long! Grab it from https://{wiki}.gamepedia.com/Special:BotPasswords".format(wiki=settings["wiki"]))
logging.error(
"Password seems incorrect. It should be 32 characters long! Grab it from https://{wiki}.gamepedia.com/Special:BotPasswords".format(
wiki=settings["wiki"]))
return
logging.info("Trying to log in to https://{wiki}.gamepedia.com...".format(wiki=settings["wiki"]))
try:
response = self.handle_mw_errors(self.session.post("https://{wiki}.gamepedia.com/api.php".format(wiki=settings["wiki"]), data={'action': 'query', 'format': 'json', 'utf8': '', 'meta': 'tokens', 'type': 'login'}))
response = self.handle_mw_errors(self.session.post("https://{wiki}.gamepedia.com/api.php".format(wiki=settings["wiki"]), data={'action': 'login', 'format': 'json', 'utf8': '', 'lgname': settings["wiki_bot_login"], 'lgpassword':settings["wiki_bot_password"], 'lgtoken': response.json()['query']['tokens']['logintoken']}))
response = self.handle_mw_errors(
self.session.post("https://{wiki}.gamepedia.com/api.php".format(wiki=settings["wiki"]),
data={'action': 'query', 'format': 'json', 'utf8': '', 'meta': 'tokens',
'type': 'login'}))
response = self.handle_mw_errors(
self.session.post("https://{wiki}.gamepedia.com/api.php".format(wiki=settings["wiki"]),
data={'action': 'login', 'format': 'json', 'utf8': '',
'lgname': settings["wiki_bot_login"],
'lgpassword': settings["wiki_bot_password"],
'lgtoken': response.json()['query']['tokens']['logintoken']}))
except ValueError:
logging.error("Logging in have not succeeded")
return
@ -697,9 +853,13 @@ class recent_changes_class(object):
def fetch(self, amount=settings["limit"]):
if self.unsent_messages:
logging.info("{} messages waiting to be delivered to Discord due to Discord throwing errors/no connection to Discord servers.".format(len(self.unsent_messages)))
logging.info(
"{} messages waiting to be delivered to Discord due to Discord throwing errors/no connection to Discord servers.".format(
len(self.unsent_messages)))
for num, item in enumerate(self.unsent_messages):
logging.debug("Trying to send a message to Discord from the queue with id of {} and content {}".format(str(num), str(item)))
logging.debug(
"Trying to send a message to Discord from the queue with id of {} and content {}".format(str(num),
str(item)))
if send_to_discord_webhook(item) < 2:
logging.debug("Sending message succeeded")
time.sleep(2.5)
@ -723,7 +883,9 @@ class recent_changes_class(object):
if len(self.ids) == 0:
logging.debug("ids is empty, triggering clean fetch")
clean = True
changes = self.safe_request("https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=recentchanges&rcshow=!bot&rcprop=title%7Ctimestamp%7Cids%7Cloginfo%7Cparsedcomment%7Csizes%7Cflags%7Ctags%7Cuser&rclimit={amount}&rctype=edit%7Cnew%7Clog%7Cexternal".format(wiki=settings["wiki"], amount=amount))
changes = self.safe_request(
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=recentchanges&rcshow=!bot&rcprop=title%7Ctimestamp%7Cids%7Cloginfo%7Cparsedcomment%7Csizes%7Cflags%7Ctags%7Cuser&rclimit={amount}&rctype=edit%7Cnew%7Clog%7Cexternal".format(
wiki=settings["wiki"], amount=amount))
if changes:
try:
changes = changes.json()['query']['recentchanges']
@ -731,7 +893,8 @@ class recent_changes_class(object):
except ValueError:
logging.warning("ValueError in fetching changes")
if changes.url == "https://www.gamepedia.com":
logging.critical("The wiki specified in the settings most probably doesn't exist, got redirected to gamepedia.com")
logging.critical(
"The wiki specified in the settings most probably doesn't exist, got redirected to gamepedia.com")
sys.exit(1)
self.downtime_controller()
return None
@ -745,10 +908,12 @@ class recent_changes_class(object):
self.streak += 1
if self.streak > 8:
self.streak = -1
send(_("Connection to {wiki} seems to be stable now.").format(wiki=settings["wikiname"]), _("Connection status"), settings["avatars"]["connection_restored"])
send(_("Connection to {wiki} seems to be stable now.").format(wiki=settings["wikiname"]),
_("Connection status"), settings["avatars"]["connection_restored"])
for change in changes:
if change["rcid"] in self.ids or change["rcid"] < self.recent_id:
logging.debug("Change ({}) is in ids or is lower than recent_id {}".format(change["rcid"], self.recent_id))
logging.debug("Change ({}) is in ids or is lower than recent_id {}".format(change["rcid"],
self.recent_id))
continue
logging.debug(self.ids)
logging.debug(self.recent_id)
@ -784,7 +949,8 @@ class recent_changes_class(object):
except requests.exceptions.Timeout:
pass
if online < 1:
logging.error("Failure when checking Internet connection at {time}".format(time=time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime())))
logging.error("Failure when checking Internet connection at {time}".format(
time=time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime())))
self.downtimecredibility = 0
if looped == False:
while 1: # recursed loop, check for connection (every 10 seconds) as long as three services are down, don't do anything else
@ -803,8 +969,10 @@ class recent_changes_class(object):
if self.downtimecredibility < 60:
self.downtimecredibility += 15
else:
if(time.time() - self.last_downtime)>1800 and self.check_connection(): #check if last downtime happened within 30 minutes, if yes, don't send a message
send(_("{wiki} seems to be down or unreachable.").format(wiki=settings["wikiname"]), _("Connection status"), settings["avatars"]["connection_failed"])
if (
time.time() - self.last_downtime) > 1800 and self.check_connection(): # check if last downtime happened within 30 minutes, if yes, don't send a message
send(_("{wiki} seems to be down or unreachable.").format(wiki=settings["wikiname"]),
_("Connection status"), settings["avatars"]["connection_failed"])
self.last_downtime = time.time()
self.streak = 0
@ -812,13 +980,16 @@ class recent_changes_class(object):
self.map_ips = {}
def update_tags(self):
tags_read = safe_read(self.safe_request("https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=tags&tglimit=max&tgprop=name|displayname".format(wiki=settings["wiki"])), "query", "tags")
tags_read = safe_read(self.safe_request(
"https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=tags&tglimit=max&tgprop=name|displayname".format(
wiki=settings["wiki"])), "query", "tags")
if tags_read:
for tag in tags_read:
self.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text()
else:
logging.warning("Could not retrive tags. Internal names will be used!")
recent_changes = recent_changes_class()
if settings["wiki_bot_login"] and settings["wiki_bot_password"]:
recent_changes.log_in()
@ -828,10 +999,12 @@ recent_changes.fetch(amount=settings["limitrefetch" ] if settings["limitrefetch"
schedule.every(settings["cooldown"]).seconds.do(recent_changes.fetch)
if 1 == 2:
print (_("director"), _("bot"), _("editor"), _("directors"), _("sysop"), _("bureaucrat"), _("reviewer"), _("autoreview"), _("autopatrol"), _("wiki_guardian"))
print(_("director"), _("bot"), _("editor"), _("directors"), _("sysop"), _("bureaucrat"), _("reviewer"),
_("autoreview"), _("autopatrol"), _("wiki_guardian"))
if settings["overview"]:
schedule.every().day.at("{}:{}".format(time.strptime(settings["overview_time"], '%H:%M').tm_hour, time.strptime(settings["overview_time"], '%H:%M').tm_min)).do(day_overview)
schedule.every().day.at("{}:{}".format(time.strptime(settings["overview_time"], '%H:%M').tm_hour,
time.strptime(settings["overview_time"], '%H:%M').tm_min)).do(day_overview)
schedule.every().day.at("00:00").do(recent_changes.clear_cache)
while 1: