Further developments

This commit is contained in:
Frisk 2021-07-09 14:55:23 +02:00
parent f72c21faf9
commit fa115239a2
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
7 changed files with 55 additions and 29 deletions

View file

@ -27,8 +27,7 @@ class Client:
""" """
A client for interacting with RcGcDw when creating formatters or hooks. A client for interacting with RcGcDw when creating formatters or hooks.
""" """
def __init__(self, hooks, wiki): def __init__(self, wiki):
self._formatters = hooks
self.__recent_changes: Wiki = wiki self.__recent_changes: Wiki = wiki
self.WIKI_API_PATH: str = src.misc.WIKI_API_PATH self.WIKI_API_PATH: str = src.misc.WIKI_API_PATH
self.WIKI_ARTICLE_PATH: str = src.misc.WIKI_ARTICLE_PATH self.WIKI_ARTICLE_PATH: str = src.misc.WIKI_ARTICLE_PATH
@ -39,10 +38,6 @@ class Client:
self.LinkParser: type(src.misc.LinkParser) = src.misc.LinkParser self.LinkParser: type(src.misc.LinkParser) = src.misc.LinkParser
#self.make_api_request: src.rc.wiki.__recent_changes.api_request = self.__recent_changes.api_request #self.make_api_request: src.rc.wiki.__recent_changes.api_request = self.__recent_changes.api_request
def refresh_internal_data(self):
"""Refreshes internal storage data for wiki tags and MediaWiki messages."""
self.__recent_changes.init_info()
@property @property
def namespaces(self) -> dict: def namespaces(self) -> dict:
"""Return a dict of namespaces, if None return empty dict""" """Return a dict of namespaces, if None return empty dict"""
@ -88,9 +83,6 @@ class Client:
""" """
return self.__recent_changes.api_request(params, *json_path, timeout=timeout, allow_redirects=allow_redirects) return self.__recent_changes.api_request(params, *json_path, timeout=timeout, allow_redirects=allow_redirects)
def get_formatters(self):
return self._formatters
def get_ipmapper(self) -> dict: def get_ipmapper(self) -> dict:
"""Returns a dict mapping IPs with amount of their edits""" """Returns a dict mapping IPs with amount of their edits"""
return self.__recent_changes.map_ips return self.__recent_changes.map_ips

View file

@ -32,9 +32,9 @@ if TYPE_CHECKING:
logger = logging.getLogger("src.api.util") logger = logging.getLogger("src.api.util")
def default_message(event: str, formatter_hooks: dict) -> Callable: def default_message(event: str, display: str, formatter_hooks: dict) -> Callable:
"""Returns a method of a formatter responsible for the event or None if such does not exist.""" """Returns a method of a formatter responsible for the event or None if such does not exist."""
return formatter_hooks.get(event, formatter_hooks.get("generic", formatter_hooks["no_formatter"])) return formatter_hooks[display].get(event, formatter_hooks.get("generic", formatter_hooks["no_formatter"]))
def clean_link(link: str) -> str: def clean_link(link: str) -> str:

View file

@ -10,7 +10,7 @@ from typing import Generator
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from redis_connector import redis from redis_connector import Redis
from src.argparser import command_line_args from src.argparser import command_line_args
from src.config import settings from src.config import settings
from src.database import db from src.database import db
@ -47,7 +47,7 @@ async def populate_wikis():
async with db.pool().acquire() as connection: async with db.pool().acquire() as connection:
async with connection.transaction(): async with connection.transaction():
async for db_wiki in connection.cursor('SELECT DISTINCT wiki, rcid, postid FROM rcgcdw'): async for db_wiki in connection.cursor('SELECT DISTINCT wiki, rcid, postid FROM rcgcdw'):
domains.new_wiki(Wiki(db_wiki["wiki"], db_wiki["rcid"], db_wiki["postid"])) await domains.new_wiki(Wiki(db_wiki["wiki"], db_wiki["rcid"], db_wiki["postid"]))
queue_limit = settings.get("queue_limit", 30) queue_limit = settings.get("queue_limit", 30)
QueuedWiki = namedtuple("QueuedWiki", ['url', 'amount']) QueuedWiki = namedtuple("QueuedWiki", ['url', 'amount'])
@ -548,6 +548,7 @@ async def main_loop():
await db.setup_connection() await db.setup_connection()
logger.debug("Connection type: {}".format(db.connection)) logger.debug("Connection type: {}".format(db.connection))
await populate_wikis() await populate_wikis()
redis = Redis(domains)
await redis.connect() await redis.connect()
await redis.pubsub() await redis.pubsub()
domains.run_all_domains() domains.run_all_domains()
@ -562,7 +563,8 @@ async def main_loop():
# loop.set_exception_handler(global_exception_handler) # loop.set_exception_handler(global_exception_handler)
try: try:
main_tasks = {"wiki_scanner": asyncio.create_task(wiki_scanner()), "message_sender": asyncio.create_task(message_sender()), main_tasks = {"wiki_scanner": asyncio.create_task(wiki_scanner()), "message_sender": asyncio.create_task(message_sender()),
"discussion_handler": asyncio.create_task(discussion_handler()), "database_updates": asyncio.create_task(DBHandler.update_db())} "discussion_handler": asyncio.create_task(discussion_handler()), "database_updates": asyncio.create_task(DBHandler.update_db()),
"redis_updates": asyncio.create_task(redis.reader())}
main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"]) main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"])
main_tasks["database_updates_shield"] = asyncio.shield(main_tasks["database_updates"]) main_tasks["database_updates_shield"] = asyncio.shield(main_tasks["database_updates"])
await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"], main_tasks["database_updates"]) await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"], main_tasks["database_updates"])

View file

@ -47,7 +47,7 @@ class Domain:
logger.error(f"Tried to start a task for domain {self.name} however the task already exists!") logger.error(f"Tried to start a task for domain {self.name} however the task already exists!")
def remove_wiki(self, script_url: str): def remove_wiki(self, script_url: str):
self.wikis.pop(script_url)
def add_wiki(self, wiki: src.wiki.Wiki, first=False): def add_wiki(self, wiki: src.wiki.Wiki, first=False):
"""Adds a wiki to domain list. """Adds a wiki to domain list.

View file

@ -14,7 +14,7 @@ class DomainManager:
def __init__(self): def __init__(self):
self.domains: dict[str, Domain] = {} self.domains: dict[str, Domain] = {}
def new_wiki(self, wiki: Wiki): async def new_wiki(self, wiki: Wiki):
"""Finds a domain for the wiki and adds a wiki to the domain object. """Finds a domain for the wiki and adds a wiki to the domain object.
:parameter wiki - Wiki object to be added""" :parameter wiki - Wiki object to be added"""
@ -22,7 +22,17 @@ class DomainManager:
try: try:
self.domains[wiki_domain].add_wiki(wiki) self.domains[wiki_domain].add_wiki(wiki)
except KeyError: except KeyError:
self.new_domain(wiki_domain).add_wiki(wiki) new_domain = await self.new_domain(wiki_domain)
new_domain.add_wiki(wiki)
def remove_wiki(self, script_url: str):
wiki_domain = self.get_domain(script_url)
try:
domain = self.domains[wiki_domain]
except KeyError:
raise NoDomain
else:
domain.remove_wiki(script_url)
@staticmethod @staticmethod
def get_domain(url: str) -> str: def get_domain(url: str) -> str:

View file

@ -2,15 +2,20 @@ import asyncio
import aioredis import aioredis
import async_timeout import async_timeout
import logging import logging
from typing import Optional from typing import Optional, TYPE_CHECKING
from src.config import settings from src.config import settings
from src.wiki import Wiki
logger = logging.getLogger("rcgcdb.redisconnector") logger = logging.getLogger("rcgcdb.redisconnector")
if TYPE_CHECKING:
from src.domain_manager import DomainManager
class Redis: class Redis:
def __init__(self): def __init__(self, domain_manager):
self.pub_connection: Optional[aioredis.connection] = None self.pub_connection: Optional[aioredis.connection] = None
self.stat_connection: Optional[aioredis.connection] = None self.stat_connection: Optional[aioredis.connection] = None
self.domain_manager: DomainManager = domain_manager
async def reader(self): async def reader(self):
"""Based on code from https://aioredis.readthedocs.io/en/latest/getting-started/#pubsub-mode""" """Based on code from https://aioredis.readthedocs.io/en/latest/getting-started/#pubsub-mode"""
@ -21,11 +26,24 @@ class Redis:
if message is not None: if message is not None:
print(f"(Reader) Message Received: {message}") print(f"(Reader) Message Received: {message}")
logger.debug(f"(Reader) Message Received: {message}") logger.debug(f"(Reader) Message Received: {message}")
await self.process_changes(message["data"])
await asyncio.sleep(1.0) await asyncio.sleep(1.0)
except asyncio.TimeoutError: # TODO Better handler except asyncio.TimeoutError: # TODO Better handler
pass pass
except aioredis.exceptions.ConnectionError: except aioredis.exceptions.ConnectionError:
pass pass
except asyncio.CancelledError:
# TODO Send a message about shutdown
raise NotImplementedError
async def process_changes(self, data: str):
data = data.split(" ")
if data[0] == "REMOVE":
self.domain_manager.remove_wiki(data[1]) # TODO Add response to source
elif data[0] == "ADD": # ADD https://new_wiki.somedamain.com 43 1 where 43 stands for rc_id and 1 for discussion_id
wiki = Wiki(data[1], int(data[2]), int(data[3])) # TODO This might raise an issue if non-int value
await self.domain_manager.new_wiki(wiki)
async def connect(self): async def connect(self):
self.stat_connection = await aioredis.from_url("redis://" + settings["redis_host"], encoding="UTF-8") self.stat_connection = await aioredis.from_url("redis://" + settings["redis_host"], encoding="UTF-8")
@ -33,7 +51,3 @@ class Redis:
async def pubsub(self): async def pubsub(self):
self.pub_connection = self.stat_connection.pubsub() self.pub_connection = self.stat_connection.pubsub()
await self.pub_connection.subscribe("rcgcdb_updates") await self.pub_connection.subscribe("rcgcdb_updates")
asyncio.create_task(self.reader())
redis = Redis()

View file

@ -12,6 +12,8 @@ from src.database import db
from src.queue_handler import DBHandler from src.queue_handler import DBHandler
from src.formatters.rc import embed_formatter, compact_formatter from src.formatters.rc import embed_formatter, compact_formatter
from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter
from src.api.hooks import formatter_hooks
from src.api.client import Client
from src.misc import parse_link from src.misc import parse_link
from src.i18n import langs from src.i18n import langs
from src.wiki_ratelimiter import RateLimiter from src.wiki_ratelimiter import RateLimiter
@ -41,6 +43,7 @@ class Wiki:
self.mw_messages: Optional[MWMessages] = None self.mw_messages: Optional[MWMessages] = None
self.first_fetch_done: bool = False self.first_fetch_done: bool = False
self.domain: Optional[Domain] = None self.domain: Optional[Domain] = None
self.client: Client = Client(self)
@property @property
def rc_id(self): def rc_id(self):
@ -224,6 +227,11 @@ class Wiki:
break break
await process_cats(change, self, categorize_events) await process_cats(change, self, categorize_events)
else: # adequate amount of changes else: # adequate amount of changes
for tag in request["query"]["tags"]:
try:
self.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text()
except KeyError:
self.tags[tag["name"]] = None
targets = await self.generate_targets() targets = await self.generate_targets()
message_list = defaultdict(list) message_list = defaultdict(list)
for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up
@ -231,7 +239,7 @@ class Wiki:
if highest_id is None or change["rcid"] > highest_id: # make sure that the highest_rc is really highest rcid but do allow other entries with potentially lesser rcids come after without breaking the cycle if highest_id is None or change["rcid"] > highest_id: # make sure that the highest_rc is really highest rcid but do allow other entries with potentially lesser rcids come after without breaking the cycle
highest_id = change["rcid"] highest_id = change["rcid"]
for combination, webhooks in targets.items(): for combination, webhooks in targets.items():
message = await rc_processor(self, change, categorize_events, ) message = await rc_processor(self, change, categorize_events, combination, webhooks)
async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, display_options: namedtuple("Settings", ["lang", "display"]), webhooks: list): async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, display_options: namedtuple("Settings", ["lang", "display"]), webhooks: list):
@ -239,12 +247,12 @@ async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, displ
LinkParser = LinkParser() LinkParser = LinkParser()
metadata = src.discord.DiscordMessageMetadata("POST", rev_id=change.get("revid", None), log_id=change.get("logid", None), metadata = src.discord.DiscordMessageMetadata("POST", rev_id=change.get("revid", None), log_id=change.get("logid", None),
page_id=change.get("pageid", None)) page_id=change.get("pageid", None))
context = Context(display_options, webhooks, client) context = Context(display_options, webhooks, wiki.client)
if ("actionhidden" in change or "suppressed" in change) and "suppressed" not in settings[ if ("actionhidden" in change or "suppressed" in change) and "suppressed" not in settings[
"ignored"]: # if event is hidden using suppression "ignored"]: # if event is hidden using suppression
context.event = "suppressed" context.event = "suppressed"
try: try:
discord_message: Optional[src.discord.DiscordMessage] = default_message("suppressed", formatter_hooks)(context, change) discord_message: Optional[src.discord.DiscordMessage] = default_message("suppressed", display_options.display, formatter_hooks)(context, change)
except NoFormatter: except NoFormatter:
return return
except: except:
@ -290,16 +298,16 @@ async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, displ
else: else:
raise raise
if identification_string in ( if identification_string in (
"delete/delete", "delete/delete_redir") and AUTO_SUPPRESSION_ENABLED: # TODO Move it into a hook? "delete/delete", "delete/delete_redir"): # TODO Move it into a hook?
delete_messages(dict(pageid=change.get("pageid"))) delete_messages(dict(pageid=change.get("pageid")))
elif identification_string == "delete/event" and AUTO_SUPPRESSION_ENABLED: elif identification_string == "delete/event":
logparams = change.get('logparams', {"ids": []}) logparams = change.get('logparams', {"ids": []})
if settings["appearance"]["mode"] == "embed": if settings["appearance"]["mode"] == "embed":
redact_messages(logparams.get("ids", []), 1, logparams.get("new", {})) redact_messages(logparams.get("ids", []), 1, logparams.get("new", {}))
else: else:
for logid in logparams.get("ids", []): for logid in logparams.get("ids", []):
delete_messages(dict(logid=logid)) delete_messages(dict(logid=logid))
elif identification_string == "delete/revision" and AUTO_SUPPRESSION_ENABLED: elif identification_string == "delete/revision":
logparams = change.get('logparams', {"ids": []}) logparams = change.get('logparams', {"ids": []})
if settings["appearance"]["mode"] == "embed": if settings["appearance"]["mode"] == "embed":
redact_messages(logparams.get("ids", []), 0, logparams.get("new", {})) redact_messages(logparams.get("ids", []), 0, logparams.get("new", {}))