This commit is contained in:
Frisk 2022-08-31 14:30:41 +02:00
parent af3cf6440e
commit 0b59ba8b5c
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
6 changed files with 120 additions and 146 deletions

31
extensions/base/rcgcdb.py Normal file
View file

@ -0,0 +1,31 @@
import logging
import json
from src.discord.message import DiscordMessage
from src.api import formatter
from src.api.context import Context
from src.api.util import embed_helper, compact_author, create_article_path, sanitize_to_markdown
@formatter.embed(event="generic")
def embed_generic(ctx: Context, change: dict):
embed = DiscordMessage(ctx.message_type, ctx.event)
embed_helper(ctx, embed, change)
embed["title"] = ctx._("Unknown event `{event}`").format(
event="{type}/{action}".format(type=change.get("type", ""), action=change.get("action", "")))
embed["url"] = create_article_path("Special:RecentChanges")
change_params = "[```json\n{params}\n```]({support})".format(params=json.dumps(change, indent=2),
support=ctx.settings["support"])
if len(change_params) > 1000:
embed.add_field(_("Report this on the support server"), ctx.settings["support"])
else:
embed.add_field(_("Report this on the support server"), change_params)
return embed
@formatter.compact(event="generic")
def compact_generic(ctx: Context, change: dict):
author, author_url = compact_author(ctx, change)
content = ctx._("Unknown event `{event}` by [{author}]({author_url}), report it on the [support server](<{support}>).").format(
event="{type}/{action}".format(type=change.get("type", ""), action=change.get("action", "")),
author=author, support=ctx.settings["support"], author_url=author_url)
return DiscordMessage(ctx.message_type, ctx.event, content=content)

View file

@ -22,6 +22,7 @@
"hide_ips": false,
"discord_message_cooldown": 0,
"datafile_path": "data.json",
"support": "https://discord.gg/v77RTk5",
"auto_suppression": {
"enabled": false,
"db_location": ":memory:"

View file

@ -168,11 +168,12 @@ class MessageTooBig(BaseException):
class StackedDiscordMessage():
def __init__(self, m_type: int):
def __init__(self, m_type: int, wiki: Wiki):
self.message_list: list[DiscordMessage] = []
self.length = 0
self.message_type: int = m_type # 0 for compact, 1 for embed
self.discord_callback_message_ids: list[int] = []
self.wiki: Wiki = wiki
def __len__(self):
return self.length

View file

@ -14,30 +14,35 @@
# along with RcGcDw. If not, see <http://www.gnu.org/licenses/>.
import re
import sys
import time
import logging
import asyncio
import aiohttp
from aiohttp import ContentTypeError, ClientResponse
from src.exceptions import ExhaustedDiscordBucket
from src.config import settings
from src.discord.message import StackedDiscordMessage, MessageTooBig
from typing import Optional, Union, Tuple, AsyncGenerator
from typing import Optional, AsyncGenerator, TYPE_CHECKING
from collections import defaultdict
from src.discord.message import DiscordMessage, DiscordMessageMetadata, DiscordMessageRaw
from src.discord.message import DiscordMessage, DiscordMessageMetadata
AUTO_SUPPRESSION_ENABLED = settings.get("auto_suppression", {"enabled": False}).get("enabled")
if AUTO_SUPPRESSION_ENABLED:
from src.fileio.database import add_entry as add_message_redaction_entry
if TYPE_CHECKING:
from src.wiki import Wiki
rate_limit = 0
logger = logging.getLogger("rcgcdw.discord.queue")
class QueueEntry:
def __init__(self, discord_message, webhooks):
self.discord_message: DiscordMessage = discord_message
def __init__(self, discord_message, webhooks, wiki):
self.discord_message: [DiscordMessage, StackedDiscordMessage] = discord_message
self.webhooks: list[str] = webhooks
self._sent_webhooks: set[str] = set()
self.wiki: Wiki = wiki
def check_sent_status(self, webhook: str) -> bool:
"""Checks sent status for given message, if True it means that the message has been sent before to given webhook, otherwise False."""
@ -71,6 +76,10 @@ class MessageQueue:
def clear(self):
self._queue.clear()
def add_messages(self, messages: list[QueueEntry]):
for message in messages:
self.add_message(message)
def add_message(self, message: QueueEntry):
self._queue.append(message)
@ -104,43 +113,42 @@ class MessageQueue:
async def pack_massages(self, messages: list[QueueEntry]) -> AsyncGenerator[tuple[StackedDiscordMessage, int]]:
"""Pack messages into StackedDiscordMessage. It's an async generator"""
current_pack = StackedDiscordMessage(0 if messages[0].discord_message.message_type == "compact" else 1) # first message
current_pack = StackedDiscordMessage(0 if messages[0].discord_message.message_type == "compact" else 1, messages[0].wiki) # first message
index = -1
for index, message in enumerate(messages):
message = message.discord_message
try:
current_pack.add_message(message)
except MessageTooBig:
yield current_pack
current_pack = StackedDiscordMessage(0 if message.message_type == "compact" else 1) # next messages
yield current_pack, index-1
current_pack = StackedDiscordMessage(0 if message.message_type == "compact" else 1, message.wiki) # next messages
current_pack.add_message(message)
yield current_pack, index
async def send_msg_set(self, msg_set: tuple[str, list[QueueEntry]]):
webhook_url, messages = msg_set # str("daosdkosakda/adkahfwegr34", list(DiscordMessage, DiscordMessage, DiscordMessage)
async for msg, index in self.pack_massages(messages):
client_error = False
if self.global_rate_limit:
return # if we are globally rate limited just wait for first gblocked request to finish
# Verify that message hasn't been sent before
status = await send_to_discord_webhook(msg, webhook_url)
if status[0] < 2:
logger.debug("Sending message succeeded")
for queue_message in messages[max(index-10, 0):index]: # mark messages as delivered
queue_message.confirm_sent_status(webhook_url)
logger.debug("Current rate limit time: {}".format(status[1]))
if status[1] is not None:
await asyncio.sleep(float(status[1])) # note, the timer on the last request won't matter that much since it's separate task and for the time of sleep it will give control to other tasks
break
elif status[0] == 5:
if status[1]["global"] is True:
logger.debug(
"Global rate limit has been detected. Setting global_rate_limit to true and awaiting punishment.")
# noinspection PyTypeChecker
try:
status = await send_to_discord_webhook(msg, webhook_url)
except aiohttp.ClientError:
client_error = True
except (aiohttp.ServerConnectionError, aiohttp.ServerTimeoutError):
# Retry on next Discord message sent attempt
return
except ExhaustedDiscordBucket as e:
if e.is_global:
self.global_rate_limit = True
await asyncio.sleep(status[1]["retry_after"] / 1000)
break
else:
logger.debug("Sending message failed")
break
await asyncio.sleep(e.remaining / 1000)
return
for queue_message in messages[max(index-len(msg.message_list), 0):index]: # mark messages as delivered
queue_message.confirm_sent_status(webhook_url)
if client_error is False:
msg.wiki.add_message(msg)
async def resend_msgs(self):
self.global_rate_limit = False
@ -160,93 +168,58 @@ class MessageQueue:
messagequeue = MessageQueue()
def handle_discord_http(code, formatted_embed, result):
def handle_discord_http(code: int, formatted_embed: str, result: ClientResponse):
if 300 > code > 199: # message went through
return 0
elif code == 400: # HTTP BAD REQUEST result.status_code, data, result, header
logger.error(
"Following message has been rejected by Discord, please submit a bug on our bugtracker adding it:")
logger.error(formatted_embed)
logger.error(result.text)
return 1
logger.error(result.text())
raise aiohttp.ClientError("Message rejected.")
elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND
if result.request.method == "POST": # Ignore not found for DELETE and PATCH requests since the message could already be removed by admin
if result.method == "POST": # Ignore not found for DELETE and PATCH requests since the message could already be removed by admin
logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.")
remove_webhook_maybe()
# TODO remove_webhook_maybe()
raise aiohttp.ClientError("Message sent to bad webhook.")
else:
return 0
elif code == 429:
logger.error("We are sending too many requests to the Discord, slowing down...")
return 2
if "x-ratelimit-global" in result.headers.keys():
raise ExhaustedDiscordBucket(remaining=int(result.headers.get("x-ratelimit-reset-after")), is_global=True)
raise ExhaustedDiscordBucket(remaining=int(result.headers.get("x-ratelimit-reset-after")), is_global=False)
elif 499 < code < 600:
logger.error(
"Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format(
code))
return 3
raise aiohttp.ServerConnectionError()
else:
logger.error("There was an unexpected HTTP code returned from Discord: {}".format(code))
return 1
raise aiohttp.ServerConnectionError()
def send_to_discord_webhook(message: [StackedDiscordMessage, DiscordMessage]):
async def send_to_discord_webhook(message: [StackedDiscordMessage, DiscordMessageMetadata], webhook_path: str):
header = settings["header"]
header['Content-Type'] = 'application/json'
standard_args = dict(headers=header)
if isinstance(message, StackedDiscordMessage):
req =
else:
message.metadata.method
if metadata.method == "POST":
req = requests.Request("POST", data.webhook_url+"?wait=" + ("true" if AUTO_SUPPRESSION_ENABLED else "false"), data=repr(data), **standard_args)
elif metadata.method == "DELETE":
req = requests.Request("DELETE", metadata.webhook_url, **standard_args)
elif metadata.method == "PATCH":
req = requests.Request("PATCH", data.webhook_url, data=repr(data), **standard_args)
try:
time.sleep(rate_limit)
rate_limit = 0
req = req.prepare()
result = requests.Session().send(req, timeout=10)
update_ratelimit(result)
if AUTO_SUPPRESSION_ENABLED and metadata.method == "POST":
if 199 < result.status_code < 300: # check if positive error log
header['X-RateLimit-Precision'] = "millisecond"
async with aiohttp.ClientSession(headers=header, timeout=3.0) as session:
if isinstance(message, StackedDiscordMessage):
async with session.post(f"https://discord.com/api/webhooks/{webhook_path}?wait=true", data=repr(message)) as resp:
try:
add_message_redaction_entry(*metadata.dump_ids(), repr(data), result.json().get("id"))
resp_json = await resp.json()
# Add Discord Message ID which we can later use to delete/redact messages if we want
message.discord_callback_message_ids.append(resp_json["id"])
except KeyError:
raise aiohttp.ServerConnectionError(f"Could not get the ID from POST request with message data. Data: {await resp.text()}")
except ContentTypeError:
logger.exception("Could not receive message ID from Discord due to invalid MIME type of response.")
except ValueError:
logger.error("Couldn't get json of result of sending Discord message.")
else:
logger.exception(f"Could not decode JSON response from Discord. Response: {await resp.text()}]")
return handle_discord_http(resp.status, repr(message), resp)
elif message.method == "DELETE":
async with session.request(method=message.method, url=f"https://discord.com/api/webhooks/{webhook_path}") as resp:
pass
elif message.method == "PATCH":
async with session.request(method=message.method, url=f"https://discord.com/api/webhooks/{webhook_path}", data=repr(message)) as resp:
pass
except requests.exceptions.Timeout:
logger.warning("Timeouted while sending data to the webhook.")
return 3
except requests.exceptions.ConnectionError:
logger.warning("Connection error while sending the data to a webhook")
return 3
else:
return handle_discord_http(result.status_code, data, result)
def send_to_discord(data: Optional[DiscordMessage], meta: DiscordMessageMetadata):
if data is not None:
for regex in settings["disallow_regexes"]:
if data.webhook_object.get("content", None):
if re.search(re.compile(regex), data.webhook_object["content"]):
logger.info("Message {} has been rejected due to matching filter ({}).".format(data.webhook_object["content"], regex))
return # discard the message without anything
else:
for to_check in [data.webhook_object.get("description", ""), data.webhook_object.get("title", ""), *[x["value"] for x in data["fields"]], data.webhook_object.get("author", {"name": ""}).get("name", "")]:
if re.search(re.compile(regex), to_check):
logger.info("Message \"{}\" has been rejected due to matching filter ({}).".format(
to_check, regex))
return # discard the message without anything
if messagequeue:
messagequeue.add_message((data, meta))
else:
code = send_to_discord_webhook(data, metadata=meta)
if code == 3:
messagequeue.add_message((data, meta))
elif code == 2:
time.sleep(5.0)
messagequeue.add_message((data, meta))
elif code is None or code < 2:
pass

View file

@ -62,4 +62,10 @@ class NoDomain(Exception):
class WikiExists(Exception):
"""When given wiki already exists"""
pass
pass
class ExhaustedDiscordBucket(BaseException):
def __init__(self, remaining: int, is_global: bool):
self.remaining = remaining
self.is_global = is_global

View file

@ -1,31 +1,25 @@
from __future__ import annotations
import concurrent.futures
import functools
import json
import time
from dataclasses import dataclass
import re
import logging, aiohttp
import asyncio
from functools import cache
from api.util import default_message
from src.discord.queue import messagequeue, QueueEntry
from mw_messages import MWMessages
from src.exceptions import *
from src.database import db
from src.queue_handler import DBHandler
from src.formatters.rc import embed_formatter, compact_formatter
from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter
from src.api.hooks import formatter_hooks
from src.api.client import Client
from src.api.context import Context
from src.discord.message import DiscordMessage, DiscordMessageMetadata
from src.misc import parse_link
from src.discord.message import DiscordMessage, DiscordMessageMetadata, StackedDiscordMessage
from src.i18n import langs
from src.wiki_ratelimiter import RateLimiter
from statistics import Statistics, Log, LogType
import asyncio
from src.statistics import Statistics, Log, LogType
from src.config import settings
# noinspection PyPackageRequirements
from bs4 import BeautifulSoup
@ -41,6 +35,8 @@ wiki_reamoval_reasons = {410: _("wiki deleted"), 404: _("wiki deleted"), 401: _(
if TYPE_CHECKING:
from src.domain import Domain
MESSAGE_LIMIT = settings.get("message_limit", 30)
class Wiki:
def __init__(self, script_url: str, rc_id: Optional[int], discussion_id: Optional[int]):
self.script_url: str = script_url
@ -52,7 +48,7 @@ class Wiki:
self.domain: Optional[Domain] = None
self.targets: Optional[defaultdict[Settings, list[str]]] = None
self.client: Client = Client(formatter_hooks, self)
self.message_history =
self.message_history: list[StackedDiscordMessage] = list()
self.update_targets()
@ -76,6 +72,11 @@ class Wiki:
# result = await connection.execute('DELETE FROM rcgcdw WHERE wiki = $1', self.script_url)
# logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(result, self.script_url))
def add_message(self, message: StackedDiscordMessage):
self.message_history.append(message)
if len(self.message_history) > MESSAGE_LIMIT*len(self.targets):
self.message_history = self.message_history[len(self.message_history)-MESSAGE_LIMIT*len(self.targets):]
def set_domain(self, domain: Domain):
self.domain = domain
@ -297,8 +298,6 @@ async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, displ
try:
discord_message: Optional[DiscordMessage] = await asyncio.get_event_loop().run_in_executor(
None, functools.partial(default_message("suppressed", display_options.display, formatter_hooks), context, change))
except NoFormatter:
return
except:
if settings.get("error_tolerance", 1) > 0:
discord_message: Optional[DiscordMessage] = None # It's handled by send_to_discord, we still want other code to run
@ -419,44 +418,7 @@ async def process_cats(event: dict, local_wiki: Wiki, categorize_events: dict):
# mw_msgs[key] = msgs # it may be a little bit messy for sure, however I don't expect any reason to remove mw_msgs entries by one
# local_wiki.mw_messages = key
# db_wiki: webhook, wiki, lang, display, rcid, postid
async def essential_info(change: dict, changed_categories, local_wiki: Wiki, target: tuple, paths: tuple, request: dict,
rate_limiter: RateLimiter) -> discord.discord.DiscordMessage:
"""Prepares essential information for both embed and compact message format."""
_ = langs[target[0][0]]["wiki"].gettext
changed_categories = changed_categories.get(change["revid"], None)
#logger.debug("List of categories in essential_info: {}".format(changed_categories))
appearance_mode = embed_formatter if target[0][1] > 0 else compact_formatter
if "actionhidden" in change or "suppressed" in change: # if event is hidden using suppression
await appearance_mode("suppressed", change, "", changed_categories, local_wiki, target, paths, rate_limiter)
return
if "commenthidden" not in change:
parsed_comment = parse_link(paths[3], change["parsedcomment"])
else:
parsed_comment = _("~~hidden~~")
if not parsed_comment:
parsed_comment = None
if change["type"] in ["edit", "new"]:
if "userhidden" in change:
change["user"] = _("hidden")
identification_string = change["type"]
elif change["type"] == "log":
identification_string = "{logtype}/{logaction}".format(logtype=change["logtype"], logaction=change["logaction"])
elif change["type"] == "categorize":
return
else:
identification_string = change["type"]
additional_data = {"namespaces": request["query"]["namespaces"], "tags": {}}
for tag in request["query"]["tags"]:
try:
additional_data["tags"][tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text()
except KeyError:
additional_data["tags"][tag["name"]] = None # Tags with no displ
return await appearance_mode(identification_string, change, parsed_comment, changed_categories, local_wiki, target, paths, rate_limiter, additional_data=additional_data)
async def essential_feeds(change: dict, comment_pages: dict, db_wiki, target: tuple) -> discord.discord.DiscordMessage:
async def essential_feeds(change: dict, comment_pages: dict, db_wiki, target: tuple) -> discord.DiscordMessage:
"""Prepares essential information for both embed and compact message format."""
appearance_mode = feeds_embed_formatter if target[0][1] > 0 else feeds_compact_formatter
identification_string = change["_embedded"]["thread"][0]["containerType"]