diff --git a/rcgcdw.py b/rcgcdw.py index 151e272..dfae8bf 100644 --- a/rcgcdw.py +++ b/rcgcdw.py @@ -1,24 +1,24 @@ #!/usr/bin/python # -*- coding: utf-8 -*- -#Recent changes Gamepedia compatible Discord webhook is a project for using a webhook as recent changes page from MediaWiki. -#Copyright (C) 2018 Frisk +# Recent changes Gamepedia compatible Discord webhook is a project for using a webhook as recent changes page from MediaWiki. +# Copyright (C) 2018 Frisk -#This program is free software: you can redistribute it and/or modify -#it under the terms of the GNU Affero General Public License as published -#by the Free Software Foundation, either version 3 of the License, or -#(at your option) any later version. +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. -#This program is distributed in the hope that it will be useful, -#but WITHOUT ANY WARRANTY; without even the implied warranty of -#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -#GNU Affero General Public License for more details. +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. -#You should have received a copy of the GNU Affero General Public License -#along with this program. If not, see . +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . -#WARNING! SHITTY CODE AHEAD. ENTER ONLY IF YOU ARE SURE YOU CAN TAKE IT -#You have been warned +# WARNING! SHITTY CODE AHEAD. ENTER ONLY IF YOU ARE SURE YOU CAN TAKE IT +# You have been warned import time, logging, json, requests, datetime, re, gettext, math, random, os.path, schedule, sys from bs4 import BeautifulSoup @@ -28,7 +28,7 @@ from html.parser import HTMLParser with open("settings.json") as sfile: settings = json.load(sfile) - if settings["limitrefetch"] < settings["limit"] and settings["limitrefetch"]!=-1: + if settings["limitrefetch"] < settings["limit"] and settings["limitrefetch"] != -1: settings["limitrefetch"] = settings["limit"] logging.basicConfig(level=settings["verbose_level"]) if settings["limitrefetch"] != -1 and os.path.exists("lastchange.txt") == False: @@ -39,37 +39,46 @@ lang = gettext.translation('rcgcdw', localedir='locale', languages=[settings["la lang.install() ngettext = lang.ngettext + class MWError(Exception): - pass + pass + class MyHTMLParser(HTMLParser): new_string = "" recent_href = "" + def handle_starttag(self, tag, attrs): for attr in attrs: if attr[0] == 'href': - self.recent_href=attr[1] + self.recent_href = attr[1] if self.recent_href.startswith("//"): self.recent_href = "https:{rest}".format(rest=self.recent_href) elif not self.recent_href.startswith("https"): self.recent_href = "https://{wiki}.gamepedia.com".format(wiki=settings["wiki"]) + self.recent_href - self.recent_href = self.recent_href.replace(")", "\)") + self.recent_href = self.recent_href.replace(")", "\\)") + def handle_data(self, data): if self.recent_href: - self.new_string = self.new_string+"[{}]({})".format(data, self.recent_href) + self.new_string = self.new_string + "[{}]({})".format(data, self.recent_href) self.recent_href = "" else: - self.new_string = self.new_string+data + self.new_string = self.new_string + data + def handle_comment(self, data): - self.new_string = self.new_string+data + self.new_string = self.new_string + data + def handle_endtag(self, tag): - print (self.new_string) - + print(self.new_string) + + HTMLParse = MyHTMLParser() + def send(message, name, avatar): send_to_discord({"content": message, "avatar_url": avatar, "username": name}) - + + def safe_read(request, *keys): if request is None: return None @@ -78,16 +87,19 @@ def safe_read(request, *keys): for item in keys: request = request[item] except KeyError: - logging.warning("Failure while extracting data from request on key {key} in {change}".format(key=item, change=request)) + logging.warning( + "Failure while extracting data from request on key {key} in {change}".format(key=item, change=request)) return None except ValueError: logging.warning("Failure while extracting data from request in {change}".format(change=request)) return None return request + def send_to_discord_webhook(data): try: - result = requests.post(settings["webhookURL"], data=data, headers={**{'Content-Type': 'application/json'}, **settings["header"]}, timeout=10) + result = requests.post(settings["webhookURL"], data=data, + headers={**{'Content-Type': 'application/json'}, **settings["header"]}, timeout=10) except requests.exceptions.Timeout: logging.warning("Timeouted while sending data to the webhook.") return 3 @@ -96,7 +108,8 @@ def send_to_discord_webhook(data): return 3 else: return handle_discord_http(result.status_code, data) - + + def send_to_discord(data): if recent_changes.unsent_messages: recent_changes.unsent_messages.append(data) @@ -110,7 +123,8 @@ def send_to_discord(data): elif code < 2: time.sleep(2.5) pass - + + def webhook_formatter(action, STATIC, **params): logging.debug("Received things: {thing}".format(thing=params)) colornumber = None if isinstance(STATIC["color"], str) else STATIC["color"] @@ -120,45 +134,59 @@ def webhook_formatter(action, STATIC, **params): if "title" in params: article_encoded = params["title"].replace(" ", "_").replace(')', '\)') if re.match(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", params["user"]) is not None: - author_url = "https://{wiki}.gamepedia.com/Special:Contributions/{user}".format(wiki=settings["wiki"], user=params["user"]) + author_url = "https://{wiki}.gamepedia.com/Special:Contributions/{user}".format(wiki=settings["wiki"], + user=params["user"]) if params["user"] not in list(recent_changes.map_ips.keys()): - contibs = safe_read(recent_changes.safe_request("https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=usercontribs&uclimit=max&ucuser={user}&ucprop=".format(wiki=settings["wiki"], user=params["user"])), "query", "usercontribs") + contibs = safe_read(recent_changes.safe_request( + "https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=usercontribs&uclimit=max&ucuser={user}&ucprop=".format( + wiki=settings["wiki"], user=params["user"])), "query", "usercontribs") if contibs is None: - logging.warning("WARNING: Something went wrong when checking amount of contributions for given IP address") + logging.warning( + "WARNING: Something went wrong when checking amount of contributions for given IP address") params["user"] = params["user"] + "(?)" else: params["user"] = "{author} ({contribs})".format(author=params["user"], contribs=len(contibs)) - recent_changes.map_ips[params["user"]]=len(contibs) + recent_changes.map_ips[params["user"]] = len(contibs) else: - recent_changes.map_ips[params["user"]]+=1 - params["user"] = "{author} ({amount})".format(author=params["user"], amount=recent_changes.map_ips[params["user"]]) + recent_changes.map_ips[params["user"]] += 1 + params["user"] = "{author} ({amount})".format(author=params["user"], + amount=recent_changes.map_ips[params["user"]]) else: - author_url = "https://{wiki}.gamepedia.com/User:{user}".format(wiki=settings["wiki"], user=params["user"].replace(" ", "_")) - if action in ("edit", "new"): #edit or new page + author_url = "https://{wiki}.gamepedia.com/User:{user}".format(wiki=settings["wiki"], + user=params["user"].replace(" ", "_")) + if action in ("edit", "new"): # edit or new page editsize = params["size"] - print (editsize) + print(editsize) if editsize > 0: if editsize > 6032: colornumber = 65280 else: - colornumber = 35840 + (math.floor(editsize/(52)))*256 + colornumber = 35840 + (math.floor(editsize / (52))) * 256 elif editsize < 0: if editsize < -6032: colornumber = 16711680 else: - colornumber = 9175040 + (math.floor((editsize*-1)/(52)))*65536 + colornumber = 9175040 + (math.floor((editsize * -1) / (52))) * 65536 elif editsize == 0: colornumber = 8750469 - link = "https://{wiki}.gamepedia.com/index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format(wiki=settings["wiki"], pageid=params["pageid"], diff=params["diff"], oldrev=params["oldrev"], article=params["title"].replace(" ", "_")) - embed["title"] = "{article} ({new}{minor}{editsize})".format(article=params["title"], editsize="+"+str(editsize) if editsize>0 else editsize, new= _("(N!) ") if action == "new" else "", minor=_("m ") if action == "edit" and params["minor"] else "") - elif action in ("upload/overwrite", "upload/upload"): #sending files + link = "https://{wiki}.gamepedia.com/index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format( + wiki=settings["wiki"], pageid=params["pageid"], diff=params["diff"], oldrev=params["oldrev"], + article=params["title"].replace(" ", "_")) + embed["title"] = "{article} ({new}{minor}{editsize})".format(article=params["title"], editsize="+" + str( + editsize) if editsize > 0 else editsize, new=_("(N!) ") if action == "new" else "", + minor=_("m ") if action == "edit" and params[ + "minor"] else "") + elif action in ("upload/overwrite", "upload/upload"): # sending files license = None - urls = safe_read(recent_changes.safe_request("https://{wiki}.gamepedia.com/api.php?action=query&format=json&prop=imageinfo&list=&meta=&titles={filename}&iiprop=timestamp%7Curl&iilimit=2".format(wiki=settings["wiki"], filename=params["title"])), "query", "pages") + urls = safe_read(recent_changes.safe_request( + "https://{wiki}.gamepedia.com/api.php?action=query&format=json&prop=imageinfo&list=&meta=&titles={filename}&iiprop=timestamp%7Curl&iilimit=2".format( + wiki=settings["wiki"], filename=params["title"])), "query", "pages") undolink = "" - link ="https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["title"].replace(" ", "_")) additional_info_retrieved = False if urls is not None: - if "-1" not in urls: #oage removed before we asked for it + if "-1" not in urls: # oage removed before we asked for it img_info = next(iter(urls.values()))["imageinfo"] embed["image"]["url"] = img_info[0]["url"] additional_info_retrieved = True @@ -167,12 +195,17 @@ def webhook_formatter(action, STATIC, **params): if params["overwrite"]: if additional_info_retrieved: img_timestamp = [x for x in img_info[1]["timestamp"] if x.isdigit()] - undolink = "https://{wiki}.gamepedia.com/index.php?title={filename}&action=revert&oldimage={timestamp}%21{filenamewon}".format(wiki=settings["wiki"], filename=article_encoded, timestamp="".join(img_timestamp), filenamewon = article_encoded[5:]) - embed["fields"] = [{"name": _("Options"), "value": _("([preview]({link}) | [undo]({undolink}))").format(link=embed["image"]["url"], undolink=undolink)}] + undolink = "https://{wiki}.gamepedia.com/index.php?title={filename}&action=revert&oldimage={timestamp}%21{filenamewon}".format( + wiki=settings["wiki"], filename=article_encoded, timestamp="".join(img_timestamp), + filenamewon=article_encoded[5:]) + embed["fields"] = [{"name": _("Options"), "value": _("([preview]({link}) | [undo]({undolink}))").format( + link=embed["image"]["url"], undolink=undolink)}] embed["title"] = _("Uploaded a new version of {name}").format(name=params["title"]) else: embed["title"] = _("Uploaded {name}").format(name=params["title"]) - article_content = safe_read(recent_changes.safe_request("https://{wiki}.gamepedia.com/api.php?action=query&format=json&prop=revisions&titles={article}&rvprop=content".format(wiki=settings["wiki"], article=quote_plus(params["title"], safe=''))), "query", "pages") + article_content = safe_read(recent_changes.safe_request( + "https://{wiki}.gamepedia.com/api.php?action=query&format=json&prop=revisions&titles={article}&rvprop=content".format( + wiki=settings["wiki"], article=quote_plus(params["title"], safe=''))), "query", "pages") if article_content is None: logging.warning("Something went wrong when getting license for the image") return 0 @@ -188,57 +221,83 @@ def webhook_formatter(action, STATIC, **params): else: license = "?" except IndexError: - logging.error("Given regex for the license detection is incorrect. It does not have a capturing group called \"license\" specified. Please fix license_regex value in the config!") + logging.error( + "Given regex for the license detection is incorrect. It does not have a capturing group called \"license\" specified. Please fix license_regex value in the config!") license = "?" except re.error: - logging.error("Given regex for the license detection is incorrect. Please fix license_regex or license_regex_detect values in the config!") + logging.error( + "Given regex for the license detection is incorrect. Please fix license_regex or license_regex_detect values in the config!") license = "?" if additional_info_retrieved: - embed["fields"] = [{"name": _("Options"), "value": _("([preview]({link}))").format(link=embed["image"]["url"])}] - params["desc"] = _("{desc}\nLicense: {license}").format(desc=params["desc"], license=license if license is not None else "?") + embed["fields"] = [ + {"name": _("Options"), "value": _("([preview]({link}))").format(link=embed["image"]["url"])}] + params["desc"] = _("{desc}\nLicense: {license}").format(desc=params["desc"], + license=license if license is not None else "?") elif action == "delete/delete": - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["title"].replace(" ", "_")) embed["title"] = _("Deleted page {article}").format(article=params["title"]) elif action == "delete/delete_redir": - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["title"].replace(" ", "_")) embed["title"] = _("Deleted redirect {article} by overwriting").format(article=params["title"]) elif action == "move/move": - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["target"].replace(" ", "_")) - params["desc"] = "{supress}. {desc}".format(desc=params["desc"], supress=_("No redirect has been made") if params["supress"] == True else _("A redirect has been made")) - embed["title"] = _("Moved {article} to {target}").format(article = params["title"], target=params["target"]) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["target"].replace(" ", "_")) + params["desc"] = "{supress}. {desc}".format(desc=params["desc"], + supress=_("No redirect has been made") if params[ + "supress"] == True else _( + "A redirect has been made")) + embed["title"] = _("Moved {article} to {target}").format(article=params["title"], target=params["target"]) elif action == "move/move_redir": - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["target"].replace(" ", "_")) - embed["title"] = _("Moved {article} to {title} over redirect").format(article=params["title"], title=params["target"]) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["target"].replace(" ", "_")) + embed["title"] = _("Moved {article} to {title} over redirect").format(article=params["title"], + title=params["target"]) elif action == "protect/move_prot": - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) - embed["title"] = _("Moved protection settings from {article} to {title}").format(article=params["title"], title=params["target"]) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["title"].replace(" ", "_")) + embed["title"] = _("Moved protection settings from {article} to {title}").format(article=params["title"], + title=params["target"]) elif action == "block/block": - link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"], user=params["blocked_user"].replace(" ", "_").replace(')', '\)')) + link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"], + user=params["blocked_user"].replace(" ", "_").replace(')', + '\)')) user = params["blocked_user"].split(':')[1] - time =_( "infinity and beyond") if params["duration"] == "infinite" else params["duration"] + time = _("infinity and beyond") if params["duration"] == "infinite" else params["duration"] embed["title"] = _("Blocked {blocked_user} for {time}").format(blocked_user=user, time=time) elif action == "block/reblock": - link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"], user=params["blocked_user"].replace(" ", "_").replace(')', '\)')) + link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"], + user=params["blocked_user"].replace(" ", "_").replace(')', + '\)')) user = params["blocked_user"].split(':')[1] embed["title"] = _("Changed block settings for {blocked_user}").format(blocked_user=user) elif action == "block/unblock": - link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"], user=params["blocked_user"].replace(" ", "_").replace(')', '\)')) + link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"], + user=params["blocked_user"].replace(" ", "_").replace(')', + '\)')) user = params["blocked_user"].split(':')[1] embed["title"] = _("Unblocked {blocked_user}").format(blocked_user=user) elif action == "curseprofile/comment-created": - link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], commentid=params["commentid"]) - #link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) old way of linking + link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], + commentid=params["commentid"]) + # link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) old way of linking embed["title"] = _("Left a comment on {target}'s profile").format(target=params["target"]) elif action == "curseprofile/comment-replied": - #link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) - link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], commentid=params["commentid"]) + # link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) + link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], + commentid=params["commentid"]) embed["title"] = _("Replied to a comment on {target}'s profile").format(target=params["target"]) elif action == "curseprofile/comment-edited": - #link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) - link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], commentid=params["commentid"]) + # link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) + link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], + commentid=params["commentid"]) embed["title"] = _("Edited a comment on {target}'s profile").format(target=params["target"]) elif action == "curseprofile/profile-edited": - link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) + link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], + target=params["target"].replace(" ", + "_").replace( + ')', '\)')) if params["field"] == "profile-location": field = _("Location") elif params["field"] == "profile-aboutme": @@ -266,17 +325,19 @@ def webhook_formatter(action, STATIC, **params): embed["title"] = _("Edited {target}'s profile").format(target=params["target"]) params["desc"] = _("{field} field changed to: {desc}").format(field=field, desc=params["desc"]) elif action == "curseprofile/comment-deleted": - link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], commentid=params["commentid"]) - #link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) + link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], + commentid=params["commentid"]) + # link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) embed["title"] = _("Deleted a comment on {target}'s profile").format(target=params["target"]) elif action in ("rights/rights", "rights/autopromote"): - link = "https://{wiki}.gamepedia.com/User:".format(wiki=settings["wiki"])+params["title"].split(":")[1] + link = "https://{wiki}.gamepedia.com/User:".format(wiki=settings["wiki"]) + params["title"].split(":")[1] if action == "rights/rights": embed["title"] = _("Changed group membership for {target}").format(target=params["title"].split(":")[1]) else: params["user"] = _("System") author_url = "" - embed["title"] = _("{target} got autopromoted to a new usergroup").format(target=params["title"].split(":")[1]) + embed["title"] = _("{target} got autopromoted to a new usergroup").format( + target=params["title"].split(":")[1]) if len(params["old_groups"]) < len(params["new_groups"]): embed["thumbnail"]["url"] = "https://i.imgur.com/WnGhF5g.gif" old_groups = [] @@ -289,28 +350,39 @@ def webhook_formatter(action, STATIC, **params): old_groups = [_("none")] if len(new_groups) == 0: new_groups = [_("none")] - reason = ": {desc}".format(desc=params["desc"]) if params["desc"]!=_("No description provided") else "" - params["desc"] = _("Groups changed from {old_groups} to {new_groups}{reason}").format(old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), reason=reason) + reason = ": {desc}".format(desc=params["desc"]) if params["desc"] != _("No description provided") else "" + params["desc"] = _("Groups changed from {old_groups} to {new_groups}{reason}").format( + old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), reason=reason) elif action == "protect/protect": - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["title"].replace(" ", "_")) embed["title"] = _("Protected {target}").format(target=params["title"]) params["desc"] = params["settings"] + " | " + params["desc"] elif action == "protect/modify": - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["title"].replace(" ", "_")) embed["title"] = _("Changed protection level for {article}").format(article=params["title"]) params["desc"] = params["settings"] + " | " + params["desc"] elif action == "protect/unprotect": - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["title"].replace(" ", "_")) embed["title"] = _("Removed protection from {article}").format(article=params["title"]) elif action == "delete/revision": amount = len(params["amount"]) - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) - embed["title"] = ngettext("Changed visibility of revision on page {article} ", "Changed visibility of {amount} revisions on page {article} ", amount).format(article=params["title"], amount=amount) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["title"].replace(" ", "_")) + embed["title"] = ngettext("Changed visibility of revision on page {article} ", + "Changed visibility of {amount} revisions on page {article} ", amount).format( + article=params["title"], amount=amount) elif action == "import/upload": - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) - embed["title"] = ngettext("Imported {article} with {count} revision", "Imported {article} with {count} revisions", params["amount"]).format(article=params["title"], count=params["amount"]) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["title"].replace(" ", "_")) + embed["title"] = ngettext("Imported {article} with {count} revision", + "Imported {article} with {count} revisions", params["amount"]).format( + article=params["title"], count=params["amount"]) elif action == "delete/restore": - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["title"].replace(" ", "_")) embed["title"] = _("Restored {article}").format(article=params["title"]) elif action == "delete/event": link = "https://{wiki}.gamepedia.com/Special:RecentChanges".format(wiki=settings["wiki"]) @@ -322,32 +394,44 @@ def webhook_formatter(action, STATIC, **params): link = "https://{wiki}.gamepedia.com/Special:RecentChanges".format(wiki=settings["wiki"]) embed["title"] = _("Edited abuse filter number {number}").format(number=params["filternr"]) elif action == "merge/merge": - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) - embed["title"] = _("Merged revision histories of {article} into {dest}").format(article=params["title"], dest=params["destination"]) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["title"].replace(" ", "_")) + embed["title"] = _("Merged revision histories of {article} into {dest}").format(article=params["title"], + dest=params["destination"]) elif action == "interwiki/iw_add": link = "https://{wiki}.gamepedia.com/Special:Interwiki".format(wiki=settings["wiki"]) embed["title"] = _("Added an entry to the interwiki table") - params["desc"] =_("Prefix: {prefix}, website: {website} | {desc}").format(desc=params["desc"], prefix=params["prefix"], website=params["website"]) + params["desc"] = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=params["desc"], + prefix=params["prefix"], + website=params["website"]) elif action == "interwiki/iw_edit": link = "https://{wiki}.gamepedia.com/Special:Interwiki".format(wiki=settings["wiki"]) embed["title"] = _("Edited an entry in interwiki table") - params["desc"] =_("Prefix: {prefix}, website: {website} | {desc}").format(desc=params["desc"], prefix=params["prefix"], website=params["website"]) + params["desc"] = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=params["desc"], + prefix=params["prefix"], + website=params["website"]) elif action == "interwiki/iw_delete": link = "https://{wiki}.gamepedia.com/Special:Interwiki".format(wiki=settings["wiki"]) embed["title"] = _("Deleted an entry in interwiki table") - params["desc"] =_("Prefix: {prefix} | {desc}").format(desc=params["desc"], prefix=params["prefix"]) + params["desc"] = _("Prefix: {prefix} | {desc}").format(desc=params["desc"], prefix=params["prefix"]) elif action == "contentmodel/change": - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["title"].replace(" ", "_")) embed["title"] = _("Changed the content model of the page {article}").format(article=params["title"]) - params["desc"] = _("Model changed from {old} to {new}: {reason}").format(old=params["oldmodel"], new=params["newmodel"], reason=params["desc"]) + params["desc"] = _("Model changed from {old} to {new}: {reason}").format(old=params["oldmodel"], + new=params["newmodel"], + reason=params["desc"]) elif action == "sprite/sprite": - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["title"].replace(" ", "_")) embed["title"] = _("Edited the sprite for {article}").format(article=params["title"]) elif action == "sprite/sheet": - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["title"].replace(" ", "_")) embed["title"] = _("Created the sprite sheet for {article}").format(article=params["title"]) elif action == "sprite/slice": - link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) + link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], + article=params["title"].replace(" ", "_")) embed["title"] = _("Edited the slice for {article}").format(article=params["title"]) elif action == "managetags/create": link = "https://{wiki}.gamepedia.com/Special:Tags".format(wiki=settings["wiki"]) @@ -391,30 +475,38 @@ def webhook_formatter(action, STATIC, **params): data['avatar_url'] = settings["avatars"]["embed"] formatted_embed = json.dumps(data, indent=4) send_to_discord(formatted_embed) - + + def handle_discord_http(code, formatted_embed): - if 300 > code > 199: #message went through + if 300 > code > 199: # message went through return 0 - elif code == 400: #HTTP BAD REQUEST - logging.error("Following message has been rejected by Discord, please submit a bug on our bugtracker adding it:") + elif code == 400: # HTTP BAD REQUEST + logging.error( + "Following message has been rejected by Discord, please submit a bug on our bugtracker adding it:") logging.error(formatted_embed) return 1 - elif code == 401 or code == 404: #HTTP UNAUTHORIZED AND NOT FOUND + elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND logging.error("Webhook URL is invalid or no longer in use, please replace it with proper one.") sys.exit(1) elif code == 429: logging.error("We are sending too many requests to the Discord, slowing down...") return 2 elif 499 < code < 600: - logging.error("Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format(code)) + logging.error( + "Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format( + code)) return 3 - -def first_pass(change): #I've decided to split the embed formatter and change handler, maybe it's more messy this way, I don't know + + +def first_pass( + change): # I've decided to split the embed formatter and change handler, maybe it's more messy this way, I don't know if "actionhidden" in change or "suppressed" in change and "suppressed" not in settings["ignored"]: - webhook_formatter("suppressed", {"timestamp": change["timestamp"], "color": settings["appearance"]["suppressed"]["color"], "icon": settings["appearance"]["suppressed"]["icon"]}, user=change["user"]) + webhook_formatter("suppressed", + {"timestamp": change["timestamp"], "color": settings["appearance"]["suppressed"]["color"], + "icon": settings["appearance"]["suppressed"]["icon"]}, user=change["user"]) return parse_output = HTMLParse.feed(change["parsedcomment"]) - #parsedcomment = (BeautifulSoup(change["parsedcomment"], "lxml")).get_text() + # parsedcomment = (BeautifulSoup(change["parsedcomment"], "lxml")).get_text() parsedcomment = HTMLParse.new_string HTMLParse.new_string = "" logging.debug(change) @@ -422,120 +514,160 @@ def first_pass(change): #I've decided to split the embed formatter and change ha if not parsedcomment: parsedcomment = _("No description provided") if change["type"] == "edit" and "edit" not in settings["ignored"]: - STATIC_VARS = {**STATIC_VARS ,**{"color": settings["appearance"]["edit"]["color"], "icon": settings["appearance"]["edit"]["icon"]}} - webhook_formatter("edit", STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, oldrev=change["old_revid"], pageid=change["pageid"], diff=change["revid"], size=change["newlen"]-change["oldlen"], minor= True if "minor" in change else False) + STATIC_VARS = {**STATIC_VARS, **{"color": settings["appearance"]["edit"]["color"], + "icon": settings["appearance"]["edit"]["icon"]}} + webhook_formatter("edit", STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + oldrev=change["old_revid"], pageid=change["pageid"], diff=change["revid"], + size=change["newlen"] - change["oldlen"], minor=True if "minor" in change else False) elif change["type"] == "log": combination = "{logtype}/{logaction}".format(logtype=change["logtype"], logaction=change["logaction"]) if combination in settings["ignored"]: return logging.debug("combination is {}".format(combination)) try: - STATIC_VARS = {**STATIC_VARS ,**{"color": settings["appearance"][combination]["color"], "icon": settings["appearance"][combination]["icon"]}} + STATIC_VARS = {**STATIC_VARS, **{"color": settings["appearance"][combination]["color"], + "icon": settings["appearance"][combination]["icon"]}} except KeyError: - STATIC_VARS = {**STATIC_VARS ,**{"color": "", "icon": ""}} + STATIC_VARS = {**STATIC_VARS, **{"color": "", "icon": ""}} logging.error("No value in the settings has been given for {}".format(combination)) if combination == "protect/protect": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, settings=change["logparams"]["description"]) - elif combination=="protect/modify": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, settings=change["logparams"]["description"]) - elif combination=="protect/unprotect": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + settings=change["logparams"]["description"]) + elif combination == "protect/modify": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + settings=change["logparams"]["description"]) + elif combination == "protect/unprotect": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) - elif combination=="upload/overwrite": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, overwrite=True) - elif combination=="upload/upload": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, overwrite=False) - elif combination=="delete/delete": + elif combination == "upload/overwrite": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + overwrite=True) + elif combination == "upload/upload": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + overwrite=False) + elif combination == "delete/delete": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) - elif combination=="delete/delete_redir": + elif combination == "delete/delete_redir": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) - elif combination=="delete/restore": + elif combination == "delete/restore": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) - elif combination=="delete/revision": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, amount=change["logparams"]["ids"]) - elif combination=="delete/event": + elif combination == "delete/revision": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + amount=change["logparams"]["ids"]) + elif combination == "delete/event": webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment) - elif combination=="import/upload": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, amount=change["logparams"]["count"]) - elif combination=="import/interwiki": + elif combination == "import/upload": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + amount=change["logparams"]["count"]) + elif combination == "import/interwiki": webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment) - elif combination=="merge/merge" : - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, destination=change["logparams"]["dest_title"]) - elif combination=="move/move": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, supress=True if "suppressredirect" in change["logparams"] else False, target=change["logparams"]['target_title']) - elif combination=="move/move_redir": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, target=change["logparams"]["target_title"]) - elif combination=="protect/move_prot": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, target=change["logparams"]["oldtitle_title"]) - elif combination=="block/block": - webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"], desc=parsedcomment, duration=change["logparams"]["duration"]) - elif combination=="block/unblock": - webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"], desc=parsedcomment) - elif combination=="block/reblock": - webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"], desc=parsedcomment) - elif combination=="rights/rights": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, old_groups=change["logparams"]["oldgroups"], new_groups=change["logparams"]["newgroups"]) - elif combination=="rights/autopromote": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, old_groups=change["logparams"]["oldgroups"], new_groups=change["logparams"]["newgroups"]) - elif combination=="abusefilter/modify": - webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, filternr=change["logparams"]['1']) - elif combination=="interwiki/iw_add": - webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, prefix=change["logparams"]['0'], website=change["logparams"]['1']) - elif combination=="interwiki/iw_edit": - webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, prefix=change["logparams"]['0'], website=change["logparams"]['1']) - elif combination=="interwiki/iw_delete": - webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, prefix=change["logparams"]['0']) - elif combination=="curseprofile/comment-created": - webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], commentid=change["logparams"]["0"]) - elif combination=="curseprofile/comment-edited": - webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], commentid=change["logparams"]["0"]) - elif combination=="curseprofile/comment-deleted": - webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], commentid=change["logparams"]["0"]) - elif combination=="curseprofile/profile-edited": - webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], field=change["logparams"]['0'], desc=change["parsedcomment"]) - elif combination=="curseprofile/comment-replied": - webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], commentid=change["logparams"]["0"]) - elif combination=="contentmodel/change": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, oldmodel=change["logparams" ]["oldmodel"], newmodel=change["logparams" ]["newmodel"]) - elif combination=="sprite/sprite": + elif combination == "merge/merge": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + destination=change["logparams"]["dest_title"]) + elif combination == "move/move": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + supress=True if "suppressredirect" in change["logparams"] else False, + target=change["logparams"]['target_title']) + elif combination == "move/move_redir": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + target=change["logparams"]["target_title"]) + elif combination == "protect/move_prot": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + target=change["logparams"]["oldtitle_title"]) + elif combination == "block/block": + webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"], + desc=parsedcomment, duration=change["logparams"]["duration"]) + elif combination == "block/unblock": + webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"], + desc=parsedcomment) + elif combination == "block/reblock": + webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"], + desc=parsedcomment) + elif combination == "rights/rights": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + old_groups=change["logparams"]["oldgroups"], new_groups=change["logparams"]["newgroups"]) + elif combination == "rights/autopromote": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + old_groups=change["logparams"]["oldgroups"], new_groups=change["logparams"]["newgroups"]) + elif combination == "abusefilter/modify": + webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, + filternr=change["logparams"]['1']) + elif combination == "interwiki/iw_add": + webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, + prefix=change["logparams"]['0'], website=change["logparams"]['1']) + elif combination == "interwiki/iw_edit": + webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, + prefix=change["logparams"]['0'], website=change["logparams"]['1']) + elif combination == "interwiki/iw_delete": + webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, + prefix=change["logparams"]['0']) + elif combination == "curseprofile/comment-created": + webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], + commentid=change["logparams"]["0"]) + elif combination == "curseprofile/comment-edited": + webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], + commentid=change["logparams"]["0"]) + elif combination == "curseprofile/comment-deleted": + webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], + commentid=change["logparams"]["0"]) + elif combination == "curseprofile/profile-edited": + webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], + field=change["logparams"]['0'], desc=change["parsedcomment"]) + elif combination == "curseprofile/comment-replied": + webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], + commentid=change["logparams"]["0"]) + elif combination == "contentmodel/change": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + oldmodel=change["logparams"]["oldmodel"], newmodel=change["logparams"]["newmodel"]) + elif combination == "sprite/sprite": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) - elif combination=="sprite/sheet": + elif combination == "sprite/sheet": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) - elif combination=="sprite/slice": + elif combination == "sprite/slice": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) - elif combination=="managetags/create": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, additional=change["logparams"]) - elif combination=="managetags/delete": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, additional=change["logparams"]) - elif combination=="managetags/activate": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, additional=change["logparams"]) - elif combination=="managetags/deactivate": - webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, additional=change["logparams"]) - elif combination=="tag/update": + elif combination == "managetags/create": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + additional=change["logparams"]) + elif combination == "managetags/delete": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + additional=change["logparams"]) + elif combination == "managetags/activate": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + additional=change["logparams"]) + elif combination == "managetags/deactivate": + webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + additional=change["logparams"]) + elif combination == "tag/update": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) else: logging.warning("No entry matches given change!") - print (change) + print(change) send(_("Unable to process the event"), _("error"), settings["avatars"]["no_event"]) return - if change["type"] == "external": #not sure what happens then, but it's listed as possible type + if change["type"] == "external": # not sure what happens then, but it's listed as possible type logging.warning("External event happened, ignoring.") - print (change) + print(change) return - elif change["type"] == "new" and "new" not in settings["ignored"]: #new page - STATIC_VARS = {**STATIC_VARS ,**{"color": settings["appearance"]["new"]["color"], "icon": settings["appearance"]["new"]["icon"]}} - webhook_formatter("new", STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, oldrev=change["old_revid"], pageid=change["pageid"], diff=change["revid"], size=change["newlen"]) - + elif change["type"] == "new" and "new" not in settings["ignored"]: # new page + STATIC_VARS = {**STATIC_VARS, **{"color": settings["appearance"]["new"]["color"], + "icon": settings["appearance"]["new"]["icon"]}} + webhook_formatter("new", STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, + oldrev=change["old_revid"], pageid=change["pageid"], diff=change["revid"], + size=change["newlen"]) + + def day_overview_request(): logging.info("Fetching daily overview... This may take up to 30 seconds!") - timestamp = (datetime.datetime.utcnow()-datetime.timedelta(hours=24)).isoformat(timespec='milliseconds') + timestamp = (datetime.datetime.utcnow() - datetime.timedelta(hours=24)).isoformat(timespec='milliseconds') logging.debug("timestamp is {}".format(timestamp)) complete = False result = [] passes = 0 continuearg = "" while not complete and passes < 10: - request = recent_changes.safe_request("https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=recentchanges&rcend={timestamp}Z&rcprop=title%7Ctimestamp%7Csizes%7Cloginfo%7Cuser&rcshow=!bot&rclimit=500&rctype=edit%7Cnew%7Clog{continuearg}".format(wiki=settings["wiki"], timestamp=timestamp, continuearg=continuearg)) - if request: + request = recent_changes.safe_request( + "https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=recentchanges&rcend={timestamp}Z&rcprop=title%7Ctimestamp%7Csizes%7Cloginfo%7Cuser&rcshow=!bot&rclimit=500&rctype=edit%7Cnew%7Clog{continuearg}".format( + wiki=settings["wiki"], timestamp=timestamp, continuearg=continuearg)) + if request: try: request = request.json() rc = request['query']['recentchanges'] @@ -548,11 +680,13 @@ def day_overview_request(): logging.warning("Wiki returned %s" % (request.json())) complete = 2 else: - result+= rc + result += rc if continuearg: continuearg = "&rccontinue={}".format(continuearg) - passes+=1 - logging.debug("continuing requesting next pages of recent changes with {} passes and continuearg being {}".format(passes, continuearg)) + passes += 1 + logging.debug( + "continuing requesting next pages of recent changes with {} passes and continuearg being {}".format( + passes, continuearg)) time.sleep(3.0) else: complete = 1 @@ -562,15 +696,17 @@ def day_overview_request(): logging.debug("quit the loop because there been too many passes") return (result, complete) + def add_to_dict(dictionary, key): if key in dictionary: - dictionary[key]+=1 + dictionary[key] += 1 else: - dictionary[key]=1 + dictionary[key] = 1 return dictionary -def day_overview(): #time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime(time.time())) - #(datetime.datetime.utcnow()+datetime.timedelta(hours=0)).isoformat(timespec='milliseconds')+'Z' + +def day_overview(): # time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime(time.time())) + # (datetime.datetime.utcnow()+datetime.timedelta(hours=0)).isoformat(timespec='milliseconds')+'Z' result = day_overview_request() if result[1] == 1: activity = defaultdict(dict) @@ -584,18 +720,19 @@ def day_overview(): #time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime(time.ti activity = add_to_dict(activity, item["user"]) hours = add_to_dict(hours, datetime.datetime.strptime(item["timestamp"], "%Y-%m-%dT%H:%M:%SZ").hour) if "actionhidden" in item or "suppressed" in item: - continue #while such actions have type value (edit/new/log) many other values are hidden and therefore can crash with key error, let's not process such events - if item["type"]=="edit": + continue # while such actions have type value (edit/new/log) many other values are hidden and therefore can crash with key error, let's not process such events + if item["type"] == "edit": edits += 1 - changed_bytes += item["newlen"]-item["oldlen"] + changed_bytes += item["newlen"] - item["oldlen"] if item["type"] == "new": if item["ns"] == 0: - new_articles+=1 + new_articles += 1 changed_bytes += item["newlen"] if item["type"] == "log": - files = files+1 if item["logtype"] == item["logaction"] == "upload" else files - admin = admin+1 if item["logtype"] in ["delete", "merge", "block", "protect", "import", "rights", "abusefilter", "interwiki", "managetags"] else admin - overall = round(new_articles+edits*0.1+files*0.3+admin*0.1+math.fabs(changed_bytes*0.001), 2) + files = files + 1 if item["logtype"] == item["logaction"] == "upload" else files + admin = admin + 1 if item["logtype"] in ["delete", "merge", "block", "protect", "import", "rights", + "abusefilter", "interwiki", "managetags"] else admin + overall = round(new_articles + edits * 0.1 + files * 0.3 + admin * 0.1 + math.fabs(changed_bytes * 0.001), 2) embed = defaultdict(dict) embed["title"] = _("Daily overview") embed["url"] = "https://{wiki}.gamepedia.com/Special:Statistics".format(wiki=settings["wiki"]) @@ -606,22 +743,28 @@ def day_overview(): #time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime(time.ti if activity: v = activity.values() active_users = [] - for user, numberu in Counter(activity).most_common(list(v).count(max(v))): #find most active users + for user, numberu in Counter(activity).most_common(list(v).count(max(v))): # find most active users active_users.append(user) the_one = random.choice(active_users) v = hours.values() active_hours = [] - for hour, numberh in Counter(hours).most_common(list(v).count(max(v))): #find most active users + for hour, numberh in Counter(hours).most_common(list(v).count(max(v))): # find most active users active_hours.append(str(hour)) usramount = ngettext(" ({} action)", " ({} actions)", numberu).format(numberu) houramount = ngettext(" UTC ({} action)", " UTC ({} actions)", numberh).format(numberh) else: - active_users = [_("But nobody came")] #a reference to my favorite game of all the time, sorry ^_^ + active_users = [_("But nobody came")] # a reference to my favorite game of all the time, sorry ^_^ active_hours = [_("But nobody came")] usramount = "" houramount = "" embed["fields"] = [] - fields = ((ngettext("Most active user", "Most active users", len(active_users)), ', '.join(active_users) + usramount), (_("Edits made"), edits), (_("New files"), files), (_("Admin actions"), admin), (_("Bytes changed"), changed_bytes), (_("New articles"), new_articles), (_("Unique contributors"), str(len(activity))), (ngettext("Most active hour", "Most active hours", len(active_hours)), ', '.join(active_hours) + houramount), (_("Day score"), str(overall))) + fields = ( + (ngettext("Most active user", "Most active users", len(active_users)), ', '.join(active_users) + usramount), + (_("Edits made"), edits), (_("New files"), files), (_("Admin actions"), admin), + (_("Bytes changed"), changed_bytes), (_("New articles"), new_articles), + (_("Unique contributors"), str(len(activity))), + (ngettext("Most active hour", "Most active hours", len(active_hours)), ', '.join(active_hours) + houramount), + (_("Day score"), str(overall))) for name, value in fields: embed["fields"].append({"name": name, "value": value}) data = {} @@ -631,6 +774,7 @@ def day_overview(): #time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime(time.ti else: logging.debug("function requesting changes for day overview returned with error code") + class recent_changes_class(object): starttime = time.time() ids = [] @@ -653,10 +797,10 @@ class recent_changes_class(object): logging.debug("File_id is {val}".format(val=file_id)) else: logging.debug("File is empty") - file_id = 999999999 + file_id = 999999999 else: - file_id = 999999999 #such value won't cause trouble, and it will make sure no refetch happen - + file_id = 999999999 # such value won't cause trouble, and it will make sure no refetch happen + def handle_mw_errors(self, request): if "errors" in request: print(request["errors"]) @@ -664,17 +808,29 @@ class recent_changes_class(object): return request def log_in(self): - #session.cookies.clear() + # session.cookies.clear() if '@' not in settings["wiki_bot_login"]: - logging.error("Please provide proper nickname for login from https://{wiki}.gamepedia.com/Special:BotPasswords".format(wiki=settings["wiki"])) + logging.error( + "Please provide proper nickname for login from https://{wiki}.gamepedia.com/Special:BotPasswords".format( + wiki=settings["wiki"])) return if len(settings["wiki_bot_password"]) != 32: - logging.error("Password seems incorrect. It should be 32 characters long! Grab it from https://{wiki}.gamepedia.com/Special:BotPasswords".format(wiki=settings["wiki"])) + logging.error( + "Password seems incorrect. It should be 32 characters long! Grab it from https://{wiki}.gamepedia.com/Special:BotPasswords".format( + wiki=settings["wiki"])) return logging.info("Trying to log in to https://{wiki}.gamepedia.com...".format(wiki=settings["wiki"])) try: - response = self.handle_mw_errors(self.session.post("https://{wiki}.gamepedia.com/api.php".format(wiki=settings["wiki"]), data={'action': 'query', 'format': 'json', 'utf8': '', 'meta': 'tokens', 'type': 'login'})) - response = self.handle_mw_errors(self.session.post("https://{wiki}.gamepedia.com/api.php".format(wiki=settings["wiki"]), data={'action': 'login', 'format': 'json', 'utf8': '', 'lgname': settings["wiki_bot_login"], 'lgpassword':settings["wiki_bot_password"], 'lgtoken': response.json()['query']['tokens']['logintoken']})) + response = self.handle_mw_errors( + self.session.post("https://{wiki}.gamepedia.com/api.php".format(wiki=settings["wiki"]), + data={'action': 'query', 'format': 'json', 'utf8': '', 'meta': 'tokens', + 'type': 'login'})) + response = self.handle_mw_errors( + self.session.post("https://{wiki}.gamepedia.com/api.php".format(wiki=settings["wiki"]), + data={'action': 'login', 'format': 'json', 'utf8': '', + 'lgname': settings["wiki_bot_login"], + 'lgpassword': settings["wiki_bot_password"], + 'lgtoken': response.json()['query']['tokens']['logintoken']})) except ValueError: logging.error("Logging in have not succeeded") return @@ -682,24 +838,28 @@ class recent_changes_class(object): logging.error("Logging in have not succeeded") return try: - if response.json()['login']['result']=="Success": + if response.json()['login']['result'] == "Success": logging.info("Logging to the wiki succeeded") else: logging.error("Logging in have not succeeded") except: logging.error("Logging in have not succeeded") - + def add_cache(self, change): self.ids.append(change["rcid"]) - #self.recent_id = change["rcid"] - if len(self.ids) > settings["limitrefetch"]+5: + # self.recent_id = change["rcid"] + if len(self.ids) > settings["limitrefetch"] + 5: self.ids.pop(0) - + def fetch(self, amount=settings["limit"]): if self.unsent_messages: - logging.info("{} messages waiting to be delivered to Discord due to Discord throwing errors/no connection to Discord servers.".format(len(self.unsent_messages))) + logging.info( + "{} messages waiting to be delivered to Discord due to Discord throwing errors/no connection to Discord servers.".format( + len(self.unsent_messages))) for num, item in enumerate(self.unsent_messages): - logging.debug("Trying to send a message to Discord from the queue with id of {} and content {}".format(str(num), str(item))) + logging.debug( + "Trying to send a message to Discord from the queue with id of {} and content {}".format(str(num), + str(item))) if send_to_discord_webhook(item) < 2: logging.debug("Sending message succeeded") time.sleep(2.5) @@ -718,12 +878,14 @@ class recent_changes_class(object): with open("lastchange.txt", "w") as record: record.write(str(self.file_id)) logging.debug("Most recent rcid is: {}".format(self.recent_id)) - + def fetch_changes(self, amount, clean=False): if len(self.ids) == 0: logging.debug("ids is empty, triggering clean fetch") clean = True - changes = self.safe_request("https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=recentchanges&rcshow=!bot&rcprop=title%7Ctimestamp%7Cids%7Cloginfo%7Cparsedcomment%7Csizes%7Cflags%7Ctags%7Cuser&rclimit={amount}&rctype=edit%7Cnew%7Clog%7Cexternal".format(wiki=settings["wiki"], amount=amount)) + changes = self.safe_request( + "https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=recentchanges&rcshow=!bot&rcprop=title%7Ctimestamp%7Cids%7Cloginfo%7Cparsedcomment%7Csizes%7Cflags%7Ctags%7Cuser&rclimit={amount}&rctype=edit%7Cnew%7Clog%7Cexternal".format( + wiki=settings["wiki"], amount=amount)) if changes: try: changes = changes.json()['query']['recentchanges'] @@ -731,7 +893,8 @@ class recent_changes_class(object): except ValueError: logging.warning("ValueError in fetching changes") if changes.url == "https://www.gamepedia.com": - logging.critical("The wiki specified in the settings most probably doesn't exist, got redirected to gamepedia.com") + logging.critical( + "The wiki specified in the settings most probably doesn't exist, got redirected to gamepedia.com") sys.exit(1) self.downtime_controller() return None @@ -742,13 +905,15 @@ class recent_changes_class(object): if self.downtimecredibility > 0: self.downtimecredibility -= 1 if self.streak > -1: - self.streak+=1 + self.streak += 1 if self.streak > 8: self.streak = -1 - send(_("Connection to {wiki} seems to be stable now.").format(wiki=settings["wikiname"]), _("Connection status"), settings["avatars"]["connection_restored"]) + send(_("Connection to {wiki} seems to be stable now.").format(wiki=settings["wikiname"]), + _("Connection status"), settings["avatars"]["connection_restored"]) for change in changes: if change["rcid"] in self.ids or change["rcid"] < self.recent_id: - logging.debug("Change ({}) is in ids or is lower than recent_id {}".format(change["rcid"], self.recent_id)) + logging.debug("Change ({}) is in ids or is lower than recent_id {}".format(change["rcid"], + self.recent_id)) continue logging.debug(self.ids) logging.debug(self.recent_id) @@ -758,7 +923,7 @@ class recent_changes_class(object): continue first_pass(change) return change["rcid"] - + def safe_request(self, url): try: request = self.session.get(url, timeout=10) @@ -772,7 +937,7 @@ class recent_changes_class(object): return None else: return request - + def check_connection(self, looped=False): online = 0 for website in ["https://google.com", "https://instagram.com", "https://steamcommunity.com"]: @@ -784,56 +949,64 @@ class recent_changes_class(object): except requests.exceptions.Timeout: pass if online < 1: - logging.error("Failure when checking Internet connection at {time}".format(time=time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime()))) + logging.error("Failure when checking Internet connection at {time}".format( + time=time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime()))) self.downtimecredibility = 0 if looped == False: - while 1: #recursed loop, check for connection (every 10 seconds) as long as three services are down, don't do anything else + while 1: # recursed loop, check for connection (every 10 seconds) as long as three services are down, don't do anything else if self.check_connection(looped=True): recent_changes.fetch(amount=settings["limitrefetch"]) break time.sleep(10) return False return True - + def downtime_controller(self): if settings["show_updown_messages"] == False: return - if self.streak > -1: #reset the streak of successful connections when bad one happens + if self.streak > -1: # reset the streak of successful connections when bad one happens self.streak = 0 - if self.downtimecredibility<60: - self.downtimecredibility+=15 + if self.downtimecredibility < 60: + self.downtimecredibility += 15 else: - if(time.time() - self.last_downtime)>1800 and self.check_connection(): #check if last downtime happened within 30 minutes, if yes, don't send a message - send(_("{wiki} seems to be down or unreachable.").format(wiki=settings["wikiname"]), _("Connection status"), settings["avatars"]["connection_failed"]) + if ( + time.time() - self.last_downtime) > 1800 and self.check_connection(): # check if last downtime happened within 30 minutes, if yes, don't send a message + send(_("{wiki} seems to be down or unreachable.").format(wiki=settings["wikiname"]), + _("Connection status"), settings["avatars"]["connection_failed"]) self.last_downtime = time.time() self.streak = 0 - + def clear_cache(self): self.map_ips = {} - + def update_tags(self): - tags_read = safe_read(self.safe_request("https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=tags&tglimit=max&tgprop=name|displayname".format(wiki=settings["wiki"])), "query", "tags") + tags_read = safe_read(self.safe_request( + "https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=tags&tglimit=max&tgprop=name|displayname".format( + wiki=settings["wiki"])), "query", "tags") if tags_read: for tag in tags_read: self.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text() else: logging.warning("Could not retrive tags. Internal names will be used!") - + + recent_changes = recent_changes_class() if settings["wiki_bot_login"] and settings["wiki_bot_password"]: recent_changes.log_in() recent_changes.update_tags() time.sleep(1.0) -recent_changes.fetch(amount=settings["limitrefetch" ] if settings["limitrefetch"] != -1 else settings["limit"]) - +recent_changes.fetch(amount=settings["limitrefetch"] if settings["limitrefetch"] != -1 else settings["limit"]) + schedule.every(settings["cooldown"]).seconds.do(recent_changes.fetch) -if 1==2: - print (_("director"), _("bot"), _("editor"), _("directors"), _("sysop"), _("bureaucrat"), _("reviewer"), _("autoreview"), _("autopatrol"), _("wiki_guardian")) +if 1 == 2: + print(_("director"), _("bot"), _("editor"), _("directors"), _("sysop"), _("bureaucrat"), _("reviewer"), + _("autoreview"), _("autopatrol"), _("wiki_guardian")) if settings["overview"]: - schedule.every().day.at("{}:{}".format(time.strptime(settings["overview_time"], '%H:%M').tm_hour, time.strptime(settings["overview_time"], '%H:%M').tm_min)).do(day_overview) + schedule.every().day.at("{}:{}".format(time.strptime(settings["overview_time"], '%H:%M').tm_hour, + time.strptime(settings["overview_time"], '%H:%M').tm_min)).do(day_overview) schedule.every().day.at("00:00").do(recent_changes.clear_cache) - + while 1: time.sleep(1.0) schedule.run_pending()