Added PostgreSQL compatibility, dropped SQLite compatibility

This commit is contained in:
Frisk 2021-03-20 13:42:54 +01:00
parent 2c8574445c
commit a2f1d54f39
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
6 changed files with 227 additions and 193 deletions

View file

@ -8,6 +8,10 @@
"monitoring_webhook": "111111111111111111/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", "monitoring_webhook": "111111111111111111/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
"support": "https://discord.gg/v77RTk5", "support": "https://discord.gg/v77RTk5",
"irc_overtime": 3600, "irc_overtime": 3600,
"pg_user": "postgres",
"pg_host": "localhost",
"pg_db": "rcgcdb",
"pg_pass": "secret_password",
"irc_servers": { "irc_servers": {
"your custom name for the farm": { "your custom name for the farm": {
"domains": ["wikipedia.org", "otherwikipedia.org"], "domains": ["wikipedia.org", "otherwikipedia.org"],

View file

@ -11,7 +11,7 @@ from typing import Generator
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from src.argparser import command_line_args from src.argparser import command_line_args
from src.config import settings from src.config import settings
from src.database import connection, setup_connection, shutdown_connection from src.database import db
from src.exceptions import * from src.exceptions import *
from src.misc import get_paths, get_domain from src.misc import get_paths, get_domain
from src.msgqueue import messagequeue, send_to_discord from src.msgqueue import messagequeue, send_to_discord
@ -39,11 +39,13 @@ main_tasks: dict = {}
# First populate the all_wikis list with every wiki # First populate the all_wikis list with every wiki
# Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests # Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests
# 2. Easier to code # 2. Easier to code
async def populate_allwikis(): async def populate_allwikis():
async with connection.transaction(): async with db.pool().acquire() as connection:
async for db_wiki in connection.cursor('SELECT DISTINCT wiki, rcid FROM rcgcdw'): async with connection.transaction():
all_wikis[db_wiki["wiki"]] = Wiki() # populate all_wikis async for db_wiki in connection.cursor('SELECT DISTINCT wiki, rcid FROM rcgcdw'):
all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"] all_wikis[db_wiki["wiki"]] = Wiki() # populate all_wikis
all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"]
queue_limit = settings.get("queue_limit", 30) queue_limit = settings.get("queue_limit", 30)
QueuedWiki = namedtuple("QueuedWiki", ['url', 'amount']) QueuedWiki = namedtuple("QueuedWiki", ['url', 'amount'])
@ -104,10 +106,11 @@ class RcQueue:
del self.domain_list[group] del self.domain_list[group]
async def check_if_domain_in_db(self, domain): async def check_if_domain_in_db(self, domain):
async with connection.transaction(): async with db.pool().acquire() as connection:
async for wiki in connection.cursor('SELECT DISTINCT wiki FROM rcgcdw WHERE rcid != -1;'): async with connection.transaction():
if get_domain(wiki["wiki"]) == domain: async for wiki in connection.cursor('SELECT DISTINCT wiki FROM rcgcdw WHERE rcid != -1;'):
return True if get_domain(wiki["wiki"]) == domain:
return True
return False return False
@asynccontextmanager @asynccontextmanager
@ -146,47 +149,48 @@ class RcQueue:
try: try:
self.to_remove = [x[0] for x in filter(self.filter_rc_active, all_wikis.items())] # first populate this list and remove wikis that are still in the db, clean up the rest self.to_remove = [x[0] for x in filter(self.filter_rc_active, all_wikis.items())] # first populate this list and remove wikis that are still in the db, clean up the rest
full = set() full = set()
async with connection.transaction(): async with db.pool().acquire() as connection:
async for db_wiki in connection.cursor('SELECT DISTINCT wiki, row_number() over (ORDER BY webhook) AS ROWID, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL order by webhook'): async with connection.transaction():
domain = get_domain(db_wiki["wiki"]) async for db_wiki in connection.cursor('SELECT DISTINCT wiki, row_number() over (ORDER BY webhook) AS ROWID, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL order by webhook'):
try: domain = get_domain(db_wiki["wiki"])
if db_wiki["wiki"] not in all_wikis: try:
raise AssertionError if db_wiki["wiki"] not in all_wikis:
self.to_remove.remove(db_wiki["wiki"]) raise AssertionError
except AssertionError: self.to_remove.remove(db_wiki["wiki"])
all_wikis[db_wiki["wiki"]] = Wiki() except AssertionError:
all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"] all_wikis[db_wiki["wiki"]] = Wiki()
except ValueError: all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"]
pass except ValueError:
if domain in full: pass
continue if domain in full:
try: continue
current_domain: dict = self[domain] try:
if current_domain["irc"]: current_domain: dict = self[domain]
logger.debug("DOMAIN LIST FOR IRC: {}".format(current_domain["irc"].updated)) if current_domain["irc"]:
logger.debug("CURRENT DOMAIN INFO: {}".format(domain)) logger.debug("DOMAIN LIST FOR IRC: {}".format(current_domain["irc"].updated))
logger.debug("IS WIKI IN A LIST?: {}".format(db_wiki["wiki"] in current_domain["irc"].updated)) logger.debug("CURRENT DOMAIN INFO: {}".format(domain))
logger.debug("LAST CHECK FOR THE WIKI {} IS {}".format(db_wiki["wiki"], all_wikis[db_wiki["wiki"]].last_check)) logger.debug("IS WIKI IN A LIST?: {}".format(db_wiki["wiki"] in current_domain["irc"].updated))
if db_wiki["wiki"] in current_domain["irc"].updated: # Priority wikis are the ones with IRC, if they get updated forcefully add them to queue logger.debug("LAST CHECK FOR THE WIKI {} IS {}".format(db_wiki["wiki"], all_wikis[db_wiki["wiki"]].last_check))
current_domain["irc"].updated.remove(db_wiki["wiki"]) if db_wiki["wiki"] in current_domain["irc"].updated: # Priority wikis are the ones with IRC, if they get updated forcefully add them to queue
current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20), forced=True) current_domain["irc"].updated.remove(db_wiki["wiki"])
logger.debug("Updated in IRC so adding to queue.") current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20), forced=True)
continue logger.debug("Updated in IRC so adding to queue.")
elif all_wikis[db_wiki["wiki"]].last_check+settings["irc_overtime"] < time.time(): # if time went by and wiki should be updated now use default mechanics continue
logger.debug("Overtime so adding to queue.") elif all_wikis[db_wiki["wiki"]].last_check+settings["irc_overtime"] < time.time(): # if time went by and wiki should be updated now use default mechanics
pass logger.debug("Overtime so adding to queue.")
else: # Continue without adding pass
logger.debug("No condition fulfilled so skipping.") else: # Continue without adding
continue logger.debug("No condition fulfilled so skipping.")
if not db_wiki["rowid"] < current_domain["last_rowid"]: continue
current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20)) if not db_wiki["rowid"] < current_domain["last_rowid"]:
except KeyError: current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20))
await self.start_group(domain, [QueuedWiki(db_wiki["wiki"], 20)]) except KeyError:
logger.info("A new domain group ({}) has been added since last time, adding it to the domain_list and starting a task...".format(domain)) await self.start_group(domain, [QueuedWiki(db_wiki["wiki"], 20)])
except ListFull: logger.info("A new domain group ({}) has been added since last time, adding it to the domain_list and starting a task...".format(domain))
full.add(domain) except ListFull:
current_domain["last_rowid"] = db_wiki["rowid"] full.add(domain)
continue current_domain["last_rowid"] = db_wiki["rowid"]
continue
for wiki in self.to_remove: for wiki in self.to_remove:
await self.remove_wiki_from_group(wiki) await self.remove_wiki_from_group(wiki)
for group, data in self.domain_list.items(): for group, data in self.domain_list.items():
@ -232,10 +236,11 @@ async def generate_targets(wiki_url: str, additional_requirements: str) -> defau
request to the wiki just to duplicate the message. request to the wiki just to duplicate the message.
""" """
combinations = defaultdict(list) combinations = defaultdict(list)
async with connection.transaction(): async with db.pool().acquire() as connection:
async for webhook in connection.cursor('SELECT webhook, lang, display FROM rcgcdw WHERE wiki = $1 {}'.format(additional_requirements), wiki_url): async with connection.transaction():
combination = (webhook["lang"], webhook["display"]) async for webhook in connection.cursor('SELECT webhook, lang, display FROM rcgcdw WHERE wiki = $1 {}'.format(additional_requirements), wiki_url):
combinations[combination].append(webhook["webhook"]) combination = (webhook["lang"], webhook["display"])
combinations[combination].append(webhook["webhook"])
return combinations return combinations
@ -244,9 +249,10 @@ async def generate_domain_groups():
:returns tuple[str, list]""" :returns tuple[str, list]"""
domain_wikis = defaultdict(list) domain_wikis = defaultdict(list)
async with connection.transaction(): async with db.pool().acquire() as connection:
async for db_wiki in connection.cursor('SELECT DISTINCT wiki, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL'): async with connection.transaction():
domain_wikis[get_domain(db_wiki["wiki"])].append(QueuedWiki(db_wiki["wiki"], 20)) async for db_wiki in connection.cursor('SELECT DISTINCT wiki, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL'):
domain_wikis[get_domain(db_wiki["wiki"])].append(QueuedWiki(db_wiki["wiki"], 20))
for group, db_wikis in domain_wikis.items(): for group, db_wikis in domain_wikis.items():
yield group, db_wikis yield group, db_wikis
@ -396,107 +402,108 @@ async def message_sender():
async def discussion_handler(): async def discussion_handler():
try: try:
while True: while True:
async with connection.transaction(): async with db.pool().acquire() as connection:
async for db_wiki in connection.cursor("SELECT DISTINCT wiki, rcid, postid FROM rcgcdw WHERE postid != '-1' OR postid IS NULL"): async with connection.transaction():
try: async for db_wiki in connection.cursor("SELECT DISTINCT wiki, rcid, postid FROM rcgcdw WHERE postid != '-1' OR postid IS NULL"):
local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory
except KeyError:
local_wiki = all_wikis[db_wiki["wiki"]] = Wiki()
local_wiki.rc_active = db_wiki["rcid"]
if db_wiki["wiki"] not in rcqueue.irc_mapping["fandom.com"].updated_discussions and local_wiki.last_discussion_check+settings["irc_overtime"] > time.time(): # I swear if another wiki farm ever starts using Fandom discussions I'm gonna use explosion magic
continue
else:
try: try:
rcqueue.irc_mapping["fandom.com"].updated_discussions.remove(db_wiki["wiki"]) local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory
except KeyError: except KeyError:
pass # to be expected local_wiki = all_wikis[db_wiki["wiki"]] = Wiki()
header = settings["header"] local_wiki.rc_active = db_wiki["rcid"]
header["Accept"] = "application/hal+json" if db_wiki["wiki"] not in rcqueue.irc_mapping["fandom.com"].updated_discussions and local_wiki.last_discussion_check+settings["irc_overtime"] > time.time(): # I swear if another wiki farm ever starts using Fandom discussions I'm gonna use explosion magic
async with aiohttp.ClientSession(headers=header,
timeout=aiohttp.ClientTimeout(6.0)) as session:
try:
feeds_response = await local_wiki.fetch_feeds(db_wiki["wiki"], session)
except (WikiServerError, WikiError):
continue # ignore this wiki if it throws errors
try:
discussion_feed_resp = await feeds_response.json(encoding="UTF-8")
if "error" in discussion_feed_resp:
error = discussion_feed_resp["error"]
if error == "NotFoundException": # Discussions disabled
if db_wiki["rcid"] != -1: # RC feed is disabled
await connection.execute("UPDATE rcgcdw SET postid = ? WHERE wiki = ?",
("-1", db_wiki["wiki"],))
else:
await local_wiki.remove(db_wiki["wiki"], 1000)
await DBHandler.update_db()
continue
raise WikiError
discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"]
discussion_feed.reverse()
except aiohttp.ContentTypeError:
logger.exception("Wiki seems to be resulting in non-json content.")
continue continue
except asyncio.TimeoutError:
logger.debug("Timeout on reading JSON of discussion post feeed.")
continue
except:
logger.exception("On loading json of response.")
continue
if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel
if len(discussion_feed) > 0:
DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True)
else: else:
DBHandler.add(db_wiki["wiki"], "0", True) try:
await DBHandler.update_db() rcqueue.irc_mapping["fandom.com"].updated_discussions.remove(db_wiki["wiki"])
continue except KeyError:
comment_events = [] pass # to be expected
targets = await generate_targets(db_wiki["wiki"], "AND NOT postid = '-1'") header = settings["header"]
for post in discussion_feed: header["Accept"] = "application/hal+json"
if post["_embedded"]["thread"][0]["containerType"] == "ARTICLE_COMMENT" and post["id"] > db_wiki["postid"]: async with aiohttp.ClientSession(headers=header,
comment_events.append(post["forumId"]) timeout=aiohttp.ClientTimeout(6.0)) as session:
comment_pages: dict = {} try:
if comment_events: feeds_response = await local_wiki.fetch_feeds(db_wiki["wiki"], session)
try: except (WikiServerError, WikiError):
comment_pages = await local_wiki.safe_request( continue # ignore this wiki if it throws errors
"{wiki}wikia.php?controller=FeedsAndPosts&method=getArticleNamesAndUsernames&stablePageIds={pages}&format=json".format( try:
wiki=db_wiki["wiki"], pages=",".join(comment_events) discussion_feed_resp = await feeds_response.json(encoding="UTF-8")
), RateLimiter(), "articleNames") if "error" in discussion_feed_resp:
except aiohttp.ClientResponseError: # Fandom can be funny sometimes... See #30 error = discussion_feed_resp["error"]
comment_pages = None if error == "NotFoundException": # Discussions disabled
except: if db_wiki["rcid"] != -1: # RC feed is disabled
if command_line_args.debug: await connection.execute("UPDATE rcgcdw SET postid = $1 WHERE wiki = $2",
logger.exception("Exception on Feeds article comment request") ("-1", db_wiki["wiki"],))
shutdown(loop=asyncio.get_event_loop()) else:
await local_wiki.remove(db_wiki["wiki"], 1000)
await DBHandler.update_db()
continue
raise WikiError
discussion_feed = discussion_feed_resp["_embedded"]["doc:posts"]
discussion_feed.reverse()
except aiohttp.ContentTypeError:
logger.exception("Wiki seems to be resulting in non-json content.")
continue
except asyncio.TimeoutError:
logger.debug("Timeout on reading JSON of discussion post feeed.")
continue
except:
logger.exception("On loading json of response.")
continue
if db_wiki["postid"] is None: # new wiki, just get the last post to not spam the channel
if len(discussion_feed) > 0:
DBHandler.add(db_wiki["wiki"], discussion_feed[-1]["id"], True)
else: else:
logger.exception("Exception on Feeds article comment request") DBHandler.add(db_wiki["wiki"], "0", True)
await generic_msg_sender_exception_logger(traceback.format_exc(), await DBHandler.update_db()
"Exception on Feeds article comment request", continue
Post=str(post)[0:1000], Wiki=db_wiki["wiki"]) comment_events = []
message_list = defaultdict(list) targets = await generate_targets(db_wiki["wiki"], "AND NOT postid = '-1'")
for post in discussion_feed: # Yeah, second loop since the comments require an extra request for post in discussion_feed:
if post["id"] > db_wiki["postid"]: if post["_embedded"]["thread"][0]["containerType"] == "ARTICLE_COMMENT" and post["id"] > db_wiki["postid"]:
for target in targets.items(): comment_events.append(post["forumId"])
try: comment_pages: dict = {}
message = await essential_feeds(post, comment_pages, db_wiki, target) if comment_events:
if message is not None: try:
message_list[target[0]].append(message) comment_pages = await local_wiki.safe_request(
except asyncio.CancelledError: "{wiki}wikia.php?controller=FeedsAndPosts&method=getArticleNamesAndUsernames&stablePageIds={pages}&format=json".format(
raise wiki=db_wiki["wiki"], pages=",".join(comment_events)
except: ), RateLimiter(), "articleNames")
if command_line_args.debug: except aiohttp.ClientResponseError: # Fandom can be funny sometimes... See #30
logger.exception("Exception on Feeds formatter") comment_pages = None
shutdown(loop=asyncio.get_event_loop()) except:
else: if command_line_args.debug:
logger.exception("Exception on Feeds formatter") logger.exception("Exception on Feeds article comment request")
await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in feed formatter", Post=str(post)[0:1000], Wiki=db_wiki["wiki"]) shutdown(loop=asyncio.get_event_loop())
# Lets stack the messages else:
for messages in message_list.values(): logger.exception("Exception on Feeds article comment request")
messages = stack_message_list(messages) await generic_msg_sender_exception_logger(traceback.format_exc(),
for message in messages: "Exception on Feeds article comment request",
await send_to_discord(message) Post=str(post)[0:1000], Wiki=db_wiki["wiki"])
if discussion_feed: message_list = defaultdict(list)
DBHandler.add(db_wiki["wiki"], post["id"], True) for post in discussion_feed: # Yeah, second loop since the comments require an extra request
await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more if post["id"] > db_wiki["postid"]:
for target in targets.items():
try:
message = await essential_feeds(post, comment_pages, db_wiki, target)
if message is not None:
message_list[target[0]].append(message)
except asyncio.CancelledError:
raise
except:
if command_line_args.debug:
logger.exception("Exception on Feeds formatter")
shutdown(loop=asyncio.get_event_loop())
else:
logger.exception("Exception on Feeds formatter")
await generic_msg_sender_exception_logger(traceback.format_exc(), "Exception in feed formatter", Post=str(post)[0:1000], Wiki=db_wiki["wiki"])
# Lets stack the messages
for messages in message_list.values():
messages = stack_message_list(messages)
for message in messages:
await send_to_discord(message)
if discussion_feed:
DBHandler.add(db_wiki["wiki"], post["id"], True)
await asyncio.sleep(delay=2.0) # hardcoded really doesn't need much more
await asyncio.sleep(delay=1.0) # Avoid lock on no wikis await asyncio.sleep(delay=1.0) # Avoid lock on no wikis
await DBHandler.update_db() await DBHandler.update_db()
except asyncio.CancelledError: except asyncio.CancelledError:
@ -543,8 +550,8 @@ async def main_loop():
global main_tasks global main_tasks
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
nest_asyncio.apply(loop) nest_asyncio.apply(loop)
await setup_connection() await db.setup_connection()
logger.debug("Connection type: {}".format(connection)) logger.debug("Connection type: {}".format(db.connection))
await populate_allwikis() await populate_allwikis()
try: try:
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
@ -562,7 +569,7 @@ async def main_loop():
await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"]) await asyncio.gather(main_tasks["wiki_scanner"], main_tasks["discussion_handler"], main_tasks["message_sender"])
except KeyboardInterrupt: except KeyboardInterrupt:
await DBHandler.update_db() await DBHandler.update_db()
await shutdown_connection() await db.shutdown_connection()
shutdown(loop) shutdown(loop)
except asyncio.CancelledError: except asyncio.CancelledError:
return return

View file

@ -4,19 +4,38 @@ from typing import Optional
from src.config import settings from src.config import settings
logger = logging.getLogger("rcgcdb.database") logger = logging.getLogger("rcgcdb.database")
connection: Optional[asyncpg.Connection] = None # connection: Optional[asyncpg.Connection] = None
async def setup_connection(): class db_connection:
global connection connection: Optional[asyncpg.Pool] = None
# Establish a connection to an existing database named "test"
# as a "postgres" user. async def setup_connection(self):
logger.debug("Setting up the Database connection...") # Establish a connection to an existing database named "test"
connection = await asyncpg.connect(user=settings["pg_user"], host=settings.get("pg_host", "localhost"), # as a "postgres" user.
database=settings.get("pg_db", "rcgcdb"), password=settings.get("pg_pass")) logger.debug("Setting up the Database connection...")
logger.debug("Database connection established! Connection: {}".format(connection)) self.connection = await asyncpg.create_pool(user=settings["pg_user"], host=settings.get("pg_host", "localhost"),
database=settings.get("pg_db", "rcgcdb"), password=settings.get("pg_pass"))
logger.debug("Database connection established! Connection: {}".format(self.connection))
async def shutdown_connection(self):
await self.connection.close()
def pool(self) -> asyncpg.Pool:
return self.connection
# Tried to make it a decorator but tbh won't probably work
# async def in_transaction(self, func):
# async def single_transaction():
# async with self.connection.acquire() as connection:
# async with connection.transaction():
# await func()
# return single_transaction
# async def query(self, string, *arg):
# async with self.connection.acquire() as connection:
# async with connection.transaction():
# return connection.cursor(string, *arg)
async def shutdown_connection(): db = db_connection()
global connection
await connection.close()

View file

@ -3,7 +3,7 @@ from collections import defaultdict
from src.misc import logger from src.misc import logger
from src.config import settings from src.config import settings
from src.database import connection from src.database import db
from src.i18n import langs from src.i18n import langs
from src.exceptions import EmbedListFull from src.exceptions import EmbedListFull
from asyncio import TimeoutError from asyncio import TimeoutError
@ -22,18 +22,19 @@ default_header["X-RateLimit-Precision"] = "millisecond"
# User facing webhook functions # User facing webhook functions
async def wiki_removal(wiki_url, status): async def wiki_removal(wiki_url, status):
async with connection.transaction(): async with db.pool().acquire() as connection:
async for observer in connection.cursor('SELECT webhook, lang FROM rcgcdw WHERE wiki = ?', wiki_url): async with connection.transaction():
_ = langs[observer["lang"]]["discord"].gettext async for observer in connection.cursor('SELECT webhook, lang FROM rcgcdw WHERE wiki = $1', wiki_url):
reasons = {410: _("wiki deleted"), 404: _("wiki deleted"), 401: _("wiki inaccessible"), _ = langs[observer["lang"]]["discord"].gettext
402: _("wiki inaccessible"), 403: _("wiki inaccessible"), 1000: _("discussions disabled")} reasons = {410: _("wiki deleted"), 404: _("wiki deleted"), 401: _("wiki inaccessible"),
reason = reasons.get(status, _("unknown error")) 402: _("wiki inaccessible"), 403: _("wiki inaccessible"), 1000: _("discussions disabled")}
await send_to_discord_webhook(DiscordMessage("compact", "webhook/remove", webhook_url=[], content=_("This recent changes webhook has been removed for `{reason}`!").format(reason=reason), wiki=None), webhook_url=observer["webhook"]) reason = reasons.get(status, _("unknown error"))
header = settings["header"] await send_to_discord_webhook(DiscordMessage("compact", "webhook/remove", webhook_url=[], content=_("This recent changes webhook has been removed for `{reason}`!").format(reason=reason), wiki=None), webhook_url=observer["webhook"])
header['Content-Type'] = 'application/json' header = settings["header"]
header['X-Audit-Log-Reason'] = "Wiki becoming unavailable" header['Content-Type'] = 'application/json'
async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(5.0)) as session: header['X-Audit-Log-Reason'] = "Wiki becoming unavailable"
await session.delete("https://discord.com/api/webhooks/"+observer["webhook"]) async with aiohttp.ClientSession(headers=header, timeout=aiohttp.ClientTimeout(5.0)) as session:
await session.delete("https://discord.com/api/webhooks/"+observer["webhook"])
async def webhook_removal_monitor(webhook_url: str, reason: int): async def webhook_removal_monitor(webhook_url: str, reason: int):
@ -239,7 +240,8 @@ async def handle_discord_http(code: int, formatted_embed: str, result: aiohttp.C
return 1 return 1
elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND
logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.") logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.")
await connection.execute("DELETE FROM rcgcdw WHERE webhook = ?", (webhook_url,)) async with db.pool().acquire() as connection:
await connection.execute("DELETE FROM rcgcdw WHERE webhook = $1", (webhook_url,))
await webhook_removal_monitor(webhook_url, code) await webhook_removal_monitor(webhook_url, code)
return 1 return 1
elif code == 429: elif code == 429:

View file

@ -1,5 +1,5 @@
import logging import logging
from src.database import connection from src.database import db
logger = logging.getLogger("rcgcdb.queue_handler") logger = logging.getLogger("rcgcdb.queue_handler")
@ -15,14 +15,15 @@ class UpdateDB:
self.updated.clear() self.updated.clear()
async def update_db(self): async def update_db(self):
async with connection.transaction(): async with db.pool().acquire() as connection:
for update in self.updated: async with connection.transaction():
if update[2] is None: for update in self.updated:
sql = "UPDATE rcgcdw SET rcid = ? WHERE wiki = ? AND ( rcid != -1 OR rcid IS NULL )" if update[2] is None:
else: sql = "UPDATE rcgcdw SET rcid = $2 WHERE wiki = $1 AND ( rcid != -1 OR rcid IS NULL )"
sql = "UPDATE rcgcdw SET postid = ? WHERE wiki = ? AND ( postid != '-1' OR postid IS NULL )" else:
await connection.execute(sql) sql = "UPDATE rcgcdw SET postid = $2 WHERE wiki = $1 AND ( postid != '-1' OR postid IS NULL )"
self.clear_list() await connection.execute(sql, update[0], update[1])
self.clear_list()
DBHandler = UpdateDB() DBHandler = UpdateDB()

View file

@ -2,7 +2,7 @@ from dataclasses import dataclass
import re import re
import logging, aiohttp import logging, aiohttp
from src.exceptions import * from src.exceptions import *
from src.database import connection from src.database import db
from src.formatters.rc import embed_formatter, compact_formatter from src.formatters.rc import embed_formatter, compact_formatter
from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter
from src.misc import parse_link from src.misc import parse_link
@ -109,7 +109,8 @@ class Wiki:
logger.info("Removing a wiki {}".format(wiki_url)) logger.info("Removing a wiki {}".format(wiki_url))
await src.discord.wiki_removal(wiki_url, reason) await src.discord.wiki_removal(wiki_url, reason)
await src.discord.wiki_removal_monitor(wiki_url, reason) await src.discord.wiki_removal_monitor(wiki_url, reason)
result = await connection.execute('DELETE FROM rcgcdw WHERE wiki = ?', wiki_url) async with db.pool().acquire() as connection:
result = await connection.execute('DELETE FROM rcgcdw WHERE wiki = $1', wiki_url)
logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(result, wiki_url)) logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(result, wiki_url))
async def pull_comment(self, comment_id, WIKI_API_PATH, rate_limiter): async def pull_comment(self, comment_id, WIKI_API_PATH, rate_limiter):