Skip to content

Commit cad6a57

Browse files
codeSamuraiiclaude
andcommitted
Implement resumable file transfers
- Switched from Redis lists to Redis Streams for chunk tracking - Added upload/download progress persistence and resumption - Implemented WebSocket /resume/{uid} endpoint for upload resumption - Added HTTP Range header support for partial downloads (206 responses) - Created resumption handler for managing transfer state - Updated JavaScript client to save/restore upload progress via localStorage - Added connection management for handling disconnections gracefully - Transfers now wait for peer reconnection within timeout window - All transfers are resumable by default (no configuration needed) Key components: - lib/store.py: Refactored to use Redis Streams with progress tracking - lib/stream_store.py: New module for stream operations (deprecated) - lib/transfer.py: Enhanced with resume_from and start_byte parameters - lib/resume.py: Handles resumption logic and state validation - lib/range_utils.py: Parses and validates HTTP Range headers - lib/connection.py: Manages peer states and reconnection windows - views/websockets.py: Added /resume endpoint for upload resumption - views/http.py: Enhanced with Range/partial content support - static/js/file-transfer.js: Added localStorage progress persistence 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent adb2bd4 commit cad6a57

File tree

12 files changed

+1132
-102
lines changed

12 files changed

+1132
-102
lines changed

lib/connection.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import anyio
2+
from typing import Optional, Dict, Any
3+
from datetime import datetime
4+
from lib.logging import HasLogging, get_logger
5+
from lib.store import Store
6+
7+
logger = get_logger('connection')
8+
9+
10+
class ConnectionManager(metaclass=HasLogging, name_from='transfer_id'):
11+
"""Manages connection states and reconnection logic for resumable transfers."""
12+
13+
RECONNECT_WINDOW = 60.0 # seconds
14+
KEEPALIVE_INTERVAL = 10.0 # seconds
15+
16+
def __init__(self, transfer_id: str, store: Store):
17+
self.transfer_id = transfer_id
18+
self.store = store
19+
self.peer_states: Dict[str, Any] = {}
20+
21+
async def register_peer(self, peer_type: str, connection_info: Dict[str, Any]) -> None:
22+
"""Register a peer connection."""
23+
state_data = {
24+
'status': 'connected',
25+
'timestamp': datetime.now().isoformat(),
26+
**connection_info
27+
}
28+
await self.store.set_peer_state(peer_type, 'connected')
29+
self.peer_states[peer_type] = state_data
30+
self.info(f"Registered {peer_type} connection")
31+
32+
async def handle_disconnect(self, peer_type: str) -> bool:
33+
"""Handle peer disconnection and return whether to wait for reconnection."""
34+
await self.store.set_peer_state(peer_type, 'disconnected')
35+
36+
other_peer = 'receiver' if peer_type == 'sender' else 'sender'
37+
other_state = await self.store.get_peer_state(other_peer)
38+
39+
if other_state in ['connected', 'uploading', 'downloading']:
40+
self.info(f"{peer_type} disconnected, keeping {other_peer} connected")
41+
return True # Wait for reconnection
42+
43+
self.warning(f"Both peers disconnected, may abandon transfer")
44+
return False
45+
46+
async def wait_for_reconnection(self, peer_type: str, timeout: Optional[float] = None) -> bool:
47+
"""Wait for a peer to reconnect within timeout."""
48+
timeout = timeout or self.RECONNECT_WINDOW
49+
self.info(f"Waiting up to {timeout}s for {peer_type} to reconnect...")
50+
51+
try:
52+
with anyio.fail_after(timeout):
53+
while True:
54+
state = await self.store.get_peer_state(peer_type)
55+
if state in ['connected', 'uploading', 'downloading', 'resuming']:
56+
self.info(f"{peer_type} reconnected successfully")
57+
return True
58+
await anyio.sleep(1.0)
59+
except TimeoutError:
60+
self.warning(f"{peer_type} did not reconnect within {timeout}s")
61+
return False
62+
63+
async def handle_reconnection(self, peer_type: str) -> Dict[str, Any]:
64+
"""Handle peer reconnection and return resume info."""
65+
await self.store.set_peer_state(peer_type, 'resuming')
66+
67+
# Check if other peer is waiting
68+
other_peer = 'receiver' if peer_type == 'sender' else 'sender'
69+
other_state = await self.store.get_peer_state(other_peer)
70+
71+
resume_info = {}
72+
if peer_type == 'sender':
73+
progress = await self.store.get_upload_progress()
74+
if progress:
75+
resume_info['bytes_uploaded'] = progress['bytes_uploaded']
76+
resume_info['last_chunk_id'] = progress['last_chunk_id']
77+
else:
78+
progress = await self.store.get_download_progress()
79+
if progress:
80+
resume_info['bytes_downloaded'] = progress['bytes_downloaded']
81+
resume_info['last_read_id'] = progress['last_read_id']
82+
83+
resume_info['other_peer_state'] = other_state
84+
resume_info['can_resume'] = True
85+
86+
self.info(f"{peer_type} reconnection handled, resume info: {resume_info}")
87+
return resume_info
88+
89+
async def keepalive_loop(self, peer_type: str) -> None:
90+
"""Send keepalive signals to maintain connection state."""
91+
while True:
92+
try:
93+
await anyio.sleep(self.KEEPALIVE_INTERVAL)
94+
state = await self.store.get_peer_state(peer_type)
95+
if state not in ['connected', 'uploading', 'downloading']:
96+
break
97+
# Update timestamp to show peer is still alive
98+
await self.store.set_peer_state(peer_type, state)
99+
except Exception as e:
100+
self.error(f"Keepalive error for {peer_type}: {e}")
101+
break
102+
103+
async def check_peer_health(self, peer_type: str) -> bool:
104+
"""Check if a peer connection is healthy."""
105+
state = await self.store.get_peer_state(peer_type)
106+
return state in ['connected', 'uploading', 'downloading', 'resuming']
107+
108+
async def coordinate_resume(self) -> bool:
109+
"""Coordinate resume between both peers."""
110+
sender_state = await self.store.get_peer_state('sender')
111+
receiver_state = await self.store.get_peer_state('receiver')
112+
113+
if sender_state in ['resuming', 'connected'] and receiver_state in ['resuming', 'connected']:
114+
self.info("Both peers ready to resume transfer")
115+
await self.store.set_event('resume_transfer')
116+
return True
117+
118+
self.debug(f"Cannot resume yet - sender: {sender_state}, receiver: {receiver_state}")
119+
return False
120+
121+
async def cleanup_on_error(self) -> None:
122+
"""Clean up connection states on error."""
123+
await self.store.set_peer_state('sender', 'error')
124+
await self.store.set_peer_state('receiver', 'error')
125+
self.warning("Connection states cleaned up due to error")

lib/range_utils.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
from typing import Optional, Tuple, List
2+
from dataclasses import dataclass
3+
4+
5+
@dataclass
6+
class RangeRequest:
7+
"""Represents a parsed HTTP Range request."""
8+
start: int
9+
end: Optional[int]
10+
total_size: Optional[int] = None
11+
12+
@property
13+
def length(self) -> Optional[int]:
14+
"""Calculate the length of the requested range."""
15+
if self.end is not None:
16+
return self.end - self.start + 1
17+
elif self.total_size is not None:
18+
return self.total_size - self.start
19+
return None
20+
21+
def to_content_range(self, total_size: int) -> str:
22+
"""Generate Content-Range header value."""
23+
end = self.end if self.end is not None else total_size - 1
24+
return f"bytes {self.start}-{end}/{total_size}"
25+
26+
27+
class RangeParser:
28+
"""Parses and validates HTTP Range headers."""
29+
30+
@staticmethod
31+
def parse_range_header(range_header: Optional[str], file_size: int) -> Optional[RangeRequest]:
32+
"""Parse Range header and return RangeRequest object."""
33+
if not range_header or not range_header.startswith('bytes='):
34+
return None
35+
36+
try:
37+
range_spec = range_header[6:] # Remove 'bytes='
38+
39+
if ',' in range_spec:
40+
# Multiple ranges not supported for now
41+
return None
42+
43+
if '-' not in range_spec:
44+
return None
45+
46+
parts = range_spec.split('-', 1)
47+
start_str, end_str = parts[0], parts[1]
48+
49+
# Handle suffix-length syntax (e.g., "-500" for last 500 bytes)
50+
if not start_str and end_str:
51+
suffix_length = int(end_str)
52+
start = max(0, file_size - suffix_length)
53+
end = file_size - 1
54+
return RangeRequest(start=start, end=end, total_size=file_size)
55+
56+
# Handle normal range
57+
start = int(start_str) if start_str else 0
58+
end = int(end_str) if end_str else file_size - 1
59+
60+
# Validate range
61+
if start < 0 or start >= file_size:
62+
return None
63+
if end >= file_size:
64+
end = file_size - 1
65+
if start > end:
66+
return None
67+
68+
return RangeRequest(start=start, end=end, total_size=file_size)
69+
70+
except (ValueError, IndexError):
71+
return None
72+
73+
@staticmethod
74+
def validate_if_range(if_range_header: Optional[str], etag: Optional[str]) -> bool:
75+
"""Validate If-Range header against ETag."""
76+
if not if_range_header or not etag:
77+
return True # No validation needed
78+
return if_range_header == etag
79+
80+
@staticmethod
81+
def calculate_chunk_range(chunk_index: int, chunk_size: int, byte_offset: int) -> Tuple[int, int]:
82+
"""Calculate byte range for a specific chunk."""
83+
chunk_start = chunk_index * chunk_size
84+
chunk_end = chunk_start + chunk_size - 1
85+
86+
if chunk_start < byte_offset:
87+
# Partial chunk at the beginning
88+
return byte_offset, chunk_end
89+
return chunk_start, chunk_end
90+
91+
@staticmethod
92+
def is_partial_content(range_request: Optional[RangeRequest]) -> bool:
93+
"""Check if this is a partial content request."""
94+
return range_request is not None and (
95+
range_request.start > 0 or
96+
(range_request.end is not None and range_request.total_size is not None and
97+
range_request.end < range_request.total_size - 1)
98+
)
99+
100+
@staticmethod
101+
def create_content_headers(range_request: RangeRequest, file_type: str) -> dict:
102+
"""Create response headers for partial content."""
103+
headers = {
104+
'Content-Type': file_type,
105+
'Accept-Ranges': 'bytes',
106+
'Content-Range': range_request.to_content_range(range_request.total_size)
107+
}
108+
109+
if range_request.length:
110+
headers['Content-Length'] = str(range_request.length)
111+
112+
return headers

lib/resume.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
from typing import Optional, Tuple
2+
from lib.store import Store
3+
from lib.metadata import FileMetadata
4+
from lib.logging import HasLogging, get_logger
5+
6+
logger = get_logger('resume')
7+
8+
9+
class ResumptionHandler(metaclass=HasLogging, name_from='transfer_id'):
10+
"""Handles transfer resumption logic."""
11+
12+
def __init__(self, transfer_id: str, store: Store):
13+
self.transfer_id = transfer_id
14+
self.store = store
15+
16+
async def can_resume_upload(self) -> bool:
17+
"""Check if an upload can be resumed."""
18+
progress = await self.store.get_upload_progress()
19+
if not progress:
20+
return False
21+
22+
sender_state = await self.store.get_peer_state('sender')
23+
return sender_state in ['paused', 'disconnected', 'incomplete']
24+
25+
async def can_resume_download(self) -> bool:
26+
"""Check if a download can be resumed."""
27+
progress = await self.store.get_download_progress()
28+
if not progress:
29+
return False
30+
31+
receiver_state = await self.store.get_peer_state('receiver')
32+
return receiver_state in ['paused', 'disconnected', 'sender_disconnected']
33+
34+
async def get_upload_resume_info(self) -> Tuple[int, str]:
35+
"""Get upload resume position and last chunk ID."""
36+
progress = await self.store.get_upload_progress()
37+
if progress:
38+
return progress['bytes_uploaded'], progress['last_chunk_id']
39+
return 0, '0'
40+
41+
async def get_download_resume_info(self) -> Tuple[int, str]:
42+
"""Get download resume position and last read ID."""
43+
progress = await self.store.get_download_progress()
44+
if progress:
45+
return progress['bytes_downloaded'], progress['last_read_id']
46+
return 0, '0'
47+
48+
async def prepare_upload_resume(self) -> dict:
49+
"""Prepare upload for resumption and return resume info."""
50+
bytes_uploaded, last_chunk_id = await self.get_upload_resume_info()
51+
52+
await self.store.set_peer_state('sender', 'resuming')
53+
54+
return {
55+
'resume_from': bytes_uploaded,
56+
'last_chunk_id': last_chunk_id,
57+
'can_resume': True
58+
}
59+
60+
async def prepare_download_resume(self, range_header: Optional[str] = None) -> dict:
61+
"""Prepare download for resumption and return resume info."""
62+
if range_header:
63+
start_byte = self._parse_range_header(range_header)
64+
else:
65+
bytes_downloaded, _ = await self.get_download_resume_info()
66+
start_byte = bytes_downloaded
67+
68+
await self.store.set_peer_state('receiver', 'resuming')
69+
70+
return {
71+
'start_byte': start_byte,
72+
'can_resume': True,
73+
'total_size': None # Will be filled from metadata
74+
}
75+
76+
def _parse_range_header(self, range_header: str) -> int:
77+
"""Parse Range header to get start byte position."""
78+
if not range_header or not range_header.startswith('bytes='):
79+
return 0
80+
81+
try:
82+
range_spec = range_header[6:] # Remove 'bytes='
83+
if '-' in range_spec:
84+
start, end = range_spec.split('-', 1)
85+
return int(start) if start else 0
86+
except (ValueError, IndexError):
87+
pass
88+
89+
return 0
90+
91+
async def validate_resume_request(self, file: FileMetadata) -> bool:
92+
"""Validate that resume request matches original transfer."""
93+
stored_metadata = await self.store.get_metadata()
94+
if not stored_metadata:
95+
return False
96+
97+
try:
98+
stored_file = FileMetadata.from_json(stored_metadata)
99+
return (stored_file.name == file.name and
100+
stored_file.size == file.size and
101+
stored_file.type == file.type)
102+
except Exception:
103+
return False
104+
105+
async def handle_peer_reconnection(self, peer_type: str) -> None:
106+
"""Handle when a peer reconnects."""
107+
other_peer = 'receiver' if peer_type == 'sender' else 'sender'
108+
other_state = await self.store.get_peer_state(other_peer)
109+
110+
if other_state == 'waiting':
111+
self.info(f"Both peers reconnected, resuming transfer")
112+
await self.store.set_event('resume_transfer')
113+
114+
async def cleanup_stale_transfers(self, max_age_seconds: int = 3600) -> None:
115+
"""Clean up stale transfer data older than max_age."""
116+
# This would be called periodically to clean up abandoned transfers
117+
# Implementation depends on Redis TTL or timestamp tracking
118+
pass

0 commit comments

Comments
 (0)