RcGcDb/src/queue_handler.py

50 lines
1.4 KiB
Python
Raw Normal View History

2021-06-30 11:41:58 +00:00
import asyncio
import collections
2020-07-11 15:54:08 +00:00
import logging
2022-09-18 14:38:19 +00:00
from typing import Union, Optional
2022-09-29 21:10:36 +00:00
from src.database import db
import asyncpg
2020-07-11 15:54:08 +00:00
logger = logging.getLogger("rcgcdb.queue_handler")
2020-07-28 12:39:32 +00:00
class UpdateDB:
2020-07-11 15:54:08 +00:00
def __init__(self):
self.updated: list[tuple[str, tuple[Union[str, int]]]] = []
2020-07-11 15:54:08 +00:00
2021-06-30 11:41:58 +00:00
def add(self, sql_expression):
self.updated.append(sql_expression)
2020-07-11 15:54:08 +00:00
def clear_list(self):
self.updated.clear()
2022-10-09 12:10:08 +00:00
async def fetch_rows(self, SQLstatement: str, *args: Union[str, int]) -> collections.abc.AsyncIterable:
2022-09-29 21:10:36 +00:00
async with db.pool().acquire() as connection:
async with connection.transaction():
async for row in connection.cursor(SQLstatement, *args):
yield row
2021-03-18 16:00:01 +00:00
async def update_db(self):
2021-06-30 11:41:58 +00:00
try:
while True:
2022-11-04 14:59:26 +00:00
logger.debug("Running DB check")
2021-06-30 11:41:58 +00:00
if self.updated:
2022-09-29 21:10:36 +00:00
async with db.pool().acquire() as connection:
2021-06-30 11:41:58 +00:00
async with connection.transaction():
for update in self.updated:
await connection.execute(update[0], *update[1])
2021-06-30 11:41:58 +00:00
self.clear_list()
await asyncio.sleep(10.0)
except asyncio.CancelledError:
logger.info("Shutting down after updating DB with {} more entries...".format(len(self.updated)))
2022-09-29 21:10:36 +00:00
async with db.pool().acquire() as connection:
2021-06-30 11:41:58 +00:00
async with connection.transaction():
for update in self.updated:
await connection.execute(update[0], *update[1])
2021-06-30 11:41:58 +00:00
self.clear_list()
2022-09-29 21:10:36 +00:00
await db.shutdown_connection()
2020-07-18 12:12:00 +00:00
2022-09-18 14:38:19 +00:00
dbmanager = UpdateDB()