ahtcg_discord_bot/ahtcg_bot.py

133 lines
4.8 KiB
Python
Executable File

#!/usr/bin/env python3
from datetime import datetime
import json
import re
import discord
from discord.ext import commands, tasks
from arkhamdb import ArkhamDBClient
from secret import TOKEN
class ArkhamDBUpdater(commands.Bot):
channel_list: set[int]
arkhamdb_client: ArkhamDBClient
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# TODO: should really be a database
with open('channel_list.json') as f:
self.channel_list = set(json.load(f))
self.arkhamdb_client = ArkhamDBClient()
self.setup_commands()
async def close(self) -> None:
await self.arkhamdb_client.close()
await super().close()
def setup_commands(self):
@self.command(name='monitor')
async def monitor(ctx: commands.Context):
"""Watch this channel for deck links and link to latest versions."""
await ctx.message.reply('Now monitoring this channel for deck IDs', mention_author=True)
self.channel_list.add(ctx.message.channel.id)
with open('channel_list.json', 'w') as f:
json.dump(list(self.channel_list), f)
@self.command(name='forget')
async def forget(ctx: commands.Context):
"""Remove this channel from the monitor list"""
await ctx.message.reply('No longer monitoring this channel for deck IDs', mention_author=True)
self.channel_list.discard(ctx.message.channel.id)
with open('channel_list.json', 'w') as f:
json.dump(list(self.channel_list), f)
async def on_ready(self):
print(f'Logged in as {self.user} (ID: {self.user.id})')
print('Enabled on servers:')
async for guild in self.fetch_guilds(limit=150):
print(' -', guild.name)
print('------')
self.arkhamdb_monitor.start()
async def gather_deck_ids(self, channel: discord.TextChannel) -> dict[int, str]:
deck_ids: dict[int, str] = {}
url_regex = re.compile(
r'(?P<prefix>.*)' + self.arkhamdb_client.origin + r'/deck/view/(?P<deck_id>\d+)')
async for message in channel.history(limit=200, oldest_first=True):
# ignore the bot's messages
if message.author.id == self.user.id:
continue
matches = url_regex.finditer(message.content)
for match in matches:
deck_ids[int(match.group('deck_id'))] = match.group('prefix')
return deck_ids
async def clear_old_messages(self, channel: discord.TextChannel) -> None:
async for message in channel.history(limit=200):
if message.author.id == self.user.id \
and message.id != channel.last_message_id \
and len(message.embeds) == 1:
await message.delete()
async def update_channel_latest_decks(self, channel: discord.TextChannel) -> None:
deck_ids = await self.gather_deck_ids(channel)
latest_decks = await self.arkhamdb_client.get_latest_decks(deck_ids)
await self.clear_old_messages(channel)
try:
last_message = await channel.fetch_message(channel.last_message_id)
except discord.NotFound:
last_message = None
message_text = '\n'.join(
f"{prefix}[{deck['name']}]({self.arkhamdb_client.origin}/deck/view/{deck['id']}) [{deck['id']}]"
for prefix, deck in latest_decks.values())
message_embed = discord.Embed(
title=f'Updated as of {datetime.now()}',
description=message_text)
if last_message is not None \
and last_message.author.id == self.user.id \
and len(last_message.embeds) == 1:
if message_text != last_message.embeds[0].description:
await last_message.edit(embed=message_embed)
else:
await channel.send(embed=message_embed)
async def on_message(self, message: discord.Message):
await self.process_commands(message)
# we do not want the bot to reply to itself
if message.author.id == self.user.id:
return
if message.channel.id in self.channel_list:
await self.update_channel_latest_decks(message.channel)
async def on_message_edit(self, before: discord.Message, after: discord.Message):
# we do not want the bot to reply to itself
if after.author.id == self.user.id:
return
if after.channel.id in self.channel_list:
await self.update_channel_latest_decks(after.channel)
@tasks.loop(hours=1)
async def arkhamdb_monitor(self) -> None:
for channel_id in self.channel_list:
channel = self.get_channel(channel_id)
await self.update_channel_latest_decks(channel)
bot = ArkhamDBUpdater(command_prefix='!arkhamdb ')
bot.run(TOKEN, reconnect=True)