Fixed edge case where RcGcDb spammed the feed
Before Width: | Height: | Size: 2.3 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 3.6 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 2.6 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 3.6 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 3.6 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 3.6 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 3.1 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 2.3 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 2.4 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 2.3 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 2.4 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 3.4 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 3.6 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 3.6 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 2.4 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 2.2 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 3.5 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 3.6 KiB After Width: | Height: | Size: 0 B |
Before Width: | Height: | Size: 3.6 KiB After Width: | Height: | Size: 0 B |
|
@ -54,6 +54,13 @@ class QueueEntry:
|
||||||
"""Confirms sent status for a webhook. Returns True if sending to all webhooks has been completed, otherwise False."""
|
"""Confirms sent status for a webhook. Returns True if sending to all webhooks has been completed, otherwise False."""
|
||||||
self._sent_webhooks.add(webhook)
|
self._sent_webhooks.add(webhook)
|
||||||
|
|
||||||
|
def clear_webhook_send_requirement_for(self, webhook: str):
|
||||||
|
"""In case webhook gets removed, this function is called to remove the webhook from the list or required recipients."""
|
||||||
|
try:
|
||||||
|
self.webhooks.remove(webhook)
|
||||||
|
except ValueError: # best effort
|
||||||
|
pass
|
||||||
|
|
||||||
def complete(self) -> bool:
|
def complete(self) -> bool:
|
||||||
return len(self._sent_webhooks) == len(self.webhooks)
|
return len(self._sent_webhooks) == len(self.webhooks)
|
||||||
|
|
||||||
|
@ -91,6 +98,15 @@ class MessageQueue:
|
||||||
def cut_messages(self, item_num: int):
|
def cut_messages(self, item_num: int):
|
||||||
self._queue = self._queue[item_num:]
|
self._queue = self._queue[item_num:]
|
||||||
|
|
||||||
|
def nuke_all_messages_to_webhook(self, webhook: str):
|
||||||
|
"""Dumb and quick way to make sure no message is intended for removed webhook in case it gets erased from
|
||||||
|
position of domain_manager as well as kill any pending suspensions"""
|
||||||
|
for message in self._queue:
|
||||||
|
message.clear_webhook_send_requirement_for(webhook)
|
||||||
|
for suspension_webhook, suspension_task in self.webhook_suspensions.items():
|
||||||
|
if webhook.split("/")[0] == suspension_webhook.split("/")[0]:
|
||||||
|
suspension_task.cancel()
|
||||||
|
|
||||||
def track_discord_error_rate(self, number: float):
|
def track_discord_error_rate(self, number: float):
|
||||||
self.discord_error_rate_tracker = max(0, min(self.discord_error_rate_tracker+number, settings["max_discord_additional_await_time"]))
|
self.discord_error_rate_tracker = max(0, min(self.discord_error_rate_tracker+number, settings["max_discord_additional_await_time"]))
|
||||||
|
|
||||||
|
@ -126,6 +142,7 @@ class MessageQueue:
|
||||||
if not isinstance(msg.webhooks, list):
|
if not isinstance(msg.webhooks, list):
|
||||||
raise TypeError('msg.webhook_url in _queue is not a list')
|
raise TypeError('msg.webhook_url in _queue is not a list')
|
||||||
for webhook in msg.webhooks:
|
for webhook in msg.webhooks:
|
||||||
|
if msg.check_sent_status(webhook) is False:
|
||||||
message_dict[webhook].append(msg) # defaultdict{"dadibadyvbdmadgqueh23/dihjd8agdandashd": [DiscordMessage, DiscordMessage]}
|
message_dict[webhook].append(msg) # defaultdict{"dadibadyvbdmadgqueh23/dihjd8agdandashd": [DiscordMessage, DiscordMessage]}
|
||||||
return message_dict.items()
|
return message_dict.items()
|
||||||
|
|
||||||
|
@ -206,7 +223,7 @@ class MessageQueue:
|
||||||
if method == "POST":
|
if method == "POST":
|
||||||
msg.webhook = webhook_url
|
msg.webhook = webhook_url
|
||||||
msg.wiki.add_message(msg)
|
msg.wiki.add_message(msg)
|
||||||
for queue_message in messages[max(index-len(msg.message_list), 0):index+1]:
|
for queue_message in messages[max(index-len(msg.message_list), 0):index+1]: # This likely breaks when there are messages from suspended webhooks awaiting sending and a new working webhook is added
|
||||||
queue_message.confirm_sent_status(webhook_url)
|
queue_message.confirm_sent_status(webhook_url)
|
||||||
else:
|
else:
|
||||||
if hasattr(msg, "wiki"): # PATCH and DELETE can not have wiki attribute
|
if hasattr(msg, "wiki"): # PATCH and DELETE can not have wiki attribute
|
||||||
|
@ -215,10 +232,7 @@ class MessageQueue:
|
||||||
logger.debug("Found webhook ID in webhook_suspensions, nuking it.")
|
logger.debug("Found webhook ID in webhook_suspensions, nuking it.")
|
||||||
await msg.wiki.remove_webhook_from_db(webhook_url, "Attempts to send a message to a webhook result in client error.", send_reason=False)
|
await msg.wiki.remove_webhook_from_db(webhook_url, "Attempts to send a message to a webhook result in client error.", send_reason=False)
|
||||||
for message in messages:
|
for message in messages:
|
||||||
try:
|
message.clear_webhook_send_requirement_for(webhook_url)
|
||||||
self._queue.remove(message)
|
|
||||||
except ValueError:
|
|
||||||
logger.exception("Message could not be found in the queue")
|
|
||||||
self.webhook_suspensions[webhook_id].cancel()
|
self.webhook_suspensions[webhook_id].cancel()
|
||||||
else:
|
else:
|
||||||
self.webhook_suspensions[webhook_id] = asyncio.create_task(self.suspension_check(webhook_url), name="DC Sus Check for {}".format(webhook_id))
|
self.webhook_suspensions[webhook_id] = asyncio.create_task(self.suspension_check(webhook_url), name="DC Sus Check for {}".format(webhook_id))
|
||||||
|
|
|
@ -10,6 +10,7 @@ import logging
|
||||||
import asyncpg
|
import asyncpg
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
|
from src.misc import flatten_lists
|
||||||
from src.discord.queue import messagequeue
|
from src.discord.queue import messagequeue
|
||||||
from src.exceptions import NoDomain
|
from src.exceptions import NoDomain
|
||||||
from src.config import settings
|
from src.config import settings
|
||||||
|
@ -76,12 +77,19 @@ class DomainManager:
|
||||||
if split_payload[0] == "ADD":
|
if split_payload[0] == "ADD":
|
||||||
await self.new_wiki(Wiki(split_payload[1], safe_type_for_id(split_payload[2], int), safe_type_for_id(split_payload[3], str)))
|
await self.new_wiki(Wiki(split_payload[1], safe_type_for_id(split_payload[2], int), safe_type_for_id(split_payload[3], str)))
|
||||||
elif split_payload[0] == "REMOVE":
|
elif split_payload[0] == "REMOVE":
|
||||||
|
webhook_diff = set()
|
||||||
try:
|
try:
|
||||||
results = await connection.fetch("SELECT * FROM rcgcdb WHERE wiki = $1;", split_payload[1])
|
results = await connection.fetch("SELECT * FROM rcgcdb WHERE wiki = $1;", split_payload[1])
|
||||||
if len(results) > 0: # If there are still webhooks for this wiki - just update its targets
|
if len(results) > 0: # If there are still webhooks for this wiki - just update its targets
|
||||||
await self.return_domain(self.get_domain(split_payload[1])).get_wiki(split_payload[1]).update_targets()
|
wiki_obj: Wiki = self.return_domain(self.get_domain(split_payload[1])).get_wiki(split_payload[1], None)
|
||||||
|
if wiki_obj is not None:
|
||||||
|
flattened_set_of_webhooks = set(flatten_lists(wiki_obj.rc_targets.values())).union(set(flatten_lists(wiki_obj.discussion_targets.values())))
|
||||||
|
await wiki_obj.update_targets()
|
||||||
|
webhook_diff = flattened_set_of_webhooks - set(flatten_lists(wiki_obj.rc_targets.values())).union(set(flatten_lists(wiki_obj.discussion_targets.values())))
|
||||||
else:
|
else:
|
||||||
self.remove_wiki(split_payload[1])
|
self.remove_wiki(split_payload[1])
|
||||||
|
for removed_webhook in webhook_diff:
|
||||||
|
messagequeue.nuke_all_messages_to_webhook(removed_webhook)
|
||||||
except asyncpg.IdleSessionTimeoutError:
|
except asyncpg.IdleSessionTimeoutError:
|
||||||
logger.error("Couldn't check amount of webhooks with {} wiki!".format(split_payload[1]))
|
logger.error("Couldn't check amount of webhooks with {} wiki!".format(split_payload[1]))
|
||||||
return
|
return
|
||||||
|
|
|
@ -2,6 +2,7 @@ from __future__ import annotations
|
||||||
|
|
||||||
import functools
|
import functools
|
||||||
import json
|
import json
|
||||||
|
import typing
|
||||||
from functools import cache
|
from functools import cache
|
||||||
from html.parser import HTMLParser
|
from html.parser import HTMLParser
|
||||||
import base64, re
|
import base64, re
|
||||||
|
@ -140,6 +141,11 @@ def class_searcher(attribs: list) -> str:
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def flatten_lists(iterable_to_be_flattened: typing.Iterable):
|
||||||
|
"""Flatten lists"""
|
||||||
|
return [item for sub_list in iterable_to_be_flattened for item in sub_list]
|
||||||
|
|
||||||
|
|
||||||
class ContentParser(HTMLParser):
|
class ContentParser(HTMLParser):
|
||||||
"""ContentPerser is an implementation of HTMLParser that parses output of action=compare&prop=diff API request
|
"""ContentPerser is an implementation of HTMLParser that parses output of action=compare&prop=diff API request
|
||||||
for two MediaWiki revisions. It extracts the following:
|
for two MediaWiki revisions. It extracts the following:
|
||||||
|
|