mirror of
https://gitlab.com/chicken-riders/RcGcDb.git
synced 2025-02-23 00:54:09 +00:00
Merge branch 'master' into 'master'
Add support for IRC feeds Closes #37 See merge request chicken-riders/RcGcDb!17
This commit is contained in:
commit
b798250a19
|
@ -1,4 +1,5 @@
|
||||||
beautifulsoup4 >= 4.6.0; python_version >= '3.6'
|
beautifulsoup4 >= 4.6.0; python_version >= '3.6'
|
||||||
aiohttp >= 3.6.2
|
aiohttp >= 3.6.2
|
||||||
lxml >= 4.2.1
|
lxml >= 4.2.1
|
||||||
nest-asyncio >= 1.4.0
|
nest-asyncio >= 1.4.0
|
||||||
|
irc >= 19.0.1
|
|
@ -7,6 +7,17 @@
|
||||||
"database_path": "rcgcdb.db",
|
"database_path": "rcgcdb.db",
|
||||||
"monitoring_webhook": "111111111111111111/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
|
"monitoring_webhook": "111111111111111111/AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA",
|
||||||
"support": "https://discord.gg/v77RTk5",
|
"support": "https://discord.gg/v77RTk5",
|
||||||
|
"irc_overtime": 3600,
|
||||||
|
"irc_servers": {
|
||||||
|
"your custom name for the farm": {
|
||||||
|
"domains": ["wikipedia.org", "otherwikipedia.org"],
|
||||||
|
"irc_host": "randomIRC.domain.com",
|
||||||
|
"irc_port": "6667",
|
||||||
|
"irc_nickname": "BotIRCNickname",
|
||||||
|
"irc_name": "BotIRCName",
|
||||||
|
"irc_channel_mapping": {"rc": "#rcchannel", "discussion": "#discussionchannel"}
|
||||||
|
}
|
||||||
|
},
|
||||||
"logging": {
|
"logging": {
|
||||||
"version": 1,
|
"version": 1,
|
||||||
"disable_existing_loggers": false,
|
"disable_existing_loggers": false,
|
||||||
|
|
60
src/bot.py
60
src/bot.py
|
@ -4,6 +4,7 @@ import logging.config
|
||||||
import signal
|
import signal
|
||||||
import traceback
|
import traceback
|
||||||
import nest_asyncio
|
import nest_asyncio
|
||||||
|
import time
|
||||||
from collections import defaultdict, namedtuple
|
from collections import defaultdict, namedtuple
|
||||||
from typing import Generator
|
from typing import Generator
|
||||||
|
|
||||||
|
@ -18,6 +19,7 @@ from src.queue_handler import DBHandler
|
||||||
from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essential_feeds
|
from src.wiki import Wiki, process_cats, process_mwmsgs, essential_info, essential_feeds
|
||||||
from src.discord import DiscordMessage, generic_msg_sender_exception_logger, stack_message_list
|
from src.discord import DiscordMessage, generic_msg_sender_exception_logger, stack_message_list
|
||||||
from src.wiki_ratelimiter import RateLimiter
|
from src.wiki_ratelimiter import RateLimiter
|
||||||
|
from src.irc_feed import AioIRCCat
|
||||||
|
|
||||||
|
|
||||||
logging.config.dictConfig(settings["logging"])
|
logging.config.dictConfig(settings["logging"])
|
||||||
|
@ -60,11 +62,24 @@ class RcQueue:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.domain_list = {}
|
self.domain_list = {}
|
||||||
self.to_remove = []
|
self.to_remove = []
|
||||||
|
self.irc_mapping = {}
|
||||||
|
|
||||||
async def start_group(self, group, initial_wikis):
|
async def start_group(self, group, initial_wikis):
|
||||||
"""Starts a task for given domain group"""
|
"""Starts a task for given domain group"""
|
||||||
if group not in self.domain_list:
|
if group not in self.domain_list:
|
||||||
self.domain_list[group] = {"task": asyncio.create_task(scan_group(group)), "last_rowid": 0, "query": LimitedList(initial_wikis), "rate_limiter": RateLimiter()}
|
if group in self.irc_mapping: # Hopefully there are no race conditions....
|
||||||
|
irc_connection = self.irc_mapping[group]
|
||||||
|
else:
|
||||||
|
for irc_server in settings["irc_servers"].keys():
|
||||||
|
if group in settings["irc_servers"][irc_server]["domains"]:
|
||||||
|
irc_connection = AioIRCCat(settings["irc_servers"][irc_server]["irc_channel_mapping"], all_wikis)
|
||||||
|
for domain in settings["irc_servers"][irc_server]["domains"]:
|
||||||
|
self.irc_mapping[domain] = irc_connection
|
||||||
|
irc_connection.connect(settings["irc_servers"][irc_server]["irc_host"], settings["irc_servers"][irc_server]["irc_port"], settings["irc_servers"][irc_server]["irc_name"])
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
irc_connection = None
|
||||||
|
self.domain_list[group] = {"task": asyncio.create_task(scan_group(group)), "last_rowid": 0, "query": LimitedList(initial_wikis), "rate_limiter": RateLimiter(), "irc": irc_connection}
|
||||||
logger.debug(self.domain_list[group])
|
logger.debug(self.domain_list[group])
|
||||||
else:
|
else:
|
||||||
raise KeyError
|
raise KeyError
|
||||||
|
@ -77,7 +92,10 @@ class RcQueue:
|
||||||
all_wikis[wiki].rc_active = -1
|
all_wikis[wiki].rc_active = -1
|
||||||
if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task
|
if not self[group]["query"]: # if there is no wiki left in the queue, get rid of the task
|
||||||
logger.debug(f"{group} no longer has any wikis queued!")
|
logger.debug(f"{group} no longer has any wikis queued!")
|
||||||
await self.stop_task_group(group)
|
if not self.check_if_domain_in_db(group):
|
||||||
|
await self.stop_task_group(group)
|
||||||
|
else:
|
||||||
|
logger.debug(f"But there are still wikis for it in DB!")
|
||||||
|
|
||||||
async def stop_task_group(self, group):
|
async def stop_task_group(self, group):
|
||||||
self[group]["task"].cancel()
|
self[group]["task"].cancel()
|
||||||
|
@ -87,7 +105,7 @@ class RcQueue:
|
||||||
fetch_all = db_cursor.execute(
|
fetch_all = db_cursor.execute(
|
||||||
'SELECT ROWID, webhook, wiki, lang, display, rcid FROM rcgcdw WHERE rcid != -1 GROUP BY wiki ORDER BY ROWID ASC')
|
'SELECT ROWID, webhook, wiki, lang, display, rcid FROM rcgcdw WHERE rcid != -1 GROUP BY wiki ORDER BY ROWID ASC')
|
||||||
for wiki in fetch_all.fetchall():
|
for wiki in fetch_all.fetchall():
|
||||||
if get_domain(db_wiki["wiki"]) == domain:
|
if get_domain(wiki["wiki"]) == domain:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -143,6 +161,19 @@ class RcQueue:
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
current_domain: dict = self[domain]
|
current_domain: dict = self[domain]
|
||||||
|
if current_domain["irc"]:
|
||||||
|
logger.debug('CURRENT STATUS:')
|
||||||
|
logger.debug("DOMAIN LIST FOR IRC: {}".format(current_domain["irc"].updated))
|
||||||
|
logger.debug("CURRENT DOMAIN INFO: {}".format(domain))
|
||||||
|
logger.debug("IS WIKI IN A LIST?: {}".format(db_wiki["wiki"] in current_domain["irc"].updated))
|
||||||
|
logger.debug("LAST CHECK FOR THE WIKI {} IS {}".format(db_wiki["wiki"], all_wikis[db_wiki["wiki"]].last_check))
|
||||||
|
if db_wiki["wiki"] not in current_domain["irc"].updated and all_wikis[db_wiki["wiki"]].last_check+settings["irc_overtime"] > time.time():
|
||||||
|
continue # if domain has IRC, has not been updated, and it was updated less than an hour ago
|
||||||
|
else: # otherwise remove it from the list
|
||||||
|
try:
|
||||||
|
current_domain["irc"].updated.remove(db_wiki["wiki"])
|
||||||
|
except KeyError:
|
||||||
|
pass # this is to be expected when third condition is not met above
|
||||||
if not db_wiki["ROWID"] < current_domain["last_rowid"]:
|
if not db_wiki["ROWID"] < current_domain["last_rowid"]:
|
||||||
current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20))
|
current_domain["query"].append(QueuedWiki(db_wiki["wiki"], 20))
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
@ -218,7 +249,8 @@ async def scan_group(group: str):
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
async with rcqueue.retrieve_next_queued(group) as queued_wiki: # acquire next wiki in queue
|
async with rcqueue.retrieve_next_queued(group) as queued_wiki: # acquire next wiki in queue
|
||||||
await asyncio.sleep(calculate_delay_for_group(len(rcqueue[group]["query"])))
|
if "irc" not in rcqueue[group]:
|
||||||
|
await asyncio.sleep(calculate_delay_for_group(len(rcqueue[group]["query"])))
|
||||||
logger.debug("Wiki {}".format(queued_wiki.url))
|
logger.debug("Wiki {}".format(queued_wiki.url))
|
||||||
local_wiki = all_wikis[queued_wiki.url] # set a reference to a wiki object from memory
|
local_wiki = all_wikis[queued_wiki.url] # set a reference to a wiki object from memory
|
||||||
extended = False
|
extended = False
|
||||||
|
@ -271,6 +303,7 @@ async def scan_group(group: str):
|
||||||
targets = generate_targets(queued_wiki.url, "AND (rcid != -1 OR rcid IS NULL)")
|
targets = generate_targets(queued_wiki.url, "AND (rcid != -1 OR rcid IS NULL)")
|
||||||
paths = get_paths(queued_wiki.url, recent_changes_resp)
|
paths = get_paths(queued_wiki.url, recent_changes_resp)
|
||||||
new_events = 0
|
new_events = 0
|
||||||
|
local_wiki.last_check = time.time() # on successful check, save new last check time
|
||||||
for change in recent_changes:
|
for change in recent_changes:
|
||||||
if change["rcid"] > local_wiki.rc_active and queued_wiki.amount != 450:
|
if change["rcid"] > local_wiki.rc_active and queued_wiki.amount != 450:
|
||||||
new_events += 1
|
new_events += 1
|
||||||
|
@ -314,7 +347,7 @@ async def scan_group(group: str):
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
return
|
return
|
||||||
except QueueEmpty:
|
except QueueEmpty:
|
||||||
await asyncio.sleep(21.0)
|
await asyncio.sleep(10.0)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
||||||
|
@ -359,15 +392,22 @@ async def discussion_handler():
|
||||||
fetch_all = db_cursor.execute(
|
fetch_all = db_cursor.execute(
|
||||||
"SELECT wiki, rcid, postid FROM rcgcdw WHERE postid != '-1' OR postid IS NULL GROUP BY wiki")
|
"SELECT wiki, rcid, postid FROM rcgcdw WHERE postid != '-1' OR postid IS NULL GROUP BY wiki")
|
||||||
for db_wiki in fetch_all.fetchall():
|
for db_wiki in fetch_all.fetchall():
|
||||||
|
try:
|
||||||
|
local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory
|
||||||
|
except KeyError:
|
||||||
|
local_wiki = all_wikis[db_wiki["wiki"]] = Wiki()
|
||||||
|
local_wiki.rc_active = db_wiki["rcid"]
|
||||||
|
if db_wiki["wiki"] not in rcqueue.irc_mapping["fandom.com"].updated_discussions and local_wiki.last_discussion_check+settings["irc_overtime"] > time.time(): # I swear if another wiki farm ever starts using Fandom discussions I'm gonna use explosion magic
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
rcqueue.irc_mapping["fandom.com"].updated_discussions.remove(db_wiki["wiki"])
|
||||||
|
except KeyError:
|
||||||
|
pass # to be expected
|
||||||
header = settings["header"]
|
header = settings["header"]
|
||||||
header["Accept"] = "application/hal+json"
|
header["Accept"] = "application/hal+json"
|
||||||
async with aiohttp.ClientSession(headers=header,
|
async with aiohttp.ClientSession(headers=header,
|
||||||
timeout=aiohttp.ClientTimeout(6.0)) as session:
|
timeout=aiohttp.ClientTimeout(6.0)) as session:
|
||||||
try:
|
|
||||||
local_wiki = all_wikis[db_wiki["wiki"]] # set a reference to a wiki object from memory
|
|
||||||
except KeyError:
|
|
||||||
local_wiki = all_wikis[db_wiki["wiki"]] = Wiki()
|
|
||||||
local_wiki.rc_active = db_wiki["rcid"]
|
|
||||||
try:
|
try:
|
||||||
feeds_response = await local_wiki.fetch_feeds(db_wiki["wiki"], session)
|
feeds_response = await local_wiki.fetch_feeds(db_wiki["wiki"], session)
|
||||||
except (WikiServerError, WikiError):
|
except (WikiServerError, WikiError):
|
||||||
|
|
62
src/irc_feed.py
Normal file
62
src/irc_feed.py
Normal file
|
@ -0,0 +1,62 @@
|
||||||
|
import irc.client_aio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from urllib.parse import urlparse, quote
|
||||||
|
|
||||||
|
logger = logging.getLogger("rcgcdw.irc_feed")
|
||||||
|
|
||||||
|
|
||||||
|
class AioIRCCat(irc.client_aio.AioSimpleIRCClient):
|
||||||
|
def __init__(self, targets, all_wikis):
|
||||||
|
irc.client_aio.SimpleIRCClient.__init__(self)
|
||||||
|
self.targets = targets
|
||||||
|
self.updated = set() # Storage for edited wikis
|
||||||
|
self.updated_discussions = set()
|
||||||
|
self.wikis = all_wikis
|
||||||
|
|
||||||
|
def on_welcome(self, connection, event): # Join IRC channels
|
||||||
|
for channel in self.targets.values():
|
||||||
|
connection.join(channel)
|
||||||
|
|
||||||
|
def on_pubmsg(self, connection, event):
|
||||||
|
if event.target == self.targets["rc"]:
|
||||||
|
self.parse_fandom_message(' '.join(event.arguments))
|
||||||
|
elif event.target == self.targets["discussion"]:
|
||||||
|
self.parse_fandom_discussion(' '.join(event.arguments))
|
||||||
|
|
||||||
|
def on_nicknameinuse(self, c, e):
|
||||||
|
c.nick(c.get_nickname() + "_")
|
||||||
|
|
||||||
|
def parse_fandom_message(self, message):
|
||||||
|
message = message.split("\x035*\x03")
|
||||||
|
# print(asyncio.all_tasks())
|
||||||
|
half = message[0].find("\x0302http")
|
||||||
|
if half == -1:
|
||||||
|
return
|
||||||
|
message = message[0][half + 3:].strip()
|
||||||
|
# print(message)
|
||||||
|
url = urlparse(message)
|
||||||
|
full_url = "https://"+url.netloc + recognize_langs(url.path)
|
||||||
|
if full_url in self.wikis and self.wikis[full_url].rc_active != -1:
|
||||||
|
self.updated.add(full_url)
|
||||||
|
logger.debug("New website appended to the list! {}".format(full_url))
|
||||||
|
|
||||||
|
def parse_fandom_discussion(self, message):
|
||||||
|
post = json.loads(message)
|
||||||
|
if post.get('action', 'unknown') != "deleted": # ignore deletion events
|
||||||
|
url = urlparse(post.get('url'))
|
||||||
|
full_url ="https://"+ url.netloc + recognize_langs(url.path)
|
||||||
|
if full_url in self.wikis: # POSSIBLE MEMORY LEAK AS WE DON'T HAVE A WAY TO CHECK IF WIKI IS LOOKING FOR DISCUSSIONS OR NOT
|
||||||
|
self.updated_discussions.add("https://"+full_url)
|
||||||
|
logger.debug("New website appended to the list! {}".format(full_url))
|
||||||
|
|
||||||
|
|
||||||
|
def recognize_langs(path):
|
||||||
|
lang = ""
|
||||||
|
new_path = path.split("/")
|
||||||
|
if len(new_path)>2:
|
||||||
|
if new_path[1] not in ("wiki", "f"):
|
||||||
|
lang = "/"+new_path[1]
|
||||||
|
return lang+"/"
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,8 @@ class Wiki:
|
||||||
fail_times: int = 0 # corresponding to amount of times connection with wiki failed for client reasons (400-499)
|
fail_times: int = 0 # corresponding to amount of times connection with wiki failed for client reasons (400-499)
|
||||||
session: aiohttp.ClientSession = None
|
session: aiohttp.ClientSession = None
|
||||||
rc_active: int = 0
|
rc_active: int = 0
|
||||||
|
last_check: float = 0.0
|
||||||
|
last_discussion_check: float = 0.0
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def fetch_wiki(extended, script_path, session: aiohttp.ClientSession, ratelimiter: RateLimiter, amount=20) -> aiohttp.ClientResponse:
|
async def fetch_wiki(extended, script_path, session: aiohttp.ClientSession, ratelimiter: RateLimiter, amount=20) -> aiohttp.ClientResponse:
|
||||||
|
@ -188,6 +190,7 @@ async def process_mwmsgs(wiki_response: dict, local_wiki: Wiki, mw_msgs: dict):
|
||||||
mw_msgs[key] = msgs # it may be a little bit messy for sure, however I don't expect any reason to remove mw_msgs entries by one
|
mw_msgs[key] = msgs # it may be a little bit messy for sure, however I don't expect any reason to remove mw_msgs entries by one
|
||||||
local_wiki.mw_messages = key
|
local_wiki.mw_messages = key
|
||||||
|
|
||||||
|
|
||||||
# db_wiki: webhook, wiki, lang, display, rcid, postid
|
# db_wiki: webhook, wiki, lang, display, rcid, postid
|
||||||
async def essential_info(change: dict, changed_categories, local_wiki: Wiki, target: tuple, paths: tuple, request: dict,
|
async def essential_info(change: dict, changed_categories, local_wiki: Wiki, target: tuple, paths: tuple, request: dict,
|
||||||
rate_limiter: RateLimiter) -> src.discord.DiscordMessage:
|
rate_limiter: RateLimiter) -> src.discord.DiscordMessage:
|
||||||
|
|
Loading…
Reference in a new issue