Page MenuHomedesp's stash

bot.py
No OneTemporary

from bs4 import *
import discord, requests
from discord.ext import commands, tasks
from discord import ui
from collections import defaultdict
import json, logging, asyncio, random, re
#for getting members from to dm, otherwise use intents.members
GUILD = int(open('guild.txt').read())
intents = discord.Intents.default()
intents.message_content = True
#intents.members = True #makes things way too slow so we bind to guild instead
bot = commands.Bot(command_prefix='.pinger ', intents=intents)
logger = logging.getLogger('discord').getChild('ubcpinger')
logger.setLevel(logging.DEBUG)
# helper for easier access of data in json
class coursedict(defaultdict):
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__
#recursively convert all dicts/list into defattrdicts
def convert(self, data):
iter = data.items() if isinstance(data, dict) else enumerate(data)
for key, val in iter:
if type(val) in [dict, list]:
if type(val) == dict:
data[key] = coursedict(val)
self.convert(data[key])
def __init__(self, data: dict):
super().__init__(lambda: '', data)
self.convert(self)
def __str__(self) -> str:
if self.subj:
s = f'{self.subj} {self.crsno}'
if self.section:
s += f' (section mode)'
else:
#funny ljust needed in actv so strip is needed to revert it on show
s += f', search:' + ' '.join([f'{k}={str(v).strip()}' for k,v in self if v and k not in ['metadata', 'subj', 'crsno']])
return s
else:
return dict.__repr__(self)
#remove the default factory print
__repr__ = dict.__repr__
#implicitly allow iter on items()
def __iter__(self):
return self.items().__iter__()
def get_data():
with open('courses.json') as f:
data = f.read()
return coursedict(json.loads(data)) if data else {}
def set_data(obj):
with open('courses.json', 'w') as data:
data.write(json.dumps(obj, indent=2))
def try_ping(pinger: coursedict):
optional = lambda fmtstr, *data: fmtstr.format(*data) if all(d for d in data) else ''
#must include user agent, accept, content type (if post), otherwise ssc will complain
#if no pname+tname specified, session cookie is needed to avoid "Please wait while your request is processed - this may take up to a minute to complete."
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/114.0",
"Accept": "text/html",
}
if pinger.section: #section mode
doc = requests.get('https://courses.students.ubc.ca/cs/courseschedule?pname=subjarea&tname=subj-course'
f'&dept={pinger.subj}'
f'&course={pinger.crsno}'
+ optional('&sessyr={}&sesscd={}', pinger.sessyr, pinger.sesscd), headers=headers).text
else: #search mode
reqdata = dict(pinger) #make a copy and remove metadata
del reqdata['metadata']
#no need to explicitly set content type if we set data= alr since requests does that for us
doc = requests.post('https://courses.students.ubc.ca/cs/courseschedule?pname=subjarea&tname=sectsearch',
data=reqdata,
headers=headers).text
#they both use the same system to display sections
soup = BeautifulSoup(doc, 'html.parser')
ret = {}
#remove thead, since we cant search directly with tbody as its doesnt necessarily exist in the doc
soup.select_one('.section-summary thead').clear()
for child in soup.select('.section-summary tr'):
section = child.select_one("td:nth-of-type(2) a")
status = child.select_one("td:nth-of-type(1)").text
ret[section.text] = 'https://courses.students.ubc.ca' + section['href'], (status if status != '\xa0' and status.strip() else 'PING')
return ret
#done make this a dict instead so its unique? this would remove the need of a "add myself to existing courses" button
#it should be pretty simple to just match the basics but more optimization (like catching CPSC 110 101 == CPSC 110 1*) might be hard
#and its practically impossible to actually figure out whether a search includes a specific section programmatically
#actually it might be possible if i do a ping once and check if theres a strict superset/subset of the current object
#still gonna be hard to optimize for pingers that span across multiple pingers
def add_course(obj):
data = get_data()
#obj.metadata.courses contains the entire list of sections that can be pinged through this pinger
#whereas obj.metadata.user[0].list contains the wanted list of sections (same for search mode, likely smaller for section mode)
#ping, check against existing pingers' course list (in metadata), if matches merge
resp = try_ping(obj)
if resp:
obj.metadata.courses = list(resp.keys())
#populate user.list to signify which part of the pinger should trigger a ping
obj.metadata.user[0].list = list(obj.metadata.courses)
if obj.section: #do filtering (only needed in section mode, search mode is already filtered by ssc)
check = re.compile(f'{obj.subj} {obj.crsno} {obj.section.replace("*", ".*")}')
obj.metadata.user[0].list = [c for c in obj.metadata.user[0].list if check.match(c)]
#after we sort through that section should not be bound to a specific section now since the data is in user.list
#we do need to set data though to allow other things (e.g. try_ping) to match on whether its in search or section mode
obj.section = True
courses = set(obj.metadata.user[0].list)
#now we have the proper name for the pinger
name = str(obj)
logger.info(f'Adding {name} to the pingers...')
merged = False
for pinger_name, pinger_obj in data: #defattrdict not only for easier editing but also for copying for iteration
against = set(pinger_obj.metadata.courses)
#only check merging if they are the same session
if pinger_obj.sessyr == obj.sessyr and pinger_obj.sesscd == obj.sesscd:
#there should only be one of the cases below: either existing pinger(s) are subsets of the current one, or this pinger is a subset of an existing pinger
#otherwise it shouldve been optimized out since it is sequentially added and this optimzation runs every time
if courses.issuperset(against):
logger.info(f' Optimization: merging {pinger_name} into {name}')
obj.metadata.user += pinger_obj.metadata.user
del data[pinger_name]
elif against.issuperset(courses):
#this should be a terminating action to avoid adding it into multiple pingers when one is already handling it
#(think of a case where existing courses are [1234, 1256] and our pinger is [12] - 2 pingers match, but only 1 is needed)
pinger_obj.metadata.user += obj.metadata.user
logger.info(f' Optimization: merging {name} into {pinger_name}')
name = pinger_name
merged = True
break
if not merged:
data[name] = obj
set_data(data)
#returns the list of courses this pinger will ping, and the name (changed if merged)
return name, "\n".join(obj.metadata.user[0].list)
else:
raise ValueError('No sections found!')
#done add ppl to existing pingers? see above
#done optimization for when multiple people are looking at the same course but want different sections pinged (and not all of them)
#TODO currently the way courses in section mode are handled means that they basically dont get optimized into search modes
#it would be better if pinger.metadata.courses tracks the total set of courses its users are pinging instead,
#and then have the optimizer do special code on reading section mode to not match on the courses in the list, but on the course names
#aka if everything a user is pinging has the same prefix of CPSC 110 for example,
#then just throw it into CPSC 110 section mode regardless even if currently the CPSC 110 pinger.metadata.courses does not have it
#this means that the following has to be changed:
# - instead of obj.metadata.courses = list(resp.keys()), do obj.metadata.courses = <total of all user lists>
# - at the superset checks, add check on whether everything being added is in the same course, and if there is a course pinger in section mode for that already
def get_pingers_with_user(id):
ret = []
for name, pinger in get_data():
for user in pinger.metadata.user:
if user.id == id:
ret.append(name)
break
return ret
def opt(label, default=False):
return discord.SelectOption(label=label, default=default)
class AddCourseDuration(ui.Modal, title='Enter the time range to search for'):
duration = ui.TextInput(label='Duration', placeholder='class duration (from-to, HHMM), e.g. 1000-1300, 1000-, -1300', min_length=5, max_length=9, required=False)
async def on_submit(self, interaction: discord.Interaction):
#set default val for duration for next open
self.duration.default = self.duration.value
await interaction.response.defer()
class AddCourseSearch(discord.ui.View):
#put in class to allow default changing on callback
#type opts is way too long, so some of the little used ones are truncated (opt("Reserved Section"),opt("Optional Section"),opt("Independent Study")
#TODO reorder to make it easier to search for the main ones? rn its alph order
type_opts = [opt("Directed Studies"),opt("Discussion"),opt("Essay/Report"),opt("Exchange Program"),opt("Experiential"),opt("Field Trip"),opt("Flexible Learning"),opt("Lab-Seminar"),opt("Laboratory"),opt("Lecture"),opt("Lecture-Discussion"),opt("Lecture-Laboratory"),opt("Lecture-Seminar"),opt("Practicum"),opt("Problem Session"),opt("Project"),opt("Rehearsal"),opt("Research"),opt("Seminar"),opt("Studio"),opt("Thesis"),opt("Tutorial"),opt("Waiting List"),opt("Work Placement"),opt("Workshop")]
term_opts = [opt("Term 1"), opt("Term 2"), opt("Term 1-2")]
#TODO remove sunday and saturday? oh actually there are a lot of courses on those days huh
days_opts = [opt("Sunday"), opt("Monday"), opt("Tuesday"), opt("Wednesday"), opt("Thursday"), opt("Friday"), opt("Saturday")]
cred_opts = [opt(str(i+1)) for i in range(6)] #pretty sure only 6 is allowed
term_menu = ui.Select(placeholder='Term', options=term_opts, min_values=0, row=3, custom_id='ubcpinger-search-days-term-menu')
days_menu = ui.Select(placeholder='Days', options=days_opts, min_values=0, max_values=7, row=2, custom_id='ubcpinger-search-days-menu')
type_menu = ui.Select(placeholder='Type', options=type_opts, min_values=0, row=3, custom_id='ubcpinger-search-days-type-menu')
term_menu = ui.Select(placeholder='Term', options=term_opts, min_values=0, row=3, custom_id='ubcpinger-search-days-term-menu')
cred_menu = ui.Select(placeholder='Credits', options=cred_opts, min_values=0, row=3, custom_id='ubcpinger-search-days-cred-menu')
def __init__(self, base):
super().__init__()
self.base = base
self.dur = AddCourseDuration()
async def change_menu(self, item: discord.ui.Select, interaction: discord.Interaction):
#remove opened menus
for i in self.children:
if isinstance(i, discord.ui.Select):
self.remove_item(i)
if item:
self.add_item(item)
await interaction.response.edit_message(view=self)
async def interaction_check(self, interaction: discord.Interaction):
#set default and close menu as response; we can extract the data later on done
if 'custom_id' in interaction.data and '-menu' in interaction.data['custom_id']:
#really hacky but im also too sleepy to actually figure this out lmao
for opt in getattr(self, interaction.data['custom_id'].split('-')[2] + '_opts'):
#seems like setting default doesnt really work for the single option dropdowns which kinda checks out but hmmm
if opt.label in interaction.data['values']:
opt.default = True
else:
#remember to reset too
opt.default = False
await self.change_menu(None, interaction)
return False
return True
@discord.ui.button(label='Days', custom_id='ubcpinger-search-days', style = discord.ButtonStyle.primary)
async def days(self, interaction: discord.Interaction, button: discord.ui.Button):
await self.change_menu(self.days_menu, interaction)
@discord.ui.button(label='Type', custom_id='ubcpinger-search-type', style = discord.ButtonStyle.primary)
async def type(self, interaction: discord.Interaction, button: discord.ui.Button):
await self.change_menu(self.type_menu, interaction)
@discord.ui.button(label='Term', custom_id='ubcpinger-search-term', style = discord.ButtonStyle.primary)
async def term(self, interaction: discord.Interaction, button: discord.ui.Button):
await self.change_menu(self.term_menu, interaction)
@discord.ui.button(label='Credits', custom_id='ubcpinger-search-cred', row=1, style = discord.ButtonStyle.primary)
async def cred(self, interaction: discord.Interaction, button: discord.ui.Button):
await self.change_menu(self.cred_menu, interaction)
@discord.ui.button(label='Done', custom_id='ubcpinger-search-done', row=1, style = discord.ButtonStyle.success)
async def done(self, interaction: discord.Interaction, button: discord.ui.Button):
try:
optional = lambda data: data.values[0] if data.values else ''
#most of these values are selected for us already so no need to parse
#since we set default on callback that is basically the canonical value of whether the day is selected
#also make it a dict for easier shoving in through kwargs in the pinger loop
days = {f'DAY{i+1}': True for i, o in enumerate(self.days_opts) if o.default}
stime, etime = self.dur.duration.value.split('-') if self.dur.duration.value else '', ''
obj = coursedict({
'subj': self.base.subj.value,
'crsno': self.base.course.value,
#note how theres no section
'credit': optional(self.cred_menu),
'stime': stime,
'etime': etime,
**days,
**self.base.sesobj,
'actv': optional(self.type_menu).ljust(50),
'term': optional(self.term_menu),
'metadata': {'user': [{'id': interaction.user.id, 'list': None}]}
})
#delegate name setting to defattrdict formatting
name, added = add_course(obj)
#edit the menu instead of sending another ephemeral msg since we cant delete the old one
#TODO figure out a better way to print since ill be using that to show a dropdown menu for the removal ui too
await interaction.response.edit_message(content=f'The pinger `{name}` is now tracking the requested classes! This includes the following:\n```\n{added}\n```\nYou will now get pinged when it opens up.', view=None)
except Exception as e:
logger.error('AddCourseSearch fail:', exc_info=e)
await interaction.response.edit_message(content=f'Something went wrong (likely wrong params): {type(e)}\n{e}', view=None)
#at the end to wrap it around nicer
@discord.ui.button(label='Time range', custom_id='ubcpinger-search-duration', row=1, style = discord.ButtonStyle.primary)
async def duration(self, interaction: discord.Interaction, button: discord.ui.Button):
await interaction.response.send_modal(self.dur)
class AddCourseBase(ui.Modal, title='Add a course to ping'):
subj = ui.TextInput(label='Subject', placeholder='e.g. CPSC, CP*', min_length=1, max_length=4)
course = ui.TextInput(label='Course no.', placeholder='e.g. 110, 1*', min_length=1, max_length=3)
section = ui.TextInput(label='Section', placeholder='e.g. 101, T2*, 10*/201/T1D, leave blank to open search menu', min_length=1, max_length=3, required=False)
session = ui.TextInput(label='Session', placeholder='e.g. 2022S, 2023W, defaults to coming session', min_length=5, max_length=5, required=False)
async def on_submit(self, interaction: discord.Interaction):
try:
#response in this case is the same msg as the button, dont do anything to it
await interaction.response.defer()
#parse session
self.sesobj = {}
if self.session.value:
self.sesobj = {
'sessyr': self.session.value[:4],
'sesscd': self.session.value[-1],
}
if not self.section.value:
#discord doesnt allow us to send another modal right away? (yep check InteractionResponseType, 9 is modal and not in the list)
await interaction.followup.send('Configure the search parameters below:', view=AddCourseSearch(self), ephemeral=True)
else:
if '*' in self.subj.value or '*' in self.course.value:
await interaction.response.send_message('Wildcards for subject and courses are not allowed in section mode! Leave section blank to do a broad search.', ephemeral=True)
return
obj = coursedict({
'subj': self.subj.value,
'crsno': self.course.value,
#even though we dont actually use the wildcard we need to differentiate between no section and a fully wildcard section so dont remove yet
'section': self.section.value,
**self.sesobj,
'metadata': {'user': [{'id': interaction.user.id, 'list': None}]}
})
name, added = add_course(obj)
await interaction.followup.send(f'The pinger `{name}` is now tracking the requested classes! This includes the following:\n```\n{added}\n```\nYou will now get pinged when it opens up.', ephemeral=True)
except Exception as e:
logger.error('AddCourseBase fail:', exc_info=e)
await interaction.followup.send(content=f'Something went wrong (likely wrong params): {type(e)}\n{e}', ephemeral=True)
#TODO dm option? or maybe just only allow dms (iirc the reason why i dont dm is coz theres no mechanism to dm multiple ppl or sth so i just send a single msg to ping everyone)
class RemoveCourseList(ui.View):
#i love python asyncio : ) why the fuck
@classmethod
async def create(self, user):
if not await bot.is_owner(user):
pingers = [opt(s) for s in get_pingers_with_user(user.id)]
else:
pingers = [opt(s) for s in get_data().keys()]
return RemoveCourseList(user, pingers)
def __init__(self, user, pingers):
super().__init__()
self.user = user
self.menu = ui.Select(placeholder='Pinger', options=pingers, min_values=1, max_values=len(pingers), custom_id='ubcpinger-remove-menu')
self.add_item(self.menu)
async def interaction_check(self, interaction: discord.Interaction):
if 'custom_id' in interaction.data and interaction.data['custom_id'] == 'ubcpinger-remove-menu':
data = get_data()
for name in interaction.data['values']:
#if bot owner, on remove of pinger with own id added, remove normally; else remove the entire thing
removed = False
for user in data[name].metadata.user:
if user.id == self.user.id:
data[name].metadata.user.remove(user)
removed = True
#if empty pinger, remove the entire pinger
if not data[name].metadata.user:
logger.info(f"{data[name]} is now empty, removing...")
del data[name]
break
if not removed:
if self.id == bot.owner_id:
del data[name]
else:
logger.error(f'Something pretty wrong happened: {data} does not have {self.id} even though on setup it had')
set_data(data)
pinger_names = "\n".join(interaction.data['values'])
await interaction.response.edit_message(content=f"Successfully removed you from these pingers:\n```\n{pinger_names}\n```", view=None)
class CourseButtons(discord.ui.View):
@discord.ui.button(label="Add course", style=discord.ButtonStyle.green, custom_id='ubcpinger-add')
async def add(self, interaction: discord.Interaction, button: discord.ui.Button):
await interaction.response.send_modal(AddCourseBase())
@discord.ui.button(label="Remove course", style=discord.ButtonStyle.red, custom_id='ubcpinger-remove')
async def remove(self, interaction: discord.Interaction, button: discord.ui.Button):
await interaction.response.defer()
await interaction.followup.send("Choose a course to remove:", view=await RemoveCourseList.create(interaction.user), ephemeral=True)
#done remove button (only allow ppl to remove their own, except if user id matches me so i can actually do cleanup and stuff without going into the server)
#TODO button for viewing what courses you get pinged for in a pinger? rn its not too descriptive since the optimizer makes it the broadest possible
@tasks.loop(seconds=1)
async def pinger():
try:
obj = get_data()
if not obj:
return
#obj is a mapping of pingers against its name
for pinger_name, pinger_obj in obj:
logger.debug(f'In {pinger_name}:')
courses = try_ping(pinger_obj)
for user in pinger_obj.metadata.user:
for course in user.list:
url, status = courses[course]
logger.debug(f' {course}: {status} ({user.id})')
if status == 'PING':
#create dm short circuits if dm is found
dm = await bot.get_guild(GUILD).get_member(user.id).create_dm()
#done button to ssc
view = ui.View()
view.add_item(discord.ui.Button(label='Get me to SSC', url=url, style=discord.ButtonStyle.primary))
await dm.send(f'## wake up `{course}` is currently open', view=view)
await asyncio.sleep(random.uniform(4, 7)) #total 4-7 secs
except Exception as e:
logger.error("Something went wrong during pinging:", exc_info=e)
#hope that 10s later something fixes itself
asyncio.sleep(10)
@bot.event
async def on_ready():
logger.info(f'Logged on as {bot.user}')
buttons = CourseButtons(timeout=None)
bot.add_view(buttons)
pinger.start()
#touch if not exist
open('courses.json', 'a').close()
bot.run(open('token.txt').read())

File Metadata

Mime Type
text/x-python
Expires
Sat, Jul 5, 3:18 PM (23 h, 29 m)
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
c0/4c/25b54ece8131c0386b32bb72786d

Event Timeline