Adding more statistics functionality

This commit is contained in:
Frisk 2024-07-13 14:31:01 +02:00
parent 011f23c35a
commit 0ac5f8b824
5 changed files with 48 additions and 14 deletions

View file

@ -30,9 +30,6 @@ logger.info("RcGcDb v{} is starting up.".format("2.0"))
if command_line_args.debug:
logger.info("Debug mode is active!")
# Log Fail states with structure wiki_url: number of fail states
all_wikis: dict = {}
main_tasks: dict = {}
# First populate the all_wikis list with every wiki
@ -258,7 +255,7 @@ async def main_loop():
load_extensions()
await populate_wikis()
# START LISTENER CONNECTION
domains.run_all_domains()
#domains.run_all_domains()
discussions = Discussions(domains.return_domain("fandom.com") if domains.check_for_domain("fandom.com") else None)
try:
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
@ -270,9 +267,9 @@ async def main_loop():
signals = (signal.SIGBREAK, signal.SIGTERM, signal.SIGINT)
# loop.set_exception_handler(global_exception_handler)
try:
main_tasks = {"message_sender": asyncio.create_task(message_sender()),
"database_updates": asyncio.create_task(dbmanager.update_db()),
"fandom_discussions": asyncio.create_task(discussions.tick_discussions(), name="discussions")} # "discussion_handler": asyncio.create_task(discussion_handler()),
main_tasks = {"message_sender": asyncio.create_task(message_sender(), name="message_sender"),
"database_updates": asyncio.create_task(dbmanager.update_db(), name="database_updates"),
"fandom_discussions": asyncio.create_task(discussions.tick_discussions(), name="fandom_discussions")} # "discussion_handler": asyncio.create_task(discussion_handler()),
main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"])
main_tasks["database_updates_shield"] = asyncio.shield(main_tasks["database_updates"])
await asyncio.gather(main_tasks["message_sender"], main_tasks["database_updates"])

View file

@ -165,9 +165,12 @@ class MessageQueue:
return
else:
if status == 0:
message = None
for message in msg.message_list:
if message.metadata.domain is not None and message.metadata.time_of_change is not None:
message.metadata.domain.register_message_timing_report(message.metadata.time_of_change)
if message and message.metadata.domain is not None:
message.metadata.domain.discord_message_registration()
for queue_message in messages[max(index-len(msg.message_list), 0):index+1]: # mark messages as delivered
queue_message.confirm_sent_status(webhook_url)
if client_error is False:

View file

@ -33,6 +33,7 @@ class Domain:
self.irc: Optional[src.irc_feed.AioIRCCat] = None
self.last_failure_report = 0
self.message_timings: LimitedList = LimitedList(limit=100)
self.total_discord_messages_sent: int = 0
# self.discussions_handler: Optional[Discussions] = Discussions(self.wikis) if name == "fandom.com" else None
def __iter__(self):
@ -50,6 +51,16 @@ class Domain:
f"calculated_delay={self.calculate_sleep_time(len(self)) if not self.irc else 'handled by IRC scheduler'} "
f"msgdelays=(min={tmin}, avg={avg}, max={tmax})>")
def json(self) -> dict:
dict_obj = {
"wikis": [x for x in self.wikis.keys()],
"irc": self.irc.connection.connected if self.irc else False,
"delay": self.calculate_sleep_time(len(self)) if not self.irc else 'handled by IRC scheduler',
"msgdelay": {"min": min(self.message_timings), "avg": int(sum(self.message_timings)/len(self.message_timings)),
"max": max(self.message_timings)},
"discord_messages": self.total_discord_messages_sent,
"last_failure_report": self.last_failure_report
}
def __repr__(self):
return self.__str__()
@ -150,6 +161,9 @@ class Domain:
send_time = datetime.datetime.now(tz=datetime.timezone.utc)
self.message_timings.append((send_time - initial_time).seconds)
def discord_message_registration(self):
self.total_discord_messages_sent += 1
async def irc_scheduler(self):
try:
while True:

View file

@ -1,5 +1,8 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Callable
import json
import time
from typing import TYPE_CHECKING, Callable, Optional
from urllib.parse import urlparse, urlunparse
import logging
import asyncpg
@ -25,6 +28,7 @@ def safe_type_for_id(unsafe_id: str, target: Callable):
class DomainManager:
def __init__(self):
self.domains: dict[str, Domain] = {}
self.start_time: float = time.time()
async def webhook_update(self, connection: asyncpg.Connection, pid: int, channel: str, payload: str):
"""Callback for database listener. Used to update our domain cache on changes such as new wikis or removed wikis"""
@ -67,9 +71,25 @@ class DomainManager:
domain = self.return_domain(self.get_domain(split_payload[2]))
logger.info("RCGCDBDEBUG Domain information for {}: {}".format(domain.name, str(domain)))
logger.info("RCGCDBDEBUG Wiki information for {}: {}".format(split_payload[2], domain.get_wiki(split_payload[2])))
elif split_payload[1] == "DUMP":
# Dump debug info JSON object into postgres pubsub channel
json_object = {"uptime": time.time() - self.start_time, "domain_count": len(self.domains),
"wiki_count": sum([len(x.wikis) for x in self.domains.values()]),
"tasks": {},
"domains": {},
"total_discord_messages_sent": sum([x.total_discord_messages_sent for x in self.domains.values()])
}
for task in asyncio.all_tasks():
json_object["tasks"][task.get_name()] = {"done": task.done(), "result": task.result() if task.done() else None}
for name, domain in self.domains.items():
json_object[name] = domain.json()
await connection.execute("""select pg_notify('webhookupdates', %(jsondump)s);""", {'jsondump': json.dumps(json_object)})
# we need: dict/list of tasks, dict of domains,
else:
raise ValueError("Unknown pub/sub command! Payload: {}".format(payload))
async def new_wiki(self, wiki: Wiki):
"""Finds a domain for the wiki and adds a wiki to the domain object.

View file

@ -1,6 +1,7 @@
from __future__ import annotations
import asyncio
import time
import types
import irc.client_aio
@ -37,7 +38,7 @@ class AioIRCCat(irc.client_aio.AioSimpleIRCClient):
self.domain = domain_object
self.connection.buffer_class.errors = "replace" # Ignore encoding errors
self.connection_details = None
self.active = True
self.last_msg = time.time()
self.activity_tester = asyncio.get_event_loop().create_task(self.testactivity())
def __str__(self):
@ -52,7 +53,7 @@ class AioIRCCat(irc.client_aio.AioSimpleIRCClient):
connection.join(channel)
def on_pubmsg(self, connection, event):
self.active = True
self.last_msg = time.time()
if event.target == self.targets["rc"]:
self.parse_fandom_message(' '.join(event.arguments))
elif event.target == self.targets["discussion"]:
@ -104,11 +105,10 @@ class AioIRCCat(irc.client_aio.AioSimpleIRCClient):
async def testactivity(self):
while True:
await asyncio.sleep(100.0)
if not self.active:
logger.error("There were no new messages in the feed!")
await asyncio.sleep(120.0)
if (time.time() - self.last_msg) > 120:
logger.error("There were no new messages in the feed for last 2 minutes! Reconnecting.")
self.on_disconnect(None, None)
self.active = False
def recognize_langs(path):
lang = ""