|
19 | 19 |
|
20 | 20 | import argparse |
21 | 21 | import time |
| 22 | +from http import HTTPStatus |
22 | 23 | from typing import Any, Dict, List, Optional |
23 | 24 |
|
24 | 25 | import mlx.core as mx |
25 | 26 | import torch |
26 | 27 | import zmq |
| 28 | +from jinja2 import TemplateError |
27 | 29 | from mlx_lm.server import convert_chat, process_message_content |
28 | 30 |
|
29 | 31 | from parallax.p2p.message_util import ( |
@@ -328,29 +330,31 @@ def _join_requests(self, left_reqs: List[Request], right_reqs: List[Request]): |
328 | 330 |
|
329 | 331 | def recv_requests_from_http(self) -> List[Request]: |
330 | 332 | """Receives requests from http frontend""" |
331 | | - if self.tp_rank == 0: |
332 | | - recv_reqs = [] |
333 | | - while True: |
334 | | - try: |
335 | | - raw_request = self.recv_from_ipc_socket.recv_pyobj(zmq.NOBLOCK) |
| 333 | + if self.tp_rank != 0: |
| 334 | + return None |
336 | 335 |
|
337 | | - # Check if this is an abort request |
338 | | - if isinstance(raw_request, dict) and raw_request.get("type") == "abort": |
339 | | - logger.debug( |
340 | | - f"Received abort request from HTTP for request ID: {raw_request.get('rid')}" |
341 | | - ) |
342 | | - self.scheduler.cancel_request(raw_request.get("rid")) |
343 | | - else: |
344 | | - # Normal request processing - do tokenization and form InitialRequest |
345 | | - req = self._handle_raw_request(raw_request) |
346 | | - recv_reqs.append(req) |
347 | | - except zmq.ZMQError: |
348 | | - break |
349 | | - except Exception as e: |
350 | | - logger.exception(f"Error receiving http request: {e}") |
351 | | - else: |
352 | | - recv_reqs = None |
| 336 | + recv_reqs = [] |
| 337 | + while True: |
| 338 | + try: |
| 339 | + raw_request = self.recv_from_ipc_socket.recv_pyobj(zmq.NOBLOCK) |
353 | 340 |
|
| 341 | + # Check if this is an abort request |
| 342 | + if isinstance(raw_request, dict) and raw_request.get("type") == "abort": |
| 343 | + logger.debug( |
| 344 | + f"Received abort request from HTTP for request ID: {raw_request.get('rid')}" |
| 345 | + ) |
| 346 | + self.scheduler.cancel_request(raw_request.get("rid")) |
| 347 | + else: |
| 348 | + # Normal request processing - do tokenization and form InitialRequest |
| 349 | + req = self._handle_raw_request(raw_request) |
| 350 | + recv_reqs.append(req) |
| 351 | + except zmq.ZMQError: |
| 352 | + break |
| 353 | + except Exception as e: |
| 354 | + logger.exception(f"Error receiving http request: {e}") |
| 355 | + self._notify_http_request_error(raw_request, e) |
| 356 | + if recv_reqs: |
| 357 | + logger.debug(f"Received {len(recv_reqs)} HTTP requests") |
354 | 358 | return recv_reqs |
355 | 359 |
|
356 | 360 | def recv_requests_from_peer(self) -> List[Request]: |
@@ -775,6 +779,34 @@ def _handle_raw_request(self, raw_request: Dict): |
775 | 779 | req.routing_table = raw_request["routing_table"] |
776 | 780 | return req |
777 | 781 |
|
| 782 | + def _notify_http_request_error(self, raw_request: Optional[Dict], error: Exception): |
| 783 | + """Best-effort notification to HTTP server when request parsing fails.""" |
| 784 | + if not hasattr(self, "send_to_ipc_socket") or self.send_to_ipc_socket is None: |
| 785 | + return |
| 786 | + if not isinstance(raw_request, dict): |
| 787 | + return |
| 788 | + rid = raw_request.get("rid") |
| 789 | + if rid is None: |
| 790 | + return |
| 791 | + |
| 792 | + is_template_error = isinstance(error, TemplateError) |
| 793 | + status = ( |
| 794 | + HTTPStatus.BAD_REQUEST |
| 795 | + if isinstance(error, ValueError) or is_template_error |
| 796 | + else HTTPStatus.INTERNAL_SERVER_ERROR |
| 797 | + ) |
| 798 | + payload = { |
| 799 | + "type": "error", |
| 800 | + "rid": rid, |
| 801 | + "error": str(error), |
| 802 | + "error_type": error.__class__.__name__, |
| 803 | + "status_code": status.value, |
| 804 | + } |
| 805 | + try: |
| 806 | + self.send_to_ipc_socket.send_pyobj(payload) |
| 807 | + except Exception: # pragma: no cover - best effort notification |
| 808 | + logger.debug("Failed to send error notification to HTTP handler", exc_info=True) |
| 809 | + |
778 | 810 | def _handle_cuda_input_requests(self, requests: List[Request]): |
779 | 811 | """ |
780 | 812 | Cuda specialized handle function. |
|
0 commit comments