mirror of
https://gitlab.com/chicken-riders/RcGcDb.git
synced 2025-02-23 00:54:09 +00:00
Updates
This commit is contained in:
parent
eb9312a637
commit
2b09e7019a
|
@ -1,4 +1,4 @@
|
||||||
python_version >= '3.6'
|
python_version >= '3.9'
|
||||||
aiohttp >= 3.6.2
|
aiohttp >= 3.6.2
|
||||||
lxml >= 4.2.1
|
lxml >= 4.2.1
|
||||||
nest-asyncio >= 1.4.0
|
nest-asyncio >= 1.4.0
|
||||||
|
|
351
src/bot.py
351
src/bot.py
|
@ -10,10 +10,9 @@ from typing import Generator
|
||||||
|
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
from redis_connector import Redis
|
|
||||||
from src.argparser import command_line_args
|
from src.argparser import command_line_args
|
||||||
from src.config import settings
|
from src.config import settings
|
||||||
from src.database import db
|
from src.database import db_connection
|
||||||
from src.exceptions import *
|
from src.exceptions import *
|
||||||
from src.misc import get_paths, get_domain
|
from src.misc import get_paths, get_domain
|
||||||
from src.msgqueue import messagequeue, send_to_discord
|
from src.msgqueue import messagequeue, send_to_discord
|
||||||
|
@ -28,7 +27,7 @@ from src.domain_manager import domains
|
||||||
logging.config.dictConfig(settings["logging"])
|
logging.config.dictConfig(settings["logging"])
|
||||||
logger = logging.getLogger("rcgcdb.bot")
|
logger = logging.getLogger("rcgcdb.bot")
|
||||||
logger.debug("Current settings: {settings}".format(settings=settings))
|
logger.debug("Current settings: {settings}".format(settings=settings))
|
||||||
logger.info("RcGcDb v{} is starting up.".format("1.1"))
|
logger.info("RcGcDb v{} is starting up.".format("2.0"))
|
||||||
|
|
||||||
if command_line_args.debug:
|
if command_line_args.debug:
|
||||||
logger.info("Debug mode is active!")
|
logger.info("Debug mode is active!")
|
||||||
|
@ -38,338 +37,23 @@ all_wikis: dict = {}
|
||||||
|
|
||||||
main_tasks: dict = {}
|
main_tasks: dict = {}
|
||||||
|
|
||||||
|
db = db_connection()
|
||||||
|
|
||||||
# First populate the all_wikis list with every wiki
|
# First populate the all_wikis list with every wiki
|
||||||
# Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests
|
# Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests
|
||||||
# 2. Easier to code
|
# 2. Easier to code
|
||||||
|
|
||||||
async def populate_wikis():
|
async def populate_wikis():
|
||||||
|
logger.info("Populating domain manager with wikis...")
|
||||||
|
start = time.time()
|
||||||
async with db.pool().acquire() as connection:
|
async with db.pool().acquire() as connection:
|
||||||
async with connection.transaction():
|
async with connection.transaction():
|
||||||
async for db_wiki in connection.cursor('SELECT DISTINCT wiki, rcid, postid FROM rcgcdw'):
|
async for db_wiki in connection.cursor('SELECT DISTINCT wiki, rcid, postid FROM rcgcdw'):
|
||||||
await domains.new_wiki(Wiki(db_wiki["wiki"], db_wiki["rcid"], db_wiki["postid"]))
|
try:
|
||||||
|
await domains.new_wiki(Wiki(db_wiki["wiki"], db_wiki["rcid"], db_wiki["postid"]))
|
||||||
queue_limit = settings.get("queue_limit", 30)
|
except WikiExists: # Can rarely happen when Pub/Sub registers wiki before population
|
||||||
QueuedWiki = namedtuple("QueuedWiki", ['url', 'amount'])
|
pass
|
||||||
|
logger.info("Populating domain manager with wikis took {} seconds".format(time.time()-start))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class RcQueue:
|
|
||||||
def __init__(self):
|
|
||||||
self.domain_list = {}
|
|
||||||
self.to_remove = []
|
|
||||||
self.irc_mapping = {}
|
|
||||||
|
|
||||||
async def start_group(self, group, initial_wikis):
|
|
||||||
"""Starts a task for given domain group"""
|
|
||||||
if group not in self.domain_list:
|
|
||||||
if group in self.irc_mapping: # Hopefully there are no race conditions....
|
|
||||||
irc_connection = self.irc_mapping[group]
|
|
||||||
else:
|
|
||||||
for irc_server in settings["irc_servers"].keys():
|
|
||||||
if group in settings["irc_servers"][irc_server]["domains"]:
|
|
||||||
irc_connection = AioIRCCat(settings["irc_servers"][irc_server]["irc_channel_mapping"], all_wikis)
|
|
||||||
for domain in settings["irc_servers"][irc_server]["domains"]:
|
|
||||||
self.irc_mapping[domain] = irc_connection
|
|
||||||
irc_connection.connect(settings["irc_servers"][irc_server]["irc_host"], settings["irc_servers"][irc_server]["irc_port"], settings["irc_servers"][irc_server]["irc_name"])
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
irc_connection = None
|
|
||||||
self.domain_list[group] = {"task": asyncio.create_task(scan_group(group)), "last_rowid": 0, "query": LimitedList(initial_wikis), "rate_limiter": RateLimiter(), "irc": irc_connection}
|
|
||||||
logger.debug(self.domain_list[group])
|
|
||||||
else:
|
|
||||||
raise KeyError
|
|
||||||
|
|
||||||
async def remove_wiki_from_group(self, wiki):
|
|
||||||
"""Removes a wiki from query of given domain group"""
|
|
||||||
logger.debug(f"Removing {wiki} from group queue.")
|
|
||||||
group = get_domain(wiki)
|
|
||||||
self[group]["query"] = LimitedList([x for x in self[group]["query"] if x.url != wiki])
|
|
||||||
all_wikis[wiki].rc_active = -1
|
|
||||||
if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task
|
|
||||||
logger.debug(f"{group} no longer has any wikis queued!")
|
|
||||||
if not await self.check_if_domain_in_db(group):
|
|
||||||
await self.stop_task_group(group)
|
|
||||||
else:
|
|
||||||
logger.debug(f"But there are still wikis for it in DB!")
|
|
||||||
|
|
||||||
async def stop_task_group(self, group):
|
|
||||||
self[group]["task"].cancel()
|
|
||||||
del self.domain_list[group]
|
|
||||||
|
|
||||||
async def check_if_domain_in_db(self, domain):
|
|
||||||
async with db.pool().acquire() as connection:
|
|
||||||
async with connection.transaction():
|
|
||||||
async for wiki in connection.cursor('SELECT DISTINCT wiki FROM rcgcdw WHERE rcid != -1;'):
|
|
||||||
if get_domain(wiki["wiki"]) == domain:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def retrieve_next_queued(self, group) -> Generator[QueuedWiki, None, None]:
|
|
||||||
"""Retrives next wiki in the queue for given domain"""
|
|
||||||
if len(self.domain_list[group]["query"]) == 0:
|
|
||||||
# make sure we are not removing the group because entire domain group went down, it's expensive - yes, but could theoretically cause issues
|
|
||||||
raise QueueEmpty
|
|
||||||
# if self.check_if_domain_in_db(group):
|
|
||||||
# #logger.warning("Domain group {} has 0 elements yet there are still wikis in the db of the same domain group! This may indicate we ran over the list too fast. We are waiting...".format(group))
|
|
||||||
# raise QueueEmpty
|
|
||||||
# else:
|
|
||||||
# await self.stop_task_group(group)
|
|
||||||
# return
|
|
||||||
try:
|
|
||||||
yield self.domain_list[group]["query"][0]
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
raise
|
|
||||||
except:
|
|
||||||
if command_line_args.debug:
|
|
||||||
logger.exception("RC Group exception")
|
|
||||||
shutdown(asyncio.get_event_loop())
|
|
||||||
else:
|
|
||||||
logger.exception("Group task returned error")
|
|
||||||
await generic_msg_sender_exception_logger(traceback.format_exc(), "Group task error logger", Group=group)
|
|
||||||
else:
|
|
||||||
self.domain_list[group]["query"].pop(0)
|
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def filter_rc_active(wiki_obj):
|
|
||||||
return wiki_obj[1].rc_active is None or wiki_obj[1].rc_active > -1
|
|
||||||
|
|
||||||
async def update_queues(self):
|
|
||||||
"""Makes a round on rcgcdb DB and looks for updates to the queues in self.domain_list"""
|
|
||||||
try:
|
|
||||||
self.to_remove = [x[0] for x in filter(self.filter_rc_active, all_wikis.items())] # first populate this list and remove wikis that are still in the db, clean up the rest
|
|
||||||
full = set()
|
|
||||||
async with db.pool().acquire() as connection:
|
|
||||||
async with connection.transaction():
|
|
||||||
async for db_wiki in connection.cursor('SELECT DISTINCT wiki, row_number() OVER (ORDER BY webhook) AS rowid, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL ORDER BY webhook'):
|
|
||||||
domain = get_domain(db_wiki["wiki"])
|
|
||||||
try:
|
|
||||||
if db_wiki["wiki"] not in all_wikis:
|
|
||||||
raise AssertionError
|
|
||||||
self.to_remove.remove(db_wiki["wiki"])
|
|
||||||
except AssertionError:
|
|
||||||
all_wikis[db_wiki["wiki"]] = Wiki()
|
|
||||||
all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"]
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
if domain in full:
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
current_domain: dict = self[domain]
|
|
||||||
if current_domain["irc"]:
|
|
||||||
logger.debug("DOMAIN LIST FOR IRC: {}".format(current_domain["irc"].updated))
|
|
||||||
logger.debug("CURRENT DOMAIN INFO: {}".format(domain))
|
|
||||||
logger.debug("IS WIKI IN A LIST?: {}".format(db_wiki["wiki"] in current_domain["irc"].updated))
|
|
||||||
logger.debug("LAST CHECK FOR THE WIKI {} IS {}".format(db_wiki["wiki"], all_wikis[db_wiki["wiki"]].last_check))
|
|
||||||
if db_wiki["wiki"] in current_domain["irc"].updated: # Priority wikis are the ones with IRC, if they get updated forcefully add them to queue
|
|
||||||
current_domain["irc"].updated.remove(db_wiki["wiki"])
|
|
||||||
current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20), forced=True)
|
|
||||||
logger.debug("Updated in IRC so adding to queue.")
|
|
||||||
continue
|
|
||||||
elif all_wikis[db_wiki["wiki"]].last_check+settings["irc_overtime"] < time.time(): # if time went by and wiki should be updated now use default mechanics
|
|
||||||
logger.debug("Overtime so adding to queue.")
|
|
||||||
pass
|
|
||||||
else: # Continue without adding
|
|
||||||
logger.debug("No condition fulfilled so skipping.")
|
|
||||||
continue
|
|
||||||
if not db_wiki["rowid"] < current_domain["last_rowid"]:
|
|
||||||
current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20))
|
|
||||||
except KeyError:
|
|
||||||
await self.start_group(domain, [QueuedWiki(db_wiki["wiki"], 20)])
|
|
||||||
logger.info("A new domain group ({}) has been added since last time, adding it to the domain_list and starting a task...".format(domain))
|
|
||||||
except ListFull:
|
|
||||||
full.add(domain)
|
|
||||||
current_domain["last_rowid"] = db_wiki["rowid"]
|
|
||||||
continue
|
|
||||||
for wiki in self.to_remove:
|
|
||||||
await self.remove_wiki_from_group(wiki)
|
|
||||||
for group, data in self.domain_list.items():
|
|
||||||
if group not in full:
|
|
||||||
self[group]["last_rowid"] = 0 # iter reached the end without being stuck on full list
|
|
||||||
logger.debug("Current domain_list structure: {}".format(self.domain_list))
|
|
||||||
except:
|
|
||||||
if command_line_args.debug:
|
|
||||||
logger.exception("Queue error!")
|
|
||||||
shutdown(asyncio.get_event_loop())
|
|
||||||
else:
|
|
||||||
logger.exception("Exception on queue updater")
|
|
||||||
await generic_msg_sender_exception_logger(traceback.format_exc(), "Queue updator")
|
|
||||||
|
|
||||||
|
|
||||||
def __getitem__(self, item):
|
|
||||||
"""Returns the query of given domain group"""
|
|
||||||
return self.domain_list[item]
|
|
||||||
|
|
||||||
def __setitem__(self, key, value):
|
|
||||||
self.domain_list[key] = value
|
|
||||||
|
|
||||||
|
|
||||||
rcqueue = RcQueue()
|
|
||||||
|
|
||||||
|
|
||||||
# Start queueing logic
|
|
||||||
|
|
||||||
def calculate_delay_for_group(group_length: int) -> float:
|
|
||||||
"""Calculate the delay between fetching each wiki to avoid rate limits"""
|
|
||||||
min_delay = 60 / settings["max_requests_per_minute"]
|
|
||||||
if group_length == 0:
|
|
||||||
group_length = 1
|
|
||||||
if (group_length * min_delay) < settings["minimal_cooldown_per_wiki_in_sec"]:
|
|
||||||
return settings["minimal_cooldown_per_wiki_in_sec"] / group_length
|
|
||||||
else:
|
|
||||||
return 0.0
|
|
||||||
|
|
||||||
|
|
||||||
async def generate_targets(wiki_url: str, additional_requirements: str) -> defaultdict:
|
|
||||||
"""To minimize the amount of requests, we generate a list of language/display mode combinations to create messages for
|
|
||||||
this way we can send the same message to multiple webhooks which have the same wiki and settings without doing another
|
|
||||||
request to the wiki just to duplicate the message.
|
|
||||||
"""
|
|
||||||
combinations = defaultdict(list)
|
|
||||||
async with db.pool().acquire() as connection:
|
|
||||||
async with connection.transaction():
|
|
||||||
async for webhook in connection.cursor('SELECT webhook, lang, display FROM rcgcdw WHERE wiki = $1 {}'.format(additional_requirements), wiki_url):
|
|
||||||
combination = (webhook["lang"], webhook["display"])
|
|
||||||
combinations[combination].append(webhook["webhook"])
|
|
||||||
return combinations
|
|
||||||
|
|
||||||
|
|
||||||
async def generate_domain_groups():
|
|
||||||
"""Generate a list of wikis per domain (fandom.com, wikipedia.org etc.)
|
|
||||||
|
|
||||||
:returns tuple[str, list]"""
|
|
||||||
domain_wikis = defaultdict(list)
|
|
||||||
async with db.pool().acquire() as connection:
|
|
||||||
async with connection.transaction():
|
|
||||||
async for db_wiki in connection.cursor('SELECT DISTINCT wiki, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL'):
|
|
||||||
domain_wikis[get_domain(db_wiki["wiki"])].append(QueuedWiki(db_wiki["wiki"], 20))
|
|
||||||
for group, db_wikis in domain_wikis.items():
|
|
||||||
yield group, db_wikis
|
|
||||||
|
|
||||||
|
|
||||||
async def scan_group(group: str):
|
|
||||||
rate_limiter = rcqueue[group]["rate_limiter"]
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
async with rcqueue.retrieve_next_queued(group) as queued_wiki: # acquire next wiki in queue
|
|
||||||
if "irc" not in rcqueue[group]:
|
|
||||||
await asyncio.sleep(calculate_delay_for_group(len(rcqueue[group]["query"])))
|
|
||||||
logger.debug("Wiki {}".format(queued_wiki.url))
|
|
||||||
local_wiki = all_wikis[queued_wiki.url] # set a reference to a wiki object from memory
|
|
||||||
extended = False
|
|
||||||
if local_wiki.mw_messages is None:
|
|
||||||
extended = True
|
|
||||||
async with aiohttp.ClientSession(headers=settings["header"],
|
|
||||||
timeout=aiohttp.ClientTimeout(6.0)) as session:
|
|
||||||
try:
|
|
||||||
wiki_response = await local_wiki.fetch_wiki(extended, queued_wiki.url, session, rate_limiter, amount=queued_wiki.amount)
|
|
||||||
await local_wiki.check_status(queued_wiki.url, wiki_response.status)
|
|
||||||
except (WikiServerError, WikiError):
|
|
||||||
logger.error("Exeption when fetching the wiki")
|
|
||||||
continue # ignore this wiki if it throws errors
|
|
||||||
try:
|
|
||||||
recent_changes_resp = await wiki_response.json()
|
|
||||||
if not isinstance(recent_changes_resp, dict):
|
|
||||||
logger.error(f"recent_changes_resp has a bad type, found {type(recent_changes_resp)}, __repr__ here: {recent_changes_resp}.")
|
|
||||||
raise TypeError
|
|
||||||
if "errors" in recent_changes_resp:
|
|
||||||
error = recent_changes_resp.get('errors')
|
|
||||||
if error["code"] == "readapidenied":
|
|
||||||
await local_wiki.fail_add(queued_wiki.url, 410)
|
|
||||||
continue
|
|
||||||
raise WikiError
|
|
||||||
except aiohttp.ContentTypeError:
|
|
||||||
logger.exception("Wiki seems to be resulting in non-json content.")
|
|
||||||
await local_wiki.fail_add(queued_wiki.url, 410)
|
|
||||||
continue
|
|
||||||
except:
|
|
||||||
logger.exception("On loading json of response.")
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
recent_changes = recent_changes_resp['query']['recentchanges']
|
|
||||||
recent_changes.reverse()
|
|
||||||
except KeyError:
|
|
||||||
logger.error("recent_changes_resp returned KeyError. skipping this check. (usually this happens when the wiki doesn't respond properly, it's pretty normal)")
|
|
||||||
continue
|
|
||||||
if extended:
|
|
||||||
await process_mwmsgs(recent_changes_resp, local_wiki, mw_msgs)
|
|
||||||
if local_wiki.rc_active in (0, None, -1): # new wiki, just get the last rc to not spam the channel, -1 for -1 to NULL changes
|
|
||||||
if len(recent_changes) > 0:
|
|
||||||
local_wiki.rc_active = recent_changes[-1]["rcid"]
|
|
||||||
DBHandler.add(queued_wiki.url, recent_changes[-1]["rcid"])
|
|
||||||
else:
|
|
||||||
local_wiki.rc_active = 0
|
|
||||||
DBHandler.add(queued_wiki.url, 0)
|
|
||||||
await DBHandler.update_db()
|
|
||||||
continue
|
|
||||||
categorize_events = {}
|
|
||||||
targets = await generate_targets(queued_wiki.url, "AND (rcid != -1 OR rcid IS NULL)")
|
|
||||||
paths = get_paths(queued_wiki.url, recent_changes_resp)
|
|
||||||
new_events = 0
|
|
||||||
local_wiki.last_check = time.time() # on successful check, save new last check time
|
|
||||||
for change in recent_changes:
|
|
||||||
if change["rcid"] > local_wiki.rc_active and queued_wiki.amount != 450:
|
|
||||||
new_events += 1
|
|
||||||
if new_events == 20:
|
|
||||||
# call the function again with max limit for more results, ignore the ones in this request
|
|
||||||
logger.debug("There were too many new events, queuing wiki with 450 limit.")
|
|
||||||
rcqueue[group]["query"].insert(1, QueuedWiki(queued_wiki.url, 450))
|
|
||||||
break
|
|
||||||
await process_cats(change, local_wiki, mw_msgs, categorize_events)
|
|
||||||
else: # If we broke from previous loop (too many changes) don't execute sending messages here
|
|
||||||
highest_rc = local_wiki.rc_active # setup var for later use
|
|
||||||
message_list = defaultdict(list)
|
|
||||||
for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up
|
|
||||||
if change["rcid"] > local_wiki.rc_active:
|
|
||||||
if highest_rc is None or change["rcid"] > highest_rc: # make sure that the highest_rc is really highest rcid but do allow other entries with potentially lesser rcids come after without breaking the cycle
|
|
||||||
highest_rc = change["rcid"]
|
|
||||||
for target in targets.items():
|
|
||||||
try:
|
|
||||||
message = await essential_info(change, categorize_events, local_wiki, target, paths,
|
|
||||||
recent_changes_resp, rate_limiter)
|
|
||||||
if message is not None:
|
|
||||||
message_list[target[0]].append(message)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
raise
|
|
||||||
except:
|
|
||||||
if command_line_args.debug:
|
|
||||||
logger.exception("Exception on RC formatter")
|
|
||||||
raise
|
|
||||||
else:
|
|
||||||
logger.exception("Exception on RC formatter")
|
|
||||||
await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in RC formatter", Wiki=queued_wiki.url, Change=str(change)[0:1000])
|
|
||||||
# Lets stack the messages
|
|
||||||
for messages in message_list.values():
|
|
||||||
messages = stack_message_list(messages)
|
|
||||||
for message in messages:
|
|
||||||
await send_to_discord(message)
|
|
||||||
if recent_changes: # we don't have to test for highest_rc being null, because if there are no RC entries recent_changes will be an empty list which will result in false in here and DO NOT save the value
|
|
||||||
local_wiki.rc_active = highest_rc
|
|
||||||
DBHandler.add(queued_wiki.url, highest_rc)
|
|
||||||
await DBHandler.update_db()
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
return
|
|
||||||
except QueueEmpty:
|
|
||||||
await asyncio.sleep(10.0)
|
|
||||||
continue
|
|
||||||
|
|
||||||
|
|
||||||
async def wiki_scanner():
|
|
||||||
"""Wiki scanner is spawned as a task which purpose is to continuously run over wikis in the DB, fetching recent changes
|
|
||||||
to add messages based on the changes to message queue later handled by message_sender coroutine."""
|
|
||||||
try:
|
|
||||||
async for group, db_wikis in generate_domain_groups(): # First scan
|
|
||||||
await rcqueue.start_group(group, db_wikis)
|
|
||||||
while True:
|
|
||||||
await asyncio.sleep(20.0)
|
|
||||||
await rcqueue.update_queues()
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
for item in rcqueue.domain_list.values(): # cancel running tasks
|
|
||||||
item["task"].cancel()
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
async def message_sender():
|
async def message_sender():
|
||||||
|
@ -508,7 +192,6 @@ async def discussion_handler():
|
||||||
await generic_msg_sender_exception_logger(traceback.format_exc(), "Discussion handler task exception", Wiki=db_wiki["wiki"])
|
await generic_msg_sender_exception_logger(traceback.format_exc(), "Discussion handler task exception", Wiki=db_wiki["wiki"])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def shutdown(loop, signal=None):
|
def shutdown(loop, signal=None):
|
||||||
global main_tasks
|
global main_tasks
|
||||||
loop.remove_signal_handler(signal)
|
loop.remove_signal_handler(signal)
|
||||||
|
@ -546,11 +229,10 @@ async def main_loop():
|
||||||
nest_asyncio.apply(loop)
|
nest_asyncio.apply(loop)
|
||||||
# Setup database connection
|
# Setup database connection
|
||||||
await db.setup_connection()
|
await db.setup_connection()
|
||||||
logger.debug("Connection type: {}".format(db.connection))
|
await db.create_pubsub_interface(domains.webhook_update)
|
||||||
|
logger.debug("Connection type: {}".format(db.connection_pool))
|
||||||
await populate_wikis()
|
await populate_wikis()
|
||||||
redis = Redis(domains)
|
# START LISTENER CONNECTION
|
||||||
await redis.connect()
|
|
||||||
await redis.pubsub()
|
|
||||||
domains.run_all_domains()
|
domains.run_all_domains()
|
||||||
try:
|
try:
|
||||||
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
|
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
|
||||||
|
@ -562,9 +244,8 @@ async def main_loop():
|
||||||
signals = (signal.SIGBREAK, signal.SIGTERM, signal.SIGINT)
|
signals = (signal.SIGBREAK, signal.SIGTERM, signal.SIGINT)
|
||||||
# loop.set_exception_handler(global_exception_handler)
|
# loop.set_exception_handler(global_exception_handler)
|
||||||
try:
|
try:
|
||||||
main_tasks = {"wiki_scanner": asyncio.create_task(wiki_scanner()), "message_sender": asyncio.create_task(message_sender()),
|
main_tasks = {"message_sender": asyncio.create_task(message_sender()),
|
||||||
"discussion_handler": asyncio.create_task(discussion_handler()), "database_updates": asyncio.create_task(DBHandler.update_db()),
|
"discussion_handler": asyncio.create_task(discussion_handler()), "database_updates": asyncio.create_task(DBHandler.update_db())}
|
||||||
"redis_updates": asyncio.create_task(redis.reader())}
|
|
||||||
main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"])
|
main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"])
|
||||||
main_tasks["database_updates_shield"] = asyncio.shield(main_tasks["database_updates"])
|
main_tasks["database_updates_shield"] = asyncio.shield(main_tasks["database_updates"])
|
||||||
await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"], main_tasks["database_updates"])
|
await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"], main_tasks["database_updates"])
|
||||||
|
@ -573,4 +254,4 @@ async def main_loop():
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
return
|
return
|
||||||
|
|
||||||
asyncio.run(main_loop(), debug=False)
|
asyncio.run(main_loop(), debug=command_line_args.debug)
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import asyncpg
|
import asyncpg
|
||||||
import logging
|
import logging
|
||||||
from typing import Optional
|
from typing import Optional, Callable
|
||||||
from src.config import settings
|
from src.config import settings
|
||||||
|
|
||||||
logger = logging.getLogger("rcgcdb.database")
|
logger = logging.getLogger("rcgcdb.database")
|
||||||
|
@ -8,23 +8,31 @@ logger = logging.getLogger("rcgcdb.database")
|
||||||
|
|
||||||
|
|
||||||
class db_connection:
|
class db_connection:
|
||||||
connection: Optional[asyncpg.Pool] = None
|
listener_connection: Optional[asyncpg.Connection] = None
|
||||||
|
connection_pool: Optional[asyncpg.Pool] = None
|
||||||
|
|
||||||
|
async def create_pubsub_interface(self, callback: Callable):
|
||||||
|
await self.listener_connection.add_listener("webhookupdates", callback)
|
||||||
|
|
||||||
async def setup_connection(self):
|
async def setup_connection(self):
|
||||||
# Establish a connection to an existing database named "test"
|
logger.debug("Setting up the Database connections...")
|
||||||
# as a "postgres" user.
|
# First, setup a separate connection for pub/sub listener
|
||||||
logger.debug("Setting up the Database connection...")
|
# It's mainly because I'm afraid that connection pool will be aggressive about inactive connections
|
||||||
self.connection = await asyncpg.create_pool(user=settings["pg_user"], host=settings.get("pg_host", "localhost"),
|
self.listener_connection = await asyncpg.connect(user=settings["pg_user"], host=settings.get("pg_host", "localhost"),
|
||||||
database=settings.get("pg_db", "rcgcdb"), password=settings.get("pg_pass"),
|
database=settings.get("pg_db", "rcgcdb"), password=settings.get("pg_pass"),
|
||||||
port=settings.get("pg_port", 5432))
|
port=settings.get("pg_port", 5432))
|
||||||
logger.debug("Database connection established! Connection: {}".format(self.connection))
|
self.connection_pool = await asyncpg.create_pool(user=settings["pg_user"], host=settings.get("pg_host", "localhost"),
|
||||||
|
database=settings.get("pg_db", "rcgcdb"), password=settings.get("pg_pass"),
|
||||||
|
port=settings.get("pg_port", 5432))
|
||||||
|
logger.debug("Database connection established! Connection: {}".format(self.connection_pool))
|
||||||
|
|
||||||
async def shutdown_connection(self):
|
async def shutdown_connection(self):
|
||||||
logger.debug("Shutting down database connection...")
|
logger.debug("Shutting down database connection...")
|
||||||
await self.connection.close()
|
await self.listener_connection.close()
|
||||||
|
await self.connection_pool.close()
|
||||||
|
|
||||||
def pool(self) -> asyncpg.Pool:
|
def pool(self) -> asyncpg.Pool:
|
||||||
return self.connection
|
return self.connection_pool
|
||||||
|
|
||||||
# Tried to make it a decorator but tbh won't probably work
|
# Tried to make it a decorator but tbh won't probably work
|
||||||
# async def in_transaction(self, func):
|
# async def in_transaction(self, func):
|
||||||
|
@ -37,7 +45,4 @@ class db_connection:
|
||||||
# async def query(self, string, *arg):
|
# async def query(self, string, *arg):
|
||||||
# async with self.connection.acquire() as connection:
|
# async with self.connection.acquire() as connection:
|
||||||
# async with connection.transaction():
|
# async with connection.transaction():
|
||||||
# return connection.cursor(string, *arg)
|
# return connection.cursor(string, *arg)
|
||||||
|
|
||||||
|
|
||||||
db = db_connection()
|
|
|
@ -55,6 +55,8 @@ class Domain:
|
||||||
:parameter wiki - Wiki object
|
:parameter wiki - Wiki object
|
||||||
:parameter first (optional) - bool indicating if wikis should be added as first or last in the ordered dict"""
|
:parameter first (optional) - bool indicating if wikis should be added as first or last in the ordered dict"""
|
||||||
wiki.set_domain(self)
|
wiki.set_domain(self)
|
||||||
|
if wiki.script_url in self.wikis:
|
||||||
|
raise WikiExists("Wiki {} exists in domain {}".format(wiki.script_url, self.name))
|
||||||
self.wikis[wiki.script_url] = wiki
|
self.wikis[wiki.script_url] = wiki
|
||||||
if first:
|
if first:
|
||||||
self.wikis.move_to_end(wiki.script_url, last=False)
|
self.wikis.move_to_end(wiki.script_url, last=False)
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from typing import TYPE_CHECKING, Optional
|
from typing import TYPE_CHECKING, Optional
|
||||||
from urllib.parse import urlparse, urlunparse
|
from urllib.parse import urlparse, urlunparse
|
||||||
|
|
||||||
|
import asyncpg
|
||||||
|
|
||||||
from src.config import settings
|
from src.config import settings
|
||||||
from src.domain import Domain
|
from src.domain import Domain
|
||||||
from src.irc_feed import AioIRCCat
|
from src.irc_feed import AioIRCCat
|
||||||
|
@ -14,6 +17,19 @@ class DomainManager:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.domains: dict[str, Domain] = {}
|
self.domains: dict[str, Domain] = {}
|
||||||
|
|
||||||
|
async def webhook_update(self, connection: asyncpg.Connection, pid: int, channel: str, payload: str):
|
||||||
|
"""Callback for database listener. Used to update our domain cache on changes such as new wikis or removed wikis"""
|
||||||
|
# TODO Write a trigger for pub/sub in database/Wiki-Bot repo
|
||||||
|
split_payload = payload.split(" ")
|
||||||
|
if len(split_payload) < 2:
|
||||||
|
raise ValueError("Improper pub/sub message! Pub/sub payload: {}".format(payload))
|
||||||
|
if split_payload[0] == "ADD":
|
||||||
|
await self.new_wiki(Wiki(split_payload[1], None, None))
|
||||||
|
elif split_payload[0] == "REMOVE":
|
||||||
|
self.remove_wiki(split_payload[1])
|
||||||
|
else:
|
||||||
|
raise ValueError("Unknown pub/sub command! Payload: {}".format(payload))
|
||||||
|
|
||||||
async def new_wiki(self, wiki: Wiki):
|
async def new_wiki(self, wiki: Wiki):
|
||||||
"""Finds a domain for the wiki and adds a wiki to the domain object.
|
"""Finds a domain for the wiki and adds a wiki to the domain object.
|
||||||
|
|
||||||
|
|
|
@ -1,53 +0,0 @@
|
||||||
import asyncio
|
|
||||||
import aioredis
|
|
||||||
import async_timeout
|
|
||||||
import logging
|
|
||||||
from typing import Optional, TYPE_CHECKING
|
|
||||||
from src.config import settings
|
|
||||||
from src.wiki import Wiki
|
|
||||||
|
|
||||||
logger = logging.getLogger("rcgcdb.redisconnector")
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from src.domain_manager import DomainManager
|
|
||||||
|
|
||||||
class Redis:
|
|
||||||
def __init__(self, domain_manager):
|
|
||||||
self.pub_connection: Optional[aioredis.connection] = None
|
|
||||||
self.stat_connection: Optional[aioredis.connection] = None
|
|
||||||
self.domain_manager: DomainManager = domain_manager
|
|
||||||
|
|
||||||
async def reader(self):
|
|
||||||
"""Based on code from https://aioredis.readthedocs.io/en/latest/getting-started/#pubsub-mode"""
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
async with async_timeout.timeout(1):
|
|
||||||
message = await self.pub_connection.get_message(ignore_subscribe_messages=True)
|
|
||||||
if message is not None:
|
|
||||||
print(f"(Reader) Message Received: {message}")
|
|
||||||
logger.debug(f"(Reader) Message Received: {message}")
|
|
||||||
await self.process_changes(message["data"])
|
|
||||||
await asyncio.sleep(1.0)
|
|
||||||
except asyncio.TimeoutError: # TODO Better handler
|
|
||||||
pass
|
|
||||||
except aioredis.exceptions.ConnectionError:
|
|
||||||
pass
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
# TODO Send a message about shutdown
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
async def process_changes(self, data: str):
|
|
||||||
data = data.split(" ")
|
|
||||||
if data[0] == "REMOVE":
|
|
||||||
self.domain_manager.remove_wiki(data[1]) # TODO Add response to source
|
|
||||||
elif data[0] == "ADD": # ADD https://new_wiki.somedamain.com 43 1 where 43 stands for rc_id and 1 for discussion_id
|
|
||||||
wiki = Wiki(data[1], int(data[2]), int(data[3])) # TODO This might raise an issue if non-int value
|
|
||||||
await self.domain_manager.new_wiki(wiki)
|
|
||||||
|
|
||||||
|
|
||||||
async def connect(self):
|
|
||||||
self.stat_connection = await aioredis.from_url("redis://" + settings["redis_host"], encoding="UTF-8")
|
|
||||||
|
|
||||||
async def pubsub(self):
|
|
||||||
self.pub_connection = self.stat_connection.pubsub()
|
|
||||||
await self.pub_connection.subscribe("rcgcdb_updates")
|
|
|
@ -1,12 +1,25 @@
|
||||||
|
import time
|
||||||
from src.config import settings
|
from src.config import settings
|
||||||
from typing import Union, Optional
|
from typing import Union, Optional
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
|
||||||
|
class LogType(Enum):
|
||||||
|
CONNECTION_ERROR: 1
|
||||||
|
HTTP_ERROR: 2
|
||||||
|
MEDIAWIKI_ERROR: 3
|
||||||
|
VALUE_UPDATE: 4
|
||||||
|
|
||||||
queue_limit = settings.get("queue_limit", 30)
|
queue_limit = settings.get("queue_limit", 30)
|
||||||
|
|
||||||
|
|
||||||
class Log:
|
class Log:
|
||||||
|
"""Log class represents an event that happened to a wiki fetch. Main purpose of those logs is debug and error-tracking."""
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
|
self.type: LogType = kwargs["type"]
|
||||||
|
self.time: int = int(time.time())
|
||||||
|
self.title: str = kwargs["title"]
|
||||||
|
self.details: Optional[str] = kwargs.get("details", None)
|
||||||
|
|
||||||
class LimitedList(list):
|
class LimitedList(list):
|
||||||
def __init__(self, *args):
|
def __init__(self, *args):
|
||||||
|
@ -18,7 +31,7 @@ class LimitedList(list):
|
||||||
|
|
||||||
|
|
||||||
class Statistics:
|
class Statistics:
|
||||||
def __init__(self, rc_id: int, discussion_id: int):
|
def __init__(self, rc_id: Optional[int], discussion_id: Optional[int]):
|
||||||
self.last_checked_rc: Optional[int] = None
|
self.last_checked_rc: Optional[int] = None
|
||||||
self.last_action: Optional[int] = rc_id
|
self.last_action: Optional[int] = rc_id
|
||||||
self.last_checked_discussion: Optional[int] = None
|
self.last_checked_discussion: Optional[int] = None
|
||||||
|
|
47
src/wiki.py
47
src/wiki.py
|
@ -18,7 +18,7 @@ from src.api.context import Context
|
||||||
from src.misc import parse_link
|
from src.misc import parse_link
|
||||||
from src.i18n import langs
|
from src.i18n import langs
|
||||||
from src.wiki_ratelimiter import RateLimiter
|
from src.wiki_ratelimiter import RateLimiter
|
||||||
from statistics import Statistics
|
from statistics import Statistics, Log, LogType
|
||||||
import src.discord
|
import src.discord
|
||||||
import asyncio
|
import asyncio
|
||||||
from src.config import settings
|
from src.config import settings
|
||||||
|
@ -36,11 +36,10 @@ if TYPE_CHECKING:
|
||||||
from src.domain import Domain
|
from src.domain import Domain
|
||||||
|
|
||||||
class Wiki:
|
class Wiki:
|
||||||
def __init__(self, script_url: str, rc_id: int, discussion_id: int):
|
def __init__(self, script_url: str, rc_id: Optional[int], discussion_id: Optional[int]):
|
||||||
self.script_url: str = script_url
|
self.script_url: str = script_url
|
||||||
self.session = aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(6.0))
|
self.session = aiohttp.ClientSession(headers=settings["header"], timeout=aiohttp.ClientTimeout(6.0))
|
||||||
self.statistics: Statistics = Statistics(rc_id, discussion_id)
|
self.statistics: Statistics = Statistics(rc_id, discussion_id)
|
||||||
self.fail_times: int = 0
|
|
||||||
self.mw_messages: Optional[MWMessages] = None
|
self.mw_messages: Optional[MWMessages] = None
|
||||||
self.first_fetch_done: bool = False
|
self.first_fetch_done: bool = False
|
||||||
self.domain: Optional[Domain] = None
|
self.domain: Optional[Domain] = None
|
||||||
|
@ -50,24 +49,24 @@ class Wiki:
|
||||||
def rc_id(self):
|
def rc_id(self):
|
||||||
return self.statistics.last_action
|
return self.statistics.last_action
|
||||||
|
|
||||||
async def remove(self, reason):
|
# async def remove(self, reason):
|
||||||
logger.info("Removing a wiki {}".format(self.script_url))
|
# logger.info("Removing a wiki {}".format(self.script_url))
|
||||||
await src.discord.wiki_removal(self.script_url, reason)
|
# await src.discord.wiki_removal(self.script_url, reason)
|
||||||
await src.discord.wiki_removal_monitor(self.script_url, reason)
|
# await src.discord.wiki_removal_monitor(self.script_url, reason)
|
||||||
async with db.pool().acquire() as connection:
|
# async with db.pool().acquire() as connection:
|
||||||
result = await connection.execute('DELETE FROM rcgcdw WHERE wiki = $1', self.script_url)
|
# result = await connection.execute('DELETE FROM rcgcdw WHERE wiki = $1', self.script_url)
|
||||||
logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(result, self.script_url))
|
# logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(result, self.script_url))
|
||||||
|
|
||||||
def set_domain(self, domain: Domain):
|
def set_domain(self, domain: Domain):
|
||||||
self.domain = domain
|
self.domain = domain
|
||||||
|
|
||||||
async def downtime_controller(self, down, reason=None):
|
# async def downtime_controller(self, down, reason=None):
|
||||||
if down:
|
# if down:
|
||||||
self.fail_times += 1
|
# self.fail_times += 1
|
||||||
if self.fail_times > 20:
|
# if self.fail_times > 20:
|
||||||
await self.remove(reason)
|
# await self.remove(reason)
|
||||||
else:
|
# else:
|
||||||
self.fail_times -= 1
|
# self.fail_times -= 1
|
||||||
|
|
||||||
async def generate_targets(self) -> defaultdict[namedtuple, list[str]]:
|
async def generate_targets(self) -> defaultdict[namedtuple, list[str]]:
|
||||||
"""This function generates all possible varations of outputs that we need to generate messages for.
|
"""This function generates all possible varations of outputs that we need to generate messages for.
|
||||||
|
@ -141,7 +140,7 @@ class Wiki:
|
||||||
elif 399 < request.status < 500:
|
elif 399 < request.status < 500:
|
||||||
logger.error("Request returned ClientError status code on {url}".format(url=request.url))
|
logger.error("Request returned ClientError status code on {url}".format(url=request.url))
|
||||||
if request.status in wiki_reamoval_reasons:
|
if request.status in wiki_reamoval_reasons:
|
||||||
await self.downtime_controller(True, reason=request.status)
|
self.statistics.update(Log(type=LogType.HTTP_ERROR, title="{} error".format(request.status), details=str(request.headers) + "\n" + str(request.url)))
|
||||||
raise ClientError(request)
|
raise ClientError(request)
|
||||||
else:
|
else:
|
||||||
# JSON Extraction
|
# JSON Extraction
|
||||||
|
@ -158,11 +157,10 @@ class Wiki:
|
||||||
except KeyError:
|
except KeyError:
|
||||||
logger.exception("KeyError while iterating over json_path, full response: {}".format(request.json()))
|
logger.exception("KeyError while iterating over json_path, full response: {}".format(request.json()))
|
||||||
raise
|
raise
|
||||||
self.first_fetch_done = True
|
|
||||||
return request_json
|
return request_json
|
||||||
|
|
||||||
async def fetch_wiki(self, amount=10) -> dict:
|
async def fetch_wiki(self, amount=10) -> dict:
|
||||||
if self.first_fetch_done is False:
|
if self.mw_messages is None:
|
||||||
params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges",
|
params = OrderedDict({"action": "query", "format": "json", "uselang": "content", "list": "tags|recentchanges",
|
||||||
"meta": "allmessages|siteinfo",
|
"meta": "allmessages|siteinfo",
|
||||||
"utf8": 1, "tglimit": "max", "tgprop": "displayname",
|
"utf8": 1, "tglimit": "max", "tgprop": "displayname",
|
||||||
|
@ -188,10 +186,8 @@ class Wiki:
|
||||||
try:
|
try:
|
||||||
request = await self.fetch_wiki(amount=amount)
|
request = await self.fetch_wiki(amount=amount)
|
||||||
self.client.last_request = request
|
self.client.last_request = request
|
||||||
except WikiServerError:
|
except WikiServerError as e:
|
||||||
return # TODO Add a log entry?
|
self.statistics.update(Log(type=LogType.CONNECTION_ERROR, title=e.)) # We need more details in WIkiServerError exception
|
||||||
else:
|
|
||||||
await self.downtime_controller(False)
|
|
||||||
if not self.mw_messages:
|
if not self.mw_messages:
|
||||||
mw_messages = request.get("query", {}).get("allmessages", [])
|
mw_messages = request.get("query", {}).get("allmessages", [])
|
||||||
final_mw_messages = dict()
|
final_mw_messages = dict()
|
||||||
|
@ -234,7 +230,7 @@ class Wiki:
|
||||||
self.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text()
|
self.tags[tag["name"]] = (BeautifulSoup(tag["displayname"], "lxml")).get_text()
|
||||||
except KeyError:
|
except KeyError:
|
||||||
self.tags[tag["name"]] = None
|
self.tags[tag["name"]] = None
|
||||||
targets = await self.generate_targets()
|
targets = await self.generate_targets() # TODO Cache this in Wiki and update based on Redis updates
|
||||||
message_list = defaultdict(list)
|
message_list = defaultdict(list)
|
||||||
for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up
|
for change in recent_changes: # Yeah, second loop since the categories require to be all loaded up
|
||||||
if change["rcid"] > self.rc_id:
|
if change["rcid"] > self.rc_id:
|
||||||
|
@ -242,6 +238,7 @@ class Wiki:
|
||||||
highest_id = change["rcid"]
|
highest_id = change["rcid"]
|
||||||
for combination, webhooks in targets.items():
|
for combination, webhooks in targets.items():
|
||||||
message, metadata = await rc_processor(self, change, categorize_events, combination, webhooks)
|
message, metadata = await rc_processor(self, change, categorize_events, combination, webhooks)
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, display_options: namedtuple("Settings", ["lang", "display"]), webhooks: list) -> tuple[src.discord.DiscordMessage, src.discord.DiscordMessageMetadata]:
|
async def rc_processor(wiki: Wiki, change: dict, changed_categories: dict, display_options: namedtuple("Settings", ["lang", "display"]), webhooks: list) -> tuple[src.discord.DiscordMessage, src.discord.DiscordMessageMetadata]:
|
||||||
|
|
Loading…
Reference in a new issue