diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d6f0d56 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +token.txt +guild.txt +courses.json \ No newline at end of file diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..1fb79ca --- /dev/null +++ b/bot.py @@ -0,0 +1,460 @@ +from bs4 import * +import discord, requests +from discord.ext import commands, tasks +from discord import ui +from collections import defaultdict +import json, logging, asyncio, random, re + +#for getting members from to dm, otherwise use intents.members +GUILD = int(open('guild.txt').read()) + + +intents = discord.Intents.default() +intents.message_content = True +#intents.members = True #makes things way too slow so we bind to guild instead +bot = commands.Bot(command_prefix='.pinger ', intents=intents) + +logger = logging.getLogger('discord').getChild('ubcpinger') +logger.setLevel(logging.DEBUG) + + + +# helper for easier access of data in json +class coursedict(defaultdict): + __getattr__ = dict.get + __setattr__ = dict.__setitem__ + __delattr__ = dict.__delitem__ + + #recursively convert all dicts/list into defattrdicts + def convert(self, data): + iter = data.items() if isinstance(data, dict) else enumerate(data) + for key, val in iter: + if type(val) in [dict, list]: + if type(val) == dict: + data[key] = coursedict(val) + self.convert(data[key]) + + def __init__(self, data: dict): + super().__init__(lambda: '', data) + self.convert(self) + + def __str__(self) -> str: + if self.subj: + s = f'{self.subj} {self.crsno}' + if self.section: + s += f' (section mode)' + else: + #funny ljust needed in actv so strip is needed to revert it on show + s += f', search:' + ' '.join([f'{k}={str(v).strip()}' for k,v in self if v and k not in ['metadata', 'subj', 'crsno']]) + return s + else: + return dict.__repr__(self) + + #remove the default factory print + __repr__ = dict.__repr__ + + #implicitly allow iter on items() + def __iter__(self): + return self.items().__iter__() + + + +def get_data(): + with open('courses.json') as f: + data = f.read() + return coursedict(json.loads(data)) if data else {} + +def set_data(obj): + with open('courses.json', 'w') as data: + data.write(json.dumps(obj, indent=2)) + + + +def try_ping(pinger: coursedict): + optional = lambda fmtstr, *data: fmtstr.format(*data) if all(d for d in data) else '' + #must include user agent, accept, content type (if post), otherwise ssc will complain + #if no pname+tname specified, session cookie is needed to avoid "Please wait while your request is processed - this may take up to a minute to complete." + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/114.0", + "Accept": "text/html", + } + + if pinger.section: #section mode + doc = requests.get('https://courses.students.ubc.ca/cs/courseschedule?pname=subjarea&tname=subj-course' + f'&dept={pinger.subj}' + f'&course={pinger.crsno}' + + optional('&sessyr={}&sesscd={}', pinger.sessyr, pinger.sesscd), headers=headers).text + else: #search mode + reqdata = dict(pinger) #make a copy and remove metadata + del reqdata['metadata'] + #no need to explicitly set content type if we set data= alr since requests does that for us + doc = requests.post('https://courses.students.ubc.ca/cs/courseschedule?pname=subjarea&tname=sectsearch', + data=reqdata, + headers=headers).text + + #they both use the same system to display sections + soup = BeautifulSoup(doc, 'html.parser') + + ret = {} + #remove thead, since we cant search directly with tbody as its doesnt necessarily exist in the doc + soup.select_one('.section-summary thead').clear() + for child in soup.select('.section-summary tr'): + section = child.select_one("td:nth-of-type(2) a") + status = child.select_one("td:nth-of-type(1)").text + ret[section.text] = 'https://courses.students.ubc.ca' + section['href'], (status if status != '\xa0' and status.strip() else 'PING') + + return ret + + +#done make this a dict instead so its unique? this would remove the need of a "add myself to existing courses" button +#it should be pretty simple to just match the basics but more optimization (like catching CPSC 110 101 == CPSC 110 1*) might be hard +#and its practically impossible to actually figure out whether a search includes a specific section programmatically +#actually it might be possible if i do a ping once and check if theres a strict superset/subset of the current object +#still gonna be hard to optimize for pingers that span across multiple pingers +def add_course(obj): + data = get_data() + + #obj.metadata.courses contains the entire list of sections that can be pinged through this pinger + #whereas obj.metadata.user[0].list contains the wanted list of sections (same for search mode, likely smaller for section mode) + + #ping, check against existing pingers' course list (in metadata), if matches merge + + resp = try_ping(obj) + if resp: + obj.metadata.courses = list(resp.keys()) + #populate user.list to signify which part of the pinger should trigger a ping + obj.metadata.user[0].list = list(obj.metadata.courses) + if obj.section: #do filtering (only needed in section mode, search mode is already filtered by ssc) + check = re.compile(f'{obj.subj} {obj.crsno} {obj.section.replace("*", ".*")}') + obj.metadata.user[0].list = [c for c in obj.metadata.user[0].list if check.match(c)] + #after we sort through that section should not be bound to a specific section now since the data is in user.list + #we do need to set data though to allow other things (e.g. try_ping) to match on whether its in search or section mode + obj.section = True + + courses = set(obj.metadata.user[0].list) + + + #now we have the proper name for the pinger + name = str(obj) + logger.info(f'Adding {name} to the pingers...') + + merged = False + for pinger_name, pinger_obj in data: #defattrdict not only for easier editing but also for copying for iteration + against = set(pinger_obj.metadata.courses) + #only check merging if they are the same session + if pinger_obj.sessyr == obj.sessyr and pinger_obj.sesscd == obj.sesscd: + #there should only be one of the cases below: either existing pinger(s) are subsets of the current one, or this pinger is a subset of an existing pinger + #otherwise it shouldve been optimized out since it is sequentially added and this optimzation runs every time + if courses.issuperset(against): + logger.info(f' Optimization: merging {pinger_name} into {name}') + obj.metadata.user += pinger_obj.metadata.user + del data[pinger_name] + elif against.issuperset(courses): + #this should be a terminating action to avoid adding it into multiple pingers when one is already handling it + #(think of a case where existing courses are [1234, 1256] and our pinger is [12] - 2 pingers match, but only 1 is needed) + pinger_obj.metadata.user += obj.metadata.user + logger.info(f' Optimization: merging {name} into {pinger_name}') + name = pinger_name + merged = True + break + + if not merged: + data[name] = obj + set_data(data) + + #returns the list of courses this pinger will ping, and the name (changed if merged) + return name, "\n".join(obj.metadata.user[0].list) + else: + raise ValueError('No sections found!') + +#done add ppl to existing pingers? see above +#done optimization for when multiple people are looking at the same course but want different sections pinged (and not all of them) + +#TODO currently the way courses in section mode are handled means that they basically dont get optimized into search modes +#it would be better if pinger.metadata.courses tracks the total set of courses its users are pinging instead, +#and then have the optimizer do special code on reading section mode to not match on the courses in the list, but on the course names +#aka if everything a user is pinging has the same prefix of CPSC 110 for example, +#then just throw it into CPSC 110 section mode regardless even if currently the CPSC 110 pinger.metadata.courses does not have it +#this means that the following has to be changed: +# - instead of obj.metadata.courses = list(resp.keys()), do obj.metadata.courses = +# - at the superset checks, add check on whether everything being added is in the same course, and if there is a course pinger in section mode for that already + +def get_pingers_with_user(id): + ret = [] + for name, pinger in get_data(): + for user in pinger.metadata.user: + if user.id == id: + ret.append(name) + break + return ret + + + +def opt(label, default=False): + return discord.SelectOption(label=label, default=default) + + +class AddCourseDuration(ui.Modal, title='Enter the time range to search for'): + duration = ui.TextInput(label='Duration', placeholder='class duration (from-to, HHMM), e.g. 1000-1300, 1000-, -1300', min_length=5, max_length=9, required=False) + + async def on_submit(self, interaction: discord.Interaction): + #set default val for duration for next open + self.duration.default = self.duration.value + await interaction.response.defer() + +class AddCourseSearch(discord.ui.View): + + #put in class to allow default changing on callback + #type opts is way too long, so some of the little used ones are truncated (opt("Reserved Section"),opt("Optional Section"),opt("Independent Study") + #TODO reorder to make it easier to search for the main ones? rn its alph order + type_opts = [opt("Directed Studies"),opt("Discussion"),opt("Essay/Report"),opt("Exchange Program"),opt("Experiential"),opt("Field Trip"),opt("Flexible Learning"),opt("Lab-Seminar"),opt("Laboratory"),opt("Lecture"),opt("Lecture-Discussion"),opt("Lecture-Laboratory"),opt("Lecture-Seminar"),opt("Practicum"),opt("Problem Session"),opt("Project"),opt("Rehearsal"),opt("Research"),opt("Seminar"),opt("Studio"),opt("Thesis"),opt("Tutorial"),opt("Waiting List"),opt("Work Placement"),opt("Workshop")] + term_opts = [opt("Term 1"), opt("Term 2"), opt("Term 1-2")] + #TODO remove sunday and saturday? oh actually there are a lot of courses on those days huh + days_opts = [opt("Sunday"), opt("Monday"), opt("Tuesday"), opt("Wednesday"), opt("Thursday"), opt("Friday"), opt("Saturday")] + cred_opts = [opt(str(i+1)) for i in range(6)] #pretty sure only 6 is allowed + + term_menu = ui.Select(placeholder='Term', options=term_opts, min_values=0, row=3, custom_id='ubcpinger-search-days-term-menu') + days_menu = ui.Select(placeholder='Days', options=days_opts, min_values=0, max_values=7, row=2, custom_id='ubcpinger-search-days-menu') + type_menu = ui.Select(placeholder='Type', options=type_opts, min_values=0, row=3, custom_id='ubcpinger-search-days-type-menu') + term_menu = ui.Select(placeholder='Term', options=term_opts, min_values=0, row=3, custom_id='ubcpinger-search-days-term-menu') + cred_menu = ui.Select(placeholder='Credits', options=cred_opts, min_values=0, row=3, custom_id='ubcpinger-search-days-cred-menu') + + + def __init__(self, base): + super().__init__() + self.base = base + self.dur = AddCourseDuration() + + async def change_menu(self, item: discord.ui.Select, interaction: discord.Interaction): + #remove opened menus + for i in self.children: + if isinstance(i, discord.ui.Select): + self.remove_item(i) + if item: + self.add_item(item) + await interaction.response.edit_message(view=self) + + async def interaction_check(self, interaction: discord.Interaction): + #set default and close menu as response; we can extract the data later on done + if 'custom_id' in interaction.data and '-menu' in interaction.data['custom_id']: + #really hacky but im also too sleepy to actually figure this out lmao + for opt in getattr(self, interaction.data['custom_id'].split('-')[2] + '_opts'): + #seems like setting default doesnt really work for the single option dropdowns which kinda checks out but hmmm + if opt.label in interaction.data['values']: + opt.default = True + else: + #remember to reset too + opt.default = False + await self.change_menu(None, interaction) + return False + return True + + + @discord.ui.button(label='Days', custom_id='ubcpinger-search-days', style = discord.ButtonStyle.primary) + async def days(self, interaction: discord.Interaction, button: discord.ui.Button): + await self.change_menu(self.days_menu, interaction) + + @discord.ui.button(label='Type', custom_id='ubcpinger-search-type', style = discord.ButtonStyle.primary) + async def type(self, interaction: discord.Interaction, button: discord.ui.Button): + await self.change_menu(self.type_menu, interaction) + + @discord.ui.button(label='Term', custom_id='ubcpinger-search-term', style = discord.ButtonStyle.primary) + async def term(self, interaction: discord.Interaction, button: discord.ui.Button): + await self.change_menu(self.term_menu, interaction) + + @discord.ui.button(label='Credits', custom_id='ubcpinger-search-cred', row=1, style = discord.ButtonStyle.primary) + async def cred(self, interaction: discord.Interaction, button: discord.ui.Button): + await self.change_menu(self.cred_menu, interaction) + + @discord.ui.button(label='Done', custom_id='ubcpinger-search-done', row=1, style = discord.ButtonStyle.success) + async def done(self, interaction: discord.Interaction, button: discord.ui.Button): + try: + optional = lambda data: data.values[0] if data.values else '' + + #most of these values are selected for us already so no need to parse + + #since we set default on callback that is basically the canonical value of whether the day is selected + #also make it a dict for easier shoving in through kwargs in the pinger loop + days = {f'DAY{i+1}': True for i, o in enumerate(self.days_opts) if o.default} + + + stime, etime = self.dur.duration.value.split('-') if self.dur.duration.value else '', '' + + obj = coursedict({ + 'subj': self.base.subj.value, + 'crsno': self.base.course.value, + #note how theres no section + 'credit': optional(self.cred_menu), + 'stime': stime, + 'etime': etime, + **days, + **self.base.sesobj, + 'actv': optional(self.type_menu).ljust(50), + 'term': optional(self.term_menu), + 'metadata': {'user': [{'id': interaction.user.id, 'list': None}]} + }) + + #delegate name setting to defattrdict formatting + name, added = add_course(obj) + + #edit the menu instead of sending another ephemeral msg since we cant delete the old one + #TODO figure out a better way to print since ill be using that to show a dropdown menu for the removal ui too + await interaction.response.edit_message(content=f'The pinger `{name}` is now tracking the requested classes! This includes the following:\n```\n{added}\n```\nYou will now get pinged when it opens up.', view=None) + except Exception as e: + logger.error('AddCourseSearch fail:', exc_info=e) + await interaction.response.edit_message(content=f'Something went wrong (likely wrong params): {type(e)}\n{e}', view=None) + + #at the end to wrap it around nicer + @discord.ui.button(label='Time range', custom_id='ubcpinger-search-duration', row=1, style = discord.ButtonStyle.primary) + async def duration(self, interaction: discord.Interaction, button: discord.ui.Button): + await interaction.response.send_modal(self.dur) + +class AddCourseBase(ui.Modal, title='Add a course to ping'): + + subj = ui.TextInput(label='Subject', placeholder='e.g. CPSC, CP*', min_length=1, max_length=4) + course = ui.TextInput(label='Course no.', placeholder='e.g. 110, 1*', min_length=1, max_length=3) + + section = ui.TextInput(label='Section', placeholder='e.g. 101, T2*, 10*/201/T1D, leave blank to open search menu', min_length=1, max_length=3, required=False) + + session = ui.TextInput(label='Session', placeholder='e.g. 2022S, 2023W, defaults to coming session', min_length=5, max_length=5, required=False) + + async def on_submit(self, interaction: discord.Interaction): + try: + #response in this case is the same msg as the button, dont do anything to it + await interaction.response.defer() + + #parse session + self.sesobj = {} + if self.session.value: + self.sesobj = { + 'sessyr': self.session.value[:4], + 'sesscd': self.session.value[-1], + } + + if not self.section.value: + #discord doesnt allow us to send another modal right away? (yep check InteractionResponseType, 9 is modal and not in the list) + await interaction.followup.send('Configure the search parameters below:', view=AddCourseSearch(self), ephemeral=True) + else: + if '*' in self.subj.value or '*' in self.course.value: + await interaction.response.send_message('Wildcards for subject and courses are not allowed in section mode! Leave section blank to do a broad search.', ephemeral=True) + return + + obj = coursedict({ + 'subj': self.subj.value, + 'crsno': self.course.value, + #even though we dont actually use the wildcard we need to differentiate between no section and a fully wildcard section so dont remove yet + 'section': self.section.value, + **self.sesobj, + 'metadata': {'user': [{'id': interaction.user.id, 'list': None}]} + }) + + name, added = add_course(obj) + await interaction.followup.send(f'The pinger `{name}` is now tracking the requested classes! This includes the following:\n```\n{added}\n```\nYou will now get pinged when it opens up.', ephemeral=True) + except Exception as e: + logger.error('AddCourseBase fail:', exc_info=e) + await interaction.followup.send(content=f'Something went wrong (likely wrong params): {type(e)}\n{e}', ephemeral=True) + +#TODO dm option? or maybe just only allow dms (iirc the reason why i dont dm is coz theres no mechanism to dm multiple ppl or sth so i just send a single msg to ping everyone) + + +class RemoveCourseList(ui.View): + #i love python asyncio : ) why the fuck + @classmethod + async def create(self, user): + if not await bot.is_owner(user): + pingers = [opt(s) for s in get_pingers_with_user(user.id)] + else: + pingers = [opt(s) for s in get_data().keys()] + return RemoveCourseList(user, pingers) + + def __init__(self, user, pingers): + super().__init__() + self.user = user + self.menu = ui.Select(placeholder='Pinger', options=pingers, min_values=1, max_values=len(pingers), custom_id='ubcpinger-remove-menu') + self.add_item(self.menu) + + async def interaction_check(self, interaction: discord.Interaction): + if 'custom_id' in interaction.data and interaction.data['custom_id'] == 'ubcpinger-remove-menu': + data = get_data() + for name in interaction.data['values']: + #if bot owner, on remove of pinger with own id added, remove normally; else remove the entire thing + removed = False + for user in data[name].metadata.user: + if user.id == self.user.id: + data[name].metadata.user.remove(user) + removed = True + #if empty pinger, remove the entire pinger + if not data[name].metadata.user: + logger.info(f"{data[name]} is now empty, removing...") + del data[name] + break + if not removed: + if self.id == bot.owner_id: + del data[name] + else: + logger.error(f'Something pretty wrong happened: {data} does not have {self.id} even though on setup it had') + set_data(data) + + pinger_names = "\n".join(interaction.data['values']) + await interaction.response.edit_message(content=f"Successfully removed you from these pingers:\n```\n{pinger_names}\n```", view=None) + + +class CourseButtons(discord.ui.View): + @discord.ui.button(label="Add course", style=discord.ButtonStyle.green, custom_id='ubcpinger-add') + async def add(self, interaction: discord.Interaction, button: discord.ui.Button): + await interaction.response.send_modal(AddCourseBase()) + + @discord.ui.button(label="Remove course", style=discord.ButtonStyle.red, custom_id='ubcpinger-remove') + async def remove(self, interaction: discord.Interaction, button: discord.ui.Button): + await interaction.response.defer() + await interaction.followup.send("Choose a course to remove:", view=await RemoveCourseList.create(interaction.user), ephemeral=True) + +#done remove button (only allow ppl to remove their own, except if user id matches me so i can actually do cleanup and stuff without going into the server) + +#TODO button for viewing what courses you get pinged for in a pinger? rn its not too descriptive since the optimizer makes it the broadest possible + +@tasks.loop(seconds=1) +async def pinger(): + try: + obj = get_data() + + if not obj: + return + + #obj is a mapping of pingers against its name + for pinger_name, pinger_obj in obj: + logger.debug(f'In {pinger_name}:') + courses = try_ping(pinger_obj) + for user in pinger_obj.metadata.user: + for course in user.list: + url, status = courses[course] + logger.debug(f' {course}: {status} ({user.id})') + if status == 'PING': + #create dm short circuits if dm is found + dm = await bot.get_guild(GUILD).get_member(user.id).create_dm() + #done button to ssc + view = ui.View() + view.add_item(discord.ui.Button(label='Get me to SSC', url=url, style=discord.ButtonStyle.primary)) + await dm.send(f'## wake up `{course}` is currently open', view=view) + + await asyncio.sleep(random.uniform(4, 7)) #total 4-7 secs + except Exception as e: + logger.error("Something went wrong during pinging:", exc_info=e) + #hope that 10s later something fixes itself + asyncio.sleep(10) + + + +@bot.event +async def on_ready(): + logger.info(f'Logged on as {bot.user}') + + buttons = CourseButtons(timeout=None) + bot.add_view(buttons) + + pinger.start() + +#touch if not exist +open('courses.json', 'a').close() + +bot.run(open('token.txt').read())