#!/usr/bin/python # -*- coding: utf-8 -*- # Recent changes Gamepedia compatible Discord webhook is a project for using a webhook as recent changes page from MediaWiki. # Copyright (C) 2018 Frisk # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # WARNING! SHITTY CODE AHEAD. ENTER ONLY IF YOU ARE SURE YOU CAN TAKE IT # You have been warned import time, logging, json, requests, datetime, re, gettext, math, random, os.path, schedule, sys from bs4 import BeautifulSoup from collections import defaultdict, Counter from urllib.parse import quote_plus from html.parser import HTMLParser with open("settings.json") as sfile: settings = json.load(sfile) if settings["limitrefetch"] < settings["limit"] and settings["limitrefetch"] != -1: settings["limitrefetch"] = settings["limit"] logging.basicConfig(level=settings["verbose_level"]) if settings["limitrefetch"] != -1 and os.path.exists("lastchange.txt") == False: with open("lastchange.txt", 'w') as sfile: sfile.write("99999999999") logging.info("Current settings: {settings}".format(settings=settings)) lang = gettext.translation('rcgcdw', localedir='locale', languages=[settings["lang"]]) lang.install() ngettext = lang.ngettext class MWError(Exception): pass class MyHTMLParser(HTMLParser): new_string = "" recent_href = "" def handle_starttag(self, tag, attrs): for attr in attrs: if attr[0] == 'href': self.recent_href = attr[1] if self.recent_href.startswith("//"): self.recent_href = "https:{rest}".format(rest=self.recent_href) elif not self.recent_href.startswith("https"): self.recent_href = "https://{wiki}.gamepedia.com".format(wiki=settings["wiki"]) + self.recent_href self.recent_href = self.recent_href.replace(")", "\\)") def handle_data(self, data): if self.recent_href: self.new_string = self.new_string + "[{}]({})".format(data, self.recent_href) self.recent_href = "" else: self.new_string = self.new_string + data def handle_comment(self, data): self.new_string = self.new_string + data def handle_endtag(self, tag): print(self.new_string) HTMLParse = MyHTMLParser() def send(message, name, avatar): send_to_discord({"content": message, "avatar_url": avatar, "username": name}) def safe_read(request, *keys): if request is None: return None try: request = request.json() for item in keys: request = request[item] except KeyError: logging.warning( "Failure while extracting data from request on key {key} in {change}".format(key=item, change=request)) return None except ValueError: logging.warning("Failure while extracting data from request in {change}".format(change=request)) return None return request def send_to_discord_webhook(data): try: result = requests.post(settings["webhookURL"], data=data, headers={**{'Content-Type': 'application/json'}, **settings["header"]}, timeout=10) except requests.exceptions.Timeout: logging.warning("Timeouted while sending data to the webhook.") return 3 except requests.exceptions.ConnectionError: logging.warning("Connection error while sending the data to a webhook") return 3 else: return handle_discord_http(result.status_code, data) def send_to_discord(data): if recent_changes.unsent_messages: recent_changes.unsent_messages.append(data) else: code = send_to_discord_webhook(data) if code == 3: recent_changes.unsent_messages.append(data) elif code == 2: time.sleep(5.0) recent_changes.unsent_messages.append(data) elif code < 2: time.sleep(2.5) pass def webhook_formatter(action, STATIC, **params): logging.debug("Received things: {thing}".format(thing=params)) colornumber = None if isinstance(STATIC["color"], str) else STATIC["color"] data = {"embeds": []} embed = defaultdict(dict) if "title" in params: article_encoded = params["title"].replace(" ", "_").replace(')', '\)') if re.match(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", params["user"]) is not None: author_url = "https://{wiki}.gamepedia.com/Special:Contributions/{user}".format(wiki=settings["wiki"], user=params["user"]) if params["user"] not in list(recent_changes.map_ips.keys()): contibs = safe_read(recent_changes.safe_request( "https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=usercontribs&uclimit=max&ucuser={user}&ucprop=".format( wiki=settings["wiki"], user=params["user"])), "query", "usercontribs") if contibs is None: logging.warning( "WARNING: Something went wrong when checking amount of contributions for given IP address") params["user"] = params["user"] + "(?)" else: params["user"] = "{author} ({contribs})".format(author=params["user"], contribs=len(contibs)) recent_changes.map_ips[params["user"]] = len(contibs) else: recent_changes.map_ips[params["user"]] += 1 params["user"] = "{author} ({amount})".format(author=params["user"], amount=recent_changes.map_ips[params["user"]]) else: author_url = "https://{wiki}.gamepedia.com/User:{user}".format(wiki=settings["wiki"], user=params["user"].replace(" ", "_")) if action in ("edit", "new"): # edit or new page editsize = params["size"] print(editsize) if editsize > 0: if editsize > 6032: colornumber = 65280 else: colornumber = 35840 + (math.floor(editsize / 52)) * 256 elif editsize < 0: if editsize < -6032: colornumber = 16711680 else: colornumber = 9175040 + (math.floor((editsize * -1) / (52))) * 65536 elif editsize == 0: colornumber = 8750469 link = "https://{wiki}.gamepedia.com/index.php?title={article}&curid={pageid}&diff={diff}&oldid={oldrev}".format( wiki=settings["wiki"], pageid=params["pageid"], diff=params["diff"], oldrev=params["oldrev"], article=params["title"].replace(" ", "_")) embed["title"] = "{article} ({new}{minor}{editsize})".format(article=params["title"], editsize="+" + str( editsize) if editsize > 0 else editsize, new=_("(N!) ") if action == "new" else "", minor=_("m ") if action == "edit" and params[ "minor"] else "") elif action in ("upload/overwrite", "upload/upload"): # sending files license = None urls = safe_read(recent_changes.safe_request( "https://{wiki}.gamepedia.com/api.php?action=query&format=json&prop=imageinfo&list=&meta=&titles={filename}&iiprop=timestamp%7Curl&iilimit=2".format( wiki=settings["wiki"], filename=params["title"])), "query", "pages") undolink = "" link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) additional_info_retrieved = False if urls is not None: if "-1" not in urls: # oage removed before we asked for it img_info = next(iter(urls.values()))["imageinfo"] embed["image"]["url"] = img_info[0]["url"] additional_info_retrieved = True else: pass if params["overwrite"]: if additional_info_retrieved: img_timestamp = [x for x in img_info[1]["timestamp"] if x.isdigit()] undolink = "https://{wiki}.gamepedia.com/index.php?title={filename}&action=revert&oldimage={timestamp}%21{filenamewon}".format( wiki=settings["wiki"], filename=article_encoded, timestamp="".join(img_timestamp), filenamewon=article_encoded[5:]) embed["fields"] = [{"name": _("Options"), "value": _("([preview]({link}) | [undo]({undolink}))").format( link=embed["image"]["url"], undolink=undolink)}] embed["title"] = _("Uploaded a new version of {name}").format(name=params["title"]) else: embed["title"] = _("Uploaded {name}").format(name=params["title"]) article_content = safe_read(recent_changes.safe_request( "https://{wiki}.gamepedia.com/api.php?action=query&format=json&prop=revisions&titles={article}&rvprop=content".format( wiki=settings["wiki"], article=quote_plus(params["title"], safe=''))), "query", "pages") if article_content is None: logging.warning("Something went wrong when getting license for the image") return 0 if "-1" not in article_content: content = list(article_content.values())[0]['revisions'][0]['*'] try: matches = re.search(re.compile(settings["license_regex"], re.IGNORECASE), content) if matches is not None: license = matches.group("license") else: if re.search(re.compile(settings["license_regex_detect"], re.IGNORECASE), content) is None: license = _("**No license!**") else: license = "?" except IndexError: logging.error( "Given regex for the license detection is incorrect. It does not have a capturing group called \"license\" specified. Please fix license_regex value in the config!") license = "?" except re.error: logging.error( "Given regex for the license detection is incorrect. Please fix license_regex or license_regex_detect values in the config!") license = "?" if additional_info_retrieved: embed["fields"] = [ {"name": _("Options"), "value": _("([preview]({link}))").format(link=embed["image"]["url"])}] params["desc"] = _("{desc}\nLicense: {license}").format(desc=params["desc"], license=license if license is not None else "?") elif action == "delete/delete": link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) embed["title"] = _("Deleted page {article}").format(article=params["title"]) elif action == "delete/delete_redir": link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) embed["title"] = _("Deleted redirect {article} by overwriting").format(article=params["title"]) elif action == "move/move": link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["target"].replace(" ", "_")) params["desc"] = "{supress}. {desc}".format(desc=params["desc"], supress=_("No redirect has been made") if params[ "supress"] == True else _( "A redirect has been made")) embed["title"] = _("Moved {article} to {target}").format(article=params["title"], target=params["target"]) elif action == "move/move_redir": link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["target"].replace(" ", "_")) embed["title"] = _("Moved {article} to {title} over redirect").format(article=params["title"], title=params["target"]) elif action == "protect/move_prot": link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) embed["title"] = _("Moved protection settings from {article} to {title}").format(article=params["title"], title=params["target"]) elif action == "block/block": link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"], user=params["blocked_user"].replace(" ", "_").replace(')', '\)')) user = params["blocked_user"].split(':')[1] block_time = _("infinity and beyond") if params["duration"] == "infinite" else params["duration"] embed["title"] = _("Blocked {blocked_user} for {time}").format(blocked_user=user, time=block_time) elif action == "block/reblock": link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"], user=params["blocked_user"].replace(" ", "_").replace(')', '\)')) user = params["blocked_user"].split(':')[1] embed["title"] = _("Changed block settings for {blocked_user}").format(blocked_user=user) elif action == "block/unblock": link = "https://{wiki}.gamepedia.com/{user}".format(wiki=settings["wiki"], user=params["blocked_user"].replace(" ", "_").replace(')', '\)')) user = params["blocked_user"].split(':')[1] embed["title"] = _("Unblocked {blocked_user}").format(blocked_user=user) elif action == "curseprofile/comment-created": link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], commentid=params["commentid"]) # link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) old way of linking embed["title"] = _("Left a comment on {target}'s profile").format(target=params["target"]) if params[ "target"] != \ params[ "user"] else _( "Left a comment on their own profile") elif action == "curseprofile/comment-replied": # link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], commentid=params["commentid"]) embed["title"] = _("Replied to a comment on {target}'s profile").format(target=params["target"]) if params[ "target"] != \ params[ "user"] else _( "Replied to a comment on their own profile") elif action == "curseprofile/comment-edited": # link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], commentid=params["commentid"]) embed["title"] = _("Edited a comment on {target}'s profile").format(target=params["target"]) if params[ "target"] != \ params[ "user"] else _( "Edited a comment on their own profile") elif action == "curseprofile/profile-edited": link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace( ')', '\)')) if params["field"] == "profile-location": field = _("Location") elif params["field"] == "profile-aboutme": field = _("About me") elif params["field"] == "profile-link-google": field = _("Google link") elif params["field"] == "profile-link-facebook": field = _("Facebook link") elif params["field"] == "profile-link-twitter": field = _("Twitter link") elif params["field"] == "profile-link-reddit": field = _("Reddit link") elif params["field"] == "profile-link-twitch": field = _("Twitch link") elif params["field"] == "profile-link-psn": field = _("PSN link") elif params["field"] == "profile-link-vk": field = _("VK link") elif params["field"] == "profile-link-xbl": field = _("XVL link") elif params["field"] == "profile-link-steam": field = _("Steam link") else: field = _("Unknown") embed["title"] = _("Edited {target}'s profile").format(target=params["target"]) if params["user"] != params[ "target"] else _("Edited their own profile") params["desc"] = _("{field} field changed to: {desc}").format(field=field, desc=params["desc"]) elif action == "curseprofile/comment-deleted": link = "https://{wiki}.gamepedia.com/Special:CommentPermalink/{commentid}".format(wiki=settings["wiki"], commentid=params["commentid"]) # link = "https://{wiki}.gamepedia.com/UserProfile:{target}".format(wiki=settings["wiki"], target=params["target"].replace(" ", "_").replace(')', '\)')) embed["title"] = _("Deleted a comment on {target}'s profile").format(target=params["target"]) elif action in ("rights/rights", "rights/autopromote"): link = "https://{wiki}.gamepedia.com/User:".format(wiki=settings["wiki"]) + params["title"].split(":")[1] if action == "rights/rights": embed["title"] = _("Changed group membership for {target}").format(target=params["title"].split(":")[1]) else: params["user"] = _("System") author_url = "" embed["title"] = _("{target} got autopromoted to a new usergroup").format( target=params["title"].split(":")[1]) if len(params["old_groups"]) < len(params["new_groups"]): embed["thumbnail"]["url"] = "https://i.imgur.com/WnGhF5g.gif" old_groups = [] new_groups = [] for name in params["old_groups"]: old_groups.append(_(name)) for name in params["new_groups"]: new_groups.append(_(name)) if len(old_groups) == 0: old_groups = [_("none")] if len(new_groups) == 0: new_groups = [_("none")] reason = ": {desc}".format(desc=params["desc"]) if params["desc"] != _("No description provided") else "" params["desc"] = _("Groups changed from {old_groups} to {new_groups}{reason}").format( old_groups=", ".join(old_groups), new_groups=', '.join(new_groups), reason=reason) elif action == "protect/protect": link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) embed["title"] = _("Protected {target}").format(target=params["title"]) params["desc"] = params["settings"] + " | " + params["desc"] elif action == "protect/modify": link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) embed["title"] = _("Changed protection level for {article}").format(article=params["title"]) params["desc"] = params["settings"] + " | " + params["desc"] elif action == "protect/unprotect": link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) embed["title"] = _("Removed protection from {article}").format(article=params["title"]) elif action == "delete/revision": amount = len(params["amount"]) link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) embed["title"] = ngettext("Changed visibility of revision on page {article} ", "Changed visibility of {amount} revisions on page {article} ", amount).format( article=params["title"], amount=amount) elif action == "import/upload": link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) embed["title"] = ngettext("Imported {article} with {count} revision", "Imported {article} with {count} revisions", params["amount"]).format( article=params["title"], count=params["amount"]) elif action == "delete/restore": link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) embed["title"] = _("Restored {article}").format(article=params["title"]) elif action == "delete/event": link = "https://{wiki}.gamepedia.com/Special:RecentChanges".format(wiki=settings["wiki"]) embed["title"] = _("Changed visibility of log events") elif action == "import/interwiki": link = "https://{wiki}.gamepedia.com/Special:RecentChanges".format(wiki=settings["wiki"]) embed["title"] = _("Imported interwiki") elif action == "abusefilter/modify": link = "https://{wiki}.gamepedia.com/Special:RecentChanges".format(wiki=settings["wiki"]) embed["title"] = _("Edited abuse filter number {number}").format(number=params["filternr"]) elif action == "merge/merge": link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) embed["title"] = _("Merged revision histories of {article} into {dest}").format(article=params["title"], dest=params["destination"]) elif action == "interwiki/iw_add": link = "https://{wiki}.gamepedia.com/Special:Interwiki".format(wiki=settings["wiki"]) embed["title"] = _("Added an entry to the interwiki table") params["desc"] = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=params["desc"], prefix=params["prefix"], website=params["website"]) elif action == "interwiki/iw_edit": link = "https://{wiki}.gamepedia.com/Special:Interwiki".format(wiki=settings["wiki"]) embed["title"] = _("Edited an entry in interwiki table") params["desc"] = _("Prefix: {prefix}, website: {website} | {desc}").format(desc=params["desc"], prefix=params["prefix"], website=params["website"]) elif action == "interwiki/iw_delete": link = "https://{wiki}.gamepedia.com/Special:Interwiki".format(wiki=settings["wiki"]) embed["title"] = _("Deleted an entry in interwiki table") params["desc"] = _("Prefix: {prefix} | {desc}").format(desc=params["desc"], prefix=params["prefix"]) elif action == "contentmodel/change": link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) embed["title"] = _("Changed the content model of the page {article}").format(article=params["title"]) params["desc"] = _("Model changed from {old} to {new}: {reason}").format(old=params["oldmodel"], new=params["newmodel"], reason=params["desc"]) elif action == "sprite/sprite": link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) embed["title"] = _("Edited the sprite for {article}").format(article=params["title"]) elif action == "sprite/sheet": link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) embed["title"] = _("Created the sprite sheet for {article}").format(article=params["title"]) elif action == "sprite/slice": link = "https://{wiki}.gamepedia.com/{article}".format(wiki=settings["wiki"], article=params["title"].replace(" ", "_")) embed["title"] = _("Edited the slice for {article}").format(article=params["title"]) elif action == "managetags/create": link = "https://{wiki}.gamepedia.com/Special:Tags".format(wiki=settings["wiki"]) embed["title"] = _("Created a tag \"{tag}\"").format(tag=params["additional"]["tag"]) recent_changes.update_tags() elif action == "managetags/delete": link = "https://{wiki}.gamepedia.com/Special:Tags".format(wiki=settings["wiki"]) embed["title"] = _("Deleted a tag \"{tag}\"").format(tag=params["additional"]["tag"]) recent_changes.update_tags() elif action == "managetags/activate": link = "https://{wiki}.gamepedia.com/Special:Tags".format(wiki=settings["wiki"]) embed["title"] = _("Activated a tag \"{tag}\"").format(tag=params["additional"]["tag"]) elif action == "managetags/deactivate": link = "https://{wiki}.gamepedia.com/Special:Tags".format(wiki=settings["wiki"]) embed["title"] = _("Deactivated a tag \"{tag}\"").format(tag=params["additional"]["tag"]) elif action == "suppressed": link = "https://{wiki}.gamepedia.com/".format(wiki=settings["wiki"]) embed["title"] = _("Action has been hidden by Gamepedia staff.") else: logging.warning("No entry for {event} with params: {params}".format(event=action, params=params)) embed["author"]["name"] = params["user"] embed["author"]["url"] = author_url embed["author"]["icon_url"] = STATIC["icon"] embed["url"] = link if "desc" not in params: params["desc"] = "" embed["description"] = params["desc"] embed["color"] = random.randrange(1, 16777215) if colornumber is None else math.floor(colornumber) embed["timestamp"] = STATIC["timestamp"] if STATIC["tags"]: tag_displayname = [] if "fields" not in embed: embed["fields"] = [] for tag in STATIC["tags"]: if tag in recent_changes.tags: tag_displayname.append(recent_changes.tags[tag]) else: tag_displayname.append(tag) embed["fields"].append({"name": _("Tags"), "value": ", ".join(tag_displayname)}) data["embeds"].append(dict(embed)) data['avatar_url'] = settings["avatars"]["embed"] formatted_embed = json.dumps(data, indent=4) send_to_discord(formatted_embed) def handle_discord_http(code, formatted_embed): if 300 > code > 199: # message went through return 0 elif code == 400: # HTTP BAD REQUEST logging.error( "Following message has been rejected by Discord, please submit a bug on our bugtracker adding it:") logging.error(formatted_embed) return 1 elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND logging.error("Webhook URL is invalid or no longer in use, please replace it with proper one.") sys.exit(1) elif code == 429: logging.error("We are sending too many requests to the Discord, slowing down...") return 2 elif 499 < code < 600: logging.error( "Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format( code)) return 3 def first_pass( change): # I've decided to split the embed formatter and change handler, maybe it's more messy this way, I don't know if "actionhidden" in change or "suppressed" in change and "suppressed" not in settings["ignored"]: webhook_formatter("suppressed", {"timestamp": change["timestamp"], "color": settings["appearance"]["suppressed"]["color"], "icon": settings["appearance"]["suppressed"]["icon"]}, user=change["user"]) return HTMLParse.feed(change["parsedcomment"]) # parsedcomment = (BeautifulSoup(change["parsedcomment"], "lxml")).get_text() parsedcomment = HTMLParse.new_string HTMLParse.new_string = "" logging.debug(change) STATIC_VARS = {"timestamp": change["timestamp"], "tags": change["tags"]} if not parsedcomment: parsedcomment = _("No description provided") if change["type"] == "edit" and "edit" not in settings["ignored"]: STATIC_VARS = {**STATIC_VARS, **{"color": settings["appearance"]["edit"]["color"], "icon": settings["appearance"]["edit"]["icon"]}} webhook_formatter("edit", STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, oldrev=change["old_revid"], pageid=change["pageid"], diff=change["revid"], size=change["newlen"] - change["oldlen"], minor=True if "minor" in change else False) elif change["type"] == "log": combination = "{logtype}/{logaction}".format(logtype=change["logtype"], logaction=change["logaction"]) if combination in settings["ignored"]: return logging.debug("combination is {}".format(combination)) try: STATIC_VARS = {**STATIC_VARS, **{"color": settings["appearance"][combination]["color"], "icon": settings["appearance"][combination]["icon"]}} except KeyError: STATIC_VARS = {**STATIC_VARS, **{"color": "", "icon": ""}} logging.error("No value in the settings has been given for {}".format(combination)) if combination == "protect/protect": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, settings=change["logparams"]["description"]) elif combination == "protect/modify": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, settings=change["logparams"]["description"]) elif combination == "protect/unprotect": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) elif combination == "upload/overwrite": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, overwrite=True) elif combination == "upload/upload": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, overwrite=False) elif combination == "delete/delete": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) elif combination == "delete/delete_redir": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) elif combination == "delete/restore": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) elif combination == "delete/revision": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, amount=change["logparams"]["ids"]) elif combination == "delete/event": webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment) elif combination == "import/upload": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, amount=change["logparams"]["count"]) elif combination == "import/interwiki": webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment) elif combination == "merge/merge": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, destination=change["logparams"]["dest_title"]) elif combination == "move/move": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, supress=True if "suppressredirect" in change["logparams"] else False, target=change["logparams"]['target_title']) elif combination == "move/move_redir": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, target=change["logparams"]["target_title"]) elif combination == "protect/move_prot": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["logparams"]["oldtitle_title"], desc=parsedcomment, target=change["title"]) elif combination == "block/block": webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"], desc=parsedcomment, duration=change["logparams"]["duration"]) elif combination == "block/unblock": webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"], desc=parsedcomment) elif combination == "block/reblock": webhook_formatter(combination, STATIC_VARS, user=change["user"], blocked_user=change["title"], desc=parsedcomment) elif combination == "rights/rights": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, old_groups=change["logparams"]["oldgroups"], new_groups=change["logparams"]["newgroups"]) elif combination == "rights/autopromote": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, old_groups=change["logparams"]["oldgroups"], new_groups=change["logparams"]["newgroups"]) elif combination == "abusefilter/modify": webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, filternr=change["logparams"]['1']) elif combination == "interwiki/iw_add": webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, prefix=change["logparams"]['0'], website=change["logparams"]['1']) elif combination == "interwiki/iw_edit": webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, prefix=change["logparams"]['0'], website=change["logparams"]['1']) elif combination == "interwiki/iw_delete": webhook_formatter(combination, STATIC_VARS, user=change["user"], desc=parsedcomment, prefix=change["logparams"]['0']) elif combination == "curseprofile/comment-created": webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], commentid=change["logparams"]["0"]) elif combination == "curseprofile/comment-edited": webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], commentid=change["logparams"]["0"]) elif combination == "curseprofile/comment-deleted": webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], commentid=change["logparams"]["0"]) elif combination == "curseprofile/profile-edited": webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], field=change["logparams"]['0'], desc=change["parsedcomment"]) elif combination == "curseprofile/comment-replied": webhook_formatter(combination, STATIC_VARS, user=change["user"], target=change["title"].split(':')[1], commentid=change["logparams"]["0"]) elif combination == "contentmodel/change": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, oldmodel=change["logparams"]["oldmodel"], newmodel=change["logparams"]["newmodel"]) elif combination == "sprite/sprite": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) elif combination == "sprite/sheet": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) elif combination == "sprite/slice": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) elif combination == "managetags/create": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, additional=change["logparams"]) elif combination == "managetags/delete": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, additional=change["logparams"]) elif combination == "managetags/activate": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, additional=change["logparams"]) elif combination == "managetags/deactivate": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, additional=change["logparams"]) elif combination == "tag/update": webhook_formatter(combination, STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment) else: logging.warning("No entry matches given change!") print(change) send(_("Unable to process the event"), _("error"), settings["avatars"]["no_event"]) return if change["type"] == "external": # not sure what happens then, but it's listed as possible type logging.warning("External event happened, ignoring.") print(change) return elif change["type"] == "new" and "new" not in settings["ignored"]: # new page STATIC_VARS = {**STATIC_VARS, **{"color": settings["appearance"]["new"]["color"], "icon": settings["appearance"]["new"]["icon"]}} webhook_formatter("new", STATIC_VARS, user=change["user"], title=change["title"], desc=parsedcomment, oldrev=change["old_revid"], pageid=change["pageid"], diff=change["revid"], size=change["newlen"]) def day_overview_request(): logging.info("Fetching daily overview... This may take up to 30 seconds!") timestamp = (datetime.datetime.utcnow() - datetime.timedelta(hours=24)).isoformat(timespec='milliseconds') logging.debug("timestamp is {}".format(timestamp)) complete = False result = [] passes = 0 continuearg = "" while not complete and passes < 10: request = recent_changes.safe_request( "https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=recentchanges&rcend={timestamp}Z&rcprop=title%7Ctimestamp%7Csizes%7Cloginfo%7Cuser&rcshow=!bot&rclimit=500&rctype=edit%7Cnew%7Clog{continuearg}".format( wiki=settings["wiki"], timestamp=timestamp, continuearg=continuearg)) if request: try: request = request.json() rc = request['query']['recentchanges'] continuearg = request["continue"]["rccontinue"] if "continue" in request else None except ValueError: logging.warning("ValueError in fetching changes") recent_changes.downtime_controller() complete = 2 except KeyError: logging.warning("Wiki returned %s" % (request.json())) complete = 2 else: result += rc if continuearg: continuearg = "&rccontinue={}".format(continuearg) passes += 1 logging.debug( "continuing requesting next pages of recent changes with {} passes and continuearg being {}".format( passes, continuearg)) time.sleep(3.0) else: complete = 1 else: complete = 2 if passes == 10: logging.debug("quit the loop because there been too many passes") return (result, complete) def add_to_dict(dictionary, key): if key in dictionary: dictionary[key] += 1 else: dictionary[key] = 1 return dictionary def day_overview(): # time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime(time.time())) # (datetime.datetime.utcnow()+datetime.timedelta(hours=0)).isoformat(timespec='milliseconds')+'Z' result = day_overview_request() if result[1] == 1: activity = defaultdict(dict) hours = defaultdict(dict) edits = 0 files = 0 admin = 0 changed_bytes = 0 new_articles = 0 if not result[0] and not settings["send_empty_overview"]: return # no changes in this day for item in result[0]: activity = add_to_dict(activity, item["user"]) hours = add_to_dict(hours, datetime.datetime.strptime(item["timestamp"], "%Y-%m-%dT%H:%M:%SZ").hour) if "actionhidden" in item or "suppressed" in item: continue # while such actions have type value (edit/new/log) many other values are hidden and therefore can crash with key error, let's not process such events if item["type"] == "edit": edits += 1 changed_bytes += item["newlen"] - item["oldlen"] if item["type"] == "new": if item["ns"] == 0: new_articles += 1 changed_bytes += item["newlen"] if item["type"] == "log": files = files + 1 if item["logtype"] == item["logaction"] == "upload" else files admin = admin + 1 if item["logtype"] in ["delete", "merge", "block", "protect", "import", "rights", "abusefilter", "interwiki", "managetags"] else admin overall = round(new_articles + edits * 0.1 + files * 0.3 + admin * 0.1 + math.fabs(changed_bytes * 0.001), 2) embed = defaultdict(dict) embed["title"] = _("Daily overview") embed["url"] = "https://{wiki}.gamepedia.com/Special:Statistics".format(wiki=settings["wiki"]) embed["color"] = settings["appearance"]["daily_overview"]["color"] embed["author"]["icon_url"] = settings["appearance"]["daily_overview"]["icon"] embed["author"]["name"] = settings["wikiname"] embed["author"]["url"] = "https://{wiki}.gamepedia.com/".format(wiki=settings["wiki"]) if activity: v = activity.values() active_users = [] for user, numberu in Counter(activity).most_common(list(v).count(max(v))): # find most active users active_users.append(user) # the_one = random.choice(active_users) v = hours.values() active_hours = [] for hour, numberh in Counter(hours).most_common(list(v).count(max(v))): # find most active users active_hours.append(str(hour)) usramount = ngettext(" ({} action)", " ({} actions)", numberu).format(numberu) houramount = ngettext(" UTC ({} action)", " UTC ({} actions)", numberh).format(numberh) else: active_users = [_("But nobody came")] # a reference to my favorite game of all the time, sorry ^_^ active_hours = [_("But nobody came")] usramount = "" houramount = "" embed["fields"] = [] fields = ( (ngettext("Most active user", "Most active users", len(active_users)), ', '.join(active_users) + usramount), (_("Edits made"), edits), (_("New files"), files), (_("Admin actions"), admin), (_("Bytes changed"), changed_bytes), (_("New articles"), new_articles), (_("Unique contributors"), str(len(activity))), (ngettext("Most active hour", "Most active hours", len(active_hours)), ', '.join(active_hours) + houramount), (_("Day score"), str(overall))) for name, value in fields: embed["fields"].append({"name": name, "value": value}) data = {"embeds": [dict(embed)]} formatted_embed = json.dumps(data, indent=4) send_to_discord(formatted_embed) else: logging.debug("function requesting changes for day overview returned with error code") class recent_changes_class(object): starttime = time.time() ids = [] map_ips = {} recent_id = 0 downtimecredibility = 0 last_downtime = 0 clock = 0 tags = {} groups = {} unsent_messages = [] streak = -1 session = requests.Session() session.headers.update(settings["header"]) if settings["limitrefetch"] != -1: with open("lastchange.txt", "r") as record: file_content = record.read().strip() if file_content: file_id = int(file_content) logging.debug("File_id is {val}".format(val=file_id)) else: logging.debug("File is empty") file_id = 999999999 else: file_id = 999999999 # such value won't cause trouble, and it will make sure no refetch happen def handle_mw_errors(self, request): if "errors" in request: print(request["errors"]) raise MWError return request def log_in(self): # session.cookies.clear() if '@' not in settings["wiki_bot_login"]: logging.error( "Please provide proper nickname for login from https://{wiki}.gamepedia.com/Special:BotPasswords".format( wiki=settings["wiki"])) return if len(settings["wiki_bot_password"]) != 32: logging.error( "Password seems incorrect. It should be 32 characters long! Grab it from https://{wiki}.gamepedia.com/Special:BotPasswords".format( wiki=settings["wiki"])) return logging.info("Trying to log in to https://{wiki}.gamepedia.com...".format(wiki=settings["wiki"])) try: response = self.handle_mw_errors( self.session.post("https://{wiki}.gamepedia.com/api.php".format(wiki=settings["wiki"]), data={'action': 'query', 'format': 'json', 'utf8': '', 'meta': 'tokens', 'type': 'login'})) response = self.handle_mw_errors( self.session.post("https://{wiki}.gamepedia.com/api.php".format(wiki=settings["wiki"]), data={'action': 'login', 'format': 'json', 'utf8': '', 'lgname': settings["wiki_bot_login"], 'lgpassword': settings["wiki_bot_password"], 'lgtoken': response.json()['query']['tokens']['logintoken']})) except ValueError: logging.error("Logging in have not succeeded") return except MWError: logging.error("Logging in have not succeeded") return try: if response.json()['login']['result'] == "Success": logging.info("Logging to the wiki succeeded") else: logging.error("Logging in have not succeeded") except: logging.error("Logging in have not succeeded") def add_cache(self, change): self.ids.append(change["rcid"]) # self.recent_id = change["rcid"] if len(self.ids) > settings["limitrefetch"] + 5: self.ids.pop(0) def fetch(self, amount=settings["limit"]): if self.unsent_messages: logging.info( "{} messages waiting to be delivered to Discord due to Discord throwing errors/no connection to Discord servers.".format( len(self.unsent_messages))) for num, item in enumerate(self.unsent_messages): logging.debug( "Trying to send a message to Discord from the queue with id of {} and content {}".format(str(num), str(item))) if send_to_discord_webhook(item) < 2: logging.debug("Sending message succeeded") time.sleep(2.5) else: logging.debug("Sending message failed") break else: self.unsent_messages = [] logging.debug("Queue emptied, all messages delivered") self.unsent_messages = self.unsent_messages[num:] logging.debug(self.unsent_messages) last_check = self.fetch_changes(amount=amount) self.recent_id = last_check if last_check is not None else self.file_id if settings["limitrefetch"] != -1 and self.recent_id != self.file_id: self.file_id = self.recent_id with open("lastchange.txt", "w") as record: record.write(str(self.file_id)) logging.debug("Most recent rcid is: {}".format(self.recent_id)) def fetch_changes(self, amount, clean=False): if len(self.ids) == 0: logging.debug("ids is empty, triggering clean fetch") clean = True changes = self.safe_request( "https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=recentchanges&rcshow=!bot&rcprop=title%7Ctimestamp%7Cids%7Cloginfo%7Cparsedcomment%7Csizes%7Cflags%7Ctags%7Cuser&rclimit={amount}&rctype=edit%7Cnew%7Clog%7Cexternal".format( wiki=settings["wiki"], amount=amount)) if changes: try: changes = changes.json()['query']['recentchanges'] changes.reverse() except ValueError: logging.warning("ValueError in fetching changes") if changes.url == "https://www.gamepedia.com": logging.critical( "The wiki specified in the settings most probably doesn't exist, got redirected to gamepedia.com") sys.exit(1) self.downtime_controller() return None except KeyError: logging.warning("Wiki returned %s" % (changes.json())) return None else: if self.downtimecredibility > 0: self.downtimecredibility -= 1 if self.streak > -1: self.streak += 1 if self.streak > 8: self.streak = -1 send(_("Connection to {wiki} seems to be stable now.").format(wiki=settings["wikiname"]), _("Connection status"), settings["avatars"]["connection_restored"]) for change in changes: if change["rcid"] in self.ids or change["rcid"] < self.recent_id: logging.debug("Change ({}) is in ids or is lower than recent_id {}".format(change["rcid"], self.recent_id)) continue logging.debug(self.ids) logging.debug(self.recent_id) self.add_cache(change) if clean and not (self.recent_id == 0 and change["rcid"] > self.file_id): logging.debug("Rejected {val}".format(val=change["rcid"])) continue first_pass(change) return change["rcid"] def safe_request(self, url): try: request = self.session.get(url, timeout=10) except requests.exceptions.Timeout: logging.warning("Reached timeout error for request on link {url}".format(url=url)) self.downtime_controller() return None except requests.exceptions.ConnectionError: logging.warning("Reached connection error for request on link {url}".format(url=url)) self.downtime_controller() return None else: return request def check_connection(self, looped=False): online = 0 for website in ["https://google.com", "https://instagram.com", "https://steamcommunity.com"]: try: requests.get(website, timeout=10) online += 1 except requests.exceptions.ConnectionError: pass except requests.exceptions.Timeout: pass if online < 1: logging.error("Failure when checking Internet connection at {time}".format( time=time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime()))) self.downtimecredibility = 0 if not looped: while 1: # recursed loop, check for connection (every 10 seconds) as long as three services are down, don't do anything else if self.check_connection(looped=True): recent_changes.fetch(amount=settings["limitrefetch"]) break time.sleep(10) return False return True def downtime_controller(self): if not settings["show_updown_messages"]: return if self.streak > -1: # reset the streak of successful connections when bad one happens self.streak = 0 if self.downtimecredibility < 60: self.downtimecredibility += 15 else: if ( time.time() - self.last_downtime) > 1800 and self.check_connection(): # check if last downtime happened within 30 minutes, if yes, don't send a message send(_("{wiki} seems to be down or unreachable.").format(wiki=settings["wikiname"]), _("Connection status"), settings["avatars"]["connection_failed"]) self.last_downtime = time.time() self.streak = 0 def clear_cache(self): self.map_ips = {} def update_tags(self): tags_read = safe_read(self.safe_request( "https://{wiki}.gamepedia.com/api.php?action=query&format=json&list=tags&tglimit=max&tgprop=name|displayname".format( wiki=settings["wiki"])), "query", "tags") if tags_read: for tag in tags_read: self.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text() else: logging.warning("Could not retrive tags. Internal names will be used!") recent_changes = recent_changes_class() if settings["wiki_bot_login"] and settings["wiki_bot_password"]: recent_changes.log_in() recent_changes.update_tags() time.sleep(1.0) recent_changes.fetch(amount=settings["limitrefetch"] if settings["limitrefetch"] != -1 else settings["limit"]) schedule.every(settings["cooldown"]).seconds.do(recent_changes.fetch) if 1 == 2: print(_("director"), _("bot"), _("editor"), _("directors"), _("sysop"), _("bureaucrat"), _("reviewer"), _("autoreview"), _("autopatrol"), _("wiki_guardian")) if settings["overview"]: schedule.every().day.at("{}:{}".format(time.strptime(settings["overview_time"], '%H:%M').tm_hour, time.strptime(settings["overview_time"], '%H:%M').tm_min)).do(day_overview) schedule.every().day.at("00:00").do(recent_changes.clear_cache) while 1: time.sleep(1.0) schedule.run_pending()