mirror of
https://gitlab.com/chicken-riders/RcGcDb.git
synced 2025-02-23 00:54:09 +00:00
Merge branch 'postgres' into 'master'
Change Database provider from SQLite to Postgres DB See merge request chicken-riders/RcGcDb!18
This commit is contained in:
commit
b952e62730
|
@ -3,3 +3,5 @@ aiohttp >= 3.6.2
|
||||||
lxml >= 4.2.1
|
lxml >= 4.2.1
|
||||||
nest-asyncio >= 1.4.0
|
nest-asyncio >= 1.4.0
|
||||||
irc >= 19.0.1
|
irc >= 19.0.1
|
||||||
|
beautifulsoup4>=4.9.3
|
||||||
|
asyncpg>=0.22.0
|
|
@ -4,10 +4,14 @@
|
||||||
},
|
},
|
||||||
"max_requests_per_minute": 30,
|
"max_requests_per_minute": 30,
|
||||||
"minimal_cooldown_per_wiki_in_sec": 60,
|
"minimal_cooldown_per_wiki_in_sec": 60,
|
||||||
"database_path": "rcgcdb.db",
|
|
||||||
"monitoring_webhook": "111111111111111111/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
|
"monitoring_webhook": "111111111111111111/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
|
||||||
"support": "https://discord.gg/v77RTk5",
|
"support": "https://discord.gg/v77RTk5",
|
||||||
"irc_overtime": 3600,
|
"irc_overtime": 3600,
|
||||||
|
"pg_user": "postgres",
|
||||||
|
"pg_host": "localhost",
|
||||||
|
"pg_db": "rcgcdb",
|
||||||
|
"pg_pass": "secret_password",
|
||||||
|
"pg_port": "5432",
|
||||||
"irc_servers": {
|
"irc_servers": {
|
||||||
"your custom name for the farm": {
|
"your custom name for the farm": {
|
||||||
"domains": ["wikipedia.org", "otherwikipedia.org"],
|
"domains": ["wikipedia.org", "otherwikipedia.org"],
|
||||||
|
|
73
src/bot.py
73
src/bot.py
|
@ -11,7 +11,7 @@ from typing import Generator
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from src.argparser import command_line_args
|
from src.argparser import command_line_args
|
||||||
from src.config import settings
|
from src.config import settings
|
||||||
from src.database import db_cursor, db_connection
|
from src.database import db
|
||||||
from src.exceptions import *
|
from src.exceptions import *
|
||||||
from src.misc import get_paths, get_domain
|
from src.misc import get_paths, get_domain
|
||||||
from src.msgqueue import messagequeue, send_to_discord
|
from src.msgqueue import messagequeue, send_to_discord
|
||||||
|
@ -35,11 +35,15 @@ all_wikis: dict = {}
|
||||||
mw_msgs: dict = {} # will have the type of id: tuple
|
mw_msgs: dict = {} # will have the type of id: tuple
|
||||||
main_tasks: dict = {}
|
main_tasks: dict = {}
|
||||||
|
|
||||||
|
|
||||||
# First populate the all_wikis list with every wiki
|
# First populate the all_wikis list with every wiki
|
||||||
# Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests
|
# Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests
|
||||||
# 2. Easier to code
|
# 2. Easier to code
|
||||||
|
|
||||||
for db_wiki in db_cursor.execute('SELECT wiki, rcid FROM rcgcdw GROUP BY wiki ORDER BY ROWID'):
|
async def populate_allwikis():
|
||||||
|
async with db.pool().acquire() as connection:
|
||||||
|
async with connection.transaction():
|
||||||
|
async for db_wiki in connection.cursor('SELECT DISTINCT wiki, rcid FROM rcgcdw'):
|
||||||
all_wikis[db_wiki["wiki"]] = Wiki() # populate all_wikis
|
all_wikis[db_wiki["wiki"]] = Wiki() # populate all_wikis
|
||||||
all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"]
|
all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"]
|
||||||
|
|
||||||
|
@ -92,7 +96,7 @@ class RcQueue:
|
||||||
all_wikis[wiki].rc_active = -1
|
all_wikis[wiki].rc_active = -1
|
||||||
if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task
|
if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task
|
||||||
logger.debug(f"{group} no longer has any wikis queued!")
|
logger.debug(f"{group} no longer has any wikis queued!")
|
||||||
if not self.check_if_domain_in_db(group):
|
if not await self.check_if_domain_in_db(group):
|
||||||
await self.stop_task_group(group)
|
await self.stop_task_group(group)
|
||||||
else:
|
else:
|
||||||
logger.debug(f"But there are still wikis for it in DB!")
|
logger.debug(f"But there are still wikis for it in DB!")
|
||||||
|
@ -101,10 +105,10 @@ class RcQueue:
|
||||||
self[group]["task"].cancel()
|
self[group]["task"].cancel()
|
||||||
del self.domain_list[group]
|
del self.domain_list[group]
|
||||||
|
|
||||||
def check_if_domain_in_db(self, domain):
|
async def check_if_domain_in_db(self, domain):
|
||||||
fetch_all = db_cursor.execute(
|
async with db.pool().acquire() as connection:
|
||||||
'SELECT ROWID, webhook, wiki, lang, display, rcid FROM rcgcdw WHERE rcid != -1 GROUP BY wiki ORDER BY ROWID ASC')
|
async with connection.transaction():
|
||||||
for wiki in fetch_all.fetchall():
|
async for wiki in connection.cursor('SELECT DISTINCT wiki FROM rcgcdw WHERE rcid != -1;'):
|
||||||
if get_domain(wiki["wiki"]) == domain:
|
if get_domain(wiki["wiki"]) == domain:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
@ -143,11 +147,11 @@ class RcQueue:
|
||||||
async def update_queues(self):
|
async def update_queues(self):
|
||||||
"""Makes a round on rcgcdb DB and looks for updates to the queues in self.domain_list"""
|
"""Makes a round on rcgcdb DB and looks for updates to the queues in self.domain_list"""
|
||||||
try:
|
try:
|
||||||
fetch_all = db_cursor.execute(
|
|
||||||
'SELECT ROWID, webhook, wiki, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL GROUP BY wiki ORDER BY ROWID ASC')
|
|
||||||
self.to_remove = [x[0] for x in filter(self.filter_rc_active, all_wikis.items())] # first populate this list and remove wikis that are still in the db, clean up the rest
|
self.to_remove = [x[0] for x in filter(self.filter_rc_active, all_wikis.items())] # first populate this list and remove wikis that are still in the db, clean up the rest
|
||||||
full = set()
|
full = set()
|
||||||
for db_wiki in fetch_all.fetchall():
|
async with db.pool().acquire() as connection:
|
||||||
|
async with connection.transaction():
|
||||||
|
async for db_wiki in connection.cursor('SELECT DISTINCT wiki, row_number() OVER (ORDER BY webhook) AS rowid, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL ORDER BY webhook'):
|
||||||
domain = get_domain(db_wiki["wiki"])
|
domain = get_domain(db_wiki["wiki"])
|
||||||
try:
|
try:
|
||||||
if db_wiki["wiki"] not in all_wikis:
|
if db_wiki["wiki"] not in all_wikis:
|
||||||
|
@ -178,14 +182,14 @@ class RcQueue:
|
||||||
else: # Continue without adding
|
else: # Continue without adding
|
||||||
logger.debug("No condition fulfilled so skipping.")
|
logger.debug("No condition fulfilled so skipping.")
|
||||||
continue
|
continue
|
||||||
if not db_wiki["ROWID"] < current_domain["last_rowid"]:
|
if not db_wiki["rowid"] < current_domain["last_rowid"]:
|
||||||
current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20))
|
current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20))
|
||||||
except KeyError:
|
except KeyError:
|
||||||
await self.start_group(domain, [QueuedWiki(db_wiki["wiki"], 20)])
|
await self.start_group(domain, [QueuedWiki(db_wiki["wiki"], 20)])
|
||||||
logger.info("A new domain group ({}) has been added since last time, adding it to the domain_list and starting a task...".format(domain))
|
logger.info("A new domain group ({}) has been added since last time, adding it to the domain_list and starting a task...".format(domain))
|
||||||
except ListFull:
|
except ListFull:
|
||||||
full.add(domain)
|
full.add(domain)
|
||||||
current_domain["last_rowid"] = db_wiki["ROWID"]
|
current_domain["last_rowid"] = db_wiki["rowid"]
|
||||||
continue
|
continue
|
||||||
for wiki in self.to_remove:
|
for wiki in self.to_remove:
|
||||||
await self.remove_wiki_from_group(wiki)
|
await self.remove_wiki_from_group(wiki)
|
||||||
|
@ -226,23 +230,28 @@ def calculate_delay_for_group(group_length: int) -> float:
|
||||||
return 0.0
|
return 0.0
|
||||||
|
|
||||||
|
|
||||||
def generate_targets(wiki_url: str, additional_requirements: str) -> defaultdict:
|
async def generate_targets(wiki_url: str, additional_requirements: str) -> defaultdict:
|
||||||
"""To minimize the amount of requests, we generate a list of language/display mode combinations to create messages for
|
"""To minimize the amount of requests, we generate a list of language/display mode combinations to create messages for
|
||||||
this way we can send the same message to multiple webhooks which have the same wiki and settings without doing another
|
this way we can send the same message to multiple webhooks which have the same wiki and settings without doing another
|
||||||
request to the wiki just to duplicate the message.
|
request to the wiki just to duplicate the message.
|
||||||
"""
|
"""
|
||||||
combinations = defaultdict(list)
|
combinations = defaultdict(list)
|
||||||
for webhook in db_cursor.execute('SELECT webhook, lang, display FROM rcgcdw WHERE wiki = ? {}'.format(additional_requirements), (wiki_url,)):
|
async with db.pool().acquire() as connection:
|
||||||
|
async with connection.transaction():
|
||||||
|
async for webhook in connection.cursor('SELECT webhook, lang, display FROM rcgcdw WHERE wiki = $1 {}'.format(additional_requirements), wiki_url):
|
||||||
combination = (webhook["lang"], webhook["display"])
|
combination = (webhook["lang"], webhook["display"])
|
||||||
combinations[combination].append(webhook["webhook"])
|
combinations[combination].append(webhook["webhook"])
|
||||||
return combinations
|
return combinations
|
||||||
|
|
||||||
|
|
||||||
async def generate_domain_groups():
|
async def generate_domain_groups():
|
||||||
"""Generate a list of wikis per domain (fandom.com, wikipedia.org etc.)"""
|
"""Generate a list of wikis per domain (fandom.com, wikipedia.org etc.)
|
||||||
|
|
||||||
|
:returns tuple[str, list]"""
|
||||||
domain_wikis = defaultdict(list)
|
domain_wikis = defaultdict(list)
|
||||||
fetch_all = db_cursor.execute('SELECT ROWID, webhook, wiki, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL GROUP BY wiki ORDER BY ROWID ASC')
|
async with db.pool().acquire() as connection:
|
||||||
for db_wiki in fetch_all.fetchall():
|
async with connection.transaction():
|
||||||
|
async for db_wiki in connection.cursor('SELECT DISTINCT wiki, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL'):
|
||||||
domain_wikis[get_domain(db_wiki["wiki"])].append(QueuedWiki(db_wiki["wiki"], 20))
|
domain_wikis[get_domain(db_wiki["wiki"])].append(QueuedWiki(db_wiki["wiki"], 20))
|
||||||
for group, db_wikis in domain_wikis.items():
|
for group, db_wikis in domain_wikis.items():
|
||||||
yield group, db_wikis
|
yield group, db_wikis
|
||||||
|
@ -301,10 +310,10 @@ async def scan_group(group: str):
|
||||||
else:
|
else:
|
||||||
local_wiki.rc_active = 0
|
local_wiki.rc_active = 0
|
||||||
DBHandler.add(queued_wiki.url, 0)
|
DBHandler.add(queued_wiki.url, 0)
|
||||||
DBHandler.update_db()
|
await DBHandler.update_db()
|
||||||
continue
|
continue
|
||||||
categorize_events = {}
|
categorize_events = {}
|
||||||
targets = generate_targets(queued_wiki.url, "AND (rcid != -1 OR rcid IS NULL)")
|
targets = await generate_targets(queued_wiki.url, "AND (rcid != -1 OR rcid IS NULL)")
|
||||||
paths = get_paths(queued_wiki.url, recent_changes_resp)
|
paths = get_paths(queued_wiki.url, recent_changes_resp)
|
||||||
new_events = 0
|
new_events = 0
|
||||||
local_wiki.last_check = time.time() # on successful check, save new last check time
|
local_wiki.last_check = time.time() # on successful check, save new last check time
|
||||||
|
@ -347,7 +356,7 @@ async def scan_group(group: str):
|
||||||
if recent_changes: # we don't have to test for highest_rc being null, because if there are no RC entries recent_changes will be an empty list which will result in false in here and DO NOT save the value
|
if recent_changes: # we don't have to test for highest_rc being null, because if there are no RC entries recent_changes will be an empty list which will result in false in here and DO NOT save the value
|
||||||
local_wiki.rc_active = highest_rc
|
local_wiki.rc_active = highest_rc
|
||||||
DBHandler.add(queued_wiki.url, highest_rc)
|
DBHandler.add(queued_wiki.url, highest_rc)
|
||||||
DBHandler.update_db()
|
await DBHandler.update_db()
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
return
|
return
|
||||||
except QueueEmpty:
|
except QueueEmpty:
|
||||||
|
@ -393,9 +402,9 @@ async def message_sender():
|
||||||
async def discussion_handler():
|
async def discussion_handler():
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
fetch_all = db_cursor.execute(
|
async with db.pool().acquire() as connection:
|
||||||
"SELECT wiki, rcid, postid FROM rcgcdw WHERE postid != '-1' OR postid IS NULL GROUP BY wiki")
|
async with connection.transaction():
|
||||||
for db_wiki in fetch_all.fetchall():
|
async for db_wiki in connection.cursor("SELECT DISTINCT wiki, rcid, postid FROM rcgcdw WHERE postid != '-1' OR postid IS NULL"):
|
||||||
try:
|
try:
|
||||||
local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory
|
local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
@ -422,11 +431,10 @@ async def discussion_handler():
|
||||||
error = discussion_feed_resp["error"]
|
error = discussion_feed_resp["error"]
|
||||||
if error == "NotFoundException": # Discussions disabled
|
if error == "NotFoundException": # Discussions disabled
|
||||||
if db_wiki["rcid"] != -1: # RC feed is disabled
|
if db_wiki["rcid"] != -1: # RC feed is disabled
|
||||||
db_cursor.execute("UPDATE rcgcdw SET postid = ? WHERE wiki = ?",
|
await connection.execute("UPDATE rcgcdw SET postid = $1 WHERE wiki = $2", "-1", db_wiki["wiki"])
|
||||||
("-1", db_wiki["wiki"],))
|
|
||||||
else:
|
else:
|
||||||
await local_wiki.remove(db_wiki["wiki"], 1000)
|
await local_wiki.remove(db_wiki["wiki"], 1000)
|
||||||
DBHandler.update_db()
|
await DBHandler.update_db()
|
||||||
continue
|
continue
|
||||||
raise WikiError
|
raise WikiError
|
||||||
discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"]
|
discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"]
|
||||||
|
@ -445,10 +453,10 @@ async def discussion_handler():
|
||||||
DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True)
|
DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True)
|
||||||
else:
|
else:
|
||||||
DBHandler.add(db_wiki["wiki"], "0", True)
|
DBHandler.add(db_wiki["wiki"], "0", True)
|
||||||
DBHandler.update_db()
|
await DBHandler.update_db()
|
||||||
continue
|
continue
|
||||||
comment_events = []
|
comment_events = []
|
||||||
targets = generate_targets(db_wiki["wiki"], "AND NOT postid = '-1'")
|
targets = await generate_targets(db_wiki["wiki"], "AND NOT postid = '-1'")
|
||||||
for post in discussion_feed:
|
for post in discussion_feed:
|
||||||
if post["_embedded"]["thread"][0]["containerType"] == "ARTICLE_COMMENT" and post["id"] > db_wiki["postid"]:
|
if post["_embedded"]["thread"][0]["containerType"] == "ARTICLE_COMMENT" and post["id"] > db_wiki["postid"]:
|
||||||
comment_events.append(post["forumId"])
|
comment_events.append(post["forumId"])
|
||||||
|
@ -496,7 +504,7 @@ async def discussion_handler():
|
||||||
DBHandler.add(db_wiki["wiki"], post["id"], True)
|
DBHandler.add(db_wiki["wiki"], post["id"], True)
|
||||||
await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more
|
await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more
|
||||||
await asyncio.sleep(delay=1.0) # Avoid lock on no wikis
|
await asyncio.sleep(delay=1.0) # Avoid lock on no wikis
|
||||||
DBHandler.update_db()
|
await DBHandler.update_db()
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
pass
|
||||||
except:
|
except:
|
||||||
|
@ -510,8 +518,6 @@ async def discussion_handler():
|
||||||
|
|
||||||
def shutdown(loop, signal=None):
|
def shutdown(loop, signal=None):
|
||||||
global main_tasks
|
global main_tasks
|
||||||
DBHandler.update_db()
|
|
||||||
db_connection.close()
|
|
||||||
loop.remove_signal_handler(signal)
|
loop.remove_signal_handler(signal)
|
||||||
if len(messagequeue) > 0:
|
if len(messagequeue) > 0:
|
||||||
logger.warning("Some messages are still queued!")
|
logger.warning("Some messages are still queued!")
|
||||||
|
@ -543,6 +549,9 @@ async def main_loop():
|
||||||
global main_tasks
|
global main_tasks
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
nest_asyncio.apply(loop)
|
nest_asyncio.apply(loop)
|
||||||
|
await db.setup_connection()
|
||||||
|
logger.debug("Connection type: {}".format(db.connection))
|
||||||
|
await populate_allwikis()
|
||||||
try:
|
try:
|
||||||
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
|
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
|
||||||
for s in signals:
|
for s in signals:
|
||||||
|
@ -558,6 +567,8 @@ async def main_loop():
|
||||||
main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"])
|
main_tasks["msg_queue_shield"] = asyncio.shield(main_tasks["message_sender"])
|
||||||
await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"])
|
await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"])
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
|
await DBHandler.update_db()
|
||||||
|
await db.shutdown_connection()
|
||||||
shutdown(loop)
|
shutdown(loop)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
return
|
return
|
||||||
|
|
|
@ -1,6 +1,43 @@
|
||||||
import sqlite3
|
import asyncpg
|
||||||
|
import logging
|
||||||
|
from typing import Optional
|
||||||
from src.config import settings
|
from src.config import settings
|
||||||
|
|
||||||
db_connection = sqlite3.connect(settings.get("database_path", 'rcgcdb.db'))
|
logger = logging.getLogger("rcgcdb.database")
|
||||||
db_connection.row_factory = sqlite3.Row
|
# connection: Optional[asyncpg.Connection] = None
|
||||||
db_cursor = db_connection.cursor()
|
|
||||||
|
|
||||||
|
class db_connection:
|
||||||
|
connection: Optional[asyncpg.Pool] = None
|
||||||
|
|
||||||
|
async def setup_connection(self):
|
||||||
|
# Establish a connection to an existing database named "test"
|
||||||
|
# as a "postgres" user.
|
||||||
|
logger.debug("Setting up the Database connection...")
|
||||||
|
self.connection = await asyncpg.create_pool(user=settings["pg_user"], host=settings.get("pg_host", "localhost"),
|
||||||
|
database=settings.get("pg_db", "rcgcdb"), password=settings.get("pg_pass"),
|
||||||
|
port=settings.get("pg_port", 5432))
|
||||||
|
logger.debug("Database connection established! Connection: {}".format(self.connection))
|
||||||
|
|
||||||
|
async def shutdown_connection(self):
|
||||||
|
logger.debug("Shutting down database connection...")
|
||||||
|
await self.connection.close()
|
||||||
|
|
||||||
|
def pool(self) -> asyncpg.Pool:
|
||||||
|
return self.connection
|
||||||
|
|
||||||
|
# Tried to make it a decorator but tbh won't probably work
|
||||||
|
# async def in_transaction(self, func):
|
||||||
|
# async def single_transaction():
|
||||||
|
# async with self.connection.acquire() as connection:
|
||||||
|
# async with connection.transaction():
|
||||||
|
# await func()
|
||||||
|
# return single_transaction
|
||||||
|
|
||||||
|
# async def query(self, string, *arg):
|
||||||
|
# async with self.connection.acquire() as connection:
|
||||||
|
# async with connection.transaction():
|
||||||
|
# return connection.cursor(string, *arg)
|
||||||
|
|
||||||
|
|
||||||
|
db = db_connection()
|
||||||
|
|
|
@ -3,7 +3,7 @@ from collections import defaultdict
|
||||||
|
|
||||||
from src.misc import logger
|
from src.misc import logger
|
||||||
from src.config import settings
|
from src.config import settings
|
||||||
from src.database import db_cursor
|
from src.database import db
|
||||||
from src.i18n import langs
|
from src.i18n import langs
|
||||||
from src.exceptions import EmbedListFull
|
from src.exceptions import EmbedListFull
|
||||||
from asyncio import TimeoutError
|
from asyncio import TimeoutError
|
||||||
|
@ -22,7 +22,9 @@ default_header["X-RateLimit-Precision"] = "millisecond"
|
||||||
|
|
||||||
# User facing webhook functions
|
# User facing webhook functions
|
||||||
async def wiki_removal(wiki_url, status):
|
async def wiki_removal(wiki_url, status):
|
||||||
for observer in db_cursor.execute('SELECT webhook, lang FROM rcgcdw WHERE wiki = ?', (wiki_url,)):
|
async with db.pool().acquire() as connection:
|
||||||
|
async with connection.transaction():
|
||||||
|
async for observer in connection.cursor('SELECT webhook, lang FROM rcgcdw WHERE wiki = $1', wiki_url):
|
||||||
_ = langs[observer["lang"]]["discord"].gettext
|
_ = langs[observer["lang"]]["discord"].gettext
|
||||||
reasons = {410: _("wiki deleted"), 404: _("wiki deleted"), 401: _("wiki inaccessible"),
|
reasons = {410: _("wiki deleted"), 404: _("wiki deleted"), 401: _("wiki inaccessible"),
|
||||||
402: _("wiki inaccessible"), 403: _("wiki inaccessible"), 1000: _("discussions disabled")}
|
402: _("wiki inaccessible"), 403: _("wiki inaccessible"), 1000: _("discussions disabled")}
|
||||||
|
@ -238,7 +240,8 @@ async def handle_discord_http(code: int, formatted_embed: str, result: aiohttp.C
|
||||||
return 1
|
return 1
|
||||||
elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND
|
elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND
|
||||||
logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.")
|
logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.")
|
||||||
db_cursor.execute("DELETE FROM rcgcdw WHERE webhook = ?", (webhook_url,))
|
async with db.pool().acquire() as connection:
|
||||||
|
await connection.execute("DELETE FROM rcgcdw WHERE webhook = $1", webhook_url)
|
||||||
await webhook_removal_monitor(webhook_url, code)
|
await webhook_removal_monitor(webhook_url, code)
|
||||||
return 1
|
return 1
|
||||||
elif code == 429:
|
elif code == 429:
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import logging
|
import logging
|
||||||
from src.database import db_cursor, db_connection
|
from src.database import db
|
||||||
|
|
||||||
logger = logging.getLogger("rcgcdb.queue_handler")
|
logger = logging.getLogger("rcgcdb.queue_handler")
|
||||||
|
|
||||||
|
@ -14,14 +14,15 @@ class UpdateDB:
|
||||||
def clear_list(self):
|
def clear_list(self):
|
||||||
self.updated.clear()
|
self.updated.clear()
|
||||||
|
|
||||||
def update_db(self):
|
async def update_db(self):
|
||||||
|
async with db.pool().acquire() as connection:
|
||||||
|
async with connection.transaction():
|
||||||
for update in self.updated:
|
for update in self.updated:
|
||||||
if update[2] is None:
|
if update[2] is None:
|
||||||
sql = "UPDATE rcgcdw SET rcid = ? WHERE wiki = ? AND ( rcid != -1 OR rcid IS NULL )"
|
sql = "UPDATE rcgcdw SET rcid = $2 WHERE wiki = $1 AND ( rcid != -1 OR rcid IS NULL )"
|
||||||
else:
|
else:
|
||||||
sql = "UPDATE rcgcdw SET postid = ? WHERE wiki = ? AND ( postid != '-1' OR postid IS NULL )"
|
sql = "UPDATE rcgcdw SET postid = $2 WHERE wiki = $1 AND ( postid != '-1' OR postid IS NULL )"
|
||||||
db_cursor.execute(sql, (update[1], update[0],))
|
await connection.execute(sql, update[0], update[1])
|
||||||
db_connection.commit()
|
|
||||||
self.clear_list()
|
self.clear_list()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@ from dataclasses import dataclass
|
||||||
import re
|
import re
|
||||||
import logging, aiohttp
|
import logging, aiohttp
|
||||||
from src.exceptions import *
|
from src.exceptions import *
|
||||||
from src.database import db_cursor, db_connection
|
from src.database import db
|
||||||
from src.formatters.rc import embed_formatter, compact_formatter
|
from src.formatters.rc import embed_formatter, compact_formatter
|
||||||
from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter
|
from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter
|
||||||
from src.misc import parse_link
|
from src.misc import parse_link
|
||||||
|
@ -109,9 +109,9 @@ class Wiki:
|
||||||
logger.info("Removing a wiki {}".format(wiki_url))
|
logger.info("Removing a wiki {}".format(wiki_url))
|
||||||
await src.discord.wiki_removal(wiki_url, reason)
|
await src.discord.wiki_removal(wiki_url, reason)
|
||||||
await src.discord.wiki_removal_monitor(wiki_url, reason)
|
await src.discord.wiki_removal_monitor(wiki_url, reason)
|
||||||
db_cursor.execute('DELETE FROM rcgcdw WHERE wiki = ?', (wiki_url,))
|
async with db.pool().acquire() as connection:
|
||||||
logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(db_cursor.rowcount, wiki_url))
|
result = await connection.execute('DELETE FROM rcgcdw WHERE wiki = $1', wiki_url)
|
||||||
db_connection.commit()
|
logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(result, wiki_url))
|
||||||
|
|
||||||
async def pull_comment(self, comment_id, WIKI_API_PATH, rate_limiter):
|
async def pull_comment(self, comment_id, WIKI_API_PATH, rate_limiter):
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Reference in a new issue