mirror of
https://gitlab.com/chicken-riders/RcGcDb.git
synced 2025-02-23 00:54:09 +00:00
Changes to DB handling
This commit is contained in:
parent
db50490e56
commit
726a376b06
13
src/bot.py
13
src/bot.py
|
@ -430,7 +430,6 @@ async def discussion_handler():
|
||||||
await connection.execute("UPDATE rcgcdw SET postid = $1 WHERE wiki = $2", "-1", db_wiki["wiki"])
|
await connection.execute("UPDATE rcgcdw SET postid = $1 WHERE wiki = $2", "-1", db_wiki["wiki"])
|
||||||
else:
|
else:
|
||||||
await local_wiki.remove(db_wiki["wiki"], 1000)
|
await local_wiki.remove(db_wiki["wiki"], 1000)
|
||||||
await DBHandler.update_db()
|
|
||||||
continue
|
continue
|
||||||
raise WikiError
|
raise WikiError
|
||||||
discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"]
|
discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"]
|
||||||
|
@ -449,7 +448,6 @@ async def discussion_handler():
|
||||||
DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True)
|
DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True)
|
||||||
else:
|
else:
|
||||||
DBHandler.add(db_wiki["wiki"], "0", True)
|
DBHandler.add(db_wiki["wiki"], "0", True)
|
||||||
await DBHandler.update_db()
|
|
||||||
continue
|
continue
|
||||||
comment_events = []
|
comment_events = []
|
||||||
targets = await generate_targets(db_wiki["wiki"], "AND NOT postid = '-1'")
|
targets = await generate_targets(db_wiki["wiki"], "AND NOT postid = '-1'")
|
||||||
|
@ -500,7 +498,6 @@ async def discussion_handler():
|
||||||
DBHandler.add(db_wiki["wiki"], post["id"], True)
|
DBHandler.add(db_wiki["wiki"], post["id"], True)
|
||||||
await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more
|
await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more
|
||||||
await asyncio.sleep(delay=1.0) # Avoid lock on no wikis
|
await asyncio.sleep(delay=1.0) # Avoid lock on no wikis
|
||||||
await DBHandler.update_db()
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
pass
|
||||||
except:
|
except:
|
||||||
|
@ -517,9 +514,10 @@ def shutdown(loop, signal=None):
|
||||||
loop.remove_signal_handler(signal)
|
loop.remove_signal_handler(signal)
|
||||||
if len(messagequeue) > 0:
|
if len(messagequeue) > 0:
|
||||||
logger.warning("Some messages are still queued!")
|
logger.warning("Some messages are still queued!")
|
||||||
for task in (main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["msg_queue_shield"]):
|
for task in (main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["msg_queue_shield"], main_tasks["database_updates_shield"]):
|
||||||
task.cancel()
|
task.cancel()
|
||||||
loop.run_until_complete(main_tasks["message_sender"])
|
loop.run_until_complete(main_tasks["message_sender"])
|
||||||
|
loop.run_until_complete(main_tasks["database_updates"])
|
||||||
for task in asyncio.all_tasks(loop):
|
for task in asyncio.all_tasks(loop):
|
||||||
logger.debug("Killing task")
|
logger.debug("Killing task")
|
||||||
task.cancel()
|
task.cancel()
|
||||||
|
@ -564,12 +562,11 @@ async def main_loop():
|
||||||
# loop.set_exception_handler(global_exception_handler)
|
# loop.set_exception_handler(global_exception_handler)
|
||||||
try:
|
try:
|
||||||
main_tasks = {"wiki_scanner": asyncio.create_task(wiki_scanner()), "message_sender": asyncio.create_task(message_sender()),
|
main_tasks = {"wiki_scanner": asyncio.create_task(wiki_scanner()), "message_sender": asyncio.create_task(message_sender()),
|
||||||
"discussion_handler": asyncio.create_task(discussion_handler())}
|
"discussion_handler": asyncio.create_task(discussion_handler()), "database_updates": asyncio.create_task(DBHandler.update_db())}
|
||||||
main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"])
|
main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"])
|
||||||
await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"])
|
main_tasks["database_updates_shield"] = asyncio.shield(main_tasks["database_updates"])
|
||||||
|
await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"], main_tasks["database_updates"])
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
await DBHandler.update_db()
|
|
||||||
await db.shutdown_connection()
|
|
||||||
shutdown(loop)
|
shutdown(loop)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
return
|
return
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from src.database import db
|
from src.database import db
|
||||||
|
|
||||||
|
@ -8,22 +9,30 @@ class UpdateDB:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.updated = []
|
self.updated = []
|
||||||
|
|
||||||
def add(self, wiki, rc_id, feeds=None):
|
def add(self, sql_expression):
|
||||||
self.updated.append((wiki, rc_id, feeds))
|
self.updated.append(sql_expression)
|
||||||
|
|
||||||
def clear_list(self):
|
def clear_list(self):
|
||||||
self.updated.clear()
|
self.updated.clear()
|
||||||
|
|
||||||
async def update_db(self):
|
async def update_db(self):
|
||||||
async with db.pool().acquire() as connection:
|
try:
|
||||||
async with connection.transaction():
|
while True:
|
||||||
for update in self.updated:
|
if self.updated:
|
||||||
if update[2] is None:
|
async with db.pool().acquire() as connection:
|
||||||
sql = "UPDATE rcgcdw SET rcid = $2 WHERE wiki = $1 AND ( rcid != -1 OR rcid IS NULL )"
|
async with connection.transaction():
|
||||||
else:
|
for update in self.updated:
|
||||||
sql = "UPDATE rcgcdw SET postid = $2 WHERE wiki = $1 AND ( postid != '-1' OR postid IS NULL )"
|
await connection.execute(update)
|
||||||
await connection.execute(sql, update[0], update[1])
|
self.clear_list()
|
||||||
self.clear_list()
|
await asyncio.sleep(10.0)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
logger.info("Shutting down after updating DB with {} more entries...".format(len(self.updated)))
|
||||||
|
async with db.pool().acquire() as connection:
|
||||||
|
async with connection.transaction():
|
||||||
|
for update in self.updated:
|
||||||
|
await connection.execute(update)
|
||||||
|
self.clear_list()
|
||||||
|
await db.shutdown_connection()
|
||||||
|
|
||||||
|
|
||||||
DBHandler = UpdateDB()
|
DBHandler = UpdateDB()
|
||||||
|
|
|
@ -6,13 +6,13 @@ import logging, aiohttp
|
||||||
from mw_messages import MWMessages
|
from mw_messages import MWMessages
|
||||||
from src.exceptions import *
|
from src.exceptions import *
|
||||||
from src.database import db
|
from src.database import db
|
||||||
|
from src.queue_handler import DBHandler
|
||||||
from src.formatters.rc import embed_formatter, compact_formatter
|
from src.formatters.rc import embed_formatter, compact_formatter
|
||||||
from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter
|
from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter
|
||||||
from src.misc import parse_link
|
from src.misc import parse_link
|
||||||
from src.i18n import langs
|
from src.i18n import langs
|
||||||
from src.wiki_ratelimiter import RateLimiter
|
from src.wiki_ratelimiter import RateLimiter
|
||||||
from statistics import Statistics
|
from statistics import Statistics
|
||||||
import sqlite3
|
|
||||||
import src.discord
|
import src.discord
|
||||||
import asyncio
|
import asyncio
|
||||||
from src.config import settings
|
from src.config import settings
|
||||||
|
@ -204,8 +204,7 @@ class Wiki:
|
||||||
self.statistics.last_action = recent_changes[-1]["rcid"]
|
self.statistics.last_action = recent_changes[-1]["rcid"]
|
||||||
else:
|
else:
|
||||||
self.statistics.last_action = 0
|
self.statistics.last_action = 0
|
||||||
db.add(queued_wiki.url, 0)
|
DBHandler.add("UPDATE rcgcdw SET rcid = 0 WHERE wiki = {} AND ( rcid != -1 OR rcid IS NULL )".format(self.script_url))
|
||||||
await DBHandler.update_db()
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Wiki_old:
|
class Wiki_old:
|
||||||
|
|
Loading…
Reference in a new issue