|
1 | 1 | import json |
| 2 | +import os |
2 | 3 | from typing import Any, Mapping, cast, overload |
3 | 4 |
|
4 | 5 | import tornado |
|
7 | 8 | from tornado.httputil import HTTPHeaders |
8 | 9 |
|
9 | 10 | from ..utils.notebook_generator import NotebookGenerator |
| 11 | +from ..utils.utils import first |
10 | 12 | from .base_handler import BaseHandler |
11 | 13 |
|
12 | | -LEETCODE_GRAPHQL_URL = "https://leetcode.com/graphql" |
| 14 | +LEETCODE_URL = "https://leetcode.com" |
| 15 | +LEETCODE_GRAPHQL_URL = f"{LEETCODE_URL}/graphql" |
13 | 16 |
|
14 | 17 | type QueryType = dict[str, str | Mapping[str, Any]] |
15 | 18 |
|
@@ -81,6 +84,25 @@ async def graphql_multi( |
81 | 84 | else: |
82 | 85 | return cast("dict[str, HTTPResponse]", responses) |
83 | 86 |
|
| 87 | + async def request_api(self, url: str, method: str, body: Mapping[str, Any]): |
| 88 | + self.log.debug(f"Requesting LeetCode API: {url} with method {method}") |
| 89 | + client = AsyncHTTPClient() |
| 90 | + req = HTTPRequest( |
| 91 | + url=f"{LEETCODE_URL}{url}", |
| 92 | + method=method, |
| 93 | + headers=HTTPHeaders(self.settings.get("leetcode_headers", {})), |
| 94 | + body=json.dumps(body), |
| 95 | + ) |
| 96 | + try: |
| 97 | + resp = await client.fetch(req) |
| 98 | + except Exception as e: |
| 99 | + self.log.error(f"Error requesting LeetCode API: {e}") |
| 100 | + self.set_status(500) |
| 101 | + self.finish(json.dumps({"message": "Failed to request LeetCode API"})) |
| 102 | + return None |
| 103 | + else: |
| 104 | + return json.loads(resp.body) if resp.body else {} |
| 105 | + |
84 | 106 | async def get_question_detail(self, title_slug: str) -> dict[str, Any]: |
85 | 107 | resp = await self.graphql( |
86 | 108 | name="question_detail", |
@@ -358,3 +380,85 @@ async def post(self): |
358 | 380 |
|
359 | 381 | file_path = notebook_generator.generate(question) |
360 | 382 | self.finish({"filePath": file_path}) |
| 383 | + |
| 384 | + |
| 385 | +class SubmitNotebookHandler(LeetCodeHandler): |
| 386 | + route = r"notebook/submit" |
| 387 | + |
| 388 | + def get_solution(self, notebook): |
| 389 | + solution_cell = first( |
| 390 | + notebook["cells"], |
| 391 | + lambda c: c["cell_type"] == "code" and c["metadata"].get("isSolutionCode"), |
| 392 | + ) |
| 393 | + if not solution_cell: |
| 394 | + return |
| 395 | + |
| 396 | + code = "".join(solution_cell["source"]).strip() |
| 397 | + return code if not code.endswith("pass") else None |
| 398 | + |
| 399 | + async def submit(self, file_path: str): |
| 400 | + if not os.path.exists(file_path): |
| 401 | + self.set_status(404) |
| 402 | + self.finish({"message": "Notebook file not found"}) |
| 403 | + return |
| 404 | + |
| 405 | + with open(file_path, "r", encoding="utf-8") as f: |
| 406 | + notebook = json.load(f) |
| 407 | + |
| 408 | + question_info = notebook["metadata"]["leetcode_question_info"] |
| 409 | + if not question_info: |
| 410 | + self.set_status(400) |
| 411 | + self.finish({"message": "Notebook does not contain LeetCode question info"}) |
| 412 | + return |
| 413 | + |
| 414 | + question_frontend_id = question_info["questionFrontendId"] |
| 415 | + question_submit_id = question_info["questionId"] |
| 416 | + submit_url = question_info["submitUrl"] |
| 417 | + sample_testcase = question_info["sampleTestCase"] |
| 418 | + if ( |
| 419 | + not question_frontend_id |
| 420 | + or not question_submit_id |
| 421 | + or not submit_url |
| 422 | + or not sample_testcase |
| 423 | + ): |
| 424 | + self.set_status(400) |
| 425 | + self.finish({"message": "Invalid question info in notebook"}) |
| 426 | + return |
| 427 | + |
| 428 | + solution_code = self.get_solution(notebook) |
| 429 | + if not solution_code: |
| 430 | + self.set_status(400) |
| 431 | + self.finish({"message": "No solution code found in notebook"}) |
| 432 | + return |
| 433 | + |
| 434 | + resp = await self.request_api( |
| 435 | + submit_url, |
| 436 | + "POST", |
| 437 | + { |
| 438 | + "question_id": str(question_submit_id), |
| 439 | + "data_input": sample_testcase, |
| 440 | + "lang": "python3", |
| 441 | + "typed_code": solution_code, |
| 442 | + "test_mode": False, |
| 443 | + "judge_type": "large", |
| 444 | + }, |
| 445 | + ) |
| 446 | + |
| 447 | + self.finish(resp) |
| 448 | + |
| 449 | + @tornado.web.authenticated |
| 450 | + async def post(self): |
| 451 | + body = self.get_json_body() |
| 452 | + if not body: |
| 453 | + self.set_status(400) |
| 454 | + self.finish({"message": "Request body is required"}) |
| 455 | + return |
| 456 | + |
| 457 | + body = cast("dict[str, str]", body) |
| 458 | + file_path = cast(str, body.get("filePath", "")) |
| 459 | + if not file_path: |
| 460 | + self.set_status(400) |
| 461 | + self.finish({"message": "filePath is required"}) |
| 462 | + return |
| 463 | + |
| 464 | + await self.submit(file_path) |
0 commit comments