diff --git a/src/bot.py b/src/bot.py index 8679a15..4c339ff 100644 --- a/src/bot.py +++ b/src/bot.py @@ -30,9 +30,6 @@ logger.info("RcGcDb v{} is starting up.".format("2.0")) if command_line_args.debug: logger.info("Debug mode is active!") -# Log Fail states with structure wiki_url: number of fail states -all_wikis: dict = {} - main_tasks: dict = {} # First populate the all_wikis list with every wiki @@ -258,7 +255,7 @@ async def main_loop(): load_extensions() await populate_wikis() # START LISTENER CONNECTION - domains.run_all_domains() + #domains.run_all_domains() discussions = Discussions(domains.return_domain("fandom.com") if domains.check_for_domain("fandom.com") else None) try: signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) @@ -270,9 +267,9 @@ async def main_loop(): signals = (signal.SIGBREAK, signal.SIGTERM, signal.SIGINT) # loop.set_exception_handler(global_exception_handler) try: - main_tasks = {"message_sender": asyncio.create_task(message_sender()), - "database_updates": asyncio.create_task(dbmanager.update_db()), - "fandom_discussions": asyncio.create_task(discussions.tick_discussions(), name="discussions")} # "discussion_handler": asyncio.create_task(discussion_handler()), + main_tasks = {"message_sender": asyncio.create_task(message_sender(), name="message_sender"), + "database_updates": asyncio.create_task(dbmanager.update_db(), name="database_updates"), + "fandom_discussions": asyncio.create_task(discussions.tick_discussions(), name="fandom_discussions")} # "discussion_handler": asyncio.create_task(discussion_handler()), main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"]) main_tasks["database_updates_shield"] = asyncio.shield(main_tasks["database_updates"]) await asyncio.gather(main_tasks["message_sender"], main_tasks["database_updates"]) diff --git a/src/discord/queue.py b/src/discord/queue.py index 01c2e4b..bc78479 100644 --- a/src/discord/queue.py +++ b/src/discord/queue.py @@ -165,9 +165,12 @@ class MessageQueue: return else: if status == 0: + message = None for message in msg.message_list: if message.metadata.domain is not None and message.metadata.time_of_change is not None: message.metadata.domain.register_message_timing_report(message.metadata.time_of_change) + if message and message.metadata.domain is not None: + message.metadata.domain.discord_message_registration() for queue_message in messages[max(index-len(msg.message_list), 0):index+1]: # mark messages as delivered queue_message.confirm_sent_status(webhook_url) if client_error is False: diff --git a/src/domain.py b/src/domain.py index bcb38ab..e74ce38 100644 --- a/src/domain.py +++ b/src/domain.py @@ -33,6 +33,7 @@ class Domain: self.irc: Optional[src.irc_feed.AioIRCCat] = None self.last_failure_report = 0 self.message_timings: LimitedList = LimitedList(limit=100) + self.total_discord_messages_sent: int = 0 # self.discussions_handler: Optional[Discussions] = Discussions(self.wikis) if name == "fandom.com" else None def __iter__(self): @@ -50,6 +51,16 @@ class Domain: f"calculated_delay={self.calculate_sleep_time(len(self)) if not self.irc else 'handled by IRC scheduler'} " f"msgdelays=(min={tmin}, avg={avg}, max={tmax})>") + def json(self) -> dict: + dict_obj = { + "wikis": [x for x in self.wikis.keys()], + "irc": self.irc.connection.connected if self.irc else False, + "delay": self.calculate_sleep_time(len(self)) if not self.irc else 'handled by IRC scheduler', + "msgdelay": {"min": min(self.message_timings), "avg": int(sum(self.message_timings)/len(self.message_timings)), + "max": max(self.message_timings)}, + "discord_messages": self.total_discord_messages_sent, + "last_failure_report": self.last_failure_report + } def __repr__(self): return self.__str__() @@ -150,6 +161,9 @@ class Domain: send_time = datetime.datetime.now(tz=datetime.timezone.utc) self.message_timings.append((send_time - initial_time).seconds) + def discord_message_registration(self): + self.total_discord_messages_sent += 1 + async def irc_scheduler(self): try: while True: diff --git a/src/domain_manager.py b/src/domain_manager.py index be96404..2991b82 100644 --- a/src/domain_manager.py +++ b/src/domain_manager.py @@ -1,5 +1,8 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Callable + +import json +import time +from typing import TYPE_CHECKING, Callable, Optional from urllib.parse import urlparse, urlunparse import logging import asyncpg @@ -25,6 +28,7 @@ def safe_type_for_id(unsafe_id: str, target: Callable): class DomainManager: def __init__(self): self.domains: dict[str, Domain] = {} + self.start_time: float = time.time() async def webhook_update(self, connection: asyncpg.Connection, pid: int, channel: str, payload: str): """Callback for database listener. Used to update our domain cache on changes such as new wikis or removed wikis""" @@ -67,9 +71,25 @@ class DomainManager: domain = self.return_domain(self.get_domain(split_payload[2])) logger.info("RCGCDBDEBUG Domain information for {}: {}".format(domain.name, str(domain))) logger.info("RCGCDBDEBUG Wiki information for {}: {}".format(split_payload[2], domain.get_wiki(split_payload[2]))) + elif split_payload[1] == "DUMP": + # Dump debug info JSON object into postgres pubsub channel + json_object = {"uptime": time.time() - self.start_time, "domain_count": len(self.domains), + "wiki_count": sum([len(x.wikis) for x in self.domains.values()]), + "tasks": {}, + "domains": {}, + "total_discord_messages_sent": sum([x.total_discord_messages_sent for x in self.domains.values()]) + } + for task in asyncio.all_tasks(): + json_object["tasks"][task.get_name()] = {"done": task.done(), "result": task.result() if task.done() else None} + for name, domain in self.domains.items(): + json_object[name] = domain.json() + await connection.execute("""select pg_notify('webhookupdates', %(jsondump)s);""", {'jsondump': json.dumps(json_object)}) + # we need: dict/list of tasks, dict of domains, + else: raise ValueError("Unknown pub/sub command! Payload: {}".format(payload)) + async def new_wiki(self, wiki: Wiki): """Finds a domain for the wiki and adds a wiki to the domain object. diff --git a/src/irc_feed.py b/src/irc_feed.py index b50505c..3af2f81 100644 --- a/src/irc_feed.py +++ b/src/irc_feed.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import time import types import irc.client_aio @@ -37,7 +38,7 @@ class AioIRCCat(irc.client_aio.AioSimpleIRCClient): self.domain = domain_object self.connection.buffer_class.errors = "replace" # Ignore encoding errors self.connection_details = None - self.active = True + self.last_msg = time.time() self.activity_tester = asyncio.get_event_loop().create_task(self.testactivity()) def __str__(self): @@ -52,7 +53,7 @@ class AioIRCCat(irc.client_aio.AioSimpleIRCClient): connection.join(channel) def on_pubmsg(self, connection, event): - self.active = True + self.last_msg = time.time() if event.target == self.targets["rc"]: self.parse_fandom_message(' '.join(event.arguments)) elif event.target == self.targets["discussion"]: @@ -104,11 +105,10 @@ class AioIRCCat(irc.client_aio.AioSimpleIRCClient): async def testactivity(self): while True: - await asyncio.sleep(100.0) - if not self.active: - logger.error("There were no new messages in the feed!") + await asyncio.sleep(120.0) + if (time.time() - self.last_msg) > 120: + logger.error("There were no new messages in the feed for last 2 minutes! Reconnecting.") self.on_disconnect(None, None) - self.active = False def recognize_langs(path): lang = ""