Progress on Discussions work

This commit is contained in:
Frisk 2022-11-10 15:16:35 +01:00
parent 879de217ed
commit be4db2a2ae
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
9 changed files with 168 additions and 208 deletions

View file

@ -5,7 +5,7 @@ begin
perform pg_notify('webhookupdates', concat('REMOVE ', old.wiki));
return old;
ELSIF (TG_OP = 'INSERT') then
perform pg_notify('webhookupdates', concat('ADD ', new.wiki));
perform pg_notify('webhookupdates', concat('ADD ', new.wiki, ' ', new.rcid::text, ' ', new.postid));
return new;
end if;
end;

View file

@ -54,7 +54,7 @@ async def populate_wikis():
start = time.time()
async with db.pool().acquire() as connection:
async with connection.transaction():
async for db_wiki in connection.cursor('SELECT DISTINCT wiki, rcid, postid FROM rcgcdw'):
async for db_wiki in connection.cursor('select wiki, MAX(rcid), MAX(postid) from rcgcdw group by wiki;'):
try:
await domains.new_wiki(Wiki(db_wiki["wiki"], db_wiki["rcid"], db_wiki["postid"]))
except WikiExists: # Can rarely happen when Pub/Sub registers wiki before population

View file

@ -1,162 +0,0 @@
import json, random, math, logging
from collections import defaultdict
from src.misc import logger
from src.config import settings
from src.database import db
from src.i18n import langs
from src.exceptions import EmbedListFull
from asyncio import TimeoutError
from math import ceil
import aiohttp
logger = logging.getLogger("rcgcdb.discord")
# General functions
default_header = settings["header"]
default_header['Content-Type'] = 'application/json'
default_header["X-RateLimit-Precision"] = "millisecond"
# User facing webhook functions
async def wiki_removal(wiki_url, status):
async with db.pool().acquire() as connection:
async with connection.transaction():
async for observer in connection.cursor('SELECT webhook, lang FROM rcgcdw WHERE wiki = $1', wiki_url):
_ = langs[observer["lang"]]["discord"].gettext
reasons = {410: _("wiki deleted"), 404: _("wiki deleted"), 401: _("wiki inaccessible"),
402: _("wiki inaccessible"), 403: _("wiki inaccessible"), 1000: _("discussions disabled")}
reason = reasons.get(status, _("unknown error"))
await send_to_discord_webhook(DiscordMessage("compact", "webhook/remove", webhook_url=[], content=_("This recent changes webhook has been removed for `{reason}`!").format(reason=reason), wiki=None), webhook_url=observer["webhook"])
header = settings["header"]
header['Content-Type'] = 'application/json'
header['X-Audit-Log-Reason'] = "Wiki becoming unavailable"
async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(5.0)) as session:
await session.delete("https://discord.com/api/webhooks/"+observer["webhook"])
async def webhook_removal_monitor(webhook_url: str, reason: int):
await send_to_discord_webhook_monitoring(DiscordMessage("compact", "webhook/remove", None, content="The webhook {} has been removed due to {}.".format("https://discord.com/api/webhooks/" + webhook_url, reason), wiki=None))
def stack_message_list(messages: list) -> list:
if len(messages) > 1:
if messages[0].message_type() == "embed":
# for i, msg in enumerate(messages):
# if not isinstance(msg, StackedDiscordMessage):
# break
# else: # all messages in messages are stacked, exit this if
# i += 1
removed_msgs = 0
# We split messages into groups of 10
for group_index in range(ceil((len(messages)) / 10)):
message_group_index = group_index * 10 - removed_msgs # this helps us with calculations which messages we need
stackable = StackedDiscordMessage(messages[message_group_index]) # treat the first message from the group as main
for message in messages[message_group_index + 1:message_group_index + 10]: # we grab messages from messages list
try:
stackable.add_embed(message) # and to our main message we add ones after it that are from same group
except EmbedListFull: # if there are too many messages in our group we simply break so another group can be made
break
messages.remove(message)
removed_msgs += 1 # helps with calculating message_group_index
messages[message_group_index] = stackable
elif messages[0].message_type() == "compact":
message_index = 0
while len(messages) > message_index+1: # as long as we have messages to stack
if (len(messages[message_index]) + len(messages[message_index+1])) < 2000: # if overall length is lower than 2000
messages[message_index].webhook_object["content"] = messages[message_index].webhook_object["content"] + "\n" + messages[message_index + 1].webhook_object["content"]
messages[message_index].length += (len(messages[message_index + 1]) + 1)
messages.remove(messages[message_index + 1])
else:
message_index += 1
return messages
# Monitoring webhook functions
async def wiki_removal_monitor(wiki_url, status):
await send_to_discord_webhook_monitoring(DiscordMessage("compact", "webhook/remove", content="Removing {} because {}.".format(wiki_url, status), webhook_url=[None], wiki=None))
async def generic_msg_sender_exception_logger(exception: str, title: str, **kwargs):
"""Creates a Discord message reporting a crash"""
message = DiscordMessage("embed", "bot/exception", [None], wiki=None)
message["description"] = exception
message["title"] = title
for key, value in kwargs.items():
message.add_field(key, value)
message.finish_embed()
await send_to_discord_webhook_monitoring(message)
async def send_to_discord_webhook_monitoring(data: DiscordMessage):
header = settings["header"]
header['Content-Type'] = 'application/json'
async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(5.0)) as session:
try:
result = await session.post("https://discord.com/api/webhooks/"+settings["monitoring_webhook"], data=repr(data))
except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError):
logger.exception("Could not send the message to Discord")
return 3
async def send_to_discord_webhook(data: DiscordMessage, webhook_url: str) -> tuple:
"""Sends a message to webhook
:return tuple(status code for request, rate limit info (None for can send more, string for amount of seconds to wait)"""
async with aiohttp.ClientSession(headers=default_header, timeout=aiohttp.ClientTimeout(5.0)) as session:
try:
result = await session.post("https://discord.com/api/webhooks/"+webhook_url, data=repr(data))
rate_limit = None if int(result.headers.get('x-ratelimit-remaining', "-1")) > 0 else result.headers.get('x-ratelimit-reset-after', None)
except (aiohttp.ClientConnectionError, aiohttp.ServerConnectionError, TimeoutError):
logger.exception("Could not send the message to Discord")
return 3, None
status = await handle_discord_http(result.status, repr(data), result, webhook_url)
if status == 5:
return 5, await result.json()
else:
return status, rate_limit
async def handle_discord_http(code: int, formatted_embed: str, result: aiohttp.ClientResponse, webhook_url: str):
if 300 > code > 199: # message went through
return 0
elif code == 400: # HTTP BAD REQUEST result.status_code, data, result, header
logger.error(
"Following message has been rejected by Discord, please submit a bug on our bugtracker adding it:")
logger.error(formatted_embed)
logger.error(await result.text())
return 1
elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND
logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.")
async with db.pool().acquire() as connection:
await connection.execute("DELETE FROM rcgcdw WHERE webhook = $1", webhook_url)
await webhook_removal_monitor(webhook_url, code)
return 1
elif code == 429:
logger.error("We are sending too many requests to the Discord, slowing down...")
return 5
elif 499 < code < 600:
logger.error(
"Discord have trouble processing the event, and because the HTTP code returned is {} it means we blame them.".format(
code))
return 3
else:
return 4
class DiscordMessageMetadata:
def __init__(self, method, log_id = None, page_id = None, rev_id = None, webhook_url = None, new_data = None):
self.method = method
self.page_id = page_id
self.log_id = log_id
self.rev_id = rev_id
self.webhook_url = webhook_url
self.new_data = new_data
def dump_ids(self) -> (int, int, int):
return self.page_id, self.rev_id, self.log_id

View file

@ -1,15 +1,28 @@
from __future__ import annotations
import asyncio
import functools
import logging
import time
import typing
from collections import OrderedDict
import aiohttp
from api.context import Context
from api.hooks import formatter_hooks
from api.util import default_message
from src.i18n import langs
from src.misc import prepare_settings
from src.exceptions import WikiError
from src.config import settings
from src.queue_handler import dbmanager
from src.argparser import command_line_args
from src.discord.message import DiscordMessageMetadata, DiscordMessage
from collections import OrderedDict, defaultdict
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from src.domain import Domain
from src.wiki import Wiki
from src.wiki import Wiki, Settings
logger = logging.getLogger("rcgcdb.discussions")
@ -33,7 +46,7 @@ class Discussions:
await self.run_discussion_scan(wiki)
for wiki in self.filter_and_sort():
if wiki.statistics.last_checked_discussion < settings.get("irc_overtime", 3600):
if (int(time.time()) - wiki.statistics.last_checked_discussion) > settings.get("irc_overtime", 3600):
await self.run_discussion_scan(wiki)
else:
return # Recently scanned wikis will get at the end of the self.wikis, so we assume what is first hasn't been checked for a while
@ -44,3 +57,106 @@ class Discussions:
return sorted(filter(lambda wiki: wiki.discussion_id != -1, self.domain_object.wikis.values()), key=lambda wiki: wiki.statistics.last_checked_discussion)
async def run_discussion_scan(self, wiki: Wiki):
wiki.statistics.last_checked_discussion = int(time.time())
params = {"controller": "DiscussionPost", "method": "getPosts", "includeCounters": "false",
"sortDirection": "descending", "sortKey": "creation_date", "limit": 20}
feeds_response = await wiki.fetch_discussions(params)
try:
discussion_feed_resp = await feeds_response.json(encoding="UTF-8")
if "error" in discussion_feed_resp:
error = discussion_feed_resp["error"]
if error == "NotFoundException": # Discussions disabled
await dbmanager.add("UPDATE rcgcdw SET postid = $1 WHERE wiki = $2", "-1", wiki.script_url)
await dbmanager.update_db()
await wiki.update_targets()
raise WikiError
discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"]
discussion_feed.reverse()
except aiohttp.ContentTypeError:
logger.exception("Wiki seems to be resulting in non-json content.")
return
except asyncio.TimeoutError:
logger.debug("Timeout on reading JSON of discussion post feed.")
return
if wiki.statistics.last_post is None: # new wiki, just get the last post to not spam the channel
if len(discussion_feed) > 0:
dbmanager.add(("UPDATE rcgcdw SET postid = $1 WHERE wiki = $2 AND ( postid != -1 OR postid IS NULL )", (
discussion_feed[-1]["id"],
wiki.script_url)))
else:
dbmanager.add(wiki.script_url, "0", True)
await dbmanager.update_db()
return
comment_events = []
for post in discussion_feed:
if post["_embedded"]["thread"][0]["containerType"] == "ARTICLE_COMMENT" and post["id"] > wiki.discussion_id:
comment_events.append(post["forumId"])
comment_pages: Optional[dict] = {}
if comment_events:
try:
params = {"controller": "FeedsAndPosts", "method": "getArticleNamesAndUsernames",
"stablePageIds": ",".join(comment_events), "format": "json"}
comment_pages_request = await wiki.fetch_discussions(params)
comment_pages = await comment_pages_request.json()
comment_pages = comment_pages["articleNames"]
except aiohttp.ClientResponseError: # Fandom can be funny sometimes... See #30
comment_pages = None
except:
if command_line_args.debug:
logger.exception("Exception on Feeds article comment request")
else:
logger.exception("Exception on Feeds article comment request")
# TODO
message_list = defaultdict(list)
for post in discussion_feed: # Yeah, second loop since the comments require an extra request
if post["id"] > wiki.discussion_id:
for target in wiki.discussion_targets.items():
try:
message = await essential_feeds(post, comment_pages, wiki, target)
if message is not None:
message_list[target[0]].append(message)
except asyncio.CancelledError:
raise
except:
if command_line_args.debug:
logger.exception("Exception on Feeds formatter")
shutdown(loop=asyncio.get_event_loop())
else:
logger.exception("Exception on Feeds formatter")
await generic_msg_sender_exception_logger(traceback.format_exc(),
"Exception in feed formatter",
Post=str(post)[0:1000], Wiki=db_wiki["wiki"])
# Lets stack the messages
for messages in message_list.values():
messages = stack_message_list(messages)
for message in messages:
await send_to_discord(message)
if discussion_feed:
DBHandler.add(db_wiki["wiki"], post["id"], True)
await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much mor
async def essential_feeds(change: dict, comment_pages: dict, wiki: Wiki, target: tuple[Settings, list[str]]) -> DiscordMessage:
"""Prepares essential information for both embed and compact message format."""
identification_string = change["_embedded"]["thread"][0]["containerType"]
comment_page = None
if identification_string == "ARTICLE_COMMENT" and comment_pages is not None:
comment_page = comment_pages.get(change["forumId"], None)
if comment_page is not None:
comment_page["fullUrl"] = "/".join(wiki.script_url.split("/", 3)[:3]) + comment_page["relativeUrl"]
metadata = DiscordMessageMetadata("POST", rev_id=None, log_id=None, page_id=None)
context = Context("embed" if target[0].display > 0 else "compact", "recentchanges", target[1], wiki.client,
langs[target[0].lang]["rc_formatters"], prepare_settings(target[0].display))
discord_message: Optional[DiscordMessage] = None
try:
discord_message = await asyncio.get_event_loop().run_in_executor(
None, functools.partial(default_message(identification_string,context.message_type,formatter_hooks), context, change))
except:
if settings.get("error_tolerance", 1) > 0:
logger.exception("Exception on discord message creation in essential_feeds")
else:
raise
if discord_message: # TODO How to react when none? (crash in formatter), probably bad handling atm
discord_message.finish_embed()
discord_message.metadata = metadata
return discord_message

View file

@ -1,6 +1,7 @@
from __future__ import annotations
import asyncio
import logging
import time
from collections import OrderedDict
from typing import TYPE_CHECKING, Optional
from functools import cache
@ -103,7 +104,7 @@ class Domain:
await self.run_wiki_scan(wiki)
while True: # Iterate until hitting return, we don't have to iterate using for since we are sending wiki to the end anyways
wiki: src.wiki.Wiki = next(iter(self.wikis.values()))
if (wiki.statistics.last_checked_rc or 0) < settings.get("irc_overtime", 3600): # TODO This makes no sense, comparing last_checked_rc to nothing
if (int(time.time()) - (wiki.statistics.last_checked_rc or 0)) > settings.get("irc_overtime", 3600):
await self.run_wiki_scan(wiki)
else:
return # Recently scanned wikis will get at the end of the self.wikis, so we assume what is first hasn't been checked for a while

View file

@ -24,7 +24,8 @@ class DomainManager:
if len(split_payload) < 2:
raise ValueError("Improper pub/sub message! Pub/sub payload: {}".format(payload))
if split_payload[0] == "ADD":
await self.new_wiki(Wiki(split_payload[1], None, None))
await self.new_wiki(Wiki(split_payload[1], int(split_payload[2]) if split_payload[2].isnumeric() else None,
int(split_payload[3]) if split_payload[3].isnumeric() else None))
elif split_payload[0] == "REMOVE":
try:
results = await connection.fetch("SELECT * FROM rcgcdw WHERE wiki = $1;", split_payload[1])

View file

@ -1,3 +1,7 @@
from __future__ import annotations
import json
from functools import cache
from html.parser import HTMLParser
import base64, re
@ -186,3 +190,11 @@ class ContentParser(HTMLParser):
self.empty = False
@cache
def prepare_settings(display_mode: int) -> dict:
"""Prepares dict of RcGcDw compatible settings based on a template and display mode of given call"""
with open("src/api/template_settings.json", "r") as template_json:
template = json.load(template_json)
template["appearance"]["embed"]["embed_images"] = True if display_mode > 1 else False
template["appearance"]["embed"]["show_edit_changes"] = True if display_mode > 2 else False
return template

View file

@ -14,6 +14,7 @@ class LogType(Enum):
VALUE_UPDATE = 4
SCAN_REASON = 5
queue_limit = settings.get("queue_limit", 30)
@ -41,7 +42,7 @@ class Statistics:
self.last_checked_rc: Optional[int] = None
self.last_action: Optional[int] = rc_id
self.last_checked_discussion: Optional[int] = None
self.last_post: Optional[int] = discussion_id
self.last_post: Optional[str] = discussion_id
self.logs: LimitedList[Log] = LimitedList()
def update(self, *args: Log, **kwargs: dict[str, Union[float, int]]):

View file

@ -1,15 +1,14 @@
from __future__ import annotations
import functools
import json
import time
import re
import logging, aiohttp
import asyncio
import requests
from functools import cache
from api.util import default_message
from src.misc import prepare_settings
from src.discord.queue import messagequeue, QueueEntry
from mw_messages import MWMessages
from src.exceptions import *
@ -47,7 +46,8 @@ class Wiki:
self.tags: dict[str, Optional[str]] = {} # Tag can be None if hidden
self.first_fetch_done: bool = False
self.domain: Optional[Domain] = None
self.targets: Optional[defaultdict[Settings, list[str]]] = None
self.rc_targets: Optional[defaultdict[Settings, list[str]]] = None
self.discussion_targets: Optional[defaultdict[Settings, list[str]]] = None
self.client: Client = Client(formatter_hooks, self)
self.message_history: list[StackedDiscordMessage] = list()
self.namespaces: Optional[dict] = None
@ -81,8 +81,8 @@ class Wiki:
def add_message(self, message: StackedDiscordMessage):
self.message_history.append(message)
if len(self.message_history) > MESSAGE_LIMIT*len(self.targets):
self.message_history = self.message_history[len(self.message_history)-MESSAGE_LIMIT*len(self.targets):]
if len(self.message_history) > MESSAGE_LIMIT*len(self.rc_targets):
self.message_history = self.message_history[len(self.message_history)-MESSAGE_LIMIT*len(self.rc_targets):]
def set_domain(self, domain: Domain):
self.domain = domain
@ -158,9 +158,17 @@ class Wiki:
:returns defaultdict[namedtuple, list[str]] - where namedtuple is a named tuple with settings for given webhooks in list"""
Settings = namedtuple("Settings", ["lang", "display"])
target_settings: defaultdict[Settings, list[str]] = defaultdict(list)
async for webhook in dbmanager.fetch_rows("SELECT webhook, lang, display FROM rcgcdw WHERE wiki = $1 AND (rcid != -1 OR rcid IS NULL)", self.script_url):
target_settings[Settings(webhook["lang"], webhook["display"])].append(webhook["webhook"])
self.targets = target_settings
discussion_targets: defaultdict[Settings, list[str]] = defaultdict(list)
async for webhook in dbmanager.fetch_rows("SELECT webhook, lang, display, rcid, postid FROM rcgcdw WHERE wiki = $1", self.script_url):
if webhook['rcid'] == -1 and webhook['postid'] == '-1':
await self.remove_wiki_from_db(4)
if webhook['rcid'] != -1:
target_settings[Settings(webhook["lang"], webhook["display"])].append(webhook["webhook"])
if webhook['postid'] != '-1':
discussion_targets[Settings(webhook["lang"], webhook["display"])].append(webhook["webhook"])
self.rc_targets = target_settings
self.discussion_targets = discussion_targets
def parse_mw_request_info(self, request_data: dict, url: str):
"""A function parsing request JSON message from MediaWiki logging all warnings and raising on MediaWiki errors"""
@ -353,7 +361,7 @@ class Wiki:
if change["rcid"] > self.rc_id:
if highest_id is None or change["rcid"] > highest_id: # make sure that the highest_rc is really highest rcid but do allow other entries with potentially lesser rcids come after without breaking the cycle
highest_id = change["rcid"]
for combination, webhooks in self.targets.items():
for combination, webhooks in self.rc_targets.items():
message = await rc_processor(self, change, categorize_events.get(change.get("revid"), None), combination, webhooks)
if message is None:
break
@ -361,34 +369,29 @@ class Wiki:
message_list.append(QueueEntry(message, webhooks, self))
messagequeue.add_messages(message_list)
self.statistics.update(last_action=highest_id)
dbmanager.add(("UPDATE rcgcdw SET rcid = $1 WHERE wiki = $2", (highest_id, self.script_url))) # If this is not enough for the future, save rcid in message sending function to make sure we always send all of the changes
dbmanager.add(("UPDATE rcgcdw SET rcid = $1 WHERE wiki = $2 AND ( rcid != -1 OR rcid IS NULL )", (highest_id, self.script_url))) # If this is not enough for the future, save rcid in message sending function to make sure we always send all of the changes
return
async def scan_discussions(self):
async def remove_webhook_from_db(self, reason: str):
raise NotImplementedError
async def remove_wiki_from_db(self, reason: str):
raise NotImplementedError # TODO
async def fetch_discussions(self, params: dict) -> aiohttp.ClientResponse:
header = settings["header"]
header["Accept"] = "application/hal+json"
async with aiohttp.ClientSession(headers=header,
timeout=aiohttp.ClientTimeout(6.0)) as session:
url_path = "{wiki}wikia.php".format(wiki=self.script_url)
params = {"controller": "DiscussionPost", "method": "getPosts", "includeCounters": "false",
"sortDirection": "descending", "sortKey": "creation_date", "limit": 20}
try:
feeds_response = session.get(url_path, params=params)
response.raise_for_status()
feeds_response = await session.get(url_path, params=params)
feeds_response.raise_for_status()
except (aiohttp.ClientConnectionError, aiohttp.ServerTimeoutError, asyncio.TimeoutError,
aiohttp.ClientResponseError, aiohttp.TooManyRedirects):
aiohttp.ClientResponseError, aiohttp.TooManyRedirects):
logger.error("A connection error occurred while requesting {}".format(url_path))
raise WikiServerError
@cache
def prepare_settings(display_mode: int) -> dict:
"""Prepares dict of RcGcDw compatible settings based on a template and display mode of given call"""
with open("src/api/template_settings.json", "r") as template_json:
template = json.load(template_json)
template["appearance"]["embed"]["embed_images"] = True if display_mode > 1 else False
template["appearance"]["embed"]["show_edit_changes"] = True if display_mode > 2 else False
return template
return feeds_response
def process_cachable(response: dict, wiki_object: Wiki) -> None:
@ -543,15 +546,3 @@ async def process_cats(event: dict, local_wiki: Wiki, categorize_events: dict):
# key = len(mw_msgs)
# mw_msgs[key] = msgs # it may be a little bit messy for sure, however I don't expect any reason to remove mw_msgs entries by one
# local_wiki.mw_messages = key
async def essential_feeds(change: dict, comment_pages: dict, db_wiki, target: tuple) -> DiscordMessage:
"""Prepares essential information for both embed and compact message format."""
appearance_mode = feeds_embed_formatter if target[0][1] > 0 else feeds_compact_formatter
identification_string = change["_embedded"]["thread"][0]["containerType"]
comment_page = None
if identification_string == "ARTICLE_COMMENT" and comment_pages is not None:
comment_page = comment_pages.get(change["forumId"], None)
if comment_page is not None:
comment_page["fullUrl"] = "/".join(db_wiki["wiki"].split("/", 3)[:3]) + comment_page["relativeUrl"]
return await appearance_mode(identification_string, change, target, db_wiki["wiki"], article_page=comment_page)