Converted all of the queries however there is still some work to be done

This commit is contained in:
Frisk 2021-03-19 16:26:19 +01:00
parent 79ea1a6a8b
commit 2c8574445c
No known key found for this signature in database
GPG key ID: 213F7C15068AF8AC
4 changed files with 188 additions and 179 deletions

View file

@ -35,11 +35,13 @@ all_wikis: dict = {}
mw_msgs: dict = {} # will have the type of id: tuple mw_msgs: dict = {} # will have the type of id: tuple
main_tasks: dict = {} main_tasks: dict = {}
# First populate the all_wikis list with every wiki # First populate the all_wikis list with every wiki
# Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests # Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests
# 2. Easier to code # 2. Easier to code
async def populate_allwikis():
for db_wiki in db_cursor.execute('SELECT wiki, rcid FROM rcgcdw GROUP BY wiki ORDER BY ROWID'): async with connection.transaction():
async for db_wiki in connection.cursor('SELECT DISTINCT wiki, rcid FROM rcgcdw'):
all_wikis[db_wiki["wiki"]] = Wiki() # populate all_wikis all_wikis[db_wiki["wiki"]] = Wiki() # populate all_wikis
all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"] all_wikis[db_wiki["wiki"]].rc_active = db_wiki["rcid"]
@ -92,7 +94,7 @@ class RcQueue:
all_wikis[wiki].rc_active = -1 all_wikis[wiki].rc_active = -1
if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task
logger.debug(f"{group} no longer has any wikis queued!") logger.debug(f"{group} no longer has any wikis queued!")
if not self.check_if_domain_in_db(group): if not await self.check_if_domain_in_db(group):
await self.stop_task_group(group) await self.stop_task_group(group)
else: else:
logger.debug(f"But there are still wikis for it in DB!") logger.debug(f"But there are still wikis for it in DB!")
@ -101,10 +103,9 @@ class RcQueue:
self[group]["task"].cancel() self[group]["task"].cancel()
del self.domain_list[group] del self.domain_list[group]
def check_if_domain_in_db(self, domain): async def check_if_domain_in_db(self, domain):
fetch_all = db_cursor.execute( async with connection.transaction():
'SELECT ROWID, webhook, wiki, lang, display, rcid FROM rcgcdw WHERE rcid != -1 GROUP BY wiki ORDER BY ROWID ASC') async for wiki in connection.cursor('SELECT DISTINCT wiki FROM rcgcdw WHERE rcid != -1;'):
for wiki in fetch_all.fetchall():
if get_domain(wiki["wiki"]) == domain: if get_domain(wiki["wiki"]) == domain:
return True return True
return False return False
@ -143,11 +144,10 @@ class RcQueue:
async def update_queues(self): async def update_queues(self):
"""Makes a round on rcgcdb DB and looks for updates to the queues in self.domain_list""" """Makes a round on rcgcdb DB and looks for updates to the queues in self.domain_list"""
try: try:
fetch_all = db_cursor.execute(
'SELECT ROWID, webhook, wiki, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL GROUP BY wiki ORDER BY ROWID ASC')
self.to_remove = [x[0] for x in filter(self.filter_rc_active, all_wikis.items())] # first populate this list and remove wikis that are still in the db, clean up the rest self.to_remove = [x[0] for x in filter(self.filter_rc_active, all_wikis.items())] # first populate this list and remove wikis that are still in the db, clean up the rest
full = set() full = set()
for db_wiki in fetch_all.fetchall(): async with connection.transaction():
async for db_wiki in connection.cursor('SELECT DISTINCT wiki, row_number() over (ORDER BY webhook) AS ROWID, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL order by webhook'):
domain = get_domain(db_wiki["wiki"]) domain = get_domain(db_wiki["wiki"])
try: try:
if db_wiki["wiki"] not in all_wikis: if db_wiki["wiki"] not in all_wikis:
@ -178,14 +178,14 @@ class RcQueue:
else: # Continue without adding else: # Continue without adding
logger.debug("No condition fulfilled so skipping.") logger.debug("No condition fulfilled so skipping.")
continue continue
if not db_wiki["ROWID"] < current_domain["last_rowid"]: if not db_wiki["rowid"] < current_domain["last_rowid"]:
current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20)) current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20))
except KeyError: except KeyError:
await self.start_group(domain, [QueuedWiki(db_wiki["wiki"], 20)]) await self.start_group(domain, [QueuedWiki(db_wiki["wiki"], 20)])
logger.info("A new domain group ({}) has been added since last time, adding it to the domain_list and starting a task...".format(domain)) logger.info("A new domain group ({}) has been added since last time, adding it to the domain_list and starting a task...".format(domain))
except ListFull: except ListFull:
full.add(domain) full.add(domain)
current_domain["last_rowid"] = db_wiki["ROWID"] current_domain["last_rowid"] = db_wiki["rowid"]
continue continue
for wiki in self.to_remove: for wiki in self.to_remove:
await self.remove_wiki_from_group(wiki) await self.remove_wiki_from_group(wiki)
@ -226,23 +226,26 @@ def calculate_delay_for_group(group_length: int) -> float:
return 0.0 return 0.0
def generate_targets(wiki_url: str, additional_requirements: str) -> defaultdict: async def generate_targets(wiki_url: str, additional_requirements: str) -> defaultdict:
"""To minimize the amount of requests, we generate a list of language/display mode combinations to create messages for """To minimize the amount of requests, we generate a list of language/display mode combinations to create messages for
this way we can send the same message to multiple webhooks which have the same wiki and settings without doing another this way we can send the same message to multiple webhooks which have the same wiki and settings without doing another
request to the wiki just to duplicate the message. request to the wiki just to duplicate the message.
""" """
combinations = defaultdict(list) combinations = defaultdict(list)
for webhook in db_cursor.execute('SELECT webhook, lang, display FROM rcgcdw WHERE wiki = ? {}'.format(additional_requirements), (wiki_url,)): async with connection.transaction():
async for webhook in connection.cursor('SELECT webhook, lang, display FROM rcgcdw WHERE wiki = $1 {}'.format(additional_requirements), wiki_url):
combination = (webhook["lang"], webhook["display"]) combination = (webhook["lang"], webhook["display"])
combinations[combination].append(webhook["webhook"]) combinations[combination].append(webhook["webhook"])
return combinations return combinations
async def generate_domain_groups(): async def generate_domain_groups():
"""Generate a list of wikis per domain (fandom.com, wikipedia.org etc.)""" """Generate a list of wikis per domain (fandom.com, wikipedia.org etc.)
:returns tuple[str, list]"""
domain_wikis = defaultdict(list) domain_wikis = defaultdict(list)
fetch_all = db_cursor.execute('SELECT ROWID, webhook, wiki, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL GROUP BY wiki ORDER BY ROWID ASC') async with connection.transaction():
for db_wiki in fetch_all.fetchall(): async for db_wiki in connection.cursor('SELECT DISTINCT wiki, webhook, lang, display, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL'):
domain_wikis[get_domain(db_wiki["wiki"])].append(QueuedWiki(db_wiki["wiki"], 20)) domain_wikis[get_domain(db_wiki["wiki"])].append(QueuedWiki(db_wiki["wiki"], 20))
for group, db_wikis in domain_wikis.items(): for group, db_wikis in domain_wikis.items():
yield group, db_wikis yield group, db_wikis
@ -304,7 +307,7 @@ async def scan_group(group: str):
await DBHandler.update_db() await DBHandler.update_db()
continue continue
categorize_events = {} categorize_events = {}
targets = generate_targets(queued_wiki.url, "AND (rcid != -1 OR rcid IS NULL)") targets = await generate_targets(queued_wiki.url, "AND (rcid != -1 OR rcid IS NULL)")
paths = get_paths(queued_wiki.url, recent_changes_resp) paths = get_paths(queued_wiki.url, recent_changes_resp)
new_events = 0 new_events = 0
local_wiki.last_check = time.time() # on successful check, save new last check time local_wiki.last_check = time.time() # on successful check, save new last check time
@ -393,9 +396,8 @@ async def message_sender():
async def discussion_handler(): async def discussion_handler():
try: try:
while True: while True:
fetch_all = db_cursor.execute( async with connection.transaction():
"SELECT wiki, rcid, postid FROM rcgcdw WHERE postid != '-1' OR postid IS NULL GROUP BY wiki") async for db_wiki in connection.cursor("SELECT DISTINCT wiki, rcid, postid FROM rcgcdw WHERE postid != '-1' OR postid IS NULL"):
for db_wiki in fetch_all.fetchall():
try: try:
local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory
except KeyError: except KeyError:
@ -422,7 +424,7 @@ async def discussion_handler():
error = discussion_feed_resp["error"] error = discussion_feed_resp["error"]
if error == "NotFoundException": # Discussions disabled if error == "NotFoundException": # Discussions disabled
if db_wiki["rcid"] != -1: # RC feed is disabled if db_wiki["rcid"] != -1: # RC feed is disabled
db_cursor.execute("UPDATE rcgcdw SET postid = ? WHERE wiki = ?", await connection.execute("UPDATE rcgcdw SET postid = ? WHERE wiki = ?",
("-1", db_wiki["wiki"],)) ("-1", db_wiki["wiki"],))
else: else:
await local_wiki.remove(db_wiki["wiki"], 1000) await local_wiki.remove(db_wiki["wiki"], 1000)
@ -448,7 +450,7 @@ async def discussion_handler():
await DBHandler.update_db() await DBHandler.update_db()
continue continue
comment_events = [] comment_events = []
targets = generate_targets(db_wiki["wiki"], "AND NOT postid = '-1'") targets = await generate_targets(db_wiki["wiki"], "AND NOT postid = '-1'")
for post in discussion_feed: for post in discussion_feed:
if post["_embedded"]["thread"][0]["containerType"] == "ARTICLE_COMMENT" and post["id"] > db_wiki["postid"]: if post["_embedded"]["thread"][0]["containerType"] == "ARTICLE_COMMENT" and post["id"] > db_wiki["postid"]:
comment_events.append(post["forumId"]) comment_events.append(post["forumId"])
@ -541,6 +543,9 @@ async def main_loop():
global main_tasks global main_tasks
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
nest_asyncio.apply(loop) nest_asyncio.apply(loop)
await setup_connection()
logger.debug("Connection type: {}".format(connection))
await populate_allwikis()
try: try:
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT) signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
for s in signals: for s in signals:

View file

@ -1,7 +1,9 @@
import asyncpg import asyncpg
from typing import Any, Union, Optional import logging
from typing import Optional
from src.config import settings from src.config import settings
logger = logging.getLogger("rcgcdb.database")
connection: Optional[asyncpg.Connection] = None connection: Optional[asyncpg.Connection] = None
@ -9,8 +11,10 @@ async def setup_connection():
global connection global connection
# Establish a connection to an existing database named "test" # Establish a connection to an existing database named "test"
# as a "postgres" user. # as a "postgres" user.
connection: asyncpg.connection = await asyncpg.connect(user=settings["pg_user"], host=settings.get("pg_host", "localhost"), logger.debug("Setting up the Database connection...")
database=settings.get("pg_db", "RcGcDb"), password=settings.get("pg_pass")) connection = await asyncpg.connect(user=settings["pg_user"], host=settings.get("pg_host", "localhost"),
database=settings.get("pg_db", "rcgcdb"), password=settings.get("pg_pass"))
logger.debug("Database connection established! Connection: {}".format(connection))
async def shutdown_connection(): async def shutdown_connection():

View file

@ -3,7 +3,7 @@ from collections import defaultdict
from src.misc import logger from src.misc import logger
from src.config import settings from src.config import settings
from src.database import db_cursor from src.database import connection
from src.i18n import langs from src.i18n import langs
from src.exceptions import EmbedListFull from src.exceptions import EmbedListFull
from asyncio import TimeoutError from asyncio import TimeoutError
@ -22,7 +22,8 @@ default_header["X-RateLimit-Precision"] = "millisecond"
# User facing webhook functions # User facing webhook functions
async def wiki_removal(wiki_url, status): async def wiki_removal(wiki_url, status):
for observer in db_cursor.execute('SELECT webhook, lang FROM rcgcdw WHERE wiki = ?', (wiki_url,)): async with connection.transaction():
async for observer in connection.cursor('SELECT webhook, lang FROM rcgcdw WHERE wiki = ?', wiki_url):
_ = langs[observer["lang"]]["discord"].gettext _ = langs[observer["lang"]]["discord"].gettext
reasons = {410: _("wiki deleted"), 404: _("wiki deleted"), 401: _("wiki inaccessible"), reasons = {410: _("wiki deleted"), 404: _("wiki deleted"), 401: _("wiki inaccessible"),
402: _("wiki inaccessible"), 403: _("wiki inaccessible"), 1000: _("discussions disabled")} 402: _("wiki inaccessible"), 403: _("wiki inaccessible"), 1000: _("discussions disabled")}
@ -238,7 +239,7 @@ async def handle_discord_http(code: int, formatted_embed: str, result: aiohttp.C
return 1 return 1
elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND elif code == 401 or code == 404: # HTTP UNAUTHORIZED AND NOT FOUND
logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.") logger.error("Webhook URL is invalid or no longer in use, please replace it with proper one.")
db_cursor.execute("DELETE FROM rcgcdw WHERE webhook = ?", (webhook_url,)) await connection.execute("DELETE FROM rcgcdw WHERE webhook = ?", (webhook_url,))
await webhook_removal_monitor(webhook_url, code) await webhook_removal_monitor(webhook_url, code)
return 1 return 1
elif code == 429: elif code == 429:

View file

@ -2,7 +2,7 @@ from dataclasses import dataclass
import re import re
import logging, aiohttp import logging, aiohttp
from src.exceptions import * from src.exceptions import *
from src.database import db_cursor, db_connection from src.database import connection
from src.formatters.rc import embed_formatter, compact_formatter from src.formatters.rc import embed_formatter, compact_formatter
from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter from src.formatters.discussions import feeds_embed_formatter, feeds_compact_formatter
from src.misc import parse_link from src.misc import parse_link
@ -109,9 +109,8 @@ class Wiki:
logger.info("Removing a wiki {}".format(wiki_url)) logger.info("Removing a wiki {}".format(wiki_url))
await src.discord.wiki_removal(wiki_url, reason) await src.discord.wiki_removal(wiki_url, reason)
await src.discord.wiki_removal_monitor(wiki_url, reason) await src.discord.wiki_removal_monitor(wiki_url, reason)
db_cursor.execute('DELETE FROM rcgcdw WHERE wiki = ?', (wiki_url,)) result = await connection.execute('DELETE FROM rcgcdw WHERE wiki = ?', wiki_url)
logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(db_cursor.rowcount, wiki_url)) logger.warning('{} rows affected by DELETE FROM rcgcdw WHERE wiki = "{}"'.format(result, wiki_url))
db_connection.commit()
async def pull_comment(self, comment_id, WIKI_API_PATH, rate_limiter): async def pull_comment(self, comment_id, WIKI_API_PATH, rate_limiter):
try: try: