Updates and fixes

This commit is contained in:
Frisk 2022-11-04 15:59:26 +01:00
parent ebaca40d8e
commit 090a14c6c4
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
13 changed files with 45 additions and 76 deletions

View file

@ -105,7 +105,7 @@ class Client:
return dict() return dict()
def parse_links(self, summary: str): def parse_links(self, summary: str):
link_parser = self.LinkParser() link_parser = self.LinkParser(self.WIKI_JUST_DOMAIN)
link_parser.feed(summary) link_parser.feed(summary)
return link_parser.new_string return link_parser.new_string

View file

@ -51,7 +51,7 @@ def sanitize_to_url(text: str) -> str: # TODO ) replaces needed?
def parse_mediawiki_changes(ctx: Context, content: str, embed: DiscordMessage) -> None: def parse_mediawiki_changes(ctx: Context, content: str, embed: DiscordMessage) -> None:
"""Parses MediaWiki changes and adds them to embed as fields "Added" and "Removed" """ """Parses MediaWiki changes and adds them to embed as fields "Added" and "Removed" """
edit_diff = ctx.client.content_parser() edit_diff = ctx.client.content_parser(ctx._)
edit_diff.feed(content) edit_diff.feed(content)
if edit_diff.small_prev_del: if edit_diff.small_prev_del:
if edit_diff.small_prev_del.replace("~~", "").replace("__", "").isspace(): if edit_diff.small_prev_del.replace("~~", "").replace("__", "").isspace():

View file

@ -16,7 +16,6 @@ from src.database import db
from src.exceptions import * from src.exceptions import *
from src.queue_handler import dbmanager from src.queue_handler import dbmanager
from src.wiki import Wiki, process_cats, essential_feeds from src.wiki import Wiki, process_cats, essential_feeds
from src.wiki_ratelimiter import RateLimiter
from src.domain_manager import domains from src.domain_manager import domains

View file

@ -34,17 +34,5 @@ class db_connection:
def pool(self) -> asyncpg.Pool: def pool(self) -> asyncpg.Pool:
return self.connection_pool return self.connection_pool
# Tried to make it a decorator but tbh won't probably work
# async def in_transaction(self, func):
# async def single_transaction():
# async with self.connection.acquire() as connection:
# async with connection.transaction():
# await func()
# return single_transaction
# async def query(self, string, *arg):
# async with self.connection.acquire() as connection:
# async with connection.transaction():
# return connection.cursor(string, *arg)
db = db_connection() db = db_connection()

View file

@ -114,14 +114,12 @@ class MessageQueue:
async def pack_massages(self, messages: list[QueueEntry], current_pack=None) -> AsyncGenerator[tuple[StackedDiscordMessage, int, str], None]: async def pack_massages(self, messages: list[QueueEntry], current_pack=None) -> AsyncGenerator[tuple[StackedDiscordMessage, int, str], None]:
"""Pack messages into StackedDiscordMessage. It's an async generator""" """Pack messages into StackedDiscordMessage. It's an async generator"""
# TODO Rebuild to support DELETE and PATCH messages
for index, message in enumerate(messages): for index, message in enumerate(messages):
if message.method == "POST": if message.method == "POST":
if current_pack is None: if current_pack is None:
current_pack = StackedDiscordMessage(0 if message.discord_message.message_type == "compact" else 1, current_pack = StackedDiscordMessage(0 if message.discord_message.message_type == "compact" else 1,
message.wiki) message.wiki)
else: else:
# message.discord_message. # TODO Where do we store method?
yield message.discord_message, index, message.method yield message.discord_message, index, message.method
message = message.discord_message message = message.discord_message
try: try:
@ -152,7 +150,7 @@ class MessageQueue:
self.global_rate_limit = True self.global_rate_limit = True
await asyncio.sleep(e.remaining / 1000) await asyncio.sleep(e.remaining / 1000)
return return
for queue_message in messages[max(index-len(msg.message_list), 0):max(index, 1)]: # mark messages as delivered for queue_message in messages[max(index-len(msg.message_list), 0):index+1]: # mark messages as delivered
queue_message.confirm_sent_status(webhook_url) queue_message.confirm_sent_status(webhook_url)
if client_error is False: if client_error is False:
msg.webhook = webhook_url msg.webhook = webhook_url

View file

@ -8,7 +8,6 @@ from src.argparser import command_line_args
from functools import cache from functools import cache
# from src.discussions import Discussions # from src.discussions import Discussions
from statistics import Log, LogType from statistics import Log, LogType
import src.wiki_ratelimiter
logger = logging.getLogger("rcgcdb.domain") logger = logging.getLogger("rcgcdb.domain")
@ -22,7 +21,6 @@ class Domain:
self.name = name # This should be always in format of topname.extension for example fandom.com self.name = name # This should be always in format of topname.extension for example fandom.com
self.task: Optional[asyncio.Task] = None self.task: Optional[asyncio.Task] = None
self.wikis: OrderedDict[str, src.wiki.Wiki] = OrderedDict() self.wikis: OrderedDict[str, src.wiki.Wiki] = OrderedDict()
self.rate_limiter: src.wiki_ratelimiter = src.wiki_ratelimiter.RateLimiter()
self.irc: Optional[src.irc_feed.AioIRCCat] = None self.irc: Optional[src.irc_feed.AioIRCCat] = None
# self.discussions_handler: Optional[Discussions] = Discussions(self.wikis) if name == "fandom.com" else None # self.discussions_handler: Optional[Discussions] = Discussions(self.wikis) if name == "fandom.com" else None
@ -38,7 +36,7 @@ class Domain:
def destroy(self): def destroy(self):
"""Destroy the domain do all of the tasks that should make sure there is no leftovers before being collected by GC""" """Destroy the domain do all of the tasks that should make sure there is no leftovers before being collected by GC"""
if self.irc: if self.irc:
self.irc.connection.disconnect("Leaving") self.irc.connection.die("Leaving")
if self.discussions_handler: if self.discussions_handler:
self.discussions_handler.close() self.discussions_handler.close()
if self.task: if self.task:
@ -81,11 +79,9 @@ class Domain:
self.wikis.move_to_end(wiki.script_url, last=False) self.wikis.move_to_end(wiki.script_url, last=False)
async def run_wiki_scan(self, wiki: src.wiki.Wiki, reason: Optional[int] = None): async def run_wiki_scan(self, wiki: src.wiki.Wiki, reason: Optional[int] = None):
await self.rate_limiter.timeout_wait()
await wiki.scan() await wiki.scan()
wiki.statistics.update(Log(type=LogType.SCAN_REASON, title=str(reason))) wiki.statistics.update(Log(type=LogType.SCAN_REASON, title=str(reason)))
self.wikis.move_to_end(wiki.script_url) self.wikis.move_to_end(wiki.script_url)
self.rate_limiter.timeout_add(1.0)
async def irc_scheduler(self): async def irc_scheduler(self):
try: try:
@ -108,7 +104,7 @@ class Domain:
return # Recently scanned wikis will get at the end of the self.wikis, so we assume what is first hasn't been checked for a while return # Recently scanned wikis will get at the end of the self.wikis, so we assume what is first hasn't been checked for a while
except: except:
if command_line_args.debug: if command_line_args.debug:
logger.exception("IRC task for domain {} failed!".format(self.name)) logger.exception("IRC scheduler task for domain {} failed!".format(self.name))
else: else:
# TODO Write # TODO Write
pass pass
@ -140,7 +136,7 @@ class Domain:
except asyncio.exceptions.CancelledError: except asyncio.exceptions.CancelledError:
for wiki in self.wikis.values(): for wiki in self.wikis.values():
await wiki.session.close() await wiki.session.close()
await self.irc.connection.disconnect() self.irc.connection.disconnect()
else: else:
try: try:
await self.regular_scheduler() await self.regular_scheduler()

View file

@ -1,5 +1,5 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, Optional from typing import TYPE_CHECKING
from urllib.parse import urlparse, urlunparse from urllib.parse import urlparse, urlunparse
import logging import logging
import asyncpg import asyncpg
@ -9,9 +9,7 @@ from src.config import settings
from src.domain import Domain from src.domain import Domain
from src.irc_feed import AioIRCCat from src.irc_feed import AioIRCCat
from src.wiki import Wiki
if TYPE_CHECKING:
from src.wiki import Wiki
logger = logging.getLogger("rcgcdb.domain_manager") logger = logging.getLogger("rcgcdb.domain_manager")
@ -23,6 +21,7 @@ class DomainManager:
"""Callback for database listener. Used to update our domain cache on changes such as new wikis or removed wikis""" """Callback for database listener. Used to update our domain cache on changes such as new wikis or removed wikis"""
# TODO Write a trigger for pub/sub in database/Wiki-Bot repo # TODO Write a trigger for pub/sub in database/Wiki-Bot repo
split_payload = payload.split(" ") split_payload = payload.split(" ")
logger.debug("Received pub/sub message: {}".format(payload))
if len(split_payload) < 2: if len(split_payload) < 2:
raise ValueError("Improper pub/sub message! Pub/sub payload: {}".format(payload)) raise ValueError("Improper pub/sub message! Pub/sub payload: {}".format(payload))
if split_payload[0] == "ADD": if split_payload[0] == "ADD":
@ -30,8 +29,8 @@ class DomainManager:
elif split_payload[0] == "REMOVE": elif split_payload[0] == "REMOVE":
try: try:
results = await connection.fetch("SELECT * FROM rcgcdw WHERE wiki = $1;", split_payload[1]) results = await connection.fetch("SELECT * FROM rcgcdw WHERE wiki = $1;", split_payload[1])
if len(results) > 0: if len(results) > 0: # If there are still webhooks for this wiki - just update its targets
return await self.return_domain(self.get_domain(split_payload[1])).get_wiki(split_payload[1]).update_targets()
except asyncpg.IdleSessionTimeoutError: except asyncpg.IdleSessionTimeoutError:
logger.error("Couldn't check amount of webhooks with {} wiki!".format(split_payload[1])) logger.error("Couldn't check amount of webhooks with {} wiki!".format(split_payload[1]))
return return
@ -50,9 +49,9 @@ class DomainManager:
new_domain = await self.new_domain(wiki_domain) new_domain = await self.new_domain(wiki_domain)
await new_domain.add_wiki(wiki) await new_domain.add_wiki(wiki)
def remove_domain(self, domain): def remove_domain(self, domain: Domain):
domain.destoy() domain.destroy()
del self.domains[domain] del self.domains[domain.name]
def remove_wiki(self, script_url: str): def remove_wiki(self, script_url: str):
wiki_domain = self.get_domain(script_url) wiki_domain = self.get_domain(script_url)
@ -79,6 +78,8 @@ class DomainManager:
for irc_server in settings["irc_servers"].keys(): for irc_server in settings["irc_servers"].keys():
if name in settings["irc_servers"][irc_server]["domains"]: if name in settings["irc_servers"][irc_server]["domains"]:
domain_object.set_irc(AioIRCCat(settings["irc_servers"][irc_server]["irc_channel_mapping"], domain_object, None, None)) domain_object.set_irc(AioIRCCat(settings["irc_servers"][irc_server]["irc_channel_mapping"], domain_object, None, None))
domain_object.irc.connect(settings["irc_servers"][irc_server]["irc_host"], settings["irc_servers"][irc_server]["irc_port"],
settings["irc_servers"][irc_server]["irc_name"], ircname=settings["irc_servers"][irc_server]["irc_nickname"])
break # Allow only one IRC for a domain break # Allow only one IRC for a domain
self.domains[name] = domain_object self.domains[name] = domain_object
return self.domains[name] return self.domains[name]

View file

@ -44,7 +44,8 @@ class AioIRCCat(irc.client_aio.AioSimpleIRCClient):
c.nick(c.get_nickname() + "_") c.nick(c.get_nickname() + "_")
def on_disconnect(self, connection, event): def on_disconnect(self, connection, event):
self.connect(*self.connection_details[0], **self.connection_details[1]) # attempt to reconnect # self.connect(*self.connection_details[0], **self.connection_details[1]) # attempt to reconnect
pass
def parse_fandom_message(self, message: str): def parse_fandom_message(self, message: str):
message = message.split("\x035*\x03") message = message.split("\x035*\x03")
@ -71,8 +72,8 @@ class AioIRCCat(irc.client_aio.AioSimpleIRCClient):
if post.get('action', 'unknown') != "deleted": # ignore deletion events if post.get('action', 'unknown') != "deleted": # ignore deletion events
url = urlparse(post.get('url')) url = urlparse(post.get('url'))
full_url ="https://"+ url.netloc + recognize_langs(url.path) full_url ="https://"+ url.netloc + recognize_langs(url.path)
if full_url in self.domain: # if full_url in self.domain:
self.discussion_callback(full_url) # self.discussion_callback(full_url)
def recognize_langs(path): def recognize_langs(path):

View file

@ -2,6 +2,7 @@ from html.parser import HTMLParser
import base64, re import base64, re
import logging import logging
from typing import Callable
from urllib.parse import urlparse, urlunparse from urllib.parse import urlparse, urlunparse
from src.i18n import langs from src.i18n import langs
@ -28,7 +29,10 @@ class LinkParser(HTMLParser):
new_string = "" new_string = ""
recent_href = "" recent_href = ""
WIKI_JUST_DOMAIN = ""
def __init__(self, DOMAIN_URL):
self.WIKI_JUST_DOMAIN = DOMAIN_URL
super().__init__()
def handle_starttag(self, tag, attrs): def handle_starttag(self, tag, attrs):
for attr in attrs: for attr in attrs:
@ -59,16 +63,17 @@ class LinkParser(HTMLParser):
pass pass
LinkParse = LinkParser() # LinkParse = LinkParser()
def parse_link(domain: str, to_parse: str) -> str:
"""Because I have strange issues using the LinkParser class myself, this is a helper function # def parse_link(domain: str, to_parse: str) -> str:
to utilize the LinkParser properly""" # """Because I have strange issues using the LinkParser class myself, this is a helper function
LinkParse.WIKI_JUST_DOMAIN = domain # to utilize the LinkParser properly"""
LinkParse.new_string = "" # LinkParse.WIKI_JUST_DOMAIN = domain
LinkParse.feed(to_parse) # LinkParse.new_string = ""
LinkParse.recent_href = "" # LinkParse.feed(to_parse)
return LinkParse.new_string # LinkParse.recent_href = ""
# return LinkParse.new_string
def link_formatter(link: str) -> str: def link_formatter(link: str) -> str:
@ -117,9 +122,9 @@ class ContentParser(HTMLParser):
small_prev_ins = "" small_prev_ins = ""
small_prev_del = "" small_prev_del = ""
def __init__(self, lang): def __init__(self, lang: Callable):
super().__init__() super().__init__()
self.more = langs[lang]["misc"].gettext("\n__And more__") self.more = lang("\n__And more__")
self.ins_length = len(self.more) self.ins_length = len(self.more)
self.del_length = len(self.more) self.del_length = len(self.more)

View file

@ -28,6 +28,7 @@ class UpdateDB:
async def update_db(self): async def update_db(self):
try: try:
while True: while True:
logger.debug("Running DB check")
if self.updated: if self.updated:
async with db.pool().acquire() as connection: async with db.pool().acquire() as connection:
async with connection.transaction(): async with connection.transaction():

View file

@ -44,7 +44,7 @@ class Statistics:
self.logs: LimitedList[Log] = LimitedList() self.logs: LimitedList[Log] = LimitedList()
def update(self, *args: Log, **kwargs: dict[str, Union[float, int]]): def update(self, *args: Log, **kwargs: dict[str, Union[float, int]]):
for key, value in kwargs: for key, value in kwargs.items():
self.__setattr__(key, value) self.__setattr__(key, value)
for log in args: for log in args:
self.logs.append(log) self.logs.append(log)

View file

@ -242,7 +242,7 @@ class Wiki:
"""Synchronous function based on api_request created for compatibility reasons with RcGcDw API""" """Synchronous function based on api_request created for compatibility reasons with RcGcDw API"""
try: try:
if isinstance(params, str): if isinstance(params, str):
request = self.session_requests.get(self.script_url + "api.php?" + params + "&errorformat=raw", timeout=10, allow_redirects=allow_redirects) request = self.session_requests.get(self.script_url + "api.php" + params + "&errorformat=raw", timeout=10, allow_redirects=allow_redirects)
elif isinstance(params, OrderedDict): elif isinstance(params, OrderedDict):
request = self.session_requests.get(self.script_url + "api.php", params=params, timeout=10, allow_redirects=allow_redirects) request = self.session_requests.get(self.script_url + "api.php", params=params, timeout=10, allow_redirects=allow_redirects)
else: else:
@ -357,6 +357,8 @@ class Wiki:
message.wiki = self message.wiki = self
message_list.append(QueueEntry(message, webhooks, self)) message_list.append(QueueEntry(message, webhooks, self))
messagequeue.add_messages(message_list) messagequeue.add_messages(message_list)
self.statistics.update(last_action=highest_id)
dbmanager.add(("UPDATE rcgcdw SET rcid = $1 WHERE wiki = $2", (highest_id, self.script_url))) # If this is not enough for the future, save rcid in message sending function to make sure we always send all of the changes
return return
@ -394,7 +396,7 @@ async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, displ
"""This function takes more vital information, communicates with a formatter and constructs DiscordMessage with it. """This function takes more vital information, communicates with a formatter and constructs DiscordMessage with it.
It creates DiscordMessageMetadata object, LinkParser and Context. Prepares a comment """ It creates DiscordMessageMetadata object, LinkParser and Context. Prepares a comment """
from src.misc import LinkParser from src.misc import LinkParser
LinkParser = LinkParser() LinkParser = LinkParser(wiki.client.WIKI_ARTICLE_PATH)
metadata = DiscordMessageMetadata("POST", rev_id=change.get("revid", None), log_id=change.get("logid", None), metadata = DiscordMessageMetadata("POST", rev_id=change.get("revid", None), log_id=change.get("logid", None),
page_id=change.get("pageid", None)) page_id=change.get("pageid", None))
context = Context("embed" if display_options.display > 0 else "compact", "recentchanges", webhooks, wiki.client, langs[display_options.lang]["rc_formatters"], prepare_settings(display_options.display)) context = Context("embed" if display_options.display > 0 else "compact", "recentchanges", webhooks, wiki.client, langs[display_options.lang]["rc_formatters"], prepare_settings(display_options.display))
@ -446,7 +448,7 @@ async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, displ
else: else:
raise raise
if identification_string in ("delete/delete", "delete/delete_redir"): # TODO Move it into a hook? if identification_string in ("delete/delete", "delete/delete_redir"): # TODO Move it into a hook?
wiki.delete_messages(dict(pageid=change.get("pageid"))) wiki.delete_messages(dict(page_id=change.get("pageid")))
elif identification_string == "delete/event": elif identification_string == "delete/event":
logparams = change.get('logparams', {"ids": []}) logparams = change.get('logparams', {"ids": []})
if settings["appearance"]["mode"] == "embed": if settings["appearance"]["mode"] == "embed":

View file

@ -1,22 +0,0 @@
import logging, time, asyncio
logger = logging.getLogger("rcgcdw.ratelimiter")
class RateLimiter:
def __init__(self):
self.timeout_until = 0
def timeout_add(self, timeout: float):
"""This function sets a new timeout"""
self.timeout_until = time.time() + timeout
#logger.debug("Added {} timeout".format(timeout))
async def timeout_wait(self):
"""This awaitable calculates the time to wait according to timeout_until, does not wait if it's past the timeout to not skip a cycle"""
calculated_timeout = self.timeout_until - time.time()
#logger.debug("Waiting {}".format(calculated_timeout))
if calculated_timeout > 0:
await asyncio.sleep(calculated_timeout)
def timeout_raw(self):
return self.timeout_until - time.time()