mirror of
https://gitlab.com/chicken-riders/RcGcDb.git
synced 2025-02-23 00:54:09 +00:00
Added "request price" to stop rapidly requesting wiki data and add some timeout, fixed Discord sending
This commit is contained in:
parent
09fcfcb070
commit
5e2a0f3028
|
@ -29,7 +29,7 @@ def embed_approval_approvefile(ctx: Context, change: dict):
|
||||||
embed_helper(ctx, embed, change)
|
embed_helper(ctx, embed, change)
|
||||||
embed["url"] = ctx.client.create_article_path(sanitize_to_url(change["title"]))
|
embed["url"] = ctx.client.create_article_path(sanitize_to_url(change["title"]))
|
||||||
embed["title"] = ctx._("Approved a file revision of {file}").format(file=sanitize_to_markdown(change["title"]))
|
embed["title"] = ctx._("Approved a file revision of {file}").format(file=sanitize_to_markdown(change["title"]))
|
||||||
link_parser_object = LinkParser(ctx.client.WIKI_JUST_DOMAIN)
|
link_parser_object = ctx.client.LinkParser(ctx.client.WIKI_JUST_DOMAIN)
|
||||||
link_parser_object.feed(change.get("logparams", {}).get("0", ""))
|
link_parser_object.feed(change.get("logparams", {}).get("0", ""))
|
||||||
embed["description"] = ctx._("File version from {time} got approved").format(name=change["title"], time=link_parser_object.new_string)
|
embed["description"] = ctx._("File version from {time} got approved").format(name=change["title"], time=link_parser_object.new_string)
|
||||||
# TODO Make timestamp more user friendly? Getting user who uploaded will be a pain though, same with approval/approve
|
# TODO Make timestamp more user friendly? Getting user who uploaded will be a pain though, same with approval/approve
|
||||||
|
@ -55,7 +55,7 @@ def embed_approval_approve(ctx: Context, change: dict):
|
||||||
embed_helper(ctx, embed, change)
|
embed_helper(ctx, embed, change)
|
||||||
embed["url"] = ctx.client.create_article_path(sanitize_to_url(change["title"]))
|
embed["url"] = ctx.client.create_article_path(sanitize_to_url(change["title"]))
|
||||||
embed["title"] = ctx._("Approved a revision of {article}").format(article=sanitize_to_markdown(change["title"]))
|
embed["title"] = ctx._("Approved a revision of {article}").format(article=sanitize_to_markdown(change["title"]))
|
||||||
link_parser_object = LinkParser(ctx.client.WIKI_JUST_DOMAIN)
|
link_parser_object = ctx.client.LinkParser(ctx.client.WIKI_JUST_DOMAIN)
|
||||||
link_parser_object.feed(change.get("logparams", {}).get("0", ""))
|
link_parser_object.feed(change.get("logparams", {}).get("0", ""))
|
||||||
embed["description"] = ctx._("Revision number {revision_id} got approved").format(name=change["title"], time=link_parser_object.new_string)
|
embed["description"] = ctx._("Revision number {revision_id} got approved").format(name=change["title"], time=link_parser_object.new_string)
|
||||||
return embed
|
return embed
|
||||||
|
|
|
@ -66,6 +66,7 @@ class MessageQueue:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._queue: list[QueueEntry] = []
|
self._queue: list[QueueEntry] = []
|
||||||
self.webhook_suspensions: dict[str, asyncio.Task] = {} # Storing tasks counting one hour since last 404
|
self.webhook_suspensions: dict[str, asyncio.Task] = {} # Storing tasks counting one hour since last 404
|
||||||
|
self.global_rate_limit = False
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return self._queue
|
return self._queue
|
||||||
|
@ -91,6 +92,7 @@ class MessageQueue:
|
||||||
|
|
||||||
async def suspension_check(self, webhook_url: str):
|
async def suspension_check(self, webhook_url: str):
|
||||||
"""Check after an hour if suspended webhook still returns ClientError"""
|
"""Check after an hour if suspended webhook still returns ClientError"""
|
||||||
|
logger.debug(f"Putting webhook {webhook_url} into suspension. Checking status in 2 hours.")
|
||||||
await asyncio.sleep(7200) # 2 hours
|
await asyncio.sleep(7200) # 2 hours
|
||||||
unsent_messages = await self.group_by_webhook()
|
unsent_messages = await self.group_by_webhook()
|
||||||
unsent_messages = dict(unsent_messages)
|
unsent_messages = dict(unsent_messages)
|
||||||
|
@ -174,17 +176,18 @@ class MessageQueue:
|
||||||
# noinspection PyTypeChecker
|
# noinspection PyTypeChecker
|
||||||
try:
|
try:
|
||||||
status = await send_to_discord_webhook(msg, webhook_url, method)
|
status = await send_to_discord_webhook(msg, webhook_url, method)
|
||||||
except aiohttp.ClientError:
|
except ExhaustedDiscordBucket as e:
|
||||||
client_error = True
|
if e.is_global:
|
||||||
|
self.global_rate_limit = True
|
||||||
|
await asyncio.sleep(e.remaining)
|
||||||
|
return
|
||||||
except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError):
|
except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError):
|
||||||
# Retry on next Discord message sent attempt
|
# Retry on next Discord message sent attempt
|
||||||
logger.debug(f"Received timeout or connection error when sending a Discord message for {msg.wiki.script_url if hasattr(msg, "wiki") else "PATCH OR DELETE MESSAGE"}.")
|
logger.debug(f"Received timeout or connection error when sending a Discord message for {msg.wiki.script_url if hasattr(msg, "wiki") else "PATCH OR DELETE MESSAGE"}.")
|
||||||
return
|
return
|
||||||
except ExhaustedDiscordBucket as e:
|
except aiohttp.ClientError as e:
|
||||||
if e.is_global:
|
client_error = True
|
||||||
self.global_rate_limit = True
|
logger.exception("Client error has been reported on Discord message sending send_msg_set function.")
|
||||||
await asyncio.sleep(e.remaining / 1000)
|
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
if status == 0 and method == "POST":
|
if status == 0 and method == "POST":
|
||||||
message = None
|
message = None
|
||||||
|
@ -216,14 +219,13 @@ class MessageQueue:
|
||||||
if self._queue:
|
if self._queue:
|
||||||
logger.info(
|
logger.info(
|
||||||
"{} messages waiting to be delivered to Discord.".format(len(self._queue)))
|
"{} messages waiting to be delivered to Discord.".format(len(self._queue)))
|
||||||
tasks_to_run = []
|
tasks_to_run = []
|
||||||
for set_msgs in await self.group_by_webhook():
|
for set_msgs in await self.group_by_webhook():
|
||||||
# logger.debug(set_msgs)
|
# logger.debug(set_msgs)
|
||||||
tasks_to_run.append(self.send_msg_set(set_msgs))
|
tasks_to_run.append(self.send_msg_set(set_msgs))
|
||||||
await asyncio.gather(*tasks_to_run) # we wait for all send_msg_set functions to finish
|
await asyncio.gather(*tasks_to_run) # we wait for all send_msg_set functions to finish
|
||||||
self._queue = [x for x in self._queue if x.complete() is False] # get rid of sent messages
|
self._queue = [x for x in self._queue if x.complete() is False] # get rid of sent messages
|
||||||
else:
|
await asyncio.sleep(1)
|
||||||
await asyncio.sleep(0.5)
|
|
||||||
|
|
||||||
|
|
||||||
messagequeue = MessageQueue()
|
messagequeue = MessageQueue()
|
||||||
|
@ -250,8 +252,8 @@ async def handle_discord_http(code: int, formatted_embed: str, result: ClientRes
|
||||||
elif code == 429:
|
elif code == 429:
|
||||||
logger.error("We are sending too many requests to the Discord, slowing down...")
|
logger.error("We are sending too many requests to the Discord, slowing down...")
|
||||||
if "x-ratelimit-global" in result.headers.keys():
|
if "x-ratelimit-global" in result.headers.keys():
|
||||||
raise ExhaustedDiscordBucket(remaining=int(result.headers.get("x-ratelimit-reset-after")), is_global=True)
|
raise ExhaustedDiscordBucket(remaining=result.headers.get("x-ratelimit-reset-after", 1), is_global=True)
|
||||||
raise ExhaustedDiscordBucket(remaining=int(result.headers.get("x-ratelimit-reset-after")), is_global=False)
|
raise ExhaustedDiscordBucket(remaining=result.headers.get("x-ratelimit-reset-after"), is_global=False)
|
||||||
elif 499 < code < 600:
|
elif 499 < code < 600:
|
||||||
logger.error(
|
logger.error(
|
||||||
"Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format(
|
"Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format(
|
||||||
|
@ -271,11 +273,12 @@ async def send_to_discord_webhook(message: [StackedDiscordMessage, DiscordMessag
|
||||||
if method == "POST":
|
if method == "POST":
|
||||||
async with session.post(f"https://discord.com/api/webhooks/{webhook_path}?wait=true", data=repr(message)) as resp: # TODO Detect Invalid Webhook Token
|
async with session.post(f"https://discord.com/api/webhooks/{webhook_path}?wait=true", data=repr(message)) as resp: # TODO Detect Invalid Webhook Token
|
||||||
try:
|
try:
|
||||||
|
logger.debug(f"repr_json reached")
|
||||||
resp_json = await resp.json()
|
resp_json = await resp.json()
|
||||||
# Add Discord Message ID which we can later use to delete/redact messages if we want
|
# Add Discord Message ID which we can later use to delete/redact messages if we want
|
||||||
message.discord_callback_message_id = resp_json["id"]
|
message.discord_callback_message_id = resp_json["id"]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise aiohttp.ServerConnectionError(f"Could not get the ID from POST request with message data. Data: {await resp.text()}")
|
logger.exception(f"Could not get the ID from POST request with message data. Data: {await resp.text()}]")
|
||||||
except ContentTypeError:
|
except ContentTypeError:
|
||||||
logger.exception("Could not receive message ID from Discord due to invalid MIME type of response.")
|
logger.exception("Could not receive message ID from Discord due to invalid MIME type of response.")
|
||||||
except ValueError:
|
except ValueError:
|
||||||
|
|
|
@ -71,6 +71,6 @@ class WikiExists(Exception):
|
||||||
|
|
||||||
|
|
||||||
class ExhaustedDiscordBucket(BaseException):
|
class ExhaustedDiscordBucket(BaseException):
|
||||||
def __init__(self, remaining: int, is_global: bool):
|
def __init__(self, remaining: int | float, is_global: bool):
|
||||||
self.remaining = remaining
|
self.remaining = remaining
|
||||||
self.is_global = is_global
|
self.is_global = is_global
|
||||||
|
|
|
@ -130,6 +130,7 @@ class Wiki:
|
||||||
self.recache_requested: bool = False
|
self.recache_requested: bool = False
|
||||||
self.session_requests = requests.Session()
|
self.session_requests = requests.Session()
|
||||||
self.session_requests.headers.update(settings["header"])
|
self.session_requests.headers.update(settings["header"])
|
||||||
|
self.request_cost = 0 # For tracking amount of times wiki has been requested in given context
|
||||||
logger.debug("Creating new wiki object for {}".format(script_url))
|
logger.debug("Creating new wiki object for {}".format(script_url))
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
|
@ -140,6 +141,9 @@ class Wiki:
|
||||||
f"<statistics={self.statistics} tags={self.tags} rc_targets={self.rc_targets}, discussion_targets={self.discussion_targets},"
|
f"<statistics={self.statistics} tags={self.tags} rc_targets={self.rc_targets}, discussion_targets={self.discussion_targets},"
|
||||||
f"recache_requested={self.recache_requested}>")
|
f"recache_requested={self.recache_requested}>")
|
||||||
|
|
||||||
|
def request_made(self):
|
||||||
|
self.request_cost += 1
|
||||||
|
|
||||||
def json(self) -> dict:
|
def json(self) -> dict:
|
||||||
dict_obj = {
|
dict_obj = {
|
||||||
"wiki_url": self.script_url,
|
"wiki_url": self.script_url,
|
||||||
|
@ -364,6 +368,7 @@ class Wiki:
|
||||||
def sync_api_request(self, params: Union[str, OrderedDict], *json_path: str, timeout: int = 10,
|
def sync_api_request(self, params: Union[str, OrderedDict], *json_path: str, timeout: int = 10,
|
||||||
allow_redirects: bool = False) -> dict:
|
allow_redirects: bool = False) -> dict:
|
||||||
"""Synchronous function based on api_request created for compatibility reasons with RcGcDw API"""
|
"""Synchronous function based on api_request created for compatibility reasons with RcGcDw API"""
|
||||||
|
self.request_made()
|
||||||
try:
|
try:
|
||||||
if isinstance(params, str):
|
if isinstance(params, str):
|
||||||
request = self.session_requests.get(self.script_url + "api.php" + params + "&errorformat=raw", timeout=10, allow_redirects=allow_redirects)
|
request = self.session_requests.get(self.script_url + "api.php" + params + "&errorformat=raw", timeout=10, allow_redirects=allow_redirects)
|
||||||
|
@ -620,6 +625,8 @@ async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, displ
|
||||||
await wiki.redact_messages(context, logparams.get("ids", []), "rev_id", logparams.get("new", {}), page_id=change.get("pageid", -1))
|
await wiki.redact_messages(context, logparams.get("ids", []), "rev_id", logparams.get("new", {}), page_id=change.get("pageid", -1))
|
||||||
await wiki.delete_messages(dict(rev_id=[int(x) for x in logparams.get("ids", [])], message_display=0))
|
await wiki.delete_messages(dict(rev_id=[int(x) for x in logparams.get("ids", [])], message_display=0))
|
||||||
run_hooks(post_hooks, discord_message, metadata, context, change)
|
run_hooks(post_hooks, discord_message, metadata, context, change)
|
||||||
|
await asyncio.sleep(wiki.request_cost*0.5) # Await amount of time due to additional requests made in the formatters
|
||||||
|
wiki.request_cost = 0
|
||||||
if discord_message: # TODO How to react when none? (crash in formatter), probably bad handling atm
|
if discord_message: # TODO How to react when none? (crash in formatter), probably bad handling atm
|
||||||
discord_message.finish_embed()
|
discord_message.finish_embed()
|
||||||
discord_message.metadata = metadata
|
discord_message.metadata = metadata
|
||||||
|
|
Loading…
Reference in a new issue