diff --git a/src/bot.py b/src/bot.py index e1f7128..8d4e800 100644 --- a/src/bot.py +++ b/src/bot.py @@ -430,7 +430,6 @@ async def discussion_handler(): await connection.execute("UPDATE rcgcdw SET postid = $1 WHERE wiki = $2", "-1", db_wiki["wiki"]) else: await local_wiki.remove(db_wiki["wiki"], 1000) - await DBHandler.update_db() continue raise WikiError discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"] @@ -449,7 +448,6 @@ async def discussion_handler(): DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True) else: DBHandler.add(db_wiki["wiki"], "0", True) - await DBHandler.update_db() continue comment_events = [] targets = await generate_targets(db_wiki["wiki"], "AND NOT postid = '-1'") @@ -500,7 +498,6 @@ async def discussion_handler(): DBHandler.add(db_wiki["wiki"], post["id"], True) await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more await asyncio.sleep(delay=1.0) # Avoid lock on no wikis - await DBHandler.update_db() except asyncio.CancelledError: pass except: @@ -517,9 +514,10 @@ def shutdown(loop, signal=None): loop.remove_signal_handler(signal) if len(messagequeue) > 0: logger.warning("Some messages are still queued!") - for task in (main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["msg_queue_shield"]): + for task in (main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["msg_queue_shield"], main_tasks["database_updates_shield"]): task.cancel() loop.run_until_complete(main_tasks["message_sender"]) + loop.run_until_complete(main_tasks["database_updates"]) for task in asyncio.all_tasks(loop): logger.debug("Killing task") task.cancel() @@ -564,12 +562,11 @@ async def main_loop(): # loop.set_exception_handler(global_exception_handler) try: main_tasks = {"wiki_scanner": asyncio.create_task(wiki_scanner()), "message_sender": asyncio.create_task(message_sender()), - "discussion_handler": asyncio.create_task(discussion_handler())} + "discussion_handler": asyncio.create_task(discussion_handler()), "database_updates": asyncio.create_task(DBHandler.update_db())} main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"]) - await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"]) + main_tasks["database_updates_shield"] = asyncio.shield(main_tasks["database_updates"]) + await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"], main_tasks["database_updates"]) except KeyboardInterrupt: - await DBHandler.update_db() - await db.shutdown_connection() shutdown(loop) except asyncio.CancelledError: return diff --git a/src/queue_handler.py b/src/queue_handler.py index 9e4f81e..7e2f781 100644 --- a/src/queue_handler.py +++ b/src/queue_handler.py @@ -1,3 +1,4 @@ +import asyncio import logging from src.database import db @@ -8,22 +9,30 @@ class UpdateDB: def __init__(self): self.updated = [] - def add(self, wiki, rc_id, feeds=None): - self.updated.append((wiki, rc_id, feeds)) + def add(self, sql_expression): + self.updated.append(sql_expression) def clear_list(self): self.updated.clear() async def update_db(self): - async with db.pool().acquire() as connection: - async with connection.transaction(): - for update in self.updated: - if update[2] is None: - sql = "UPDATE rcgcdw SET rcid = $2 WHERE wiki = $1 AND ( rcid != -1 OR rcid IS NULL )" - else: - sql = "UPDATE rcgcdw SET postid = $2 WHERE wiki = $1 AND ( postid != '-1' OR postid IS NULL )" - await connection.execute(sql, update[0], update[1]) - self.clear_list() + try: + while True: + if self.updated: + async with db.pool().acquire() as connection: + async with connection.transaction(): + for update in self.updated: + await connection.execute(update) + self.clear_list() + await asyncio.sleep(10.0) + except asyncio.CancelledError: + logger.info("Shutting down after updating DB with {} more entries...".format(len(self.updated))) + async with db.pool().acquire() as connection: + async with connection.transaction(): + for update in self.updated: + await connection.execute(update) + self.clear_list() + await db.shutdown_connection() DBHandler = UpdateDB() diff --git a/src/wiki.py b/src/wiki.py index 8b0e52d..69acd66 100644 --- a/src/wiki.py +++ b/src/wiki.py @@ -6,13 +6,13 @@ import logging, aiohttp from mw_messages import MWMessages from src.exceptions import * from src.database import db +from src.queue_handler import DBHandler from src.formatters.rc import embed_formatter, compact_formatter from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter from src.misc import parse_link from src.i18n import langs from src.wiki_ratelimiter import RateLimiter from statistics import Statistics -import sqlite3 import src.discord import asyncio from src.config import settings @@ -204,8 +204,7 @@ class Wiki: self.statistics.last_action = recent_changes[-1]["rcid"] else: self.statistics.last_action = 0 - db.add(queued_wiki.url, 0) - await DBHandler.update_db() + DBHandler.add("UPDATE rcgcdw SET rcid = 0 WHERE wiki = {} AND ( rcid != -1 OR rcid IS NULL )".format(self.script_url)) @dataclass class Wiki_old: