diff --git a/.gitignore b/.gitignore | |
new file mode 100644 | |
index 0000000..d6f0d56 | |
--- /dev/null | |
+++ b/.gitignore | |
@@ -0,0 +1,3 @@ | |
+token.txt | |
+guild.txt | |
+courses.json | |
\ No newline at end of file | |
diff --git a/bot.py b/bot.py | |
new file mode 100644 | |
index 0000000..1fb79ca | |
--- /dev/null | |
+++ b/bot.py | |
@@ -0,0 +1,460 @@ | |
+from bs4 import * | |
+import discord, requests | |
+from discord.ext import commands, tasks | |
+from discord import ui | |
+from collections import defaultdict | |
+import json, logging, asyncio, random, re | |
+ | |
+#for getting members from to dm, otherwise use intents.members | |
+GUILD = int(open('guild.txt').read()) | |
+ | |
+ | |
+intents = discord.Intents.default() | |
+intents.message_content = True | |
+#intents.members = True #makes things way too slow so we bind to guild instead | |
+bot = commands.Bot(command_prefix='.pinger ', intents=intents) | |
+ | |
+logger = logging.getLogger('discord').getChild('ubcpinger') | |
+logger.setLevel(logging.DEBUG) | |
+ | |
+ | |
+ | |
+# helper for easier access of data in json | |
+class coursedict(defaultdict): | |
+ __getattr__ = dict.get | |
+ __setattr__ = dict.__setitem__ | |
+ __delattr__ = dict.__delitem__ | |
+ | |
+ #recursively convert all dicts/list into defattrdicts | |
+ def convert(self, data): | |
+ iter = data.items() if isinstance(data, dict) else enumerate(data) | |
+ for key, val in iter: | |
+ if type(val) in [dict, list]: | |
+ if type(val) == dict: | |
+ data[key] = coursedict(val) | |
+ self.convert(data[key]) | |
+ | |
+ def __init__(self, data: dict): | |
+ super().__init__(lambda: '', data) | |
+ self.convert(self) | |
+ | |
+ def __str__(self) -> str: | |
+ if self.subj: | |
+ s = f'{self.subj} {self.crsno}' | |
+ if self.section: | |
+ s += f' (section mode)' | |
+ else: | |
+ #funny ljust needed in actv so strip is needed to revert it on show | |
+ s += f', search:' + ' '.join([f'{k}={str(v).strip()}' for k,v in self if v and k not in ['metadata', 'subj', 'crsno']]) | |
+ return s | |
+ else: | |
+ return dict.__repr__(self) | |
+ | |
+ #remove the default factory print | |
+ __repr__ = dict.__repr__ | |
+ | |
+ #implicitly allow iter on items() | |
+ def __iter__(self): | |
+ return self.items().__iter__() | |
+ | |
+ | |
+ | |
+def get_data(): | |
+ with open('courses.json') as f: | |
+ data = f.read() | |
+ return coursedict(json.loads(data)) if data else {} | |
+ | |
+def set_data(obj): | |
+ with open('courses.json', 'w') as data: | |
+ data.write(json.dumps(obj, indent=2)) | |
+ | |
+ | |
+ | |
+def try_ping(pinger: coursedict): | |
+ optional = lambda fmtstr, *data: fmtstr.format(*data) if all(d for d in data) else '' | |
+ #must include user agent, accept, content type (if post), otherwise ssc will complain | |
+ #if no pname+tname specified, session cookie is needed to avoid "Please wait while your request is processed - this may take up to a minute to complete." | |
+ headers = { | |
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/114.0", | |
+ "Accept": "text/html", | |
+ } | |
+ | |
+ if pinger.section: #section mode | |
+ doc = requests.get('https://courses.students.ubc.ca/cs/courseschedule?pname=subjarea&tname=subj-course' | |
+ f'&dept={pinger.subj}' | |
+ f'&course={pinger.crsno}' | |
+ + optional('&sessyr={}&sesscd={}', pinger.sessyr, pinger.sesscd), headers=headers).text | |
+ else: #search mode | |
+ reqdata = dict(pinger) #make a copy and remove metadata | |
+ del reqdata['metadata'] | |
+ #no need to explicitly set content type if we set data= alr since requests does that for us | |
+ doc = requests.post('https://courses.students.ubc.ca/cs/courseschedule?pname=subjarea&tname=sectsearch', | |
+ data=reqdata, | |
+ headers=headers).text | |
+ | |
+ #they both use the same system to display sections | |
+ soup = BeautifulSoup(doc, 'html.parser') | |
+ | |
+ ret = {} | |
+ #remove thead, since we cant search directly with tbody as its doesnt necessarily exist in the doc | |
+ soup.select_one('.section-summary thead').clear() | |
+ for child in soup.select('.section-summary tr'): | |
+ section = child.select_one("td:nth-of-type(2) a") | |
+ status = child.select_one("td:nth-of-type(1)").text | |
+ ret[section.text] = 'https://courses.students.ubc.ca' + section['href'], (status if status != '\xa0' and status.strip() else 'PING') | |
+ | |
+ return ret | |
+ | |
+ | |
+#done make this a dict instead so its unique? this would remove the need of a "add myself to existing courses" button | |
+#it should be pretty simple to just match the basics but more optimization (like catching CPSC 110 101 == CPSC 110 1*) might be hard | |
+#and its practically impossible to actually figure out whether a search includes a specific section programmatically | |
+#actually it might be possible if i do a ping once and check if theres a strict superset/subset of the current object | |
+#still gonna be hard to optimize for pingers that span across multiple pingers | |
+def add_course(obj): | |
+ data = get_data() | |
+ | |
+ #obj.metadata.courses contains the entire list of sections that can be pinged through this pinger | |
+ #whereas obj.metadata.user[0].list contains the wanted list of sections (same for search mode, likely smaller for section mode) | |
+ | |
+ #ping, check against existing pingers' course list (in metadata), if matches merge | |
+ | |
+ resp = try_ping(obj) | |
+ if resp: | |
+ obj.metadata.courses = list(resp.keys()) | |
+ #populate user.list to signify which part of the pinger should trigger a ping | |
+ obj.metadata.user[0].list = list(obj.metadata.courses) | |
+ if obj.section: #do filtering (only needed in section mode, search mode is already filtered by ssc) | |
+ check = re.compile(f'{obj.subj} {obj.crsno} {obj.section.replace("*", ".*")}') | |
+ obj.metadata.user[0].list = [c for c in obj.metadata.user[0].list if check.match(c)] | |
+ #after we sort through that section should not be bound to a specific section now since the data is in user.list | |
+ #we do need to set data though to allow other things (e.g. try_ping) to match on whether its in search or section mode | |
+ obj.section = True | |
+ | |
+ courses = set(obj.metadata.user[0].list) | |
+ | |
+ | |
+ #now we have the proper name for the pinger | |
+ name = str(obj) | |
+ logger.info(f'Adding {name} to the pingers...') | |
+ | |
+ merged = False | |
+ for pinger_name, pinger_obj in data: #defattrdict not only for easier editing but also for copying for iteration | |
+ against = set(pinger_obj.metadata.courses) | |
+ #only check merging if they are the same session | |
+ if pinger_obj.sessyr == obj.sessyr and pinger_obj.sesscd == obj.sesscd: | |
+ #there should only be one of the cases below: either existing pinger(s) are subsets of the current one, or this pinger is a subset of an existing pinger | |
+ #otherwise it shouldve been optimized out since it is sequentially added and this optimzation runs every time | |
+ if courses.issuperset(against): | |
+ logger.info(f' Optimization: merging {pinger_name} into {name}') | |
+ obj.metadata.user += pinger_obj.metadata.user | |
+ del data[pinger_name] | |
+ elif against.issuperset(courses): | |
+ #this should be a terminating action to avoid adding it into multiple pingers when one is already handling it | |
+ #(think of a case where existing courses are [1234, 1256] and our pinger is [12] - 2 pingers match, but only 1 is needed) | |
+ pinger_obj.metadata.user += obj.metadata.user | |
+ logger.info(f' Optimization: merging {name} into {pinger_name}') | |
+ name = pinger_name | |
+ merged = True | |
+ break | |
+ | |
+ if not merged: | |
+ data[name] = obj | |
+ set_data(data) | |
+ | |
+ #returns the list of courses this pinger will ping, and the name (changed if merged) | |
+ return name, "\n".join(obj.metadata.user[0].list) | |
+ else: | |
+ raise ValueError('No sections found!') | |
+ | |
+#done add ppl to existing pingers? see above | |
+#done optimization for when multiple people are looking at the same course but want different sections pinged (and not all of them) | |
+ | |
+#TODO currently the way courses in section mode are handled means that they basically dont get optimized into search modes | |
+#it would be better if pinger.metadata.courses tracks the total set of courses its users are pinging instead, | |
+#and then have the optimizer do special code on reading section mode to not match on the courses in the list, but on the course names | |
+#aka if everything a user is pinging has the same prefix of CPSC 110 for example, | |
+#then just throw it into CPSC 110 section mode regardless even if currently the CPSC 110 pinger.metadata.courses does not have it | |
+#this means that the following has to be changed: | |
+# - instead of obj.metadata.courses = list(resp.keys()), do obj.metadata.courses = <total of all user lists> | |
+# - at the superset checks, add check on whether everything being added is in the same course, and if there is a course pinger in section mode for that already | |
+ | |
+def get_pingers_with_user(id): | |
+ ret = [] | |
+ for name, pinger in get_data(): | |
+ for user in pinger.metadata.user: | |
+ if user.id == id: | |
+ ret.append(name) | |
+ break | |
+ return ret | |
+ | |
+ | |
+ | |
+def opt(label, default=False): | |
+ return discord.SelectOption(label=label, default=default) | |
+ | |
+ | |
+class AddCourseDuration(ui.Modal, title='Enter the time range to search for'): | |
+ duration = ui.TextInput(label='Duration', placeholder='class duration (from-to, HHMM), e.g. 1000-1300, 1000-, -1300', min_length=5, max_length=9, required=False) | |
+ | |
+ async def on_submit(self, interaction: discord.Interaction): | |
+ #set default val for duration for next open | |
+ self.duration.default = self.duration.value | |
+ await interaction.response.defer() | |
+ | |
+class AddCourseSearch(discord.ui.View): | |
+ | |
+ #put in class to allow default changing on callback | |
+ #type opts is way too long, so some of the little used ones are truncated (opt("Reserved Section"),opt("Optional Section"),opt("Independent Study") | |
+ #TODO reorder to make it easier to search for the main ones? rn its alph order | |
+ type_opts = [opt("Directed Studies"),opt("Discussion"),opt("Essay/Report"),opt("Exchange Program"),opt("Experiential"),opt("Field Trip"),opt("Flexible Learning"),opt("Lab-Seminar"),opt("Laboratory"),opt("Lecture"),opt("Lecture-Discussion"),opt("Lecture-Laboratory"),opt("Lecture-Seminar"),opt("Practicum"),opt("Problem Session"),opt("Project"),opt("Rehearsal"),opt("Research"),opt("Seminar"),opt("Studio"),opt("Thesis"),opt("Tutorial"),opt("Waiting List"),opt("Work Placement"),opt("Workshop")] | |
+ term_opts = [opt("Term 1"), opt("Term 2"), opt("Term 1-2")] | |
+ #TODO remove sunday and saturday? oh actually there are a lot of courses on those days huh | |
+ days_opts = [opt("Sunday"), opt("Monday"), opt("Tuesday"), opt("Wednesday"), opt("Thursday"), opt("Friday"), opt("Saturday")] | |
+ cred_opts = [opt(str(i+1)) for i in range(6)] #pretty sure only 6 is allowed | |
+ | |
+ term_menu = ui.Select(placeholder='Term', options=term_opts, min_values=0, row=3, custom_id='ubcpinger-search-days-term-menu') | |
+ days_menu = ui.Select(placeholder='Days', options=days_opts, min_values=0, max_values=7, row=2, custom_id='ubcpinger-search-days-menu') | |
+ type_menu = ui.Select(placeholder='Type', options=type_opts, min_values=0, row=3, custom_id='ubcpinger-search-days-type-menu') | |
+ term_menu = ui.Select(placeholder='Term', options=term_opts, min_values=0, row=3, custom_id='ubcpinger-search-days-term-menu') | |
+ cred_menu = ui.Select(placeholder='Credits', options=cred_opts, min_values=0, row=3, custom_id='ubcpinger-search-days-cred-menu') | |
+ | |
+ | |
+ def __init__(self, base): | |
+ super().__init__() | |
+ self.base = base | |
+ self.dur = AddCourseDuration() | |
+ | |
+ async def change_menu(self, item: discord.ui.Select, interaction: discord.Interaction): | |
+ #remove opened menus | |
+ for i in self.children: | |
+ if isinstance(i, discord.ui.Select): | |
+ self.remove_item(i) | |
+ if item: | |
+ self.add_item(item) | |
+ await interaction.response.edit_message(view=self) | |
+ | |
+ async def interaction_check(self, interaction: discord.Interaction): | |
+ #set default and close menu as response; we can extract the data later on done | |
+ if 'custom_id' in interaction.data and '-menu' in interaction.data['custom_id']: | |
+ #really hacky but im also too sleepy to actually figure this out lmao | |
+ for opt in getattr(self, interaction.data['custom_id'].split('-')[2] + '_opts'): | |
+ #seems like setting default doesnt really work for the single option dropdowns which kinda checks out but hmmm | |
+ if opt.label in interaction.data['values']: | |
+ opt.default = True | |
+ else: | |
+ #remember to reset too | |
+ opt.default = False | |
+ await self.change_menu(None, interaction) | |
+ return False | |
+ return True | |
+ | |
+ | |
+ @discord.ui.button(label='Days', custom_id='ubcpinger-search-days', style = discord.ButtonStyle.primary) | |
+ async def days(self, interaction: discord.Interaction, button: discord.ui.Button): | |
+ await self.change_menu(self.days_menu, interaction) | |
+ | |
+ @discord.ui.button(label='Type', custom_id='ubcpinger-search-type', style = discord.ButtonStyle.primary) | |
+ async def type(self, interaction: discord.Interaction, button: discord.ui.Button): | |
+ await self.change_menu(self.type_menu, interaction) | |
+ | |
+ @discord.ui.button(label='Term', custom_id='ubcpinger-search-term', style = discord.ButtonStyle.primary) | |
+ async def term(self, interaction: discord.Interaction, button: discord.ui.Button): | |
+ await self.change_menu(self.term_menu, interaction) | |
+ | |
+ @discord.ui.button(label='Credits', custom_id='ubcpinger-search-cred', row=1, style = discord.ButtonStyle.primary) | |
+ async def cred(self, interaction: discord.Interaction, button: discord.ui.Button): | |
+ await self.change_menu(self.cred_menu, interaction) | |
+ | |
+ @discord.ui.button(label='Done', custom_id='ubcpinger-search-done', row=1, style = discord.ButtonStyle.success) | |
+ async def done(self, interaction: discord.Interaction, button: discord.ui.Button): | |
+ try: | |
+ optional = lambda data: data.values[0] if data.values else '' | |
+ | |
+ #most of these values are selected for us already so no need to parse | |
+ | |
+ #since we set default on callback that is basically the canonical value of whether the day is selected | |
+ #also make it a dict for easier shoving in through kwargs in the pinger loop | |
+ days = {f'DAY{i+1}': True for i, o in enumerate(self.days_opts) if o.default} | |
+ | |
+ | |
+ stime, etime = self.dur.duration.value.split('-') if self.dur.duration.value else '', '' | |
+ | |
+ obj = coursedict({ | |
+ 'subj': self.base.subj.value, | |
+ 'crsno': self.base.course.value, | |
+ #note how theres no section | |
+ 'credit': optional(self.cred_menu), | |
+ 'stime': stime, | |
+ 'etime': etime, | |
+ **days, | |
+ **self.base.sesobj, | |
+ 'actv': optional(self.type_menu).ljust(50), | |
+ 'term': optional(self.term_menu), | |
+ 'metadata': {'user': [{'id': interaction.user.id, 'list': None}]} | |
+ }) | |
+ | |
+ #delegate name setting to defattrdict formatting | |
+ name, added = add_course(obj) | |
+ | |
+ #edit the menu instead of sending another ephemeral msg since we cant delete the old one | |
+ #TODO figure out a better way to print since ill be using that to show a dropdown menu for the removal ui too | |
+ await interaction.response.edit_message(content=f'The pinger `{name}` is now tracking the requested classes! This includes the following:\n```\n{added}\n```\nYou will now get pinged when it opens up.', view=None) | |
+ except Exception as e: | |
+ logger.error('AddCourseSearch fail:', exc_info=e) | |
+ await interaction.response.edit_message(content=f'Something went wrong (likely wrong params): {type(e)}\n{e}', view=None) | |
+ | |
+ #at the end to wrap it around nicer | |
+ @discord.ui.button(label='Time range', custom_id='ubcpinger-search-duration', row=1, style = discord.ButtonStyle.primary) | |
+ async def duration(self, interaction: discord.Interaction, button: discord.ui.Button): | |
+ await interaction.response.send_modal(self.dur) | |
+ | |
+class AddCourseBase(ui.Modal, title='Add a course to ping'): | |
+ | |
+ subj = ui.TextInput(label='Subject', placeholder='e.g. CPSC, CP*', min_length=1, max_length=4) | |
+ course = ui.TextInput(label='Course no.', placeholder='e.g. 110, 1*', min_length=1, max_length=3) | |
+ | |
+ section = ui.TextInput(label='Section', placeholder='e.g. 101, T2*, 10*/201/T1D, leave blank to open search menu', min_length=1, max_length=3, required=False) | |
+ | |
+ session = ui.TextInput(label='Session', placeholder='e.g. 2022S, 2023W, defaults to coming session', min_length=5, max_length=5, required=False) | |
+ | |
+ async def on_submit(self, interaction: discord.Interaction): | |
+ try: | |
+ #response in this case is the same msg as the button, dont do anything to it | |
+ await interaction.response.defer() | |
+ | |
+ #parse session | |
+ self.sesobj = {} | |
+ if self.session.value: | |
+ self.sesobj = { | |
+ 'sessyr': self.session.value[:4], | |
+ 'sesscd': self.session.value[-1], | |
+ } | |
+ | |
+ if not self.section.value: | |
+ #discord doesnt allow us to send another modal right away? (yep check InteractionResponseType, 9 is modal and not in the list) | |
+ await interaction.followup.send('Configure the search parameters below:', view=AddCourseSearch(self), ephemeral=True) | |
+ else: | |
+ if '*' in self.subj.value or '*' in self.course.value: | |
+ await interaction.response.send_message('Wildcards for subject and courses are not allowed in section mode! Leave section blank to do a broad search.', ephemeral=True) | |
+ return | |
+ | |
+ obj = coursedict({ | |
+ 'subj': self.subj.value, | |
+ 'crsno': self.course.value, | |
+ #even though we dont actually use the wildcard we need to differentiate between no section and a fully wildcard section so dont remove yet | |
+ 'section': self.section.value, | |
+ **self.sesobj, | |
+ 'metadata': {'user': [{'id': interaction.user.id, 'list': None}]} | |
+ }) | |
+ | |
+ name, added = add_course(obj) | |
+ await interaction.followup.send(f'The pinger `{name}` is now tracking the requested classes! This includes the following:\n```\n{added}\n```\nYou will now get pinged when it opens up.', ephemeral=True) | |
+ except Exception as e: | |
+ logger.error('AddCourseBase fail:', exc_info=e) | |
+ await interaction.followup.send(content=f'Something went wrong (likely wrong params): {type(e)}\n{e}', ephemeral=True) | |
+ | |
+#TODO dm option? or maybe just only allow dms (iirc the reason why i dont dm is coz theres no mechanism to dm multiple ppl or sth so i just send a single msg to ping everyone) | |
+ | |
+ | |
+class RemoveCourseList(ui.View): | |
+ #i love python asyncio : ) why the fuck | |
+ @classmethod | |
+ async def create(self, user): | |
+ if not await bot.is_owner(user): | |
+ pingers = [opt(s) for s in get_pingers_with_user(user.id)] | |
+ else: | |
+ pingers = [opt(s) for s in get_data().keys()] | |
+ return RemoveCourseList(user, pingers) | |
+ | |
+ def __init__(self, user, pingers): | |
+ super().__init__() | |
+ self.user = user | |
+ self.menu = ui.Select(placeholder='Pinger', options=pingers, min_values=1, max_values=len(pingers), custom_id='ubcpinger-remove-menu') | |
+ self.add_item(self.menu) | |
+ | |
+ async def interaction_check(self, interaction: discord.Interaction): | |
+ if 'custom_id' in interaction.data and interaction.data['custom_id'] == 'ubcpinger-remove-menu': | |
+ data = get_data() | |
+ for name in interaction.data['values']: | |
+ #if bot owner, on remove of pinger with own id added, remove normally; else remove the entire thing | |
+ removed = False | |
+ for user in data[name].metadata.user: | |
+ if user.id == self.user.id: | |
+ data[name].metadata.user.remove(user) | |
+ removed = True | |
+ #if empty pinger, remove the entire pinger | |
+ if not data[name].metadata.user: | |
+ logger.info(f"{data[name]} is now empty, removing...") | |
+ del data[name] | |
+ break | |
+ if not removed: | |
+ if self.id == bot.owner_id: | |
+ del data[name] | |
+ else: | |
+ logger.error(f'Something pretty wrong happened: {data} does not have {self.id} even though on setup it had') | |
+ set_data(data) | |
+ | |
+ pinger_names = "\n".join(interaction.data['values']) | |
+ await interaction.response.edit_message(content=f"Successfully removed you from these pingers:\n```\n{pinger_names}\n```", view=None) | |
+ | |
+ | |
+class CourseButtons(discord.ui.View): | |
+ @discord.ui.button(label="Add course", style=discord.ButtonStyle.green, custom_id='ubcpinger-add') | |
+ async def add(self, interaction: discord.Interaction, button: discord.ui.Button): | |
+ await interaction.response.send_modal(AddCourseBase()) | |
+ | |
+ @discord.ui.button(label="Remove course", style=discord.ButtonStyle.red, custom_id='ubcpinger-remove') | |
+ async def remove(self, interaction: discord.Interaction, button: discord.ui.Button): | |
+ await interaction.response.defer() | |
+ await interaction.followup.send("Choose a course to remove:", view=await RemoveCourseList.create(interaction.user), ephemeral=True) | |
+ | |
+#done remove button (only allow ppl to remove their own, except if user id matches me so i can actually do cleanup and stuff without going into the server) | |
+ | |
+#TODO button for viewing what courses you get pinged for in a pinger? rn its not too descriptive since the optimizer makes it the broadest possible | |
+ | |
[email protected](seconds=1) | |
+async def pinger(): | |
+ try: | |
+ obj = get_data() | |
+ | |
+ if not obj: | |
+ return | |
+ | |
+ #obj is a mapping of pingers against its name | |
+ for pinger_name, pinger_obj in obj: | |
+ logger.debug(f'In {pinger_name}:') | |
+ courses = try_ping(pinger_obj) | |
+ for user in pinger_obj.metadata.user: | |
+ for course in user.list: | |
+ url, status = courses[course] | |
+ logger.debug(f' {course}: {status} ({user.id})') | |
+ if status == 'PING': | |
+ #create dm short circuits if dm is found | |
+ dm = await bot.get_guild(GUILD).get_member(user.id).create_dm() | |
+ #done button to ssc | |
+ view = ui.View() | |
+ view.add_item(discord.ui.Button(label='Get me to SSC', url=url, style=discord.ButtonStyle.primary)) | |
+ await dm.send(f'## wake up `{course}` is currently open', view=view) | |
+ | |
+ await asyncio.sleep(random.uniform(4, 7)) #total 4-7 secs | |
+ except Exception as e: | |
+ logger.error("Something went wrong during pinging:", exc_info=e) | |
+ #hope that 10s later something fixes itself | |
+ asyncio.sleep(10) | |
+ | |
+ | |
+ | |
[email protected] | |
+async def on_ready(): | |
+ logger.info(f'Logged on as {bot.user}') | |
+ | |
+ buttons = CourseButtons(timeout=None) | |
+ bot.add_view(buttons) | |
+ | |
+ pinger.start() | |
+ | |
+#touch if not exist | |
+open('courses.json', 'a').close() | |
+ | |
+bot.run(open('token.txt').read()) |
File Metadata
File Metadata
- Mime Type
- text/x-diff
- Expires
- Tue, Apr 15, 2:55 AM (22 h, 29 m)
- Storage Engine
- local-disk
- Storage Format
- Raw Data
- Storage Handle
- e4/10/8fb9fe81dfa735b02292a97567b3