2021-12-28 14:08:56 +00:00
|
|
|
# This file is part of Recent changes Goat compatible Discord webhook (RcGcDw).
|
|
|
|
#
|
|
|
|
# RcGcDw is free software: you can redistribute it and/or modify
|
|
|
|
# it under the terms of the GNU General Public License as published by
|
|
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
|
|
# (at your option) any later version.
|
|
|
|
#
|
|
|
|
# RcGcDw is distributed in the hope that it will be useful,
|
|
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
# GNU General Public License for more details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
|
# along with RcGcDw. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
from http.server import BaseHTTPRequestHandler, HTTPServer
|
|
|
|
import json
|
|
|
|
import urllib.parse
|
|
|
|
import requests
|
|
|
|
|
|
|
|
response_jsons: dict[str, dict] = {}
|
|
|
|
|
2021-12-30 14:46:18 +00:00
|
|
|
class EndOfContent(Exception):
|
|
|
|
pass
|
2021-12-28 14:08:56 +00:00
|
|
|
|
|
|
|
def load_response(name: str):
|
2021-12-30 14:46:18 +00:00
|
|
|
with open("data/response_{}.json".format(name), "r") as response_file:
|
2021-12-28 14:08:56 +00:00
|
|
|
response_json: dict = json.loads(response_file.read())
|
|
|
|
response_jsons[name] = response_json
|
|
|
|
|
|
|
|
|
|
|
|
def get_response(name: str):
|
|
|
|
return response_jsons.get(name)
|
|
|
|
|
|
|
|
|
2021-12-30 14:46:18 +00:00
|
|
|
[load_response(x) for x in ["recentchanges", "recentchanges2", "init", "error", "siteinfo", "image", "userinfo"]]
|
2021-12-28 14:08:56 +00:00
|
|
|
|
|
|
|
|
|
|
|
messages_collector = []
|
2021-12-30 14:46:18 +00:00
|
|
|
askedfor = False
|
2021-12-28 14:08:56 +00:00
|
|
|
|
|
|
|
# Return server response based on some output from Minecraft Wiki
|
|
|
|
class MockServerRequestHandler(BaseHTTPRequestHandler):
|
|
|
|
def do_GET(self):
|
2021-12-30 14:46:18 +00:00
|
|
|
global askedfor
|
2021-12-28 14:08:56 +00:00
|
|
|
# We assume testing will be for API endpoint only since RcGcDw doesn't do requests to other URLs so no need to check main path
|
|
|
|
# For simplicity, return a dictionary of query arguments, we assume duplicate keys will not appear
|
|
|
|
query = {k: y for (k, y) in urllib.parse.parse_qsl(self.path.split("?")[1])}
|
|
|
|
if query.get("action") == "query":
|
|
|
|
# Regular pooled query for recentchanges
|
|
|
|
if query.get("list") == "recentchanges":
|
|
|
|
self.send_essentials_ok()
|
2021-12-30 14:46:18 +00:00
|
|
|
if askedfor is False:
|
2021-12-28 14:08:56 +00:00
|
|
|
# Limit amount of events accordingly to required amount just in case
|
2021-12-30 14:46:18 +00:00
|
|
|
response_jsons["recentchanges"]["query"]["recentchanges"] = get_response("recentchanges")["query"]["recentchanges"][0:int(query.get("rclimit", 20))]
|
|
|
|
response_content = json.dumps(get_response("recentchanges"))
|
|
|
|
askedfor = True
|
2021-12-28 14:08:56 +00:00
|
|
|
else:
|
2021-12-30 14:46:18 +00:00
|
|
|
response_jsons["recentchanges2"]["query"]["recentchanges"] = get_response("recentchanges2")["query"]["recentchanges"][0:int(query.get("rclimit", 20))]
|
|
|
|
response_content = json.dumps(get_response("recentchanges2"))
|
2021-12-28 14:08:56 +00:00
|
|
|
self.wfile.write(response_content.encode('utf-8'))
|
|
|
|
# Init info
|
|
|
|
elif query.get("list") == "tags" and query.get("meta") == "allmessages|siteinfo":
|
|
|
|
self.send_essentials_ok()
|
2021-12-30 14:46:18 +00:00
|
|
|
response_content = json.dumps(get_response("init"))
|
2021-12-28 14:08:56 +00:00
|
|
|
self.wfile.write(response_content.encode('utf-8'))
|
|
|
|
elif query.get("meta") == "siteinfo":
|
|
|
|
self.send_essentials_ok()
|
2021-12-30 14:46:18 +00:00
|
|
|
response_content = json.dumps(get_response("siteinfo"))
|
2021-12-28 14:08:56 +00:00
|
|
|
self.wfile.write(response_content.encode('utf-8'))
|
|
|
|
elif query.get("prop") == "imageinfo|revisions":
|
|
|
|
self.send_essentials_ok()
|
2021-12-30 14:46:18 +00:00
|
|
|
response_content = json.dumps(get_response("image"))
|
2021-12-28 14:08:56 +00:00
|
|
|
self.wfile.write(response_content.encode('utf-8'))
|
|
|
|
elif query.get("list") == "usercontribs":
|
|
|
|
self.send_essentials_ok()
|
2021-12-30 14:46:18 +00:00
|
|
|
response_content = json.dumps(get_response("userinfo"))
|
2021-12-28 14:08:56 +00:00
|
|
|
self.wfile.write(response_content.encode('utf-8'))
|
|
|
|
else:
|
|
|
|
self.send_response(400)
|
|
|
|
self.send_header('Content-Type', 'application/json; charset=utf-8')
|
|
|
|
self.end_headers()
|
2021-12-30 14:46:18 +00:00
|
|
|
response_content = json.dumps(get_response("error"))
|
2021-12-28 14:08:56 +00:00
|
|
|
self.wfile.write(response_content.encode('utf-8'))
|
2021-12-30 14:46:18 +00:00
|
|
|
elif query.get("action") == "compare":
|
|
|
|
self.send_essentials_ok()
|
|
|
|
name = "diff{}{}".format(query.get("fromrev"), query.get("torev"))
|
|
|
|
load_response(name)
|
|
|
|
response_content = json.dumps(get_response(name))
|
|
|
|
self.wfile.write(response_content.encode('utf-8'))
|
2021-12-28 14:08:56 +00:00
|
|
|
|
|
|
|
def do_POST(self):
|
2021-12-30 14:46:18 +00:00
|
|
|
self.read_ok_collect(method="POST")
|
2021-12-28 14:08:56 +00:00
|
|
|
|
|
|
|
def do_PATCH(self):
|
2021-12-30 14:46:18 +00:00
|
|
|
self.read_ok_collect(method="PATCH")
|
2021-12-28 14:08:56 +00:00
|
|
|
|
|
|
|
def do_DELETE(self):
|
2021-12-30 14:46:18 +00:00
|
|
|
self.read_ok_collect(method="DELETE")
|
2021-12-28 14:08:56 +00:00
|
|
|
|
2021-12-30 14:46:18 +00:00
|
|
|
def read_ok_collect(self, method: str):
|
2021-12-28 14:08:56 +00:00
|
|
|
content_length = int(self.headers['Content-Length'])
|
|
|
|
patch_data = self.rfile.read(content_length)
|
2021-12-30 14:46:18 +00:00
|
|
|
if patch_data:
|
|
|
|
messages_collector.append(json.loads(patch_data.decode('utf-8')))
|
|
|
|
else:
|
|
|
|
messages_collector.append(method + self.path)
|
2021-12-28 14:08:56 +00:00
|
|
|
self.send_essentials_ok()
|
|
|
|
self.wfile.write(json.dumps({"id": len(messages_collector)}).encode('utf-8'))
|
|
|
|
|
|
|
|
def send_essentials_ok(self):
|
|
|
|
self.send_response(requests.codes.ok)
|
|
|
|
self.send_header('Content-Type', 'application/json; charset=utf-8')
|
|
|
|
self.end_headers()
|
|
|
|
|
|
|
|
|
2021-12-30 14:46:18 +00:00
|
|
|
def start_mock_server(port, config):
|
2021-12-28 14:08:56 +00:00
|
|
|
mock_server = HTTPServer(('localhost', port), MockServerRequestHandler)
|
|
|
|
try:
|
|
|
|
print("Server started successfully at http://localhost:{}".format(port))
|
2021-12-30 14:46:18 +00:00
|
|
|
while 1:
|
2022-01-01 12:45:53 +00:00
|
|
|
if (len(messages_collector) < 13 and config.config == 1) or (len(messages_collector) < 11 and config.config == 2):
|
2021-12-30 14:46:18 +00:00
|
|
|
mock_server.handle_request()
|
|
|
|
else:
|
|
|
|
raise EndOfContent
|
2021-12-28 14:08:56 +00:00
|
|
|
except KeyboardInterrupt:
|
|
|
|
print("Shutting down...")
|
2021-12-30 14:46:18 +00:00
|
|
|
except EndOfContent:
|
|
|
|
with open("results/results{}.json".format(config.config), "r") as proper_results:
|
|
|
|
if proper_results.read() == json.dumps(messages_collector):
|
|
|
|
print("Results are correct!")
|
|
|
|
else:
|
|
|
|
print("Results are incorrect, saving failed results to resultsfailed{}.json".format(config.config))
|
|
|
|
with open("results/resultsfailed{}.json".format(config.config), "w") as file_to_write:
|
|
|
|
file_to_write.write(json.dumps(messages_collector))
|