diff --git a/scratchattach/site/classroom.py b/scratchattach/site/classroom.py index 4eacac89..0c69a0a8 100644 --- a/scratchattach/site/classroom.py +++ b/scratchattach/site/classroom.py @@ -12,6 +12,7 @@ if TYPE_CHECKING: from scratchattach.site.session import Session +from scratchattach.utils import commons from scratchattach.utils.commons import requests from . import user, activity, typed_dicts from ._base import BaseSiteComponent @@ -32,7 +33,6 @@ class Classroom(BaseSiteComponent): is_closed: bool = False datetime: datetime = datetime.fromtimestamp(0.0) - update_function: Callable = field(repr=False, default=requests.get) _session: Optional[Session] = field(repr=False, default=None) @@ -42,7 +42,9 @@ def __post_init__(self): if self.id: self.update_api = f"https://api.scratch.mit.edu/classrooms/{self.id}" elif self.classtoken: - self.update_api = f"https://api.scratch.mit.edu/classtoken/{self.classtoken}" + self.update_api = ( + f"https://api.scratch.mit.edu/classtoken/{self.classtoken}" + ) else: raise KeyError(f"No class id or token provided! {self.__dict__ = }") @@ -55,9 +57,11 @@ def __post_init__(self): self._cookies = self._session._cookies # Headers for operations that require accept and Content-Type fields: - self._json_headers = {**self._headers, - "accept": "application/json", - "Content-Type": "application/json"} + self._json_headers = { + **self._headers, + "accept": "application/json", + "Content-Type": "application/json", + } def __str__(self) -> str: return f"" @@ -69,17 +73,20 @@ def update(self): success = False if not success: - response = requests.get(f"https://scratch.mit.edu/classes/{self.id}/") + with requests.no_error_handling(): + response = requests.get(f"https://scratch.mit.edu/classes/{self.id}/") soup = BeautifulSoup(response.text, "html.parser") headings = soup.find_all("h1") for heading in headings: if heading.text == "Whoops! Our server is Scratch'ing its head": - raise exceptions.ClassroomNotFound(f"Classroom id {self.id} is not closed and cannot be found.") + raise exceptions.ClassroomNotFound( + f"Classroom id {self.id} is not closed and cannot be found." + ) # id, title, description, status, date_start (iso format), educator/username - title = soup.find("title").contents[0][:-len(" on Scratch")] + title = soup.find("title").contents[0][: -len(" on Scratch")] overviews = soup.find_all("p", {"class": "overview"}) description, status = overviews[0].text, overviews[1].text @@ -89,7 +96,9 @@ def update(self): sfx = "',\n userId: " for script in soup.find_all("script"): if pfx in script.text: - educator_username = commons.webscrape_count(script.text, pfx, sfx, str) + educator_username = commons.webscrape_count( + script.text, pfx, sfx, str + ) ret: typed_dicts.ClassroomDict = { "id": self.id, @@ -97,8 +106,8 @@ def update(self): "description": description, "educator": {}, "status": status, - "is_closed": True - } + "is_closed": True, + } if educator_username: ret["educator"]["username"] = educator_username @@ -111,77 +120,98 @@ def _update_from_dict(self, data: typed_dicts.ClassroomDict): self.title = data["title"] self.about_class = data["description"] self.working_on = data["status"] - self.datetime = datetime.fromisoformat(data["date_start"]) - self.author = user.User(username=data["educator"]["username"], _session=self._session) - self.author.supply_data_dict(data["educator"]) - self.is_closed = bool(data["date_end"]) + if "date_start" in data: + self.datetime = datetime.fromisoformat(data["date_start"]) + if "username" in data["educator"]: + self.author = user.User( + username=data["educator"]["username"], _session=self._session + ) + self.author.supply_data_dict(data["educator"]) + if "date_end" in data: + self.is_closed = bool(data["date_end"]) return True def student_count(self) -> int: # student count - text = requests.get( - f"https://scratch.mit.edu/classes/{self.id}/", - headers=self._headers - ).text + with requests.no_error_handling(): + text = requests.get( + f"https://scratch.mit.edu/classes/{self.id}/", headers=self._headers + ).text return commons.webscrape_count(text, "Students (", ")") - def student_names(self, *, page=1) -> list[str]: + def student_names(self, *, offset=0, limit=60) -> list[str]: """ Returns the student on the class. - + Keyword Arguments: page: The page of the students that should be returned. - + Returns: list: The usernames of the class students """ - if self.is_closed: - ret = [] - response = requests.get(f"https://scratch.mit.edu/classes/{self.id}/") - soup = BeautifulSoup(response.text, "html.parser") - found = set("") - - for result in soup.css.select("ul.scroll-content .user a"): - result_text = result.text.strip() - if result_text in found: - continue - found.add(result_text) - ret.append(result_text) - # for scrollable in soup.find_all("ul", {"class": "scroll-content"}): - # if not isinstance(scrollable, Tag): - # continue - # for item in scrollable.contents: - # if not isinstance(item, bs4.NavigableString): - # if "user" in item.attrs["class"]: - # anchors = item.find_all("a") - # if len(anchors) == 2: - # ret.append(anchors[1].text.strip()) - - return ret - - text = requests.get( - f"https://scratch.mit.edu/classes/{self.id}/students/?page={page}", - headers=self._headers - ).text - textlist = [i.split('/">')[0] for i in text.split(' ')[0] for i in text.split(' list[int]: """ Returns the class studio on the class. - + Keyword Arguments: page: The page of the students that should be returned. - + Returns: list: The id of the class studios """ @@ -190,7 +220,9 @@ def class_studio_ids(self, *, page: int = 1) -> list[int]: response = requests.get(f"https://scratch.mit.edu/classes/{self.id}/") soup = BeautifulSoup(response.text, "html.parser") - for result in soup.css.select("ul.scroll-content .gallery a[href]:not([class])"): + for result in soup.css.select( + "ul.scroll-content .gallery a[href]:not([class])" + ): value = result["href"] if not isinstance(value, str): value = value[0] @@ -207,27 +239,37 @@ def class_studio_ids(self, *, page: int = 1) -> list[int]: text = requests.get( f"https://scratch.mit.edu/classes/{self.id}/studios/?page={page}", - headers=self._headers + headers=self._headers, ).text - textlist = [int(i.split('/">')[0]) for i in text.split('\n ')[0]) + for i in text.split('\n None: self._check_session() - requests.post(f"https://scratch.mit.edu/site-api/classrooms/all/{self.id}/", - headers=self._headers, cookies=self._cookies, - files={"file": thumbnail}) + requests.post( + f"https://scratch.mit.edu/site-api/classrooms/all/{self.id}/", + headers=self._headers, + cookies=self._cookies, + files={"file": thumbnail}, + ) def set_description(self, desc: str) -> None: self._check_session() - response = requests.put(f"https://scratch.mit.edu/site-api/classrooms/all/{self.id}/", - headers=self._headers, cookies=self._cookies, - json={"description": desc}) + response = requests.put( + f"https://scratch.mit.edu/site-api/classrooms/all/{self.id}/", + headers=self._headers, + cookies=self._cookies, + json={"description": desc}, + ) try: data = response.json() @@ -235,7 +277,9 @@ def set_description(self, desc: str) -> None: # Success! return else: - warnings.warn(f"{self._session} may not be authenticated to edit {self}") + warnings.warn( + f"{self._session} may not be authenticated to edit {self}" + ) except Exception as e: warnings.warn(f"{self._session} may not be authenticated to edit {self}") @@ -243,9 +287,12 @@ def set_description(self, desc: str) -> None: def set_working_on(self, status: str) -> None: self._check_session() - response = requests.put(f"https://scratch.mit.edu/site-api/classrooms/all/{self.id}/", - headers=self._headers, cookies=self._cookies, - json={"status": status}) + response = requests.put( + f"https://scratch.mit.edu/site-api/classrooms/all/{self.id}/", + headers=self._headers, + cookies=self._cookies, + json={"status": status}, + ) try: data = response.json() @@ -253,7 +300,9 @@ def set_working_on(self, status: str) -> None: # Success! return else: - warnings.warn(f"{self._session} may not be authenticated to edit {self}") + warnings.warn( + f"{self._session} may not be authenticated to edit {self}" + ) except Exception as e: warnings.warn(f"{self._session} may not be authenticated to edit {self}") @@ -261,9 +310,12 @@ def set_working_on(self, status: str) -> None: def set_title(self, title: str) -> None: self._check_session() - response = requests.put(f"https://scratch.mit.edu/site-api/classrooms/all/{self.id}/", - headers=self._headers, cookies=self._cookies, - json={"title": title}) + response = requests.put( + f"https://scratch.mit.edu/site-api/classrooms/all/{self.id}/", + headers=self._headers, + cookies=self._cookies, + json={"title": title}, + ) try: data = response.json() @@ -271,27 +323,36 @@ def set_title(self, title: str) -> None: # Success! return else: - warnings.warn(f"{self._session} may not be authenticated to edit {self}") + warnings.warn( + f"{self._session} may not be authenticated to edit {self}" + ) except Exception as e: warnings.warn(f"{self._session} may not be authenticated to edit {self}") raise e - def add_studio(self, name: str, description: str = '') -> None: + def add_studio(self, name: str, description: str = "") -> None: self._check_session() - requests.post("https://scratch.mit.edu/classes/create_classroom_gallery/", - json={ - "classroom_id": str(self.id), - "classroom_token": self.classtoken, - "title": name, - "description": description}, - headers=self._headers, cookies=self._cookies) + requests.post( + "https://scratch.mit.edu/classes/create_classroom_gallery/", + json={ + "classroom_id": str(self.id), + "classroom_token": self.classtoken, + "title": name, + "description": description, + }, + headers=self._headers, + cookies=self._cookies, + ) def reopen(self) -> None: self._check_session() - response = requests.put(f"https://scratch.mit.edu/site-api/classrooms/all/{self.id}/", - headers=self._headers, cookies=self._cookies, - json={"visibility": "visible"}) + response = requests.put( + f"https://scratch.mit.edu/site-api/classrooms/all/{self.id}/", + headers=self._headers, + cookies=self._cookies, + json={"visibility": "visible"}, + ) try: response.json() @@ -302,8 +363,11 @@ def reopen(self) -> None: def close(self) -> None: self._check_session() - response = requests.post(f"https://scratch.mit.edu/site-api/classrooms/close_classroom/{self.id}/", - headers=self._headers, cookies=self._cookies) + response = requests.post( + f"https://scratch.mit.edu/site-api/classrooms/close_classroom/{self.id}/", + headers=self._headers, + cookies=self._cookies, + ) try: response.json() @@ -312,11 +376,27 @@ def close(self) -> None: warnings.warn(f"{self._session} may not be authenticated to edit {self}") raise e - def register_student(self, username: str, password: str = '', birth_month: Optional[int] = None, - birth_year: Optional[int] = None, - gender: Optional[str] = None, country: Optional[str] = None, is_robot: bool = False) -> None: - return register_by_token(self.id, self.classtoken, username, password, birth_month or 1, birth_year or 2000, gender or "(Prefer not to say)", country or "United+States", - is_robot) + def register_student( + self, + username: str, + password: str = "", + birth_month: Optional[int] = None, + birth_year: Optional[int] = None, + gender: Optional[str] = None, + country: Optional[str] = None, + is_robot: bool = False, + ) -> None: + return register_by_token( + self.id, + self.classtoken, + username, + password, + birth_month or 1, + birth_year or 2000, + gender or "(Prefer not to say)", + country or "United+States", + is_robot, + ) def generate_signup_link(self): if self.classtoken is not None: @@ -324,14 +404,19 @@ def generate_signup_link(self): self._check_session() - response = requests.get(f"https://scratch.mit.edu/site-api/classrooms/generate_registration_link/{self.id}/", - headers=self._headers, cookies=self._cookies) + response = requests.get( + f"https://scratch.mit.edu/site-api/classrooms/generate_registration_link/{self.id}/", + headers=self._headers, + cookies=self._cookies, + ) # Should really check for '404' page data = response.json() if "reg_link" in data: return data["reg_link"] else: - raise exceptions.Unauthorized(f"{self._session} is not authorised to generate a signup link of {self}") + raise exceptions.Unauthorized( + f"{self._session} is not authorised to generate a signup link of {self}" + ) def public_activity(self, *, limit=20): """ @@ -341,8 +426,11 @@ def public_activity(self, *, limit=20): if limit > 20: warnings.warn("The limit is set to more than 20. There may be an error") soup = BeautifulSoup( - requests.get(f"https://scratch.mit.edu/site-api/classrooms/activity/public/{self.id}/?limit={limit}").text, - 'html.parser') + requests.get( + f"https://scratch.mit.edu/site-api/classrooms/activity/public/{self.id}/?limit={limit}" + ).text, + "html.parser", + ) activities = [] source = soup.find_all("li") @@ -354,7 +442,12 @@ def public_activity(self, *, limit=20): return activities - def activity(self, student: str = "all", mode: str = "Last created", page: Optional[int] = None) -> list[activity.Activity]: + def activity( + self, + student: str = "all", + mode: str = "Last created", + page: Optional[int] = None, + ) -> list[activity.Activity]: """ Get a list of private activity, only available to the class owner. Returns: @@ -367,16 +460,21 @@ def activity(self, student: str = "all", mode: str = "Last created", page: Optio with requests.no_error_handling(): try: - data = requests.get(f"https://scratch.mit.edu/site-api/classrooms/activity/{self.id}/{student}/", - params={"page": page, "ascsort": ascsort, "descsort": descsort}, - headers=self._headers, cookies=self._cookies).json() + data = requests.get( + f"https://scratch.mit.edu/site-api/classrooms/activity/{self.id}/{student}/", + params={"page": page, "ascsort": ascsort, "descsort": descsort}, + headers=self._headers, + cookies=self._cookies, + ).json() except json.JSONDecodeError: return [] _activity: list[activity.Activity] = [] for activity_json in data: _activity.append(activity.Activity(_session=self._session)) - _activity[-1]._update_from_json(activity_json) # NOT the same as _update_from_dict + _activity[-1]._update_from_json( + activity_json + ) # NOT the same as _update_from_dict return _activity @@ -401,7 +499,7 @@ def get_classroom(class_id: str) -> Classroom: "If you want to remove this warning, use warnings.filterwarnings('ignore', category=scratchattach.ClassroomAuthenticationWarning)\n" "To ignore all warnings of the type GetAuthenticationWarning, which includes this warning, use " "`warnings.filterwarnings('ignore', category=scratchattach.GetAuthenticationWarning)`.", - exceptions.ClassroomAuthenticationWarning + exceptions.ClassroomAuthenticationWarning, ) return commons._get_object("id", class_id, Classroom, exceptions.ClassroomNotFound) @@ -426,26 +524,42 @@ def get_classroom_from_token(class_token) -> Classroom: "If you want to remove this warning, use warnings.filterwarnings('ignore', category=ClassroomAuthenticationWarning). " "To ignore all warnings of the type GetAuthenticationWarning, which includes this warning, use " "warnings.filterwarnings('ignore', category=GetAuthenticationWarning).", - exceptions.ClassroomAuthenticationWarning + exceptions.ClassroomAuthenticationWarning, + ) + return commons._get_object( + "classtoken", class_token, Classroom, exceptions.ClassroomNotFound ) - return commons._get_object("classtoken", class_token, Classroom, exceptions.ClassroomNotFound) - - -def register_by_token(class_id: int, class_token: str, username: str, password: str, birth_month: int, birth_year: int, - gender: str, country: str, is_robot: bool = False) -> None: - data = {"classroom_id": class_id, - "classroom_token": class_token, - "username": username, - "password": password, - "birth_month": birth_month, - "birth_year": birth_year, - "gender": gender, - "country": country, - "is_robot": is_robot} - response = requests.post("https://scratch.mit.edu/classes/register_new_student/", - data=data, headers=commons.headers, cookies={"scratchcsrftoken": 'a'}) +def register_by_token( + class_id: int, + class_token: str, + username: str, + password: str, + birth_month: int, + birth_year: int, + gender: str, + country: str, + is_robot: bool = False, +) -> None: + data = { + "classroom_id": class_id, + "classroom_token": class_token, + "username": username, + "password": password, + "birth_month": birth_month, + "birth_year": birth_year, + "gender": gender, + "country": country, + "is_robot": is_robot, + } + + response = requests.post( + "https://scratch.mit.edu/classes/register_new_student/", + data=data, + headers=commons.headers, + cookies={"scratchcsrftoken": "a"}, + ) ret = response.json()[0] if "username" in ret: diff --git a/scratchattach/site/user.py b/scratchattach/site/user.py index a36506f7..1543ebe4 100644 --- a/scratchattach/site/user.py +++ b/scratchattach/site/user.py @@ -553,28 +553,11 @@ def loves( Returns: list: The user's loved projects """ - # We need to use beautifulsoup webscraping so we cant use the api_iterative function - if offset < 0: - raise exceptions.BadRequest("offset parameter must be >= 0") - if limit < 0: - raise exceptions.BadRequest("limit parameter must be >= 0") - - # There are 40 projects on display per page - # So the first page you need to view is 1 + offset // 40 - # (You have to add one because the first page is idx 1 instead of 0) - - # The final project to view is at idx offset + limit - 1 - # (You have to -1 because the index starts at 0) - # So the page number for this is 1 + (offset + limit - 1) // 40 - - # But this is a range so we have to add another 1 for the second argument - pages = range(1 + offset // 40, 2 + (offset + limit - 1) // 40) _projects = [] - for page in pages: - # The index of the first project on page #n is just (n-1) * 40 - first_idx = (page - 1) * 40 - + for page, page_slice in commons.enumerate_pages( + offset=offset, limit=limit, items_per_page=40, start_page_index=1 + ): with requests.no_error_handling(): page_content = requests.get( f"https://scratch.mit.edu/projects/all/{self.username}/loves/" @@ -584,21 +567,16 @@ def loves( soup = BeautifulSoup(page_content, "html.parser") - # We need to check if we are out of bounds - # If we are, we can jump out early - # This is detectable if Scratch gives you a '404' - - # We can't just detect if the 404 text is within the whole of the page content - # because it would break if someone made a project with that name + # Out of bounds is checkable with 404. Can let you exit early. + # Check for specific

so projects called `404` don't break the implementation + # TODO: make a test case which checks this edge-case - # This page only uses

tags for the 404 text, so we can just use a soup for those h1_tag = soup.find("h1") if h1_tag is not None: - # Just to confirm that it's a 404, in case I am wrong. It can't hurt if "Whoops! Our server is Scratch'ing its head" in h1_tag.text: break - # Each project element is a list item with the class name 'project thumb item' so we can just use that + # Each project element is a
  • with the class name 'project thumb item' for i, project_element in enumerate( soup.find_all("li", {"class": "project thumb item"}) ): @@ -606,8 +584,7 @@ def loves( # The current project idx = first_idx + i # We want to start at {offset} and end at {offset + limit} - # So the offset <= current project idx <= offset + limit - if offset <= first_idx + i <= offset + limit: + if i in range(*page_slice.indices(40)): # Each of these elements provides: # A project id # A thumbnail link (no need to webscrape this) @@ -619,9 +596,8 @@ def loves( # 1st contains tag # 2nd contains project title # 3rd links to the author & contains their username + # NOTE: this can also include a 4th element if the user is a scratch member - # This function is pretty handy! - # I'll use it for an id from a string like: /projects/1070616180/ first_anchor = project_anchors[0] second_anchor = project_anchors[1] third_anchor = project_anchors[2] @@ -634,8 +610,6 @@ def loves( title = second_anchor.contents[0] author = third_anchor.contents[0] - # Instantiating a project with the properties that we know - # This may cause issues (see below) _project = project.Project( id=project_id, _session=self._session, diff --git a/scratchattach/utils/commons.py b/scratchattach/utils/commons.py index a24eef0c..5ead8ec1 100644 --- a/scratchattach/utils/commons.py +++ b/scratchattach/utils/commons.py @@ -1,9 +1,21 @@ """v2 ready: Common functions used by various internal modules""" + from __future__ import annotations import string -from typing import Optional, Final, Any, TypeVar, Callable, TYPE_CHECKING, Union, overload +from typing_extensions import ( + Iterable, + Iterator, + Optional, + Final, + Any, + TypeVar, + Callable, + TYPE_CHECKING, + Union, + overload, +) from threading import Event as ManualResetEvent from threading import Lock @@ -15,60 +27,65 @@ headers: Final = { "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " - "(KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36", + "(KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36", "x-csrftoken": "a", "x-requested-with": "XMLHttpRequest", "referer": "https://scratch.mit.edu", } empty_project_json: Final = { - 'targets': [ + "targets": [ { - 'isStage': True, - 'name': 'Stage', - 'variables': { - '`jEk@4|i[#Fk?(8x)AV.-my variable': [ - 'my variable', + "isStage": True, + "name": "Stage", + "variables": { + "`jEk@4|i[#Fk?(8x)AV.-my variable": [ + "my variable", 0, ], }, - 'lists': {}, - 'broadcasts': {}, - 'blocks': {}, - 'comments': {}, - 'currentCostume': 0, - 'costumes': [ + "lists": {}, + "broadcasts": {}, + "blocks": {}, + "comments": {}, + "currentCostume": 0, + "costumes": [ { - 'name': '', - 'bitmapResolution': 1, - 'dataFormat': 'svg', - 'assetId': '14e46ec3e2ba471c2adfe8f119052307', - 'md5ext': '14e46ec3e2ba471c2adfe8f119052307.svg', - 'rotationCenterX': 0, - 'rotationCenterY': 0, + "name": "", + "bitmapResolution": 1, + "dataFormat": "svg", + "assetId": "14e46ec3e2ba471c2adfe8f119052307", + "md5ext": "14e46ec3e2ba471c2adfe8f119052307.svg", + "rotationCenterX": 0, + "rotationCenterY": 0, }, ], - 'sounds': [], - 'volume': 100, - 'layerOrder': 0, - 'tempo': 60, - 'videoTransparency': 50, - 'videoState': 'on', - 'textToSpeechLanguage': None, + "sounds": [], + "volume": 100, + "layerOrder": 0, + "tempo": 60, + "videoTransparency": 50, + "videoState": "on", + "textToSpeechLanguage": None, }, ], - 'monitors': [], - 'extensions': [], - 'meta': { - 'semver': '3.0.0', - 'vm': '2.3.0', - 'agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) ' - 'Chrome/124.0.0.0 Safari/537.36', + "monitors": [], + "extensions": [], + "meta": { + "semver": "3.0.0", + "vm": "2.3.0", + "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/124.0.0.0 Safari/537.36", }, } -def api_iterative_data(fetch_func: Callable[[int, int], list], limit: int, offset: int, max_req_limit: int = 40, - unpack: bool = True) -> list: +def api_iterative_data( + fetch_func: Callable[[int, int], list], + limit: int, + offset: int, + max_req_limit: int = 40, + unpack: bool = True, +) -> list: """ Iteratively gets data by calling fetch_func with a moving offset and a limit. Once fetch_func returns None, the retrieval is completed. @@ -96,8 +113,16 @@ def api_iterative_data(fetch_func: Callable[[int, int], list], limit: int, offse return api_data -def api_iterative(url: str, *, limit: int, offset: int, max_req_limit: int = 40, add_params: str = "", - _headers: Optional[dict] = None, cookies: Optional[dict] = None): +def api_iterative( + url: str, + *, + limit: int, + offset: int, + max_req_limit: int = 40, + add_params: str = "", + _headers: Optional[dict] = None, + cookies: Optional[dict] = None, +): """ Function for getting data from one of Scratch's iterative JSON API endpoints (like /users//followers, or /users//projects) """ @@ -116,7 +141,10 @@ def fetch(off: int, lim: int): Performs a single API request """ resp = requests.get( - f"{url}?limit={lim}&offset={off}{add_params}", headers=_headers, cookies=cookies, timeout=10 + f"{url}?limit={lim}&offset={off}{add_params}", + headers=_headers, + cookies=cookies, + timeout=10, ).json() if not resp: @@ -130,11 +158,15 @@ def fetch(off: int, lim: int): ) return api_data -def _get_object(identificator_name, identificator, __class: type[C], NotFoundException, session=None) -> C: + +def _get_object( + identificator_name, identificator, __class: type[C], NotFoundException, session=None +) -> C: # Internal function: Generalization of the process ran by get_user, get_studio etc. # Builds an object of class that is inheriting from BaseSiteComponent # # Class must inherit from BaseSiteComponent from scratchattach.site import project + try: use_class: type = __class if __class is project.PartialProject: @@ -145,12 +177,20 @@ def _get_object(identificator_name, identificator, __class: type[C], NotFoundExc if r == "429": raise exceptions.Response429( "Your network is blocked or rate-limited by Scratch.\n" - "If you're using an online IDE like replit.com, try running the code on your computer.") + "If you're using an online IDE like replit.com, try running the code on your computer." + ) if not r: # Target is unshared. The cases that this can happen in are hardcoded: - if __class is project.PartialProject: # Case: Target is an unshared project. - _object = project.PartialProject(**{identificator_name: identificator, - "shared": False, "_session": session}) + if ( + __class is project.PartialProject + ): # Case: Target is an unshared project. + _object = project.PartialProject( + **{ + identificator_name: identificator, + "shared": False, + "_session": session, + } + ) assert isinstance(_object, __class) return _object else: @@ -162,23 +202,31 @@ def _get_object(identificator_name, identificator, __class: type[C], NotFoundExc except Exception as e: raise e + I = TypeVar("I") + + @overload def webscrape_count(raw: str, text_before: str, text_after: str, cls: type[I]) -> I: pass + @overload def webscrape_count(raw: str, text_before: str, text_after: str) -> int: pass -def webscrape_count(raw, text_before, text_after, cls = int): + +def webscrape_count(raw, text_before, text_after, cls=int): return cls(raw.split(text_before)[1].split(text_after)[0]) if TYPE_CHECKING: C = TypeVar("C", bound=_base.BaseSiteComponent) -def parse_object_list(raw, /, __class: type[C], session=None, primary_key="id") -> list[C]: + +def parse_object_list( + raw, /, __class: type[C], session=None, primary_key="id" +) -> list[C]: results = [] for raw_dict in raw: try: @@ -187,7 +235,12 @@ def parse_object_list(raw, /, __class: type[C], session=None, primary_key="id") _obj._update_from_dict(raw_dict) results.append(_obj) except Exception as e: - print("Warning raised by scratchattach: failed to parse ", raw_dict, "error", e) + print( + "Warning raised by scratchattach: failed to parse ", + raw_dict, + "error", + e, + ) return results @@ -195,15 +248,19 @@ class LockEvent: """ Can be waited on and triggered. Not to be confused with threading.Event, which has to be reset. """ + _event: ManualResetEvent _locks: list[Lock] _access_locks: Lock + def __init__(self): self._event = ManualResetEvent() self._locks = [] self._access_locks = Lock() - def wait(self, blocking: bool = True, timeout: Optional[Union[int, float]] = None) -> bool: + def wait( + self, blocking: bool = True, timeout: Optional[Union[int, float]] = None + ) -> bool: """ Wait for the event. """ @@ -233,12 +290,13 @@ def on(self) -> Lock: lock.acquire(timeout=0) return lock + def get_class_sort_mode(mode: str) -> tuple[str, str]: """ Returns the sort mode for the given mode for classes only """ - ascsort = '' - descsort = '' + ascsort = "" + descsort = "" mode = mode.lower() if mode == "last created": @@ -261,3 +319,48 @@ def b62_decode(s: str): ret = ret * 62 + chars.index(char) return ret + + +def enumerate_pages( + *, offset: int, limit: int, items_per_page: int, start_page_index: int +) -> Iterable[tuple[int, slice]]: + """ + Converts an offset+limit into an iterable of page indexes and starting index of each item on each page + + Keyword Arguments: + offset: the starting item index (usually 0, but not defaulted) + limit: the number of items to look for + items_per_page: the number of items on a singular page + start_page_index: the index associated with the first page. Usually either 0 or 1. Depends on the page you are viewing + """ + if offset < 0: + raise ValueError(f"{offset=}, expected offset > 0") + if limit < 0: + raise ValueError(f"{limit=}, expected limit > 0") + + # There are n items on display per page + # So the first page you need to view is start_page_index + offset // items_per_page + # (You may have to add one because the first page is idx 1 instead of 0) + + # The final item to view is at idx offset + limit - 1 + # (You have to -1 because the index starts at 0) + # So the page number for this is start_page_index + (offset + limit - 1) // items_per_page + + # But this is a range so we have to add another 1 for the second argument + pages = range( + start_page_index + offset // items_per_page, + 1 + start_page_index + (offset + limit - 1) // items_per_page, + ) + + # The index of the first item on page #i is just (i-1) * items_per_page + for i in pages: + start_i = (i - start_page_index) * items_per_page + + # we can generate a slice object to index our lists so we can make the offset+limit as accurate as possible + page_slice = slice(items_per_page) + if start_i == 0: + page_slice = slice(offset, items_per_page) + if start_i + items_per_page > offset + limit: + page_slice = slice(0, offset + limit - start_i) + + yield i, page_slice diff --git a/tests/test_classroom.py b/tests/test_classroom.py new file mode 100644 index 00000000..3568df88 --- /dev/null +++ b/tests/test_classroom.py @@ -0,0 +1,20 @@ +import scratchattach as sa +import util + + +def test_classroom(): + sess = util.teacher_session() + if not sess: + return + + classes = sess.mystuff_classes() + assert len(classes) > 0 + room = classes[0] + room = sa.get_classroom("23448") + names = room.student_names(offset=2, limit=62) + print(len(names)) + print(names) + + +if __name__ == "__main__": + test_classroom()