Many changes to logic of sending messages "better" exception handling

This commit is contained in:
Frisk 2020-08-01 12:45:41 +02:00
parent ecf07a0534
commit e26258edb7
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
3 changed files with 45 additions and 11 deletions

View file

@ -16,7 +16,7 @@ from src.misc import get_paths
from src.msgqueue import messagequeue from src.msgqueue import messagequeue
from src.queue_handler import DBHandler from src.queue_handler import DBHandler
from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info
from src.discord import DiscordMessage, formatter_exception_logger from src.discord import DiscordMessage, formatter_exception_logger, msg_sender_exception_logger
logging.config.dictConfig(settings["logging"]) logging.config.dictConfig(settings["logging"])
logger = logging.getLogger("rcgcdb.bot") logger = logging.getLogger("rcgcdb.bot")
@ -140,12 +140,22 @@ async def wiki_scanner():
async def message_sender(): async def message_sender():
"""message_sender is a coroutine responsible for handling Discord messages and their sending to Discord""" """message_sender is a coroutine responsible for handling Discord messages and their sending to Discord"""
try:
while True: while True:
await messagequeue.resend_msgs() await messagequeue.resend_msgs()
except:
if command_line_args.debug:
logger.exception("Exception on DC message sender")
raise # reraise the issue
else:
logger.exception("Exception on DC message sender")
await msg_sender_exception_logger(traceback.format_exc())
def shutdown(loop, signal=None): def shutdown(loop, signal=None):
DBHandler.update_db() DBHandler.update_db()
if len(messagequeue) > 0:
logger.warning("Some messages are still queued!")
loop.stop() loop.stop()
logger.info("Script has shut down due to signal {}.".format(signal)) logger.info("Script has shut down due to signal {}.".format(signal))
for task in asyncio.all_tasks(loop): for task in asyncio.all_tasks(loop):

View file

@ -118,6 +118,15 @@ async def formatter_exception_logger(wiki_url, change, exception):
await send_to_discord_webhook_monitoring(message) await send_to_discord_webhook_monitoring(message)
async def msg_sender_exception_logger(exception):
"""Creates a Discord message reporting a crash in RC formatter area"""
message = DiscordMessage("embed", "bot/exception", [None], wiki=None)
message["description"] = exception
message["title"] = "MSGSENDER Exception Report"
message.finish_embed()
await send_to_discord_webhook_monitoring(message)
async def send_to_discord_webhook_monitoring(data: DiscordMessage): async def send_to_discord_webhook_monitoring(data: DiscordMessage):
header = settings["header"] header = settings["header"]
header['Content-Type'] = 'application/json' header['Content-Type'] = 'application/json'
@ -135,24 +144,30 @@ async def send_to_discord_webhook(data: DiscordMessage, webhook_url: str) -> tup
:return tuple(status code for request, rate limit info (None for can send more, string for amount of seconds to wait)""" :return tuple(status code for request, rate limit info (None for can send more, string for amount of seconds to wait)"""
header = settings["header"] header = settings["header"]
header['Content-Type'] = 'application/json' header['Content-Type'] = 'application/json'
header["X-RateLimit-Precision"] = "millisecond"
async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(5.0)) as session: async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(5.0)) as session:
try: try:
result = await session.post("https://discord.com/api/webhooks/"+webhook_url, data=repr(data)) result = await session.post("https://discord.com/api/webhooks/"+webhook_url, data=repr(data))
logger.debug(result.headers)
rate_limit = None if int(result.headers.get('x-ratelimit-remaining')) > 0 else result.headers.get('x-ratelimit-reset-after') rate_limit = None if int(result.headers.get('x-ratelimit-remaining')) > 0 else result.headers.get('x-ratelimit-reset-after')
except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError, TimeoutError): except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError, TimeoutError):
logger.exception("Could not send the message to Discord") logger.exception("Could not send the message to Discord")
return 3, None return 3, None
return await handle_discord_http(result.status, repr(data), await result.text(), data), rate_limit status = await handle_discord_http(result.status, repr(data), result, data)
if status == 5:
return 5, await result.json()
else:
return status, rate_limit
async def handle_discord_http(code, formatted_embed, result, dmsg): async def handle_discord_http(code: int, formatted_embed: str, result: aiohttp.ClientResponse, dmsg: DiscordMessage):
if 300 > code > 199: # message went through if 300 > code > 199: # message went through
return 0 return 0
elif code == 400: # HTTP BAD REQUEST result.status_code, data, result, header elif code == 400: # HTTP BAD REQUEST result.status_code, data, result, header
logger.error( logger.error(
"Following message has been rejected by Discord, please submit a bug on our bugtracker adding it:") "Following message has been rejected by Discord, please submit a bug on our bugtracker adding it:")
logger.error(formatted_embed) logger.error(formatted_embed)
logger.error(result.text) logger.error(await result.text())
return 1 return 1
elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND
logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.") logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.")
@ -161,7 +176,7 @@ async def handle_discord_http(code, formatted_embed, result, dmsg):
return 1 return 1
elif code == 429: elif code == 429:
logger.error("We are sending too many requests to the Discord, slowing down...") logger.error("We are sending too many requests to the Discord, slowing down...")
return 2 return 5
elif 499 < code < 600: elif 499 < code < 600:
logger.error( logger.error(
"Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format( "Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format(

View file

@ -8,7 +8,7 @@ class MessageQueue:
"""Message queue class for undelivered messages""" """Message queue class for undelivered messages"""
def __init__(self): def __init__(self):
self._queue = [] self._queue = []
self.session = None self.global_rate_limit = False
def __repr__(self): def __repr__(self):
return self._queue return self._queue
@ -28,8 +28,6 @@ class MessageQueue:
def cut_messages(self, item_num): def cut_messages(self, item_num):
self._queue = self._queue[item_num:] self._queue = self._queue[item_num:]
async def create_session(self):
self.session = aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(5.0))
async def group_by_webhook(self): # TODO Change into iterable async def group_by_webhook(self): # TODO Change into iterable
"""Group Discord messages in the queue by the dictionary, allowing to send multiple messages to different """Group Discord messages in the queue by the dictionary, allowing to send multiple messages to different
@ -43,6 +41,8 @@ class MessageQueue:
async def send_msg_set(self, msg_set: tuple): async def send_msg_set(self, msg_set: tuple):
webhook_url, messages = msg_set webhook_url, messages = msg_set
for msg in messages: for msg in messages:
if self.global_rate_limit:
return # if we are globally rate limited just wait for first gblocked request to finish
status = await send_to_discord_webhook(msg, webhook_url) status = await send_to_discord_webhook(msg, webhook_url)
if status[0] < 2: if status[0] < 2:
logger.debug("Sending message succeeded") logger.debug("Sending message succeeded")
@ -50,11 +50,19 @@ class MessageQueue:
logger.debug("Current rate limit time: {}".format(status[1])) logger.debug("Current rate limit time: {}".format(status[1]))
if status[1] is not None: if status[1] is not None:
await asyncio.sleep(float(status[1])) # note, the timer on the last request won't matter that much since it's separate task and for the time of sleep it will give control to other tasks await asyncio.sleep(float(status[1])) # note, the timer on the last request won't matter that much since it's separate task and for the time of sleep it will give control to other tasks
break
elif status[0] == 5:
if status[1]["global"] is True:
logger.debug("Global rate limit has been detected. Setting global_rate_limit to true and awaiting punishment.")
self.global_rate_limit = True
await asyncio.sleep(status[1]["retry_after"]/1000)
break
else: else:
logger.debug("Sending message failed") logger.debug("Sending message failed")
break break
async def resend_msgs(self): async def resend_msgs(self):
self.global_rate_limit = False
if self._queue: if self._queue:
logger.info( logger.info(
"{} messages waiting to be delivered to Discord.".format(len(self._queue))) "{} messages waiting to be delivered to Discord.".format(len(self._queue)))
@ -64,7 +72,8 @@ class MessageQueue:
tasks_to_run.append(self.send_msg_set(set_msgs)) tasks_to_run.append(self.send_msg_set(set_msgs))
await asyncio.gather(*tasks_to_run) await asyncio.gather(*tasks_to_run)
logger.debug(self._queue) logger.debug(self._queue)
await asyncio.sleep(0.1) else:
await asyncio.sleep(0.5)
messagequeue = MessageQueue() messagequeue = MessageQueue()