2020-07-28 12:39:32 +00:00
import aiohttp
import asyncio
2020-07-09 22:24:23 +00:00
import logging . config
2020-07-26 08:00:27 +00:00
import signal
2020-07-31 23:58:04 +00:00
import traceback
2020-08-13 12:23:45 +00:00
import nest_asyncio
2020-08-09 11:53:11 +00:00
from collections import defaultdict , namedtuple
from typing import Generator
2020-07-09 22:24:23 +00:00
2020-08-05 14:41:40 +00:00
from contextlib import asynccontextmanager
2020-07-31 23:58:04 +00:00
from src . argparser import command_line_args
2020-07-28 12:39:32 +00:00
from src . config import settings
2020-08-12 10:50:30 +00:00
from src . database import db_cursor , db_connection
2020-07-28 12:39:32 +00:00
from src . exceptions import *
2020-08-03 14:44:42 +00:00
from src . misc import get_paths , get_domain
2020-07-28 12:39:32 +00:00
from src . msgqueue import messagequeue
from src . queue_handler import DBHandler
2020-08-02 17:27:42 +00:00
from src . wiki import Wiki , process_cats , process_mwmsgs , essential_info , essential_feeds
2020-08-06 13:26:06 +00:00
from src . discord import DiscordMessage , generic_msg_sender_exception_logger
2020-08-06 00:46:43 +00:00
from src . wiki_ratelimiter import RateLimiter
2020-07-28 12:39:32 +00:00
2020-07-09 22:24:23 +00:00
logging . config . dictConfig ( settings [ " logging " ] )
logger = logging . getLogger ( " rcgcdb.bot " )
logger . debug ( " Current settings: {settings} " . format ( settings = settings ) )
2020-07-31 23:58:04 +00:00
logger . info ( " RcGcDb v {} is starting up. " . format ( " 1.0 " ) )
if command_line_args . debug :
logger . info ( " Debug mode is active! " )
2020-07-09 22:24:23 +00:00
2020-07-27 03:16:50 +00:00
# Log Fail states with structure wiki_url: number of fail states
2020-07-11 15:54:08 +00:00
all_wikis : dict = { }
mw_msgs : dict = { } # will have the type of id: tuple
2020-08-13 12:23:45 +00:00
main_tasks : dict = { }
2020-07-09 22:24:23 +00:00
2020-07-10 13:38:36 +00:00
# First populate the all_wikis list with every wiki
# Reasons for this: 1. we require amount of wikis to calculate the cooldown between requests
# 2. Easier to code
2020-07-09 22:24:23 +00:00
2020-08-09 11:53:11 +00:00
for db_wiki in db_cursor . execute ( ' SELECT wiki, rcid FROM rcgcdw GROUP BY wiki ORDER BY ROWID ' ) :
2020-08-06 13:26:06 +00:00
all_wikis [ db_wiki [ " wiki " ] ] = Wiki ( ) # populate all_wikis
2020-08-09 11:53:11 +00:00
all_wikis [ db_wiki [ " wiki " ] ] . rc_active = db_wiki [ " rcid " ]
2020-08-06 13:26:06 +00:00
2020-08-05 14:41:40 +00:00
queue_limit = settings . get ( " queue_limit " , 30 )
2020-08-09 11:53:11 +00:00
QueuedWiki = namedtuple ( " QueuedWiki " , [ ' url ' , ' amount ' ] )
2020-08-05 14:41:40 +00:00
class LimitedList ( list ) :
def __init__ ( self , * args ) :
list . __init__ ( self , * args )
2020-08-11 00:09:50 +00:00
def append ( self , obj : QueuedWiki ) - > None :
2020-08-05 14:41:40 +00:00
if len ( self ) < queue_limit :
2020-08-11 00:09:50 +00:00
self . insert ( len ( self ) , obj )
2020-08-05 14:41:40 +00:00
return
raise ListFull
2020-08-11 01:00:49 +00:00
2020-08-05 14:41:40 +00:00
2020-08-05 01:02:32 +00:00
class RcQueue :
def __init__ ( self ) :
self . domain_list = { }
self . to_remove = [ ]
2020-08-05 14:41:40 +00:00
async def start_group ( self , group , initial_wikis ) :
2020-08-05 01:02:32 +00:00
""" Starts a task for given domain group """
if group not in self . domain_list :
2020-08-11 17:16:47 +00:00
self . domain_list [ group ] = { " task " : asyncio . create_task ( scan_group ( group ) ) , " last_rowid " : 0 , " query " : LimitedList ( initial_wikis ) , " rate_limiter " : RateLimiter ( ) }
2020-08-11 01:54:25 +00:00
logger . debug ( self . domain_list [ group ] )
2020-08-05 01:02:32 +00:00
else :
raise KeyError
2020-08-05 17:20:38 +00:00
async def remove_wiki_from_group ( self , wiki ) :
2020-08-05 01:02:32 +00:00
""" Removes a wiki from query of given domain group """
2020-08-06 13:26:06 +00:00
logger . debug ( f " Removing { wiki } from group queue. " )
2020-08-05 17:20:38 +00:00
group = get_domain ( wiki )
2020-08-12 12:05:34 +00:00
self [ group ] [ " query " ] = [ x for x in self [ group ] [ " query " ] if x . url != wiki ]
2020-08-05 17:20:38 +00:00
if not self [ group ] [ " query " ] : # if there is no wiki left in the queue, get rid of the task
2020-08-09 13:31:21 +00:00
logger . debug ( f " { group } no longer has any wikis queued! " )
2020-08-07 16:56:29 +00:00
all_wikis [ wiki ] . rc_active = - 1
2020-08-10 20:31:53 +00:00
await self . stop_task_group ( group )
async def stop_task_group ( self , group ) :
self [ group ] [ " task " ] . cancel ( )
del self . domain_list [ group ]
2020-08-05 01:02:32 +00:00
2020-08-05 14:41:40 +00:00
@asynccontextmanager
2020-08-09 11:53:11 +00:00
async def retrieve_next_queued ( self , group ) - > Generator [ QueuedWiki , None , None ] :
2020-08-05 17:20:38 +00:00
""" Retrives next wiki in the queue for given domain """
2020-08-10 20:31:53 +00:00
if len ( self . domain_list [ group ] [ " query " ] ) == 0 :
await self . stop_task_group ( group )
return
2020-08-05 14:41:40 +00:00
try :
yield self . domain_list [ group ] [ " query " ] [ 0 ]
2020-08-06 13:26:06 +00:00
except asyncio . CancelledError :
raise
2020-08-06 00:46:43 +00:00
except :
if command_line_args . debug :
logger . exception ( " RC Group exception " )
2020-08-06 13:26:06 +00:00
shutdown ( asyncio . get_event_loop ( ) )
2020-08-06 00:46:43 +00:00
else :
logger . exception ( " Group task returned error " )
2020-08-06 13:26:06 +00:00
await generic_msg_sender_exception_logger ( traceback . format_exc ( ) , " Group task error logger " , Group = group )
2020-08-06 00:46:43 +00:00
else :
2020-08-05 14:41:40 +00:00
self . domain_list [ group ] [ " query " ] . pop ( 0 )
2020-08-06 00:46:43 +00:00
2020-08-06 13:26:06 +00:00
@staticmethod
def filter_rc_active ( wiki_obj ) :
2020-08-11 00:48:06 +00:00
return wiki_obj [ 1 ] . rc_active is None or wiki_obj [ 1 ] . rc_active > - 1
2020-08-06 13:26:06 +00:00
2020-08-05 14:41:40 +00:00
async def update_queues ( self ) :
2020-08-05 17:20:38 +00:00
""" Makes a round on rcgcdw DB and looks for updates to the queues in self.domain_list """
2020-08-06 00:46:43 +00:00
try :
fetch_all = db_cursor . execute (
2020-08-11 01:54:25 +00:00
' SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL GROUP BY wiki ORDER BY ROWID ASC ' )
2020-08-06 13:26:06 +00:00
self . to_remove = [ x [ 0 ] for x in filter ( self . filter_rc_active , all_wikis . items ( ) ) ] # first populate this list and remove wikis that are still in the db, clean up the rest
2020-08-06 00:46:43 +00:00
full = [ ]
for db_wiki in fetch_all . fetchall ( ) :
domain = get_domain ( db_wiki [ " wiki " ] )
try :
2020-08-09 13:31:21 +00:00
if db_wiki [ " wiki " ] not in all_wikis :
raise AssertionError
self . to_remove . remove ( db_wiki [ " wiki " ] )
except AssertionError :
2020-08-06 13:39:02 +00:00
all_wikis [ db_wiki [ " wiki " ] ] = Wiki ( )
2020-08-07 16:56:29 +00:00
all_wikis [ db_wiki [ " wiki " ] ] . rc_active = db_wiki [ " rcid " ]
2020-08-09 13:31:21 +00:00
except ValueError :
pass
2020-08-06 13:39:02 +00:00
try :
2020-08-09 23:57:14 +00:00
current_domain : dict = self [ domain ]
2020-08-06 00:46:43 +00:00
if not db_wiki [ " ROWID " ] < current_domain [ " last_rowid " ] :
2020-08-09 11:53:11 +00:00
current_domain [ " query " ] . append ( QueuedWiki ( db_wiki [ " wiki " ] , 20 ) )
2020-08-06 00:46:43 +00:00
except KeyError :
2020-08-11 01:54:25 +00:00
await self . start_group ( domain , [ QueuedWiki ( db_wiki [ " wiki " ] , 20 ) ] )
2020-08-12 11:40:48 +00:00
logger . info ( " A new domain group ( {} ) has been added since last time, adding it to the domain_list and starting a task... " . format ( domain ) )
2020-08-06 00:46:43 +00:00
except ListFull :
full . append ( domain )
current_domain [ " last_rowid " ] = db_wiki [ " ROWID " ]
continue
for wiki in self . to_remove :
await self . remove_wiki_from_group ( wiki )
for group , data in self . domain_list . items ( ) :
if group not in full :
self [ group ] [ " last_rowid " ] = 0 # iter reached the end without being stuck on full list
2020-08-06 13:26:06 +00:00
logger . debug ( " Current domain_list structure: {} " . format ( self . domain_list ) )
2020-08-06 00:46:43 +00:00
except :
2020-08-06 13:26:06 +00:00
if command_line_args . debug :
logger . exception ( " Queue error! " )
shutdown ( asyncio . get_event_loop ( ) )
else :
logger . exception ( " Exception on queue updater " )
await generic_msg_sender_exception_logger ( traceback . format_exc ( ) , " Queue updator " )
2020-08-05 14:41:40 +00:00
2020-08-05 01:02:32 +00:00
def __getitem__ ( self , item ) :
""" Returns the query of given domain group """
return self . domain_list [ item ]
def __setitem__ ( self , key , value ) :
self . domain_list [ key ] = value
rcqueue = RcQueue ( )
2020-07-23 19:12:07 +00:00
2020-07-09 22:24:23 +00:00
# Start queueing logic
2020-08-03 14:44:42 +00:00
def calculate_delay_for_group ( group_length : int ) - > float :
2020-07-28 12:25:18 +00:00
""" Calculate the delay between fetching each wiki to avoid rate limits """
2020-07-23 19:12:07 +00:00
min_delay = 60 / settings [ " max_requests_per_minute " ]
2020-08-06 13:26:06 +00:00
if group_length == 0 :
group_length = 1
2020-08-03 14:44:42 +00:00
if ( group_length * min_delay ) < settings [ " minimal_cooldown_per_wiki_in_sec " ] :
return settings [ " minimal_cooldown_per_wiki_in_sec " ] / group_length
2020-07-11 15:54:08 +00:00
else :
2020-08-06 00:46:43 +00:00
return 0.0
2020-07-11 15:54:08 +00:00
2020-07-19 23:40:20 +00:00
2020-08-12 10:23:10 +00:00
def generate_targets ( wiki_url : str , additional_requirements : str ) - > defaultdict :
2020-07-28 12:25:18 +00:00
""" To minimize the amount of requests, we generate a list of language/display mode combinations to create messages for
this way we can send the same message to multiple webhooks which have the same wiki and settings without doing another
request to the wiki just to duplicate the message .
"""
2020-07-19 23:40:20 +00:00
combinations = defaultdict ( list )
2020-08-12 10:23:10 +00:00
for webhook in db_cursor . execute ( ' SELECT webhook, lang, display FROM rcgcdw WHERE wiki = ? {} ' . format ( additional_requirements ) , ( wiki_url , ) ) :
2020-07-27 12:13:36 +00:00
combination = ( webhook [ " lang " ] , webhook [ " display " ] )
combinations [ combination ] . append ( webhook [ " webhook " ] )
2020-07-19 23:40:20 +00:00
return combinations
2020-08-05 14:41:40 +00:00
async def generate_domain_groups ( ) :
2020-08-05 01:02:32 +00:00
""" Generate a list of wikis per domain (fandom.com, wikipedia.org etc.) """
2020-08-05 17:20:38 +00:00
domain_wikis = defaultdict ( list )
2020-08-11 01:54:25 +00:00
fetch_all = db_cursor . execute ( ' SELECT ROWID, webhook, wiki, lang, display, wikiid, rcid FROM rcgcdw WHERE rcid != -1 OR rcid IS NULL GROUP BY wiki ORDER BY ROWID ASC ' )
2020-08-03 14:44:42 +00:00
for db_wiki in fetch_all . fetchall ( ) :
2020-08-09 11:53:11 +00:00
domain_wikis [ get_domain ( db_wiki [ " wiki " ] ) ] . append ( QueuedWiki ( db_wiki [ " wiki " ] , 20 ) )
2020-08-05 17:20:38 +00:00
for group , db_wikis in domain_wikis . items ( ) :
2020-08-05 01:02:32 +00:00
yield group , db_wikis
2020-08-03 14:44:42 +00:00
2020-08-05 14:41:40 +00:00
async def scan_group ( group : str ) :
2020-08-06 00:46:43 +00:00
rate_limiter = rcqueue [ group ] [ " rate_limiter " ]
2020-08-05 01:02:32 +00:00
while True :
2020-08-09 13:31:21 +00:00
try :
async with rcqueue . retrieve_next_queued ( group ) as queued_wiki : # acquire next wiki in queue
2020-08-11 00:09:50 +00:00
await asyncio . sleep ( calculate_delay_for_group ( len ( rcqueue [ group ] [ " query " ] ) ) )
2020-08-09 13:31:21 +00:00
logger . debug ( " Wiki {} " . format ( queued_wiki . url ) )
local_wiki = all_wikis [ queued_wiki . url ] # set a reference to a wiki object from memory
extended = False
if local_wiki . mw_messages is None :
extended = True
async with aiohttp . ClientSession ( headers = settings [ " header " ] ,
timeout = aiohttp . ClientTimeout ( 3.0 ) ) as session :
try :
wiki_response = await local_wiki . fetch_wiki ( extended , queued_wiki . url , session , rate_limiter , amount = queued_wiki . amount )
await local_wiki . check_status ( queued_wiki . url , wiki_response . status )
except ( WikiServerError , WikiError ) :
logger . error ( " Exeption when fetching the wiki " )
continue # ignore this wiki if it throws errors
try :
recent_changes_resp = await wiki_response . json ( )
2020-08-18 23:04:41 +00:00
if not isinstance ( recent_changes_resp , dict ) :
logger . error ( f " recent_changes_resp has a bad type, found { type ( recent_changes_resp ) } , __repr__ here: { recent_changes_resp } . " )
raise TypeError
if " errors " in recent_changes_resp :
error = recent_changes_resp . get ( ' errors ' )
2020-08-09 13:31:21 +00:00
if error [ " code " ] == " readapidenied " :
await local_wiki . fail_add ( queued_wiki . url , 410 )
continue
raise WikiError
recent_changes = recent_changes_resp [ ' query ' ] [ ' recentchanges ' ]
recent_changes . reverse ( )
except aiohttp . ContentTypeError :
logger . exception ( " Wiki seems to be resulting in non-json content. " )
await local_wiki . fail_add ( queued_wiki . url , 410 )
continue
except :
logger . exception ( " On loading json of response. " )
continue
if extended :
await process_mwmsgs ( recent_changes_resp , local_wiki , mw_msgs )
2020-08-11 18:15:25 +00:00
if local_wiki . rc_active in ( 0 , None , - 1 ) : # new wiki, just get the last rc to not spam the channel, -1 for -1 to NULL changes
2020-08-09 13:31:21 +00:00
if len ( recent_changes ) > 0 :
local_wiki . rc_active = recent_changes [ - 1 ] [ " rcid " ]
DBHandler . add ( queued_wiki . url , recent_changes [ - 1 ] [ " rcid " ] )
else :
local_wiki . rc_active = 0
DBHandler . add ( queued_wiki . url , 0 )
DBHandler . update_db ( )
2020-08-03 14:44:42 +00:00
continue
2020-08-09 13:31:21 +00:00
categorize_events = { }
2020-08-12 10:23:10 +00:00
targets = generate_targets ( queued_wiki . url , " AND rcid != -1 OR rcid IS NULL " )
2020-08-09 13:31:21 +00:00
paths = get_paths ( queued_wiki . url , recent_changes_resp )
new_events = 0
for change in recent_changes :
if change [ " rcid " ] > local_wiki . rc_active and queued_wiki . amount != 450 :
new_events + = 1
if new_events == 20 :
# call the function again with max limit for more results, ignore the ones in this request
logger . debug ( " There were too many new events, queuing wiki with 450 limit. " )
rcqueue [ group ] [ " query " ] . insert ( 1 , QueuedWiki ( queued_wiki . url , 450 ) )
break
await process_cats ( change , local_wiki , mw_msgs , categorize_events )
else : # If we broke from previous loop (too many changes) don't execute sending messages here
for change in recent_changes : # Yeah, second loop since the categories require to be all loaded up
if change [ " rcid " ] > local_wiki . rc_active :
for target in targets . items ( ) :
try :
await essential_info ( change , categorize_events , local_wiki , target , paths ,
recent_changes_resp , rate_limiter )
except asyncio . CancelledError :
2020-08-09 11:53:11 +00:00
raise
2020-08-09 13:31:21 +00:00
except :
if command_line_args . debug :
logger . exception ( " Exception on RC formatter " )
raise
else :
logger . exception ( " Exception on RC formatter " )
await generic_msg_sender_exception_logger ( traceback . format_exc ( ) , " Exception in RC formatter " , Wiki = queued_wiki . url , Change = str ( change ) [ 0 : 1000 ] )
if recent_changes :
local_wiki . rc_active = change [ " rcid " ]
DBHandler . add ( queued_wiki . url , change [ " rcid " ] )
DBHandler . update_db ( )
except asyncio . CancelledError :
return
2020-08-03 14:44:42 +00:00
2020-07-19 13:32:54 +00:00
async def wiki_scanner ( ) :
2020-08-01 00:48:14 +00:00
""" Wiki scanner is spawned as a task which purpose is to continuously run over wikis in the DB, fetching recent changes
to add messages based on the changes to message queue later handled by message_sender coroutine . """
2020-07-26 08:00:27 +00:00
try :
2020-08-05 14:41:40 +00:00
async for group , db_wikis in generate_domain_groups ( ) : # First scan
await rcqueue . start_group ( group , db_wikis )
while True :
await asyncio . sleep ( 20.0 )
await rcqueue . update_queues ( )
2020-07-26 08:00:27 +00:00
except asyncio . CancelledError :
2020-08-13 12:23:45 +00:00
for item in rcqueue . domain_list . values ( ) : # cancel running tasks
item [ " task " ] . cancel ( )
2020-07-26 21:52:24 +00:00
raise
2020-07-21 12:15:40 +00:00
2020-07-20 12:03:55 +00:00
2020-07-19 13:32:54 +00:00
async def message_sender ( ) :
2020-08-01 00:48:14 +00:00
""" message_sender is a coroutine responsible for handling Discord messages and their sending to Discord """
2020-08-01 10:45:41 +00:00
try :
while True :
await messagequeue . resend_msgs ( )
2020-08-13 12:23:45 +00:00
if main_tasks [ " msg_queue_shield " ] . cancelled ( ) :
raise asyncio . CancelledError
2020-08-06 13:26:06 +00:00
except asyncio . CancelledError :
2020-08-13 12:23:45 +00:00
while len ( messagequeue ) :
logger . info ( " Shutting down after sending {} more Discord messages... " . format ( len ( messagequeue ) ) )
await messagequeue . resend_msgs ( )
2020-08-06 13:26:06 +00:00
pass
2020-08-01 10:45:41 +00:00
except :
if command_line_args . debug :
logger . exception ( " Exception on DC message sender " )
2020-08-06 13:26:06 +00:00
shutdown ( loop = asyncio . get_event_loop ( ) )
2020-08-01 10:45:41 +00:00
else :
logger . exception ( " Exception on DC message sender " )
2020-08-06 13:26:06 +00:00
await generic_msg_sender_exception_logger ( traceback . format_exc ( ) , " Message sender exception " )
2020-07-22 11:43:18 +00:00
2020-08-06 00:46:43 +00:00
async def discussion_handler ( ) :
try :
while True :
fetch_all = db_cursor . execute (
2020-08-14 21:38:26 +00:00
' SELECT wiki, wikiid, rcid, postid FROM rcgcdw WHERE wikiid IS NOT NULL ' )
2020-08-06 00:46:43 +00:00
for db_wiki in fetch_all . fetchall ( ) :
2020-08-08 17:25:32 +00:00
header = settings [ " header " ]
header [ " Accept " ] = " application/hal+json "
async with aiohttp . ClientSession ( headers = header ,
2020-08-14 21:45:00 +00:00
timeout = aiohttp . ClientTimeout ( 4.0 ) ) as session :
2020-08-08 17:25:32 +00:00
try :
local_wiki = all_wikis [ db_wiki [ " wiki " ] ] # set a reference to a wiki object from memory
except KeyError :
local_wiki = all_wikis [ db_wiki [ " wiki " ] ] = Wiki ( )
2020-08-12 10:23:10 +00:00
local_wiki . rc_active = db_wiki [ " rcid " ]
2020-08-08 17:25:32 +00:00
try :
feeds_response = await local_wiki . fetch_feeds ( db_wiki [ " wikiid " ] , session )
except ( WikiServerError , WikiError ) :
continue # ignore this wiki if it throws errors
try :
discussion_feed_resp = await feeds_response . json ( encoding = " UTF-8 " )
if " title " in discussion_feed_resp :
error = discussion_feed_resp [ " error " ]
if error == " site doesn ' t exists " :
2020-08-15 18:02:43 +00:00
if db_wiki [ " rcid " ] != - 1 :
db_cursor . execute ( " UPDATE rcgcdw SET wikiid = ? WHERE wiki = ? " ,
( None , db_wiki [ " wiki " ] , ) )
else :
await local_wiki . remove ( db_wiki [ " wiki " ] , 1000 )
2020-08-08 17:25:32 +00:00
DBHandler . update_db ( )
continue
raise WikiError
discussion_feed = discussion_feed_resp [ " _embedded " ] [ " doc:posts " ]
discussion_feed . reverse ( )
except aiohttp . ContentTypeError :
logger . exception ( " Wiki seems to be resulting in non-json content. " )
continue
2020-08-18 23:04:41 +00:00
except asyncio . TimeoutError :
logger . debug ( " Timeout on reading JSON of discussion post feeed. " )
continue
2020-08-08 17:25:32 +00:00
except :
logger . exception ( " On loading json of response. " )
2020-08-06 00:46:43 +00:00
continue
2020-08-08 17:25:32 +00:00
if db_wiki [ " postid " ] is None : # new wiki, just get the last post to not spam the channel
if len ( discussion_feed ) > 0 :
DBHandler . add ( db_wiki [ " wikiid " ] , discussion_feed [ - 1 ] [ " id " ] , True )
else :
DBHandler . add ( db_wiki [ " wikiid " ] , " 0 " , True )
DBHandler . update_db ( )
continue
2020-08-22 17:51:15 +00:00
comment_events = [ ]
2020-08-12 10:23:10 +00:00
targets = generate_targets ( db_wiki [ " wiki " ] , " AND NOT wikiid IS NULL " )
2020-08-08 17:25:32 +00:00
for post in discussion_feed :
2020-08-22 17:51:15 +00:00
if post [ " _embedded " ] [ " thread " ] [ 0 ] [ " containerType " ] == " ARTICLE_COMMENT " and post [ " id " ] > db_wiki [ " postid " ] :
comment_events . append ( post [ " forumId " ] )
comment_pages = { }
if len ( comment_events ) :
comment_pages = await local_wiki . safe_request (
" {wiki} wikia.php?controller=FeedsAndPosts&method=getArticleNamesAndUsernames&stablePageIds= {pages} &format=json " . format (
wiki = db_wiki [ " wiki " ] , pages = ' , ' . join ( comment_events )
) , rate_limiter , " articleNames " ) # rate_limiter is undefined, needs fixing
for post in discussion_feed : # Yeah, second loop since the comments require an extra request
2020-08-08 17:25:32 +00:00
if post [ " id " ] > db_wiki [ " postid " ] :
for target in targets . items ( ) :
try :
2020-08-22 17:51:15 +00:00
await essential_feeds ( post , comment_pages , db_wiki , target )
2020-08-08 17:25:32 +00:00
except asyncio . CancelledError :
raise
except :
if command_line_args . debug :
logger . exception ( " Exception on Feeds formatter " )
shutdown ( loop = asyncio . get_event_loop ( ) )
else :
logger . exception ( " Exception on Feeds formatter " )
await generic_msg_sender_exception_logger ( traceback . format_exc ( ) , " Exception in feed formatter " , Post = str ( post ) [ 0 : 1000 ] , Wiki = db_wiki [ " wiki " ] )
if discussion_feed :
DBHandler . add ( db_wiki [ " wikiid " ] , post [ " id " ] , True )
2020-08-06 00:46:43 +00:00
await asyncio . sleep ( delay = 2.0 ) # hardcoded really doesn't need much more
DBHandler . update_db ( )
2020-08-06 13:26:06 +00:00
except asyncio . CancelledError :
pass
2020-08-06 00:46:43 +00:00
except :
if command_line_args . debug :
raise # reraise the issue
else :
logger . exception ( " Exception on Feeds formatter " )
2020-08-06 13:26:06 +00:00
await generic_msg_sender_exception_logger ( traceback . format_exc ( ) , " Discussion handler task exception " , Wiki = db_wiki [ " wiki " ] )
2020-08-06 00:46:43 +00:00
2020-07-19 13:32:54 +00:00
2020-07-26 08:00:27 +00:00
def shutdown ( loop , signal = None ) :
2020-08-13 12:23:45 +00:00
global main_tasks
2020-07-26 08:00:27 +00:00
DBHandler . update_db ( )
2020-08-12 10:50:30 +00:00
db_connection . close ( )
2020-08-13 12:23:45 +00:00
loop . remove_signal_handler ( signal )
2020-08-01 10:45:41 +00:00
if len ( messagequeue ) > 0 :
logger . warning ( " Some messages are still queued! " )
2020-08-13 12:23:45 +00:00
for task in ( main_tasks [ " wiki_scanner " ] , main_tasks [ " discussion_handler " ] , main_tasks [ " msg_queue_shield " ] ) :
task . cancel ( )
loop . run_until_complete ( main_tasks [ " message_sender " ] )
2020-07-26 08:00:27 +00:00
for task in asyncio . all_tasks ( loop ) :
2020-08-11 21:12:10 +00:00
logger . debug ( " Killing task " )
2020-07-26 08:00:27 +00:00
task . cancel ( )
2020-08-13 12:23:45 +00:00
try :
loop . run_until_complete ( asyncio . gather ( * asyncio . all_tasks ( loop ) ) )
except asyncio . CancelledError :
loop . stop ( )
logger . info ( " Script has shut down due to signal {} . " . format ( signal ) )
2020-08-06 13:26:06 +00:00
# sys.exit(0)
2020-07-26 08:00:27 +00:00
2020-07-28 12:39:32 +00:00
2020-08-06 13:26:06 +00:00
# def global_exception_handler(loop, context):
# """Global exception handler for asyncio, lets us know when something crashes"""
# msg = context.get("exception", context["message"])
# logger.error("Global exception handler: {}".format(msg))
# if command_line_args.debug is False:
# requests.post("https://discord.com/api/webhooks/"+settings["monitoring_webhook"], data=repr(DiscordMessage("compact", "monitoring", [settings["monitoring_webhook"]], wiki=None, content="[RcGcDb] Global exception handler: {}".format(msg))), headers={'Content-Type': 'application/json'})
# else:
# shutdown(loop)
2020-07-20 12:03:55 +00:00
2020-08-01 00:30:49 +00:00
2020-07-19 13:32:54 +00:00
async def main_loop ( ) :
2020-08-13 12:23:45 +00:00
global main_tasks
2020-07-23 19:12:07 +00:00
loop = asyncio . get_event_loop ( )
2020-08-13 12:23:45 +00:00
nest_asyncio . apply ( loop )
2020-07-26 16:03:20 +00:00
try :
signals = ( signal . SIGHUP , signal . SIGTERM , signal . SIGINT )
2020-07-26 21:52:24 +00:00
for s in signals :
loop . add_signal_handler (
s , lambda s = s : shutdown ( loop , signal = s ) )
2020-07-26 16:03:20 +00:00
except AttributeError :
2020-07-29 16:33:40 +00:00
logger . info ( " Running on Windows, some things may not work as they should. " )
2020-07-26 16:03:20 +00:00
signals = ( signal . SIGBREAK , signal . SIGTERM , signal . SIGINT )
2020-08-06 13:26:06 +00:00
# loop.set_exception_handler(global_exception_handler)
2020-07-26 21:52:24 +00:00
try :
2020-08-13 12:23:45 +00:00
main_tasks = { " wiki_scanner " : asyncio . create_task ( wiki_scanner ( ) ) , " message_sender " : asyncio . create_task ( message_sender ( ) ) ,
" discussion_handler " : asyncio . create_task ( discussion_handler ( ) ) }
main_tasks [ " msg_queue_shield " ] = asyncio . shield ( main_tasks [ " message_sender " ] )
await asyncio . gather ( main_tasks [ " wiki_scanner " ] , main_tasks [ " discussion_handler " ] , main_tasks [ " message_sender " ] )
2020-07-26 21:52:24 +00:00
except KeyboardInterrupt :
shutdown ( loop )
2020-08-14 21:38:26 +00:00
except asyncio . CancelledError :
2020-08-13 12:23:45 +00:00
return
2020-07-19 13:32:54 +00:00
2020-08-09 11:53:11 +00:00
asyncio . run ( main_loop ( ) , debug = False )