diff --git a/cogs/__init__.py b/cogs/__init__.py index ce6bff2a..6282c242 100644 --- a/cogs/__init__.py +++ b/cogs/__init__.py @@ -37,6 +37,7 @@ from .remind_me import ClearRemindersBacklogTaskCog, RemindMeCommandCog from .send_get_roles_reminders import SendGetRolesRemindersTaskCog from .send_introduction_reminders import SendIntroductionRemindersTaskCog +from .society_events import SocietyEventsSlashCommandsCog from .source import SourceCommandCog from .startup import StartupCog from .stats import StatsCommandsCog @@ -77,6 +78,7 @@ "RemindMeCommandCog", "SendGetRolesRemindersTaskCog", "SendIntroductionRemindersTaskCog", + "SocietyEventsSlashCommandsCog", "SourceCommandCog", "StartupCog", "StatsCommandsCog", @@ -119,6 +121,7 @@ def setup(bot: "TeXBot") -> None: SendGetRolesRemindersTaskCog, SendIntroductionRemindersTaskCog, SourceCommandCog, + SocietyEventsSlashCommandsCog, StartupCog, StatsCommandsCog, StrikeCommandCog, diff --git a/cogs/make_member.py b/cogs/make_member.py index 16a5cf8e..5bc9df9e 100644 --- a/cogs/make_member.py +++ b/cogs/make_member.py @@ -4,10 +4,7 @@ import re from typing import TYPE_CHECKING -import aiohttp -import bs4 import discord -from bs4 import BeautifulSoup from django.core.exceptions import ValidationError from config import settings @@ -22,6 +19,8 @@ from utils import TeXBotApplicationContext +from utils.msl import get_membership_count, is_student_id_member + __all__: "Sequence[str]" = ("MakeMemberCommandCog", "MemberCountCommandCog") logger: "Final[Logger]" = logging.getLogger("TeX-Bot") @@ -157,92 +156,32 @@ async def make_member(self, ctx: "TeXBotApplicationContext", group_member_id: st ) return - guild_member_ids: set[str] = set() - - http_session: aiohttp.ClientSession = aiohttp.ClientSession( - headers=REQUEST_HEADERS, - cookies=REQUEST_COOKIES, - ) - async with http_session, http_session.get(GROUPED_MEMBERS_URL) as http_response: - response_html: str = await http_response.text() - - MEMBER_HTML_TABLE_IDS: Final[frozenset[str]] = frozenset( - { - "ctl00_Main_rptGroups_ctl05_gvMemberships", - "ctl00_Main_rptGroups_ctl03_gvMemberships", - "ctl00_ctl00_Main_AdminPageContent_rptGroups_ctl03_gvMemberships", - "ctl00_ctl00_Main_AdminPageContent_rptGroups_ctl05_gvMemberships", - }, + if not await is_student_id_member(student_id=group_member_id): + await self.command_send_error( + ctx=ctx, + message=( + f"You must be a member of {self.bot.group_full_name} " + "to use this command.\n" + f"The provided {_GROUP_MEMBER_ID_ARGUMENT_NAME} must match " + f"the {self.bot.group_member_id_type} ID " + f"that you purchased your {self.bot.group_short_name} membership with." + ), ) - table_id: str - for table_id in MEMBER_HTML_TABLE_IDS: - parsed_html: bs4.Tag | bs4.NavigableString | None = BeautifulSoup( - response_html, - "html.parser", - ).find( - "table", - {"id": table_id}, - ) - - if parsed_html is None or isinstance(parsed_html, bs4.NavigableString): - continue - guild_member_ids.update( - row.contents[2].text - for row in parsed_html.find_all( - "tr", - {"class": ["msl_row", "msl_altrow"]}, - ) + try: + await GroupMadeMember.objects.acreate(group_member_id=group_member_id) # type: ignore[misc] + except ValidationError as create_group_made_member_error: + error_is_already_exists: bool = ( + "hashed_group_member_id" in create_group_made_member_error.message_dict + and any( + "already exists" in error + for error in create_group_made_member_error.message_dict[ + "hashed_group_member_id" + ] ) - - guild_member_ids.discard("") - guild_member_ids.discard("\n") - guild_member_ids.discard(" ") - - if not guild_member_ids: - await self.command_send_error( - ctx, - error_code="E1041", - logging_message=OSError( - "The guild member IDs could not be retrieved from " - "the MEMBERS_LIST_URL.", - ), - ) - return - - if group_member_id not in guild_member_ids: - await self.command_send_error( - ctx, - message=( - f"You must be a member of {self.bot.group_full_name} " - "to use this command.\n" - f"The provided {_GROUP_MEMBER_ID_ARGUMENT_NAME} must match " - f"the {self.bot.group_member_id_type} ID " - f"that you purchased your {self.bot.group_short_name} membership with." - ), - ) - return - - # NOTE: The "Member" role must be added to the user **before** the "Guest" role to ensure that the welcome message does not include the suggestion to purchase membership - await interaction_member.add_roles( - member_role, - reason=f'{ctx.user} used TeX Bot slash-command: "/makemember"', ) - - try: - await GroupMadeMember.objects.acreate(group_member_id=group_member_id) # type: ignore[misc] - except ValidationError as create_group_made_member_error: - error_is_already_exists: bool = ( - "hashed_group_member_id" in create_group_made_member_error.message_dict - and any( - "already exists" in error - for error in create_group_made_member_error.message_dict[ - "hashed_group_member_id" - ] - ) - ) - if not error_is_already_exists: - raise + if not error_is_already_exists: + raise create_group_made_member_error from create_group_made_member_error await ctx.followup.send(content="Successfully made you a member!", ephemeral=True) @@ -285,58 +224,7 @@ async def member_count(self, ctx: "TeXBotApplicationContext") -> None: # type: await ctx.defer(ephemeral=False) async with ctx.typing(): - http_session: aiohttp.ClientSession = aiohttp.ClientSession( - headers=REQUEST_HEADERS, - cookies=REQUEST_COOKIES, - ) - async with http_session, http_session.get(BASE_MEMBERS_URL) as http_response: - response_html: str = await http_response.text() - - member_list_div: bs4.Tag | bs4.NavigableString | None = BeautifulSoup( - response_html, - "html.parser", - ).find( - "div", - {"class": "memberlistcol"}, - ) - - if member_list_div is None or isinstance(member_list_div, bs4.NavigableString): - await self.command_send_error( - ctx=ctx, - error_code="E1041", - logging_message=OSError( - "The member count could not be retrieved from the MEMBERS_LIST_URL.", - ), - ) - return - - if "showing 100 of" in member_list_div.text.lower(): - member_count: str = member_list_div.text.split(" ")[3] - await ctx.followup.send( - content=f"{self.bot.group_full_name} has {member_count} members! :tada:", - ) - return - - member_table: bs4.Tag | bs4.NavigableString | None = BeautifulSoup( - response_html, - "html.parser", - ).find( - "table", - {"id": "ctl00_ctl00_Main_AdminPageContent_gvMembers"}, - ) - - if member_table is None or isinstance(member_table, bs4.NavigableString): - await self.command_send_error( - ctx=ctx, - error_code="E1041", - logging_message=OSError( - "The member count could not be retrieved from the MEMBERS_LIST_URL." - ), - ) - return - await ctx.followup.send( - content=f"{self.bot.group_full_name} has { - len(member_table.find_all('tr', {'class': ['msl_row', 'msl_altrow']})) - } members! :tada:" + content=f"{settings['GROUP_NAME']} has {await get_membership_count()} " + "members! :tada:", ) diff --git a/cogs/society_events.py b/cogs/society_events.py new file mode 100644 index 00000000..f411f0c9 --- /dev/null +++ b/cogs/society_events.py @@ -0,0 +1,49 @@ +"""Module for handling society events in a Discord bot.""" + +import logging +from datetime import datetime +from typing import TYPE_CHECKING + +import discord + +from utils import TeXBotBaseCog +from utils.msl import fetch_guild_activities + +if TYPE_CHECKING: + from collections.abc import Sequence + from logging import Logger + from typing import Final + + from utils import TeXBotApplicationContext + + +__all__: "Sequence[str]" = ("SocietyEventsSlashCommandsCog",) + + +logger: "Final[Logger]" = logging.getLogger("TeX-Bot") + + +class SocietyEventsSlashCommandsCog(TeXBotBaseCog): + """Cog Class for handling society event commands.""" + + society_events: discord.SlashCommandGroup = discord.SlashCommandGroup( + name="society-events", + description="Commands for managing society events.", + ) + + @society_events.command( + name="list-all", + desciption="List all society events.", + ) + async def list_all_events(self, ctx: "TeXBotApplicationContext") -> None: + """List all society events.""" + await ctx.defer(ephemeral=True) + activities: dict[str, str] = await fetch_guild_activities( + from_date=datetime.strptime("2023-01-01", "%Y-%m-%d"), # noqa: DTZ007 + to_date=datetime.strptime("2026-01-01", "%Y-%m-%d"), # noqa: DTZ007 + ) + + await ctx.followup.send( + content=str(activities), + ephemeral=True, + ) diff --git a/pyproject.toml b/pyproject.toml index 9e3122ec..cf54b04d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,8 +25,9 @@ test = ["pytest-cov>=6.1", "pytest>=8.3"] type-check = ["django-stubs[compatible-mypy]>=5.1", "mypy>=1.13", "types-beautifulsoup4>=4.12"] [project] # TODO: Remove [project] table once https://github.com/astral-sh/uv/issues/8582 is completed +dependencies = ["aiohttp>=3.11.14", "anyio>=4.9.0"] name = "TeX-Bot-Py-V2" -requires-python = ">=3.12,<3.13" # TODO: Allow Python 3.13 once py-cord makes a new release with support for it +requires-python = ">=3.12,<3.13" # TODO: Allow Python 3.13 once py-cord makes a new release with support for it version = "0.1.0" diff --git a/utils/msl/__init__.py b/utils/msl/__init__.py new file mode 100644 index 00000000..9de0f348 --- /dev/null +++ b/utils/msl/__init__.py @@ -0,0 +1,35 @@ +"""MSL utility classes & functions provided for use across the whole of the project.""" + +from typing import TYPE_CHECKING + +from .activities import fetch_guild_activities +from .events import create_event, get_all_guild_events +from .finances import ( + fetch_financial_transactions, + fetch_transaction_from_id, + get_account_balance, +) +from .memberships import get_full_membership_list, get_membership_count, is_student_id_member +from .reports import ( + get_product_customisations, + get_product_sales, + update_current_year_sales_report, +) + +if TYPE_CHECKING: + from collections.abc import Sequence + +__all__: "Sequence[str]" = ( + "create_event", + "fetch_financial_transactions", + "fetch_guild_activities", + "fetch_transaction_from_id", + "get_account_balance", + "get_all_guild_events", + "get_full_membership_list", + "get_membership_count", + "get_product_customisations", + "get_product_sales", + "is_student_id_member", + "update_current_year_sales_report", +) diff --git a/utils/msl/activities.py b/utils/msl/activities.py new file mode 100644 index 00000000..96f3d7d0 --- /dev/null +++ b/utils/msl/activities.py @@ -0,0 +1,209 @@ +"""Module for fetching activities from the guild website.""" + +import logging +from enum import Enum +from typing import TYPE_CHECKING + +import aiohttp +import bs4 +from bs4 import BeautifulSoup + +from .core import BASE_HEADERS, ORGANISATION_ID, get_msl_context + +if TYPE_CHECKING: + from collections.abc import Sequence + from datetime import datetime + from logging import Logger + from typing import Final + +__all__: "Sequence[str]" = ("fetch_guild_activities",) + +logger: "Final[Logger]" = logging.getLogger("TeX-Bot") + + +ALL_ACTIVITIES_URL: "Final[str]" = ( + f"https://www.guildofstudents.com/organisation/admin/activities/all/{ORGANISATION_ID}/" +) + +INDIVIDUAL_ACTIVITIES_URL: "Final[str]" = f"https://www.guildofstudents.com/organisation/admin/activities/activity/status/{ORGANISATION_ID}/" + +ACTIVITIES_BUTTON_KEY: "Final[str]" = "ctl00$ctl00$Main$AdminPageContent$fsFilter$btnSubmit" +ACTIVITIES_TABLE_ID: "Final[str]" = "ctl00_ctl00_Main_AdminPageContent_gvResults" +ACTIVITIES_START_DATE_KEY: "Final[str]" = ( + "ctl00$ctl00$Main$AdminPageContent$drDates$txtFromDate" +) +ACTIVITIES_END_DATE_KEY: "Final[str]" = "ctl00$ctl00$Main$AdminPageContent$drDates$txtToDate" + + +class ActivityStatus(Enum): + """ + Enum to define the possible activity status values. + + Submitted - The activity has been submitted and is pending approval. + Approved - The activity has been approved and is scheduled. + Draft - The activity is a draft and is not yet submitted. + Cancelled - The activity has been cancelled. + Queried - The activity has been queried and is pending response. + """ + + SUBMITTED = "Submitted" + APPROVED = "Approved" + DRAFT = "Draft" + CANCELLED = "Cancelled" + QUERIED = "Queried" + + +class Activity: + """ + Class to represent an activity on the guild website. + + Attributes: + activity_id (int): The ID of the activity. + name (str): The name of the activity. + status (ActivityStatus): The status of the activity. + start_date (datetime): The start date of the activity. + end_date (datetime): The end date of the activity. + location (str): The location of the activity. + description (str): The description of the activity. + """ + + __slots__ = ( + "activity_id", + "description", + "end_date", + "location", + "name", + "start_date", + "status", + ) + + def __init__( + self, + activity_id: int, + name: str, + status: ActivityStatus, + start_date: "datetime", + end_date: "datetime", + location: str, + description: str, + ) -> None: + self.activity_id = activity_id + self.name = name + self.status = status + self.start_date = start_date + self.end_date = end_date + self.location = location + self.description = description + + +async def fetch_guild_activities(from_date: "datetime", to_date: "datetime") -> dict[str, str]: + """Fetch all activities on the guild website.""" + data_fields, cookies = await get_msl_context(url=ALL_ACTIVITIES_URL) + + form_data: dict[str, str] = { + ACTIVITIES_START_DATE_KEY: from_date.strftime("%d/%m/%Y"), + ACTIVITIES_END_DATE_KEY: to_date.strftime("%d/%m/%Y"), + ACTIVITIES_BUTTON_KEY: "Apply", + "__EVENTTARGET": "", + "__EVENTARGUMENT": "", + "__VIEWSTATEENCRYPTED": "", + } + + data_fields.update(form_data) + + data_fields.pop("ctl00$ctl00$Main$AdminPageContent$fsFilter$btnCancel") + + session_v2: aiohttp.ClientSession = aiohttp.ClientSession( + headers=BASE_HEADERS, + cookies=cookies, + ) + async with ( + session_v2, + session_v2.post(url=ALL_ACTIVITIES_URL, data=data_fields) as http_response, + ): + if http_response.status != 200: + logger.debug("Returned a non 200 status code!!") + logger.debug(http_response) + return {} + + response_html: str = await http_response.text() + + activities_table_html: bs4.Tag | bs4.NavigableString | None = BeautifulSoup( + markup=response_html, + features="html.parser", + ).find( + name="table", + attrs={"id": ACTIVITIES_TABLE_ID}, + ) + + if activities_table_html is None or isinstance(activities_table_html, bs4.NavigableString): + logger.warning("Failed to find the activities table.") + logger.debug(response_html) + return {} + + if "There are no activities" in str(activities_table_html): + logger.debug("No activities were found matching the date range.") + return {} + + activities_list: list[bs4.Tag] = activities_table_html.find_all(name="tr") + + activities_list.pop(0) + + return_list: list[bs4.Tag] = [] + + # NOTE: The below will only get the first page of activities, more work is needed. + + try: + for activity in activities_list: + activity.find_all("a")[0].get("href").split("/")[7] + return_list.append(activity) + except IndexError: + pass + + return { + activity.find_all("a")[0].get("href").split("/")[7]: activity.find_all("td")[ + 1 + ].text.strip() + for activity in return_list + } + + +async def create_activity() -> int: + """Create an activity on the guild website.""" + raise NotImplementedError + + +async def fetch_activity(activity_id: int) -> Activity | None: + """Fetch a specific activity from the guild website.""" + data_fields, cookies = await get_msl_context(url=ALL_ACTIVITIES_URL) + + form_data: dict[str, str] = { + "__EVENTTARGET": "", + "__EVENTARGUMENT": "", + "__VIEWSTATEENCRYPTED": "", + } + + data_fields.update(form_data) + + data_fields.pop("ctl00$ctl00$Main$AdminPageContent$fsFilter$btnCancel") + + session_v2: aiohttp.ClientSession = aiohttp.ClientSession( + headers=BASE_HEADERS, + cookies=cookies, + ) + async with ( + session_v2, + session_v2.post( + url=f"{INDIVIDUAL_ACTIVITIES_URL}/{activity_id}", data=data_fields + ) as http_response, + ): + if http_response.status != 200: + logger.debug("Returned a non 200 status code!!") + logger.debug(http_response) + return None + + response_html: str = await http_response.text() + + logger.debug(response_html) + + raise NotImplementedError diff --git a/utils/msl/core.py b/utils/msl/core.py new file mode 100644 index 00000000..464f5d32 --- /dev/null +++ b/utils/msl/core.py @@ -0,0 +1,83 @@ +"""Functions to enable interaction with MSL based SU websites.""" + +import datetime as dt +import logging +from datetime import datetime +from typing import TYPE_CHECKING + +import aiohttp +from bs4 import BeautifulSoup + +from config import settings + +if TYPE_CHECKING: + from collections.abc import Mapping, Sequence + from datetime import timezone + from http.cookies import Morsel + from logging import Logger + from typing import Final + +__all__: "Sequence[str]" = () + +logger: "Final[Logger]" = logging.getLogger("TeX-Bot") + + +DEFAULT_TIMEZONE: "Final[timezone]" = dt.UTC +TODAYS_DATE: "Final[datetime]" = datetime.now(tz=DEFAULT_TIMEZONE) + +CURRENT_YEAR_START_DATE: "Final[datetime]" = datetime( + year=TODAYS_DATE.year if TODAYS_DATE.month >= 7 else TODAYS_DATE.year - 1, + month=7, + day=1, + tzinfo=DEFAULT_TIMEZONE, +) + +CURRENT_YEAR_END_DATE: "Final[datetime]" = datetime( + year=TODAYS_DATE.year if TODAYS_DATE.month >= 7 else TODAYS_DATE.year - 1, + month=6, + day=30, + tzinfo=DEFAULT_TIMEZONE, +) + +BASE_HEADERS: "Final[Mapping[str, str]]" = { + "Cache-Control": "no-cache", + "Pragma": "no-cache", + "Expires": "0", +} + +BASE_COOKIES: "Final[Mapping[str, str]]" = { + ".ASPXAUTH": settings["MEMBERS_LIST_AUTH_SESSION_COOKIE"], +} + +ORGANISATION_ID: "Final[str]" = settings["ORGANISATION_ID"] + +ORGANISATION_ADMIN_URL: "Final[str]" = ( + f"https://www.guildofstudents.com/organisation/admin/{ORGANISATION_ID}/" +) + + +async def get_msl_context(url: str) -> tuple[dict[str, str], dict[str, str]]: + """Get the required context headers, data and cookies to make a request to MSL.""" + http_session: aiohttp.ClientSession = aiohttp.ClientSession( + headers=BASE_HEADERS, + cookies=BASE_COOKIES, + ) + data_fields: dict[str, str] = {} + cookies: dict[str, str] = {} + async with http_session, http_session.get(url=url) as field_data: + data_response = BeautifulSoup( + markup=await field_data.text(), + features="html.parser", + ) + + for field in data_response.find_all(name="input"): + if field.get("name") and field.get("value"): + data_fields[field.get("name")] = field.get("value") + + for cookie in field_data.cookies: + cookie_morsel: Morsel[str] | None = field_data.cookies.get(cookie) + if cookie_morsel is not None: + cookies[cookie] = cookie_morsel.value + cookies[".ASPXAUTH"] = settings["MEMBERS_LIST_AUTH_SESSION_COOKIE"] + + return data_fields, cookies diff --git a/utils/msl/events.py b/utils/msl/events.py new file mode 100644 index 00000000..2b5fe8e4 --- /dev/null +++ b/utils/msl/events.py @@ -0,0 +1,99 @@ +"""Module for fetching events from the guild website.""" + +import logging +from typing import TYPE_CHECKING + +import aiohttp +import bs4 +from bs4 import BeautifulSoup + +from .core import BASE_HEADERS, ORGANISATION_ID, get_msl_context + +if TYPE_CHECKING: + from collections.abc import Sequence + from logging import Logger + from typing import Final + +__all__: "Sequence[str]" = ("create_event", "get_all_guild_events") + +EVENTS_FROM_DATE_KEY: "Final[str]" = ( + "ctl00$ctl00$Main$AdminPageContent$datesFilter$txtFromDate" +) +EVENTS_TO_DATE_KEY: "Final[str]" = "ctl00$ctl00$Main$AdminPageContent$datesFilter$txtToDate" +EVENTS_BUTTON_KEY: "Final[str]" = "ctl00$ctl00$Main$AdminPageContent$fsSetDates$btnSubmit" +EVENTS_TABLE_ID: "Final[str]" = "ctl00_ctl00_Main_AdminPageContent_gvEvents" +CREATE_EVENT_URL: "Final[str]" = ( + f"https://www.guildofstudents.com/events/edit/event/{ORGANISATION_ID}/" +) +EVENT_LIST_URL: "Final[str]" = ( + f"https://www.guildofstudents.com/events/edit/{ORGANISATION_ID}/" +) + + +logger: "Final[Logger]" = logging.getLogger("TeX-Bot") + + +async def get_all_guild_events(from_date: str, to_date: str) -> dict[str, str]: + """Fetch all events on the guild website.""" + data_fields, cookies = await get_msl_context(url=EVENT_LIST_URL) + + form_data: dict[str, str] = { + EVENTS_FROM_DATE_KEY: from_date, + EVENTS_TO_DATE_KEY: to_date, + EVENTS_BUTTON_KEY: "Find Events", + "__EVENTTARGET": "", + "__EVENTARGUMENT": "", + "__VIEWSTATEENCRYPTED": "", + } + + data_fields.update(form_data) + + session_v2: aiohttp.ClientSession = aiohttp.ClientSession( + headers=BASE_HEADERS, + cookies=cookies, + ) + async with ( + session_v2, + session_v2.post(url=EVENT_LIST_URL, data=data_fields) as http_response, + ): + if http_response.status != 200: + logger.debug("Returned a non 200 status code!!") + logger.debug(http_response) + return {} + + response_html: str = await http_response.text() + + event_table_html: bs4.Tag | bs4.NavigableString | None = BeautifulSoup( + markup=response_html, + features="html.parser", + ).find( + name="table", + attrs={"id": EVENTS_TABLE_ID}, + ) + + if event_table_html is None or isinstance(event_table_html, bs4.NavigableString): + logger.debug("Something went wrong!") + return {} + + if "There are no events" in str(event_table_html): + logger.debug("No events were found!") + return {} + + event_list: list[bs4.Tag] = event_table_html.find_all(name="tr") + + event_list.pop(0) + + return { + event.find(name="a").get("href").split("/")[5]: event.find(name="a").text # type: ignore[union-attr] + for event in event_list + } + + +async def create_event() -> int | None: + """Create an event on the guild website.""" + raise NotImplementedError + + +async def fetch_event(event_id: int) -> dict[str, str]: + """Fetch a specific event from the guild website.""" + raise NotImplementedError diff --git a/utils/msl/finances.py b/utils/msl/finances.py new file mode 100644 index 00000000..9f03f5d0 --- /dev/null +++ b/utils/msl/finances.py @@ -0,0 +1,180 @@ +"""Module for accessing society finances.""" + +import logging +from enum import Enum +from typing import TYPE_CHECKING + +import aiohttp +import bs4 +from bs4 import BeautifulSoup + +from .core import ( + BASE_COOKIES, + BASE_HEADERS, + ORGANISATION_ADMIN_URL, + ORGANISATION_ID, +) + +if TYPE_CHECKING: + from collections.abc import Sequence + from logging import Logger + from typing import Final + +__all__: "Sequence[str]" = () + +FINANCE_REDIRECT_URL: "Final[str]" = ( + f"https://www.guildofstudents.com/sgf/{ORGANISATION_ID}/Landing/Member" +) +FINANCES_URL: "Final[str]" = ( + f"https://guildofstudents.com/sgf/{ORGANISATION_ID}/Home/Dashboard/" +) +BASE_EXPENSE_URL: "Final[str]" = ( + f"https://guildofstudents.com/sgf/{ORGANISATION_ID}/Request/Edit?RequestId=" +) + + +logger: "Final[Logger]" = logging.getLogger("TeX-Bot") + + +class TransactionType(Enum): + """ + Enum for the different possible types of transactions. + + Attributes: + ---------- + - Personal Expense: A personal expense + - External Payment: A payment to an external entity + - Purchase Order: A purchase order + - Invoice: An invoice + + """ + + PERSONAL_EXPENSE = "Personal Expense" + EXTERNAL_PAYMENT = "External Payment" + PURCHASE_ORDER = "Purchase Order" + INVOICE = "Invoice" + + +async def get_account_balance() -> float | None: + """Return the current account balance.""" + raise NotImplementedError # NOTE: Not implemented because SGF does not currently support this but is due to be added imminently. + + +async def get_available_balance() -> float | None: + """ + Return the current available balance. + + This is different from the account balance as it takes into account pending transactions. + """ + cookie_session: aiohttp.ClientSession = aiohttp.ClientSession( + headers=BASE_HEADERS, + cookies=BASE_COOKIES, + ) + async with ( + cookie_session, + cookie_session.get(url=ORGANISATION_ADMIN_URL) as (cookie_response), + ): + if cookie_response.status != 200: + logger.debug("Returned a non 200 status code!!") + logger.debug(cookie_response) + return None + + cookies = cookie_response.cookies + + logger.debug(cookies) + http_session: aiohttp.ClientSession = aiohttp.ClientSession( + headers=BASE_HEADERS, + cookies=cookies, + ) + async with http_session, http_session.get(url=FINANCE_REDIRECT_URL) as http_response: + if http_response.status != 200: + logger.debug("Returned a non 200 status code!!") + logger.debug(http_response) + return None + + response_html: str = await http_response.text() + + # check page title + if "Login" in response_html: + logger.debug("Not logged in!") + return None + + available_balance_html: bs4.Tag | bs4.NavigableString | None = BeautifulSoup( + markup=response_html, + features="html.parser", + ).find( + name="div", + attrs={"id": "accounts-summary"}, + ) + + if available_balance_html is None or ( + isinstance(available_balance_html, bs4.NavigableString) + ): + logger.debug("Something went wrong!") + logger.debug(response_html) + return None + + logger.debug("Available balance HTML: %s", available_balance_html) + + return None + + +async def fetch_financial_transactions( + limit: int | None = None, transaction_type: TransactionType | None = None +) -> dict[str, str]: + """ + Return the most recent `limit` transactions. + + If no limit is supplied, all transactions will be returned. + Optional filter for type, if no type is supplied, all transactions will be returned. + """ + raise NotImplementedError + + +async def fetch_transaction_from_id(transaction_id: int) -> dict[str, str]: + """Return the transaction with the given ID.""" + """ + Transaction structure: { + id: int, + created by: str, + linked_event_id: int | None, + payee: str + lines: { + line 1 description: str, + line 1 amount: float, + line 2 description: str, + line 2 amount: float, + etc... + } + total_amount: float, + status: str, + } + + """ + EXPENSE_URL: Final[str] = BASE_EXPENSE_URL + str(transaction_id) + + http_session: aiohttp.ClientSession = aiohttp.ClientSession( + headers=BASE_HEADERS, + cookies=BASE_COOKIES, + ) + async with http_session, http_session.get(url=EXPENSE_URL) as http_response: + if http_response.status != 200: + logger.debug("Returned a non 200 status code!!") + logger.debug(http_response) + return {} + + response_html: str = await http_response.text() + + expense_html: bs4.Tag | bs4.NavigableString | None = BeautifulSoup( + markup=response_html, + features="html.parser", + ).find( + name="div", + attrs={"class": "row container mx-auto"}, + ) + + if expense_html is None or isinstance(expense_html, bs4.NavigableString): + logger.debug("Something went wrong!") + return {} + + raise NotImplementedError diff --git a/utils/msl/memberships.py b/utils/msl/memberships.py new file mode 100644 index 00000000..3949a660 --- /dev/null +++ b/utils/msl/memberships.py @@ -0,0 +1,108 @@ +"""Module for checking membership status.""" + +import logging +from typing import TYPE_CHECKING + +import aiohttp +import bs4 +from bs4 import BeautifulSoup + +from .core import BASE_COOKIES, BASE_HEADERS, ORGANISATION_ID + +if TYPE_CHECKING: + from collections.abc import Sequence + from logging import Logger + from typing import Final + +__all__: "Sequence[str]" = ( + "get_full_membership_list", + "get_membership_count", + "is_student_id_member", +) + +MEMBERS_LIST_URL: "Final[str]" = ( + f"https://guildofstudents.com/organisation/memberlist/{ORGANISATION_ID}/?sort=groups" +) + +persistent_membership_list: set[tuple[str, int]] = set() + +logger: "Final[Logger]" = logging.getLogger("TeX-Bot") + + +async def get_full_membership_list() -> set[tuple[str, int]]: + """Get a list of tuples of student ID to names.""" + http_session: aiohttp.ClientSession = aiohttp.ClientSession( + headers=BASE_HEADERS, + cookies=BASE_COOKIES, + ) + async with http_session, http_session.get(url=MEMBERS_LIST_URL) as http_response: + response_html: str = await http_response.text() + + standard_members_table: bs4.Tag | bs4.NavigableString | None = BeautifulSoup( + markup=response_html, + features="html.parser", + ).find( + name="table", + attrs={"id": "ctl00_Main_rptGroups_ctl03_gvMemberships"}, + ) + + all_members_table: bs4.Tag | bs4.NavigableString | None = BeautifulSoup( + markup=response_html, + features="html.parser", + ).find( + name="table", + attrs={"id": "ctl00_Main_rptGroups_ctl05_gvMemberships"}, + ) + + if standard_members_table is None or all_members_table is None: + logger.warning("One or both of the membership tables could not be found!") + logger.debug(response_html) + return set() + + if isinstance(standard_members_table, bs4.NavigableString) or isinstance( + all_members_table, bs4.NavigableString + ): + logger.warning( + "Both membership tables were found but one or both are the wrong format!", + ) + logger.debug(standard_members_table) + logger.debug(all_members_table) + return set() + + standard_members: list[bs4.Tag] = standard_members_table.find_all(name="tr") + all_members: list[bs4.Tag] = all_members_table.find_all(name="tr") + + standard_members.pop(0) + all_members.pop(0) + + member_list: set[tuple[str, int]] = { + ( + member.find_all(name="td")[0].text.strip(), + member.find_all(name="td")[ + 1 + ].text.strip(), # NOTE: This will not properly handle external members who do not have an ID... There does not appear to be a solution to this other than simply checking manually. + ) + for member in standard_members + all_members + } + + persistent_membership_list.clear() + persistent_membership_list.update(member_list) + + return member_list + + +async def is_student_id_member(student_id: str | int) -> bool: + """Check if the student ID is a member of the society.""" + all_ids: set[str] = {str(member[1]) for member in persistent_membership_list} + + if str(student_id) in all_ids: + return True + + new_ids: set[str] = {str(member[1]) for member in await get_full_membership_list()} + + return str(student_id) in new_ids + + +async def get_membership_count() -> int: + """Return the total number of members.""" + return len(await get_full_membership_list()) diff --git a/utils/msl/reports.py b/utils/msl/reports.py new file mode 100644 index 00000000..86432e91 --- /dev/null +++ b/utils/msl/reports.py @@ -0,0 +1,256 @@ +"""Module for fetching reports from the guild website.""" + +import logging +import re +from datetime import datetime, timedelta, timezone +from enum import Enum +from typing import TYPE_CHECKING + +import aiohttp +import anyio + +from .core import ( + BASE_HEADERS, + CURRENT_YEAR_END_DATE, + CURRENT_YEAR_START_DATE, + ORGANISATION_ID, + get_msl_context, +) + +if TYPE_CHECKING: + from collections.abc import Sequence + from logging import Logger + from typing import Final + +__all__: "Sequence[str]" = ( + "get_product_customisations", + "get_product_sales", + "update_current_year_sales_report", +) + +logger: "Final[Logger]" = logging.getLogger("TeX-Bot") + + +SALES_REPORTS_URL: "Final[str]" = ( + f"https://www.guildofstudents.com/organisation/salesreports/{ORGANISATION_ID}/" +) +SALES_FROM_DATE_KEY: "Final[str]" = "ctl00$ctl00$Main$AdminPageContent$drDateRange$txtFromDate" +SALES_FROM_TIME_KEY: "Final[str]" = "ctl00$ctl00$Main$AdminPageContent$drDateRange$txtFromTime" +SALES_TO_DATE_KEY: "Final[str]" = "ctl00$ctl00$Main$AdminPageContent$drDateRange$txtToDate" +SALES_TO_TIME_KEY: "Final[str]" = "ctl00$ctl00$Main$AdminPageContent$drDateRange$txtToTime" + + +class ReportType(Enum): + """ + Enum to define the different types of reports available. + + SALES - Provides a report of sales by product, date and quantity. + CUSTOMISATION - Provides a report of customisations by product, date and quantity. + + MSL also supports "Purchasers" reports, however, these are largely unused but could + be implemented in the future. + """ + + SALES = "Sales" + CUSTOMISATION = "Customisations" + + +async def fetch_report_url_and_cookies( + report_type: ReportType, *, from_date: datetime, to_date: datetime +) -> tuple[str | None, dict[str, str]]: + """Fetch the specified report from the guild website.""" + data_fields, cookies = await get_msl_context(url=SALES_REPORTS_URL) + + form_data: dict[str, str] = { + SALES_FROM_DATE_KEY: from_date.strftime("%d/%m/%Y"), + SALES_FROM_TIME_KEY: from_date.strftime("%H:%M"), + SALES_TO_DATE_KEY: to_date.strftime("%d/%m/%Y"), + SALES_TO_TIME_KEY: to_date.strftime("%H:%M"), + "__EVENTTARGET": f"ctl00$ctl00$Main$AdminPageContent$lb{report_type.value}", + "__EVENTARGUMENT": "", + "__VIEWSTATEENCRYPTED": "", + } + + data_fields.pop("ctl00$ctl00$search$btnSubmit") + + data_fields.update(form_data) + + session_v2: aiohttp.ClientSession = aiohttp.ClientSession( + headers=BASE_HEADERS, + cookies=cookies, + ) + async with ( + session_v2, + session_v2.post(url=SALES_REPORTS_URL, data=data_fields) as http_response, + ): + if http_response.status != 200: + logger.debug("Returned a non 200 status code!!") + logger.debug(http_response) + return None, {} + + response_html: str = await http_response.text() + + if "no transactions" in response_html: + logger.debug("No transactions were found!") + return None, {} + + match = re.search(r'ExportUrlBase":"(.*?)"', response_html) + if not match: + logger.warning("Failed to find the report export url from the http response.") + logger.debug(response_html) + return None, {} + + urlbase: str = match.group(1).replace(r"\u0026", "&").replace("\\/", "/") + if not urlbase: + logger.warning("Failed to construct report url!") + logger.debug(match) + return None, {} + + return f"https://guildofstudents.com/{urlbase}CSV", cookies + + +async def update_current_year_sales_report() -> None: + """Get all sales reports from the guild website.""" + report_url, cookies = await fetch_report_url_and_cookies( + report_type=ReportType.SALES, + to_date=CURRENT_YEAR_END_DATE, + from_date=CURRENT_YEAR_START_DATE, + ) + + if report_url is None: + logger.debug("No report URL was found!") + return + + file_session: aiohttp.ClientSession = aiohttp.ClientSession( + headers=BASE_HEADERS, + cookies=cookies, + ) + async with file_session, file_session.get(url=report_url) as file_response: + if file_response.status != 200: + logger.warning("Returned a non 200 status code!!") + logger.debug(file_response) + return + + async with await anyio.open_file("CurrentYearSalesReport.csv", "wb") as report_file: + await report_file.write( + b"product_id,product_name,date,quantity,unit_price,total\n", + ) + + for line in (await file_response.read()).split(b"\n")[7:]: + if line == b"\r" or not line: + break + + values: list[bytes] = line.split(b",") + + product_name_and_id: bytes = values[0] + product_id: bytes = ( + product_name_and_id.split(b" ")[0].removeprefix(b"[") + ).removesuffix(b"]") + product_name: bytes = b" ".join( + product_name_and_id.split(b" ")[1:], + ) + date: bytes = values[5] + quantity: bytes = values[6] + unit_price: bytes = values[7] + total: bytes = values[8] + + await report_file.write( + product_id + + b"," + + product_name + + b"," + + date + + b"," + + quantity + + b"," + + unit_price + + b"," + + total + + b"\n", + ) + + logger.debug("Sales report updated successfully!!") + return + + +async def get_product_sales(product_id: str) -> dict[str, int]: + """Get the dates and quantities of sales for a given product ID.""" + product_sales_data: dict[str, int] = {} + async with await anyio.open_file("CurrentYearSalesReport.csv", "r") as report_file: + for line in (await report_file.readlines())[1:]: + values: list[str] = line.split(",") + + if values[0] == product_id: + product_sales_data[values[2]] = int(values[3]) + + return product_sales_data + + +async def get_product_customisations(product_id: str) -> list[dict[str, str]]: + """Get the set of product customisations for a given product ID, checking the past year.""" + report_url, cookies = await fetch_report_url_and_cookies( + report_type=ReportType.CUSTOMISATION, + to_date=datetime.now(tz=timezone.utc), # noqa: UP017 + from_date=datetime.now(tz=timezone.utc) - timedelta(weeks=52), # noqa: UP017 + ) + + if report_url is None: + logger.warning("Failed to retrieve customisations report URL.") + return [] + + customisation_records: list[dict[str, str]] = [] + file_session: aiohttp.ClientSession = aiohttp.ClientSession( + headers=BASE_HEADERS, + cookies=cookies, + ) + async with file_session, file_session.get(url=report_url) as file_response: + if file_response.status != 200: + logger.warning("Customisation report file session returned a non 200 status code.") + logger.debug(file_response) + return [] + + for line in (await file_response.content.read()).split(b"\n")[7:]: + if line == b"\r" or not line: + break + + values: list[str] = line.decode("utf-8").split(",") + + if len(values) < 6: + logger.debug("Invalid line in customisations report!") + logger.debug(values) + continue + + product_name_and_id: str = values[0] + file_product_id: str = ( + product_name_and_id.split(" ")[0].removeprefix("[").removesuffix("]") + ) + file_product_name: str = " ".join(product_name_and_id.split(" ")[1:]) + + if file_product_id != product_id: + continue + + purchase_id: str = values[1] + purchase_date: str = values[2] + + student_id: str = values[3] + customisation_name: str = values[4] + customisation_value: str = values[5] + + for item in customisation_records: + if item["purchase_id"] == purchase_id: + item[customisation_name] = customisation_value + logger.debug(item) + break + + customisation_records.append( + { + "product_id": product_id, + "product_name": file_product_name, + "purchase_id": purchase_id, + "purchase_date": purchase_date, + "student_id": student_id, + customisation_name: customisation_value, + }, + ) + + return customisation_records diff --git a/uv.lock b/uv.lock index 92b7a6c1..b41e8fd5 100644 --- a/uv.lock +++ b/uv.lock @@ -57,6 +57,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/ec/6a/bc7e17a3e87a2985d3e8f4da4cd0f481060eb78fb08596c42be62c90a4d9/aiosignal-1.3.2-py2.py3-none-any.whl", hash = "sha256:45cde58e409a301715980c2b01d0c28bdde3770d8290b5eb2173759d9acb31a5", size = 7597, upload-time = "2024-12-13T17:10:38.469Z" }, ] +[[package]] +name = "anyio" +version = "4.9.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "idna" }, + { name = "sniffio" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/95/7d/4c1bd541d4dffa1b52bd83fb8527089e097a106fc90b467a7313b105f840/anyio-4.9.0.tar.gz", hash = "sha256:673c0c244e15788651a4ff38710fea9675823028a6f08a5eda409e0c9840a028", size = 190949 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a1/ee/48ca1a7c89ffec8b6a0c5d02b89c305671d5ffd8d3c94acf8b8c408575bb/anyio-4.9.0-py3-none-any.whl", hash = "sha256:9f76d541cad6e36af7beb62e978876f3b41e3e04f2c1fbf0884604c0a9c4d93c", size = 100916 }, +] + [[package]] name = "application-properties" version = "0.8.3" @@ -850,6 +864,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" }, ] +[[package]] +name = "sniffio" +version = "1.3.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a2/87/a6771e1546d97e7e041b6ae58d80074f81b7d5121207425c964ddf5cfdbd/sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc", size = 20372 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e9/44/75a9c9421471a6c4805dbf2356f7c181a29c1879239abab1ea2cc8f38b40/sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2", size = 10235 }, +] + [[package]] name = "soupsieve" version = "2.7" @@ -872,6 +895,10 @@ wheels = [ name = "tex-bot-py-v2" version = "0.1.0" source = { virtual = "." } +dependencies = [ + { name = "aiohttp" }, + { name = "anyio" }, +] [package.dev-dependencies] dev = [ @@ -916,6 +943,10 @@ type-check = [ ] [package.metadata] +requires-dist = [ + { name = "aiohttp", specifier = ">=3.11.14" }, + { name = "anyio", specifier = ">=4.9.0" }, +] [package.metadata.requires-dev] dev = [