This commit is contained in:
Frisk 2022-08-16 12:50:49 +02:00
parent ef3dddfb0c
commit 573efaf7fa
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
6 changed files with 107 additions and 174 deletions

View file

@ -18,20 +18,22 @@ import math
import random
from collections import defaultdict
from typing import Optional
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from wiki import Wiki
with open("src/api/template_settings.json", "r") as template_json:
settings: dict = json.load(template_json)
class DiscordMessageMetadata:
def __init__(self, method, log_id = None, page_id = None, rev_id = None, webhook_url = None, new_data = None):
def __init__(self, method, log_id = None, page_id = None, rev_id = None, webhook_url = None):
self.method = method
self.page_id = page_id
self.log_id = log_id
self.rev_id = rev_id
self.webhook_url = webhook_url
self.new_data = new_data
def matches(self, other: dict):
for key, value in other.items():
@ -49,6 +51,7 @@ class DiscordMessage:
self.webhook_object = dict(allowed_mentions={"parse": []})
self.length = 0
self.metadata: Optional[DiscordMessageMetadata] = None
self.wiki: Optional[Wiki] = None
if message_type == "embed":
self.__setup_embed()
@ -160,11 +163,16 @@ class DiscordMessageRaw(DiscordMessage):
self.webhook_url = webhook_url
class MessageTooBig(BaseException):
pass
class StackedDiscordMessage():
def __init__(self, m_type: int):
self.message_list: list[DiscordMessage] = []
self.length = 0
self.message_type: int = m_type # 0 for compact, 1 for embed
self.discord_callback_message_ids: list[int] = []
def __len__(self):
return self.length
@ -182,7 +190,7 @@ class StackedDiscordMessage():
return [(num, message) for num, message in enumerate(self.message_list)]
def add_message(self, message: DiscordMessage):
if len(self) + len(message) > 6000:
if len(self) + len(message) > 6000 or len(self.message_list) > 9:
raise MessageTooBig
self.length += len(message)
self.message_list.append(message)

View file

@ -1,4 +1,5 @@
from collections import OrderedDict
from src.config import settings
import src.wiki

View file

@ -35,6 +35,7 @@ class Domain:
return len(self.wikis)
def destroy(self):
"""Destroy the domain do all of the tasks that should make sure there is no leftovers before being collected by GC"""
if self.irc:
self.irc.connection.disconnect("Leaving")
if self.discussions_handler:
@ -43,9 +44,11 @@ class Domain:
self.task.cancel()
def get_wiki(self, item, default=None) -> Optional[src.wiki.Wiki]:
"""Return a wiki with given domain name"""
return self.wikis.get(item, default)
def set_irc(self, irc_client: src.irc_feed.AioIRCCat):
"""Sets IRC"""
self.irc = irc_client
def stop_task(self):
@ -53,6 +56,7 @@ class Domain:
self.task.cancel() # Be aware that cancelling the task may take time
def run_domain(self):
"""Starts asyncio task for domain"""
if not self.task or self.task.cancelled():
self.task = asyncio.create_task(self.run_wiki_check(), name=self.name)
else:
@ -109,6 +113,7 @@ class Domain:
return max((-25 * queue_length) + 150, 1)
async def run_wiki_check(self):
"""Runs appropriate scheduler depending on existence of IRC"""
if self.irc:
while True:
await self.irc_scheduler()

View file

@ -55,7 +55,7 @@ class AioIRCCat(irc.client_aio.AioSimpleIRCClient):
# print(message)
url = urlparse(message)
full_url = "https://"+url.netloc + recognize_langs(url.path)
wiki = self.domain.get_wiki(full_url)
wiki = self.domain.get_wiki(full_url) # TODO Perhaps something less performance hurting?
if wiki and wiki.rc_id != -1:
self.updated_wikis.add(full_url)
logger.debug("New website appended to the list! {}".format(full_url))

View file

@ -1,10 +1,21 @@
import asyncio, logging, aiohttp
from src.discord import send_to_discord_webhook, DiscordMessage, StackedDiscordMessage
import typing
from src.discord.message import DiscordMessage, StackedDiscordMessage, MessageTooBig
from src.config import settings
from src.exceptions import EmbedListFull
from collections import defaultdict
logger = logging.getLogger("rcgcdw.msgqueue")
class QueueEntry:
def __init__(self, discord_message, webhooks):
self.discord_message: DiscordMessage = discord_message
self.webhooks: list[str] = webhooks
def __iter__(self):
return iter(self.webhooks)
class MessageQueue:
"""Message queue class for undelivered messages"""
def __init__(self):
@ -23,9 +34,9 @@ class MessageQueue:
def clear(self):
self._queue.clear()
def add_message(self, message):
self._queue.append(message)
logger.debug("Adding new message")
def add_messages(self, messages: list[QueueEntry]):
self._queue.extend(messages)
logger.debug("Adding new messages")
#
# def replace_message(self, to_replace: DiscordMessage, with_replace: StackedDiscordMessage):
# try:
@ -36,7 +47,6 @@ class MessageQueue:
def cut_messages(self, item_num):
self._queue = self._queue[item_num:]
async def group_by_webhook(self): # TODO Change into iterable
"""Group Discord messages in the queue by the dictionary, allowing to send multiple messages to different
webhooks at the same time avoiding ratelimits per Discord webhook route."""
@ -48,9 +58,21 @@ class MessageQueue:
message_dict[webhook].append(msg) # defaultdict{"dadibadyvbdmadgqueh23/dihjd8agdandashd": [DiscordMessage, DiscordMessage]}
return message_dict.items() # dict_items([('daosdkosakda/adkahfwegr34', [DiscordMessage]), ('daosdkosakda/adkahfwegr33', [DiscordMessage, DiscordMessage])])
async def pack_massages(self, messages: list[DiscordMessage]) -> typing.AsyncGenerator:
"""Pack messages into StackedDiscordMessage. It's an async generator"""
current_pack = StackedDiscordMessage(0 if messages[0].message_type == "compact" else 1) # first message
for message in messages:
try:
current_pack.add_message(message)
except MessageTooBig:
yield current_pack
current_pack = StackedDiscordMessage(0 if message.message_type == "compact" else 1) # next messages
current_pack.add_message(message)
yield current_pack
async def send_msg_set(self, msg_set: tuple):
webhook_url, messages = msg_set # str("daosdkosakda/adkahfwegr34", list(DiscordMessage, DiscordMessage, DiscordMessage)
for msg in messages:
async for msg in self.pack_massages(messages):
if self.global_rate_limit:
return # if we are globally rate limited just wait for first gblocked request to finish
status = await send_to_discord_webhook(msg, webhook_url)

View file

@ -10,6 +10,7 @@ import logging, aiohttp
from functools import cache
from api.util import default_message
from msgqueue import QueueEntry, messagequeue
from mw_messages import MWMessages
from src.exceptions import *
from src.database import db
@ -188,8 +189,7 @@ class Wiki:
"amenableparser": 1, "amincludelocal": 1, "siprop": "namespaces|general"})
else:
params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges",
"meta": "siteinfo", "utf8": 1,
"tglimit": "max", "rcshow": "!bot", "tgprop": "displayname",
"meta": "siteinfo", "utf8": 1, "rcshow": "!bot",
"rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user",
"rclimit": amount, "rctype": "edit|new|log|categorize", "siprop": "namespaces|general"})
try:
@ -200,7 +200,7 @@ class Wiki:
return response
async def scan(self, amount=10):
"""Fetches recent changes of a wiki
"""Main track of fetching RecentChanges of a wiki.
:raises WikiServerError
"""
@ -209,21 +209,13 @@ class Wiki:
request = await self.fetch_wiki(amount=amount)
self.client.last_request = request
except WikiServerError as e:
# If WikiServerError comes up 2 times in recent 2 minutes, this will reraise the exception, otherwise waits 2 seconds
self.statistics.update(Log(type=LogType.CONNECTION_ERROR, title=str(e.exception)))
if self.statistics.recent_connection_errors() > 1:
raise
await asyncio.sleep(2.0)
if not self.mw_messages:
# TODO Split into other function
mw_messages = request.get("query", {}).get("allmessages", [])
final_mw_messages = dict()
for msg in mw_messages:
if "missing" not in msg: # ignore missing strings
final_mw_messages[msg["name"]] = re.sub(r'\[\[.*?]]', '', msg["*"])
else:
logger.warning("Could not fetch the MW message translation for: {}".format(msg["name"]))
self.mw_messages = MWMessages(final_mw_messages)
# TODO Split into other function
process_cachable(request, self)
try:
recent_changes = request["query"]["recentchanges"]
recent_changes.reverse()
@ -252,19 +244,17 @@ class Wiki:
break
await process_cats(change, self, categorize_events)
else: # adequate amount of changes
for tag in request["query"]["tags"]:
try:
self.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text()
except KeyError:
self.tags[tag["name"]] = None
message_list = defaultdict(list)
message_list = [] # Collect all messages so they can be efficiently merged in Discord message sender
for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up
if change["rcid"] > self.rc_id:
if highest_id is None or change["rcid"] > highest_id: # make sure that the highest_rc is really highest rcid but do allow other entries with potentially lesser rcids come after without breaking the cycle
highest_id = change["rcid"]
for combination, webhooks in self.targets.items():
message = await rc_processor(self, change, categorize_events, combination, webhooks)
break
message.wiki = self
message_list.append(QueueEntry(message, webhooks))
messagequeue.add_messages(message_list)
return
@cache
def prepare_settings(display_mode: int) -> dict:
@ -276,7 +266,27 @@ def prepare_settings(display_mode: int) -> dict:
return template
def process_cachable(response: dict, wiki_object: Wiki) -> None:
"""This function processes cachable objects such as MediaWiki system messages and wiki tag display names to be used
for processing of DiscordMessages and saves them in a wiki object."""
mw_messages = response.get("query", {}).get("allmessages", [])
final_mw_messages = dict()
for msg in mw_messages:
if "missing" not in msg: # ignore missing strings
final_mw_messages[msg["name"]] = re.sub(r'\[\[.*?]]', '', msg["*"])
else:
logger.warning("Could not fetch the MW message translation for: {}".format(msg["name"]))
wiki_object.mw_messages = MWMessages(final_mw_messages)
for tag in response["query"]["tags"]:
try:
wiki_object.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text()
except KeyError:
wiki_object.tags[tag["name"]] = None
async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, display_options: namedtuple("Settings", ["lang", "display"]), webhooks: list) -> DiscordMessage:
"""This function takes more vital information, communicates with a formatter and constructs DiscordMessage with it.
It creates DiscordMessageMetadata object, LinkParser and Context. Prepares a comment """
from src.misc import LinkParser
LinkParser = LinkParser()
metadata = DiscordMessageMetadata("POST", rev_id=change.get("revid", None), log_id=change.get("logid", None),
@ -353,120 +363,6 @@ async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, displ
return discord_message
@dataclass
class Wiki_old:
mw_messages: int = None
fail_times: int = 0 # corresponding to amount of times connection with wiki failed for client reasons (400-499)
session: aiohttp.ClientSession = None
rc_active: int = 0
last_check: float = 0.0
last_discussion_check: float = 0.0
@staticmethod
async def fetch_wiki(extended, script_path, session: aiohttp.ClientSession, ratelimiter: RateLimiter, amount=20) -> aiohttp.ClientResponse:
await ratelimiter.timeout_wait()
url_path = script_path + "api.php"
if extended:
params = {"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges",
"meta": "allmessages|siteinfo",
"utf8": 1, "tglimit": "max", "tgprop": "displayname",
"rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user",
"rclimit": amount, "rcshow": "!bot", "rctype": "edit|new|log|categorize",
"ammessages": "recentchanges-page-added-to-category|recentchanges-page-removed-from-category|recentchanges-page-added-to-category-bundled|recentchanges-page-removed-from-category-bundled",
"amenableparser": 1, "amincludelocal": 1, "siprop": "namespaces|general"}
else:
params = {"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges",
"meta": "siteinfo", "utf8": 1,
"tglimit": "max", "rcshow": "!bot", "tgprop": "displayname",
"rcprop": "title|redirect|timestamp|ids|loginfo|parsedcomment|sizes|flags|tags|user",
"rclimit": amount, "rctype": "edit|new|log|categorize", "siprop": "namespaces|general"}
try:
response = await session.get(url_path, params=params)
ratelimiter.timeout_add(1.0)
except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError):
logger.error("A connection error occurred while requesting {}".format(url_path))
raise WikiServerError
return response
@staticmethod
async def fetch_feeds(wiki, session: aiohttp.ClientSession) -> aiohttp.ClientResponse:
url_path = "{wiki}wikia.php".format(wiki=wiki)
params = {"controller": "DiscussionPost", "method": "getPosts", "includeCounters": "false", "sortDirection": "descending", "sortKey": "creation_date", "limit": 20}
try:
response = await session.get(url_path, params=params)
response.raise_for_status()
except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError, aiohttp.ClientResponseError):
logger.error("A connection error occurred while requesting {}".format(url_path))
raise WikiServerError
return response
@staticmethod
async def safe_request(url, ratelimiter, *keys):
await ratelimiter.timeout_wait()
try:
async with aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(6.0)) as session:
request = await session.get(url)
ratelimiter.timeout_add(1.0)
request.raise_for_status()
json_request = await request.json(encoding="UTF-8")
except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError):
logger.error("Reached connection error for request on link {url}".format(url=url))
else:
try:
for item in keys:
json_request = json_request[item]
except KeyError:
logger.warning(
"Failure while extracting data from request on key {key} in {change}".format(key=item, change=json_request))
return None
return json_request
async def fail_add(self, wiki_url, status):
logger.debug("Increasing fail_times to {}".format(self.fail_times+3))
self.fail_times += 3
if self.fail_times > 9:
await self.remove(wiki_url, status)
async def check_status(self, wiki_url, status):
if 199 < status < 300:
self.fail_times -= 1
pass
elif 400 < status < 500: # ignore 400 error since this might be our fault
await self.fail_add(wiki_url, status)
logger.warning("Wiki {} responded with HTTP code {}, increased fail_times to {}, skipping...".format(wiki_url, status, self.fail_times))
raise WikiError
elif 499 < status < 600:
logger.warning("Wiki {} responded with HTTP code {}, skipping...".format(wiki_url, status, self.fail_times))
raise WikiServerError
@staticmethod
async def remove(wiki_url, reason):
logger.info("Removing a wiki {}".format(wiki_url))
await discord.discord.wiki_removal(wiki_url, reason)
await discord.discord.wiki_removal_monitor(wiki_url, reason)
async with db.pool().acquire() as connection:
result = await connection.execute('DELETE FROM rcgcdw WHERE wiki = $1', wiki_url)
logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(result, wiki_url))
async def pull_comment(self, comment_id, WIKI_API_PATH, rate_limiter):
try:
comment = await self.safe_request(
"{wiki}?action=comment&do=getRaw&comment_id={comment}&format=json".format(wiki=WIKI_API_PATH,
comment=comment_id), rate_limiter, "text")
logger.debug("Got the following comment from the API: {}".format(comment))
if comment is None:
raise TypeError
except (TypeError, AttributeError):
logger.exception("Could not resolve the comment text.")
except KeyError:
logger.exception("CurseProfile extension API did not respond with a valid comment content.")
else:
if len(comment) > 1000:
comment = comment[0:1000] + ""
return comment
return ""
async def process_cats(event: dict, local_wiki: Wiki, categorize_events: dict):
"""Process categories based on local MW messages. """
if event["type"] == "categorize":
@ -492,35 +388,36 @@ async def process_cats(event: dict, local_wiki: Wiki, categorize_events: dict):
else:
logger.debug("Log entry got suppressed, ignoring entry.")
async def process_mwmsgs(wiki_response: dict, local_wiki: Wiki, mw_msgs: dict):
"""
This function is made to parse the initial wiki extended information to update local_wiki.mw_messages that stores the key
to mw_msgs that is a dict storing id: tuple where tuple is a set of MW messages for categories.
The reason it's constructed this way is to prevent duplication of data in memory so Markus doesn't complain about
high RAM usage. It does however affect CPU performance as every wiki requires to check the list for the matching
tuples of MW messages.
:param wiki_response:
:param local_wiki:
:param mw_msgs:
:return:
"""
msgs = []
for message in wiki_response["query"]["allmessages"]:
if not "missing" in message: # ignore missing strings
msgs.append((message["name"], re.sub(r'\[\[.*?\]\]', '', message["*"])))
else:
logger.warning("Could not fetch the MW message translation for: {}".format(message["name"]))
msgs = tuple(msgs)
for key, set in mw_msgs.items():
if msgs == set:
local_wiki.mw_messages = key
return
# if same entry is not in mw_msgs
key = len(mw_msgs)
mw_msgs[key] = msgs # it may be a little bit messy for sure, however I don't expect any reason to remove mw_msgs entries by one
local_wiki.mw_messages = key
# This function has been removed. While its implementation seems sound, it should be considered only if we find performance
# concerns with RcGcDb
# async def process_mwmsgs(wiki_response: dict, local_wiki: Wiki, mw_msgs: dict):
# """
# This function is made to parse the initial wiki extended information to update local_wiki.mw_messages that stores the key
# to mw_msgs that is a dict storing id: tuple where tuple is a set of MW messages for categories.
# The reason it's constructed this way is to prevent duplication of data in memory so Markus doesn't complain about
# high RAM usage. It does however affect CPU performance as every wiki requires to check the list for the matching
# tuples of MW messages.
#
# :param wiki_response:
# :param local_wiki:
# :param mw_msgs:
# :return:
# """
# msgs = []
# for message in wiki_response["query"]["allmessages"]:
# if not "missing" in message: # ignore missing strings
# msgs.append((message["name"], re.sub(r'\[\[.*?\]\]', '', message["*"])))
# else:
# logger.warning("Could not fetch the MW message translation for: {}".format(message["name"]))
# msgs = tuple(msgs)
# for key, set in mw_msgs.items():
# if msgs == set:
# local_wiki.mw_messages = key
# return
# # if same entry is not in mw_msgs
# key = len(mw_msgs)
# mw_msgs[key] = msgs # it may be a little bit messy for sure, however I don't expect any reason to remove mw_msgs entries by one
# local_wiki.mw_messages = key
# db_wiki: webhook, wiki, lang, display, rcid, postid