diff --git a/extensions/base/rcgcdb.py b/extensions/base/rcgcdb.py new file mode 100644 index 0000000..3c94bec --- /dev/null +++ b/extensions/base/rcgcdb.py @@ -0,0 +1,31 @@ +import logging +import json +from src.discord.message import DiscordMessage +from src.api import formatter +from src.api.context import Context +from src.api.util import embed_helper, compact_author, create_article_path, sanitize_to_markdown + + +@formatter.embed(event="generic") +def embed_generic(ctx: Context, change: dict): + embed = DiscordMessage(ctx.message_type, ctx.event) + embed_helper(ctx, embed, change) + embed["title"] = ctx._("Unknown event `{event}`").format( + event="{type}/{action}".format(type=change.get("type", ""), action=change.get("action", ""))) + embed["url"] = create_article_path("Special:RecentChanges") + change_params = "[```json\n{params}\n```]({support})".format(params=json.dumps(change, indent=2), + support=ctx.settings["support"]) + if len(change_params) > 1000: + embed.add_field(_("Report this on the support server"), ctx.settings["support"]) + else: + embed.add_field(_("Report this on the support server"), change_params) + return embed + + +@formatter.compact(event="generic") +def compact_generic(ctx: Context, change: dict): + author, author_url = compact_author(ctx, change) + content = ctx._("Unknown event `{event}` by [{author}]({author_url}), report it on the [support server](<{support}>).").format( + event="{type}/{action}".format(type=change.get("type", ""), action=change.get("action", "")), + author=author, support=ctx.settings["support"], author_url=author_url) + return DiscordMessage(ctx.message_type, ctx.event, content=content) diff --git a/src/api/template_settings.json b/src/api/template_settings.json index 36b67f4..6f142e8 100644 --- a/src/api/template_settings.json +++ b/src/api/template_settings.json @@ -22,6 +22,7 @@ "hide_ips": false, "discord_message_cooldown": 0, "datafile_path": "data.json", + "support": "https://discord.gg/v77RTk5", "auto_suppression": { "enabled": false, "db_location": ":memory:" diff --git a/src/discord/message.py b/src/discord/message.py index 23313b2..ac02025 100644 --- a/src/discord/message.py +++ b/src/discord/message.py @@ -168,11 +168,12 @@ class MessageTooBig(BaseException): class StackedDiscordMessage(): - def __init__(self, m_type: int): + def __init__(self, m_type: int, wiki: Wiki): self.message_list: list[DiscordMessage] = [] self.length = 0 self.message_type: int = m_type # 0 for compact, 1 for embed self.discord_callback_message_ids: list[int] = [] + self.wiki: Wiki = wiki def __len__(self): return self.length diff --git a/src/discord/queue.py b/src/discord/queue.py index 00faf84..1c2f6cc 100644 --- a/src/discord/queue.py +++ b/src/discord/queue.py @@ -14,30 +14,35 @@ # along with RcGcDw. If not, see . import re -import sys import time import logging import asyncio + +import aiohttp +from aiohttp import ContentTypeError, ClientResponse + +from src.exceptions import ExhaustedDiscordBucket from src.config import settings from src.discord.message import StackedDiscordMessage, MessageTooBig -from typing import Optional, Union, Tuple, AsyncGenerator +from typing import Optional, AsyncGenerator, TYPE_CHECKING from collections import defaultdict -from src.discord.message import DiscordMessage, DiscordMessageMetadata, DiscordMessageRaw +from src.discord.message import DiscordMessage, DiscordMessageMetadata -AUTO_SUPPRESSION_ENABLED = settings.get("auto_suppression", {"enabled": False}).get("enabled") -if AUTO_SUPPRESSION_ENABLED: - from src.fileio.database import add_entry as add_message_redaction_entry +if TYPE_CHECKING: + from src.wiki import Wiki rate_limit = 0 logger = logging.getLogger("rcgcdw.discord.queue") + class QueueEntry: - def __init__(self, discord_message, webhooks): - self.discord_message: DiscordMessage = discord_message + def __init__(self, discord_message, webhooks, wiki): + self.discord_message: [DiscordMessage, StackedDiscordMessage] = discord_message self.webhooks: list[str] = webhooks self._sent_webhooks: set[str] = set() + self.wiki: Wiki = wiki def check_sent_status(self, webhook: str) -> bool: """Checks sent status for given message, if True it means that the message has been sent before to given webhook, otherwise False.""" @@ -71,6 +76,10 @@ class MessageQueue: def clear(self): self._queue.clear() + def add_messages(self, messages: list[QueueEntry]): + for message in messages: + self.add_message(message) + def add_message(self, message: QueueEntry): self._queue.append(message) @@ -104,43 +113,42 @@ class MessageQueue: async def pack_massages(self, messages: list[QueueEntry]) -> AsyncGenerator[tuple[StackedDiscordMessage, int]]: """Pack messages into StackedDiscordMessage. It's an async generator""" - current_pack = StackedDiscordMessage(0 if messages[0].discord_message.message_type == "compact" else 1) # first message + current_pack = StackedDiscordMessage(0 if messages[0].discord_message.message_type == "compact" else 1, messages[0].wiki) # first message index = -1 for index, message in enumerate(messages): message = message.discord_message try: current_pack.add_message(message) except MessageTooBig: - yield current_pack - current_pack = StackedDiscordMessage(0 if message.message_type == "compact" else 1) # next messages + yield current_pack, index-1 + current_pack = StackedDiscordMessage(0 if message.message_type == "compact" else 1, message.wiki) # next messages current_pack.add_message(message) yield current_pack, index async def send_msg_set(self, msg_set: tuple[str, list[QueueEntry]]): webhook_url, messages = msg_set # str("daosdkosakda/adkahfwegr34", list(DiscordMessage, DiscordMessage, DiscordMessage) async for msg, index in self.pack_massages(messages): + client_error = False if self.global_rate_limit: return # if we are globally rate limited just wait for first gblocked request to finish # Verify that message hasn't been sent before - status = await send_to_discord_webhook(msg, webhook_url) - if status[0] < 2: - logger.debug("Sending message succeeded") - for queue_message in messages[max(index-10, 0):index]: # mark messages as delivered - queue_message.confirm_sent_status(webhook_url) - logger.debug("Current rate limit time: {}".format(status[1])) - if status[1] is not None: - await asyncio.sleep(float(status[1])) # note, the timer on the last request won't matter that much since it's separate task and for the time of sleep it will give control to other tasks - break - elif status[0] == 5: - if status[1]["global"] is True: - logger.debug( - "Global rate limit has been detected. Setting global_rate_limit to true and awaiting punishment.") + # noinspection PyTypeChecker + try: + status = await send_to_discord_webhook(msg, webhook_url) + except aiohttp.ClientError: + client_error = True + except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError): + # Retry on next Discord message sent attempt + return + except ExhaustedDiscordBucket as e: + if e.is_global: self.global_rate_limit = True - await asyncio.sleep(status[1]["retry_after"] / 1000) - break - else: - logger.debug("Sending message failed") - break + await asyncio.sleep(e.remaining / 1000) + return + for queue_message in messages[max(index-len(msg.message_list), 0):index]: # mark messages as delivered + queue_message.confirm_sent_status(webhook_url) + if client_error is False: + msg.wiki.add_message(msg) async def resend_msgs(self): self.global_rate_limit = False @@ -160,93 +168,58 @@ class MessageQueue: messagequeue = MessageQueue() -def handle_discord_http(code, formatted_embed, result): +def handle_discord_http(code: int, formatted_embed: str, result: ClientResponse): if 300 > code > 199: # message went through return 0 elif code == 400: # HTTP BAD REQUEST result.status_code, data, result, header logger.error( "Following message has been rejected by Discord, please submit a bug on our bugtracker adding it:") logger.error(formatted_embed) - logger.error(result.text) - return 1 + logger.error(result.text()) + raise aiohttp.ClientError("Message rejected.") elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND - if result.request.method == "POST": # Ignore not found for DELETE and PATCH requests since the message could already be removed by admin + if result.method == "POST": # Ignore not found for DELETE and PATCH requests since the message could already be removed by admin logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.") - remove_webhook_maybe() + # TODO remove_webhook_maybe() + raise aiohttp.ClientError("Message sent to bad webhook.") else: return 0 elif code == 429: logger.error("We are sending too many requests to the Discord, slowing down...") - return 2 + if "x-ratelimit-global" in result.headers.keys(): + raise ExhaustedDiscordBucket(remaining=int(result.headers.get("x-ratelimit-reset-after")), is_global=True) + raise ExhaustedDiscordBucket(remaining=int(result.headers.get("x-ratelimit-reset-after")), is_global=False) elif 499 < code < 600: logger.error( "Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format( code)) - return 3 + raise aiohttp.ServerConnectionError() else: logger.error("There was an unexpected HTTP code returned from Discord: {}".format(code)) - return 1 + raise aiohttp.ServerConnectionError() -def send_to_discord_webhook(message: [StackedDiscordMessage, DiscordMessage]): +async def send_to_discord_webhook(message: [StackedDiscordMessage, DiscordMessageMetadata], webhook_path: str): header = settings["header"] header['Content-Type'] = 'application/json' - standard_args = dict(headers=header) - if isinstance(message, StackedDiscordMessage): - req = - else: - message.metadata.method - if metadata.method == "POST": - req = requests.Request("POST", data.webhook_url+"?wait=" + ("true" if AUTO_SUPPRESSION_ENABLED else "false"), data=repr(data), **standard_args) - elif metadata.method == "DELETE": - req = requests.Request("DELETE", metadata.webhook_url, **standard_args) - elif metadata.method == "PATCH": - req = requests.Request("PATCH", data.webhook_url, data=repr(data), **standard_args) - try: - time.sleep(rate_limit) - rate_limit = 0 - req = req.prepare() - result = requests.Session().send(req, timeout=10) - update_ratelimit(result) - if AUTO_SUPPRESSION_ENABLED and metadata.method == "POST": - if 199 < result.status_code < 300: # check if positive error log + header['X-RateLimit-Precision'] = "millisecond" + async with aiohttp.ClientSession(headers=header, timeout=3.0) as session: + if isinstance(message, StackedDiscordMessage): + async with session.post(f"https://discord.com/api/webhooks/{webhook_path}?wait=true", data=repr(message)) as resp: try: - add_message_redaction_entry(*metadata.dump_ids(), repr(data), result.json().get("id")) + resp_json = await resp.json() + # Add Discord Message ID which we can later use to delete/redact messages if we want + message.discord_callback_message_ids.append(resp_json["id"]) + except KeyError: + raise aiohttp.ServerConnectionError(f"Could not get the ID from POST request with message data. Data: {await resp.text()}") + except ContentTypeError: + logger.exception("Could not receive message ID from Discord due to invalid MIME type of response.") except ValueError: - logger.error("Couldn't get json of result of sending Discord message.") - else: + logger.exception(f"Could not decode JSON response from Discord. Response: {await resp.text()}]") + return handle_discord_http(resp.status, repr(message), resp) + elif message.method == "DELETE": + async with session.request(method=message.method, url=f"https://discord.com/api/webhooks/{webhook_path}") as resp: + pass + elif message.method == "PATCH": + async with session.request(method=message.method, url=f"https://discord.com/api/webhooks/{webhook_path}", data=repr(message)) as resp: pass - except requests.exceptions.Timeout: - logger.warning("Timeouted while sending data to the webhook.") - return 3 - except requests.exceptions.ConnectionError: - logger.warning("Connection error while sending the data to a webhook") - return 3 - else: - return handle_discord_http(result.status_code, data, result) - - -def send_to_discord(data: Optional[DiscordMessage], meta: DiscordMessageMetadata): - if data is not None: - for regex in settings["disallow_regexes"]: - if data.webhook_object.get("content", None): - if re.search(re.compile(regex), data.webhook_object["content"]): - logger.info("Message {} has been rejected due to matching filter ({}).".format(data.webhook_object["content"], regex)) - return # discard the message without anything - else: - for to_check in [data.webhook_object.get("description", ""), data.webhook_object.get("title", ""), *[x["value"] for x in data["fields"]], data.webhook_object.get("author", {"name": ""}).get("name", "")]: - if re.search(re.compile(regex), to_check): - logger.info("Message \"{}\" has been rejected due to matching filter ({}).".format( - to_check, regex)) - return # discard the message without anything - if messagequeue: - messagequeue.add_message((data, meta)) - else: - code = send_to_discord_webhook(data, metadata=meta) - if code == 3: - messagequeue.add_message((data, meta)) - elif code == 2: - time.sleep(5.0) - messagequeue.add_message((data, meta)) - elif code is None or code < 2: - pass \ No newline at end of file diff --git a/src/exceptions.py b/src/exceptions.py index d7c0607..c093a61 100644 --- a/src/exceptions.py +++ b/src/exceptions.py @@ -62,4 +62,10 @@ class NoDomain(Exception): class WikiExists(Exception): """When given wiki already exists""" - pass \ No newline at end of file + pass + + +class ExhaustedDiscordBucket(BaseException): + def __init__(self, remaining: int, is_global: bool): + self.remaining = remaining + self.is_global = is_global diff --git a/src/wiki.py b/src/wiki.py index b80d887..603f356 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -1,31 +1,25 @@ from __future__ import annotations -import concurrent.futures import functools import json import time -from dataclasses import dataclass import re import logging, aiohttp +import asyncio from functools import cache from api.util import default_message from src.discord.queue import messagequeue, QueueEntry from mw_messages import MWMessages from src.exceptions import * -from src.database import db from src.queue_handler import DBHandler -from src.formatters.rc import embed_formatter, compact_formatter from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter from src.api.hooks import formatter_hooks from src.api.client import Client from src.api.context import Context -from src.discord.message import DiscordMessage, DiscordMessageMetadata -from src.misc import parse_link +from src.discord.message import DiscordMessage, DiscordMessageMetadata, StackedDiscordMessage from src.i18n import langs -from src.wiki_ratelimiter import RateLimiter -from statistics import Statistics, Log, LogType -import asyncio +from src.statistics import Statistics, Log, LogType from src.config import settings # noinspection PyPackageRequirements from bs4 import BeautifulSoup @@ -41,6 +35,8 @@ wiki_reamoval_reasons = {410: _("wiki deleted"), 404: _("wiki deleted"), 401: _( if TYPE_CHECKING: from src.domain import Domain +MESSAGE_LIMIT = settings.get("message_limit", 30) + class Wiki: def __init__(self, script_url: str, rc_id: Optional[int], discussion_id: Optional[int]): self.script_url: str = script_url @@ -52,7 +48,7 @@ class Wiki: self.domain: Optional[Domain] = None self.targets: Optional[defaultdict[Settings, list[str]]] = None self.client: Client = Client(formatter_hooks, self) - self.message_history = + self.message_history: list[StackedDiscordMessage] = list() self.update_targets() @@ -76,6 +72,11 @@ class Wiki: # result = await connection.execute('DELETE FROM rcgcdw WHERE wiki = $1', self.script_url) # logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(result, self.script_url)) + def add_message(self, message: StackedDiscordMessage): + self.message_history.append(message) + if len(self.message_history) > MESSAGE_LIMIT*len(self.targets): + self.message_history = self.message_history[len(self.message_history)-MESSAGE_LIMIT*len(self.targets):] + def set_domain(self, domain: Domain): self.domain = domain @@ -297,8 +298,6 @@ async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, displ try: discord_message: Optional[DiscordMessage] = await asyncio.get_event_loop().run_in_executor( None, functools.partial(default_message("suppressed", display_options.display, formatter_hooks), context, change)) - except NoFormatter: - return except: if settings.get("error_tolerance", 1) > 0: discord_message: Optional[DiscordMessage] = None # It's handled by send_to_discord, we still want other code to run @@ -419,44 +418,7 @@ async def process_cats(event: dict, local_wiki: Wiki, categorize_events: dict): # mw_msgs[key] = msgs # it may be a little bit messy for sure, however I don't expect any reason to remove mw_msgs entries by one # local_wiki.mw_messages = key - -# db_wiki: webhook, wiki, lang, display, rcid, postid -async def essential_info(change: dict, changed_categories, local_wiki: Wiki, target: tuple, paths: tuple, request: dict, - rate_limiter: RateLimiter) -> discord.discord.DiscordMessage: - """Prepares essential information for both embed and compact message format.""" - _ = langs[target[0][0]]["wiki"].gettext - changed_categories = changed_categories.get(change["revid"], None) - #logger.debug("List of categories in essential_info: {}".format(changed_categories)) - appearance_mode = embed_formatter if target[0][1] > 0 else compact_formatter - if "actionhidden" in change or "suppressed" in change: # if event is hidden using suppression - await appearance_mode("suppressed", change, "", changed_categories, local_wiki, target, paths, rate_limiter) - return - if "commenthidden" not in change: - parsed_comment = parse_link(paths[3], change["parsedcomment"]) - else: - parsed_comment = _("~~hidden~~") - if not parsed_comment: - parsed_comment = None - if change["type"] in ["edit", "new"]: - if "userhidden" in change: - change["user"] = _("hidden") - identification_string = change["type"] - elif change["type"] == "log": - identification_string = "{logtype}/{logaction}".format(logtype=change["logtype"], logaction=change["logaction"]) - elif change["type"] == "categorize": - return - else: - identification_string = change["type"] - additional_data = {"namespaces": request["query"]["namespaces"], "tags": {}} - for tag in request["query"]["tags"]: - try: - additional_data["tags"][tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text() - except KeyError: - additional_data["tags"][tag["name"]] = None # Tags with no displ - return await appearance_mode(identification_string, change, parsed_comment, changed_categories, local_wiki, target, paths, rate_limiter, additional_data=additional_data) - - -async def essential_feeds(change: dict, comment_pages: dict, db_wiki, target: tuple) -> discord.discord.DiscordMessage: +async def essential_feeds(change: dict, comment_pages: dict, db_wiki, target: tuple) -> discord.DiscordMessage: """Prepares essential information for both embed and compact message format.""" appearance_mode = feeds_embed_formatter if target[0][1] > 0 else feeds_compact_formatter identification_string = change["_embedded"]["thread"][0]["containerType"]