mirror of
https://gitlab.com/chicken-riders/RcGcDb.git
synced 2025-02-23 00:54:09 +00:00
Further work on Discord message sending code
This commit is contained in:
parent
573efaf7fa
commit
af3cf6440e
|
@ -17,11 +17,12 @@ import re
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
import logging
|
import logging
|
||||||
from typing import Optional, Union, Tuple
|
import asyncio
|
||||||
|
from src.config import settings
|
||||||
|
from src.discord.message import StackedDiscordMessage, MessageTooBig
|
||||||
|
from typing import Optional, Union, Tuple, AsyncGenerator
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
import requests
|
|
||||||
|
|
||||||
from src.configloader import settings
|
|
||||||
from src.discord.message import DiscordMessage, DiscordMessageMetadata, DiscordMessageRaw
|
from src.discord.message import DiscordMessage, DiscordMessageMetadata, DiscordMessageRaw
|
||||||
|
|
||||||
AUTO_SUPPRESSION_ENABLED = settings.get("auto_suppression", {"enabled": False}).get("enabled")
|
AUTO_SUPPRESSION_ENABLED = settings.get("auto_suppression", {"enabled": False}).get("enabled")
|
||||||
|
@ -32,10 +33,31 @@ rate_limit = 0
|
||||||
|
|
||||||
logger = logging.getLogger("rcgcdw.discord.queue")
|
logger = logging.getLogger("rcgcdw.discord.queue")
|
||||||
|
|
||||||
|
class QueueEntry:
|
||||||
|
def __init__(self, discord_message, webhooks):
|
||||||
|
self.discord_message: DiscordMessage = discord_message
|
||||||
|
self.webhooks: list[str] = webhooks
|
||||||
|
self._sent_webhooks: set[str] = set()
|
||||||
|
|
||||||
|
def check_sent_status(self, webhook: str) -> bool:
|
||||||
|
"""Checks sent status for given message, if True it means that the message has been sent before to given webhook, otherwise False."""
|
||||||
|
return webhook in self._sent_webhooks
|
||||||
|
|
||||||
|
def confirm_sent_status(self, webhook: str):
|
||||||
|
"""Confirms sent status for a webhook. Returns True if sending to all webhooks has been completed, otherwise False."""
|
||||||
|
self._sent_webhooks.add(webhook)
|
||||||
|
|
||||||
|
def complete(self) -> bool:
|
||||||
|
return len(self._sent_webhooks) == len(self.webhooks)
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return iter(self.webhooks)
|
||||||
|
|
||||||
|
|
||||||
class MessageQueue:
|
class MessageQueue:
|
||||||
"""Message queue class for undelivered messages"""
|
"""Message queue class for undelivered messages"""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._queue: list[Tuple[Union[DiscordMessage, DiscordMessageRaw], DiscordMessageMetadata]] = []
|
self._queue: list[QueueEntry] = []
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return self._queue
|
return self._queue
|
||||||
|
@ -49,7 +71,7 @@ class MessageQueue:
|
||||||
def clear(self):
|
def clear(self):
|
||||||
self._queue.clear()
|
self._queue.clear()
|
||||||
|
|
||||||
def add_message(self, message: Tuple[Union[DiscordMessage, DiscordMessageRaw], DiscordMessageMetadata]):
|
def add_message(self, message: QueueEntry):
|
||||||
self._queue.append(message)
|
self._queue.append(message)
|
||||||
|
|
||||||
def cut_messages(self, item_num: int):
|
def cut_messages(self, item_num: int):
|
||||||
|
@ -63,31 +85,76 @@ class MessageQueue:
|
||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
async def group_by_webhook(self):
|
||||||
|
"""Group Discord messages in the queue by the dictionary, allowing to send multiple messages to different
|
||||||
|
webhooks at the same time avoiding ratelimits per Discord webhook route."""
|
||||||
|
message_dict = defaultdict(list)
|
||||||
|
for msg in self._queue:
|
||||||
|
if not isinstance(msg.webhooks, list):
|
||||||
|
raise TypeError('msg.webhook_url in _queue is not a list')
|
||||||
|
for webhook in msg.webhooks:
|
||||||
|
message_dict[webhook].append(msg) # defaultdict{"dadibadyvbdmadgqueh23/dihjd8agdandashd": [DiscordMessage, DiscordMessage]}
|
||||||
|
return message_dict.items()
|
||||||
|
|
||||||
def delete_all_with_matching_metadata(self, **properties):
|
def delete_all_with_matching_metadata(self, **properties):
|
||||||
"""Deletes all of the messages that have matching metadata properties (useful for message redaction)"""
|
"""Deletes all of the messages that have matching metadata properties (useful for message redaction)"""
|
||||||
for index, item in reversed(list(enumerate(self._queue))):
|
for index, item in reversed(list(enumerate(self._queue))):
|
||||||
if self.compare_message_to_dict(item[1], properties):
|
if self.compare_message_to_dict(item[1], properties):
|
||||||
self._queue.pop(index)
|
self._queue.pop(index)
|
||||||
|
|
||||||
def resend_msgs(self):
|
async def pack_massages(self, messages: list[QueueEntry]) -> AsyncGenerator[tuple[StackedDiscordMessage, int]]:
|
||||||
|
"""Pack messages into StackedDiscordMessage. It's an async generator"""
|
||||||
|
current_pack = StackedDiscordMessage(0 if messages[0].discord_message.message_type == "compact" else 1) # first message
|
||||||
|
index = -1
|
||||||
|
for index, message in enumerate(messages):
|
||||||
|
message = message.discord_message
|
||||||
|
try:
|
||||||
|
current_pack.add_message(message)
|
||||||
|
except MessageTooBig:
|
||||||
|
yield current_pack
|
||||||
|
current_pack = StackedDiscordMessage(0 if message.message_type == "compact" else 1) # next messages
|
||||||
|
current_pack.add_message(message)
|
||||||
|
yield current_pack, index
|
||||||
|
|
||||||
|
async def send_msg_set(self, msg_set: tuple[str, list[QueueEntry]]):
|
||||||
|
webhook_url, messages = msg_set # str("daosdkosakda/adkahfwegr34", list(DiscordMessage, DiscordMessage, DiscordMessage)
|
||||||
|
async for msg, index in self.pack_massages(messages):
|
||||||
|
if self.global_rate_limit:
|
||||||
|
return # if we are globally rate limited just wait for first gblocked request to finish
|
||||||
|
# Verify that message hasn't been sent before
|
||||||
|
status = await send_to_discord_webhook(msg, webhook_url)
|
||||||
|
if status[0] < 2:
|
||||||
|
logger.debug("Sending message succeeded")
|
||||||
|
for queue_message in messages[max(index-10, 0):index]: # mark messages as delivered
|
||||||
|
queue_message.confirm_sent_status(webhook_url)
|
||||||
|
logger.debug("Current rate limit time: {}".format(status[1]))
|
||||||
|
if status[1] is not None:
|
||||||
|
await asyncio.sleep(float(status[1])) # note, the timer on the last request won't matter that much since it's separate task and for the time of sleep it will give control to other tasks
|
||||||
|
break
|
||||||
|
elif status[0] == 5:
|
||||||
|
if status[1]["global"] is True:
|
||||||
|
logger.debug(
|
||||||
|
"Global rate limit has been detected. Setting global_rate_limit to true and awaiting punishment.")
|
||||||
|
self.global_rate_limit = True
|
||||||
|
await asyncio.sleep(status[1]["retry_after"] / 1000)
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
logger.debug("Sending message failed")
|
||||||
|
break
|
||||||
|
|
||||||
|
async def resend_msgs(self):
|
||||||
|
self.global_rate_limit = False
|
||||||
if self._queue:
|
if self._queue:
|
||||||
logger.info(
|
logger.info(
|
||||||
"{} messages waiting to be delivered to Discord due to Discord throwing errors/no connection to Discord servers.".format(
|
"{} messages waiting to be delivered to Discord.".format(len(self._queue)))
|
||||||
len(self._queue)))
|
tasks_to_run = []
|
||||||
for num, item in enumerate(self._queue):
|
for set_msgs in await self.group_by_webhook():
|
||||||
logger.debug(
|
# logger.debug(set_msgs)
|
||||||
"Trying to send a message to Discord from the queue with id of {} and content {}".format(str(num),
|
tasks_to_run.append(self.send_msg_set(set_msgs))
|
||||||
str(item)))
|
await asyncio.gather(*tasks_to_run) # we wait for all send_msg_set functions to finish
|
||||||
if send_to_discord_webhook(item[0], metadata=item[1]) < 2:
|
self._queue = [x for x in self._queue if x.complete() is False] # get rid of sent messages
|
||||||
logger.debug("Sending message succeeded")
|
else:
|
||||||
else:
|
await asyncio.sleep(0.5)
|
||||||
logger.debug("Sending message failed")
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
self.clear()
|
|
||||||
logger.debug("Queue emptied, all messages delivered")
|
|
||||||
self.cut_messages(num)
|
|
||||||
logger.debug(self._queue)
|
|
||||||
|
|
||||||
|
|
||||||
messagequeue = MessageQueue()
|
messagequeue = MessageQueue()
|
||||||
|
@ -105,7 +172,7 @@ def handle_discord_http(code, formatted_embed, result):
|
||||||
elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND
|
elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND
|
||||||
if result.request.method == "POST": # Ignore not found for DELETE and PATCH requests since the message could already be removed by admin
|
if result.request.method == "POST": # Ignore not found for DELETE and PATCH requests since the message could already be removed by admin
|
||||||
logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.")
|
logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.")
|
||||||
sys.exit(1)
|
remove_webhook_maybe()
|
||||||
else:
|
else:
|
||||||
return 0
|
return 0
|
||||||
elif code == 429:
|
elif code == 429:
|
||||||
|
@ -121,19 +188,14 @@ def handle_discord_http(code, formatted_embed, result):
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
|
|
||||||
def update_ratelimit(request):
|
def send_to_discord_webhook(message: [StackedDiscordMessage, DiscordMessage]):
|
||||||
"""Updates rate limit time"""
|
|
||||||
global rate_limit
|
|
||||||
rate_limit = 0 if int(request.headers.get('x-ratelimit-remaining', "-1")) > 0 else int(request.headers.get(
|
|
||||||
'x-ratelimit-reset-after', 0))
|
|
||||||
rate_limit += settings.get("discord_message_cooldown", 0)
|
|
||||||
|
|
||||||
|
|
||||||
def send_to_discord_webhook(data: Optional[DiscordMessage], metadata: DiscordMessageMetadata):
|
|
||||||
global rate_limit
|
|
||||||
header = settings["header"]
|
header = settings["header"]
|
||||||
header['Content-Type'] = 'application/json'
|
header['Content-Type'] = 'application/json'
|
||||||
standard_args = dict(headers=header)
|
standard_args = dict(headers=header)
|
||||||
|
if isinstance(message, StackedDiscordMessage):
|
||||||
|
req =
|
||||||
|
else:
|
||||||
|
message.metadata.method
|
||||||
if metadata.method == "POST":
|
if metadata.method == "POST":
|
||||||
req = requests.Request("POST", data.webhook_url+"?wait=" + ("true" if AUTO_SUPPRESSION_ENABLED else "false"), data=repr(data), **standard_args)
|
req = requests.Request("POST", data.webhook_url+"?wait=" + ("true" if AUTO_SUPPRESSION_ENABLED else "false"), data=repr(data), **standard_args)
|
||||||
elif metadata.method == "DELETE":
|
elif metadata.method == "DELETE":
|
||||||
|
|
125
src/msgqueue.py
125
src/msgqueue.py
|
@ -1,125 +0,0 @@
|
||||||
import asyncio, logging, aiohttp
|
|
||||||
import typing
|
|
||||||
|
|
||||||
from src.discord.message import DiscordMessage, StackedDiscordMessage, MessageTooBig
|
|
||||||
from src.config import settings
|
|
||||||
from src.exceptions import EmbedListFull
|
|
||||||
from collections import defaultdict
|
|
||||||
logger = logging.getLogger("rcgcdw.msgqueue")
|
|
||||||
|
|
||||||
class QueueEntry:
|
|
||||||
def __init__(self, discord_message, webhooks):
|
|
||||||
self.discord_message: DiscordMessage = discord_message
|
|
||||||
self.webhooks: list[str] = webhooks
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
return iter(self.webhooks)
|
|
||||||
|
|
||||||
|
|
||||||
class MessageQueue:
|
|
||||||
"""Message queue class for undelivered messages"""
|
|
||||||
def __init__(self):
|
|
||||||
self._queue = []
|
|
||||||
self.global_rate_limit = False
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return self._queue
|
|
||||||
|
|
||||||
def __len__(self):
|
|
||||||
return len(self._queue)
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
return iter(self._queue)
|
|
||||||
|
|
||||||
def clear(self):
|
|
||||||
self._queue.clear()
|
|
||||||
|
|
||||||
def add_messages(self, messages: list[QueueEntry]):
|
|
||||||
self._queue.extend(messages)
|
|
||||||
logger.debug("Adding new messages")
|
|
||||||
#
|
|
||||||
# def replace_message(self, to_replace: DiscordMessage, with_replace: StackedDiscordMessage):
|
|
||||||
# try:
|
|
||||||
# self._queue[self._queue.index(to_replace)] = with_replace
|
|
||||||
# except ValueError:
|
|
||||||
# raise
|
|
||||||
|
|
||||||
def cut_messages(self, item_num):
|
|
||||||
self._queue = self._queue[item_num:]
|
|
||||||
|
|
||||||
async def group_by_webhook(self): # TODO Change into iterable
|
|
||||||
"""Group Discord messages in the queue by the dictionary, allowing to send multiple messages to different
|
|
||||||
webhooks at the same time avoiding ratelimits per Discord webhook route."""
|
|
||||||
message_dict = defaultdict(list)
|
|
||||||
for msg in self._queue:
|
|
||||||
if not isinstance(msg.webhook_url, list):
|
|
||||||
raise TypeError('msg.webhook_url in _queue is not a list')
|
|
||||||
for webhook in msg.webhook_url:
|
|
||||||
message_dict[webhook].append(msg) # defaultdict{"dadibadyvbdmadgqueh23/dihjd8agdandashd": [DiscordMessage, DiscordMessage]}
|
|
||||||
return message_dict.items() # dict_items([('daosdkosakda/adkahfwegr34', [DiscordMessage]), ('daosdkosakda/adkahfwegr33', [DiscordMessage, DiscordMessage])])
|
|
||||||
|
|
||||||
async def pack_massages(self, messages: list[DiscordMessage]) -> typing.AsyncGenerator:
|
|
||||||
"""Pack messages into StackedDiscordMessage. It's an async generator"""
|
|
||||||
current_pack = StackedDiscordMessage(0 if messages[0].message_type == "compact" else 1) # first message
|
|
||||||
for message in messages:
|
|
||||||
try:
|
|
||||||
current_pack.add_message(message)
|
|
||||||
except MessageTooBig:
|
|
||||||
yield current_pack
|
|
||||||
current_pack = StackedDiscordMessage(0 if message.message_type == "compact" else 1) # next messages
|
|
||||||
current_pack.add_message(message)
|
|
||||||
yield current_pack
|
|
||||||
|
|
||||||
async def send_msg_set(self, msg_set: tuple):
|
|
||||||
webhook_url, messages = msg_set # str("daosdkosakda/adkahfwegr34", list(DiscordMessage, DiscordMessage, DiscordMessage)
|
|
||||||
async for msg in self.pack_massages(messages):
|
|
||||||
if self.global_rate_limit:
|
|
||||||
return # if we are globally rate limited just wait for first gblocked request to finish
|
|
||||||
status = await send_to_discord_webhook(msg, webhook_url)
|
|
||||||
if status[0] < 2:
|
|
||||||
logger.debug("Sending message succeeded")
|
|
||||||
try:
|
|
||||||
if len(msg.webhook_url) > 1:
|
|
||||||
msg.webhook_url.remove(webhook_url)
|
|
||||||
else:
|
|
||||||
self._queue.remove(msg)
|
|
||||||
except ValueError:
|
|
||||||
# For the love of god I cannot figure why can it return ValueError: list.remove(x): x not in list, however considering it's not in the list, somehow, anymore we can just not care about it I guess
|
|
||||||
pass
|
|
||||||
logger.debug("Current rate limit time: {}".format(status[1]))
|
|
||||||
if status[1] is not None:
|
|
||||||
await asyncio.sleep(float(status[1])) # note, the timer on the last request won't matter that much since it's separate task and for the time of sleep it will give control to other tasks
|
|
||||||
break
|
|
||||||
elif status[0] == 5:
|
|
||||||
if status[1]["global"] is True:
|
|
||||||
logger.debug("Global rate limit has been detected. Setting global_rate_limit to true and awaiting punishment.")
|
|
||||||
self.global_rate_limit = True
|
|
||||||
await asyncio.sleep(status[1]["retry_after"]/1000)
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
logger.debug("Sending message failed")
|
|
||||||
break
|
|
||||||
|
|
||||||
async def resend_msgs(self):
|
|
||||||
self.global_rate_limit = False
|
|
||||||
if self._queue:
|
|
||||||
logger.info(
|
|
||||||
"{} messages waiting to be delivered to Discord.".format(len(self._queue)))
|
|
||||||
tasks_to_run = []
|
|
||||||
for set_msgs in await self.group_by_webhook():
|
|
||||||
# logger.debug(set_msgs)
|
|
||||||
tasks_to_run.append(self.send_msg_set(set_msgs))
|
|
||||||
await asyncio.gather(*tasks_to_run) # we wait for all send_msg_set functions to finish
|
|
||||||
else:
|
|
||||||
await asyncio.sleep(0.5)
|
|
||||||
|
|
||||||
|
|
||||||
messagequeue = MessageQueue()
|
|
||||||
|
|
||||||
|
|
||||||
async def send_to_discord(msg):
|
|
||||||
messagequeue.add_message(msg)
|
|
||||||
# webhooks = msg.webhook_url.copy()
|
|
||||||
# for webhook in webhooks:
|
|
||||||
# msg.webhook_url = [webhook] # Doing it just so it doesn't store reference but value
|
|
||||||
# messagequeue.add_message(msg.copy())
|
|
|
@ -10,7 +10,7 @@ import logging, aiohttp
|
||||||
from functools import cache
|
from functools import cache
|
||||||
|
|
||||||
from api.util import default_message
|
from api.util import default_message
|
||||||
from msgqueue import QueueEntry, messagequeue
|
from src.discord.queue import messagequeue, QueueEntry
|
||||||
from mw_messages import MWMessages
|
from mw_messages import MWMessages
|
||||||
from src.exceptions import *
|
from src.exceptions import *
|
||||||
from src.database import db
|
from src.database import db
|
||||||
|
|
Loading…
Reference in a new issue