Major rework of how the message sending is handled

This commit is contained in:
Frisk 2020-07-27 18:32:30 +02:00
parent 49b9e8de20
commit 391c897367
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
6 changed files with 39 additions and 27 deletions

View file

@ -54,7 +54,6 @@ async def wiki_scanner():
while True: while True:
calc_delay = calculate_delay() calc_delay = calculate_delay()
fetch_all = db_cursor.execute('SELECT webhook, wiki, lang, display, wikiid, rcid, postid FROM rcgcdw GROUP BY wiki') fetch_all = db_cursor.execute('SELECT webhook, wiki, lang, display, wikiid, rcid, postid FROM rcgcdw GROUP BY wiki')
# webhook, wiki, lang, display, wikiid, rcid, postid
for db_wiki in fetch_all.fetchall(): for db_wiki in fetch_all.fetchall():
logger.debug("Wiki {}".format(db_wiki[1])) logger.debug("Wiki {}".format(db_wiki[1]))
extended = False extended = False

View file

@ -23,7 +23,7 @@ async def wiki_removal(wiki_url, status):
reasons = {410: _("wiki deletion"), 404: _("wiki deletion"), 401: _("wiki becoming inaccessible"), reasons = {410: _("wiki deletion"), 404: _("wiki deletion"), 401: _("wiki becoming inaccessible"),
402: _("wiki becoming inaccessible"), 403: _("wiki becoming inaccessible"), 410: _("wiki becoming inaccessible")} 402: _("wiki becoming inaccessible"), 403: _("wiki becoming inaccessible"), 410: _("wiki becoming inaccessible")}
reason = reasons.get(status, _("unknown error")) reason = reasons.get(status, _("unknown error"))
await send_to_discord_webhook(DiscordMessage("compact", "webhook/remove", webhook_url=[observer[0]], content=_("The webhook for {} has been removed due to {}.".format(wiki_url, reason)), wiki=None)) await send_to_discord_webhook(DiscordMessage("compact", "webhook/remove", webhook_url=[], content=_("The webhook for {} has been removed due to {}.".format(wiki_url, reason)), wiki=None), webhook_url=observer[0])
header = settings["header"] header = settings["header"]
header['Content-Type'] = 'application/json' header['Content-Type'] = 'application/json'
header['X-Audit-Log-Reason'] = "Wiki becoming unavailable" header['X-Audit-Log-Reason'] = "Wiki becoming unavailable"
@ -117,13 +117,12 @@ async def send_to_discord_webhook_monitoring(data: DiscordMessage):
return 3 return 3
async def send_to_discord_webhook(data: DiscordMessage): async def send_to_discord_webhook(data: DiscordMessage, webhook_url: str):
header = settings["header"] header = settings["header"]
header['Content-Type'] = 'application/json' header['Content-Type'] = 'application/json'
async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(5.0)) as session: async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(5.0)) as session:
for webhook in data.webhook_url:
try: try:
result = await session.post("https://discord.com/api/webhooks/"+webhook, data=repr(data)) result = await session.post("https://discord.com/api/webhooks/"+webhook_url, data=repr(data))
except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError): except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError):
logger.exception("Could not send the message to Discord") logger.exception("Could not send the message to Discord")
return 3 return 3

View file

@ -24,6 +24,7 @@ logger = logging.getLogger("rcgcdw.rc_formatters")
async def compact_formatter(action, change, parsed_comment, categories, recent_changes, target, _, ngettext, paths, async def compact_formatter(action, change, parsed_comment, categories, recent_changes, target, _, ngettext, paths,
additional_data=None): additional_data=None):
"""Recent Changes compact formatter, part of RcGcDw"""
if additional_data is None: if additional_data is None:
additional_data = {"namespaces": {}, "tags": {}} additional_data = {"namespaces": {}, "tags": {}}
WIKI_API_PATH = paths[0] WIKI_API_PATH = paths[0]
@ -315,6 +316,7 @@ async def compact_formatter(action, change, parsed_comment, categories, recent_c
async def embed_formatter(action, change, parsed_comment, categories, recent_changes, target, _, ngettext, paths, additional_data=None): async def embed_formatter(action, change, parsed_comment, categories, recent_changes, target, _, ngettext, paths, additional_data=None):
"""Recent Changes embed formatter, part of RcGcDw"""
if additional_data is None: if additional_data is None:
additional_data = {"namespaces": {}, "tags": {}} additional_data = {"namespaces": {}, "tags": {}}
WIKI_API_PATH = paths[0] WIKI_API_PATH = paths[0]

View file

@ -9,8 +9,9 @@ logger = logging.getLogger("rcgcdw.misc")
def get_paths(wiki: str, request) -> tuple: def get_paths(wiki: str, request) -> tuple:
"""Prepares wiki paths for the functions"""
parsed_url = urlparse(wiki) parsed_url = urlparse(wiki)
WIKI_API_PATH = wiki + request["query"]["general"]["scriptpath"] + "api.php" WIKI_API_PATH = wiki + "api.php"
WIKI_SCRIPT_PATH = wiki WIKI_SCRIPT_PATH = wiki
WIKI_ARTICLE_PATH = urlunparse((*parsed_url[0:2], "", "", "", "")) + request["query"]["general"]["articlepath"] WIKI_ARTICLE_PATH = urlunparse((*parsed_url[0:2], "", "", "", "")) + request["query"]["general"]["articlepath"]
WIKI_JUST_DOMAIN = urlunparse((*parsed_url[0:2], "", "", "", "")) WIKI_JUST_DOMAIN = urlunparse((*parsed_url[0:2], "", "", "", ""))

View file

@ -1,6 +1,7 @@
import asyncio, logging, aiohttp import asyncio, logging, aiohttp
from src.discord import send_to_discord_webhook from src.discord import send_to_discord_webhook
from src.config import settings from src.config import settings
from collections import defaultdict, ItemsView
logger = logging.getLogger("rcgcdw.msgqueue") logger = logging.getLogger("rcgcdw.msgqueue")
class MessageQueue: class MessageQueue:
@ -30,26 +31,35 @@ class MessageQueue:
async def create_session(self): async def create_session(self):
self.session = aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(5.0)) self.session = aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(5.0))
async def resend_msgs(self): async def group_by_webhook(self): # TODO Change into iterable
if self.session is None: """Group Discord messages in the queue by the dictionary, allowing to send multiple messages to different
await self.create_session() webhooks at the same time avoiding ratelimits per Discord webhook route."""
if self._queue: message_dict = defaultdict(list)
logger.info( for msg in self._queue:
"{} messages waiting to be delivered to Discord.".format(len(self._queue))) for webhook in msg.webhook_url:
for num, item in enumerate(self._queue): message_dict[webhook].append(msg)
logger.debug( return message_dict.items()
"Trying to send a message to Discord from the queue with id of {} and content {}".format(str(num),
str(item))) async def send_msg_set(self, msg_set: tuple):
if await send_to_discord_webhook(item) < 2: webhook_url, messages = msg_set
for msg in messages:
if await send_to_discord_webhook(msg, webhook_url) < 2:
logger.debug("Sending message succeeded") logger.debug("Sending message succeeded")
self._queue.remove(msg)
await asyncio.sleep(1.9) await asyncio.sleep(1.9)
else: else:
logger.debug("Sending message failed") logger.debug("Sending message failed")
break break
else:
self.clear() async def resend_msgs(self):
logger.debug("Queue emptied, all messages delivered") if self._queue:
self.cut_messages(num) logger.info(
"{} messages waiting to be delivered to Discord.".format(len(self._queue)))
tasks_to_run = []
for set_msgs in await self.group_by_webhook():
logger.debug(set_msgs)
tasks_to_run.append(self.send_msg_set(set_msgs))
await asyncio.gather(*tasks_to_run)
logger.debug(self._queue) logger.debug(self._queue)
await asyncio.sleep(4.0) await asyncio.sleep(4.0)

View file

@ -102,6 +102,7 @@ class Wiki:
async def process_cats(event: dict, local_wiki: Wiki, category_msgs: dict, categorize_events: dict): async def process_cats(event: dict, local_wiki: Wiki, category_msgs: dict, categorize_events: dict):
"""Process categories based on local MW messages. """
if event["type"] == "categorize": if event["type"] == "categorize":
if "commenthidden" not in event: if "commenthidden" not in event:
if local_wiki.mw_messages: if local_wiki.mw_messages: