diff --git a/src/claude_code_transcripts/__init__.py b/src/claude_code_transcripts/__init__.py index e4854a3..b01f532 100644 --- a/src/claude_code_transcripts/__init__.py +++ b/src/claude_code_transcripts/__init__.py @@ -1295,24 +1295,23 @@ def generate_index_pagination_html(total_pages): return _macros.index_pagination(total_pages) -def generate_html(json_path, output_dir, github_repo=None): - output_dir = Path(output_dir) - output_dir.mkdir(exist_ok=True) +def render_session_pages(data, github_repo=None): + """Render session data to HTML pages in memory. - # Load session file (supports both JSON and JSONL) - data = parse_session_file(json_path) + Args: + data: Parsed session data dict with "loglines" key + github_repo: Optional "owner/repo" string for commit links + Returns dict with: + "pages": {"index.html": str, "page-001.html": str, ...} + "meta": {"total_pages": int, "total_prompts": int, "total_messages": int, + "total_tool_calls": int, "total_commits": int, "github_repo": str|None} + """ loglines = data.get("loglines", []) # Auto-detect GitHub repo if not provided if github_repo is None: github_repo = detect_github_repo(loglines) - if github_repo: - print(f"Auto-detected GitHub repo: {github_repo}") - else: - print( - "Warning: Could not auto-detect GitHub repo. Commit links will be disabled." - ) # Set module-level variable for render functions global _github_repo @@ -1354,6 +1353,8 @@ def generate_html(json_path, output_dir, github_repo=None): total_convs = len(conversations) total_pages = (total_convs + PROMPTS_PER_PAGE - 1) // PROMPTS_PER_PAGE + pages = {} + for page_num in range(1, total_pages + 1): start_idx = (page_num - 1) * PROMPTS_PER_PAGE end_idx = min(start_idx + PROMPTS_PER_PAGE, total_convs) @@ -1379,10 +1380,7 @@ def generate_html(json_path, output_dir, github_repo=None): pagination_html=pagination_html, messages_html="".join(messages_html), ) - (output_dir / f"page-{page_num:03d}.html").write_text( - page_content, encoding="utf-8" - ) - print(f"Generated page-{page_num:03d}.html") + pages[f"page-{page_num:03d}.html"] = page_content # Calculate overall stats and collect all commits for timeline total_tool_counts = {} @@ -1463,10 +1461,168 @@ def generate_html(json_path, output_dir, github_repo=None): total_pages=total_pages, index_items_html="".join(index_items), ) - index_path = output_dir / "index.html" - index_path.write_text(index_content, encoding="utf-8") + pages["index.html"] = index_content + + return { + "pages": pages, + "meta": { + "total_pages": total_pages, + "total_prompts": prompt_num, + "total_messages": total_messages, + "total_tool_calls": total_tool_calls, + "total_commits": total_commits, + "github_repo": github_repo, + }, + } + + +def write_pages_to_dir(pages, output_dir): + """Write rendered pages dict to an output directory.""" + output_dir = Path(output_dir) + output_dir.mkdir(exist_ok=True, parents=True) + for filename, content in pages.items(): + (output_dir / filename).write_text(content, encoding="utf-8") + + +def import_session_file_to_db( + session_path, project_name=None, project_key=None, db_path=None, github_repo=None +): + """Import a single session file into the SQLite database. + + Returns (session_id, project_name, result) where result is the render_session_pages output. + Skips if already present. Returns None for result if skipped. + """ + from claude_code_transcripts.db import ( + get_db, + session_exists, + import_session, + DEFAULT_DB_PATH, + ) + + session_path = Path(session_path) + session_id = session_path.stem + + if project_key is None: + project_key = session_path.parent.name + if project_name is None: + project_name = get_project_display_name(project_key) + + conn = get_db(db_path) + try: + if session_exists(conn, session_id): + return session_id, project_name, None + + session_json = session_path.read_text(encoding="utf-8") + data = parse_session_file(session_path) + result = render_session_pages(data, github_repo=github_repo) + + loglines = data.get("loglines", []) + created_at = loglines[0].get("timestamp") if loglines else None + summary = get_session_summary(session_path) + stat = session_path.stat() + + import_session( + conn, + session_id=session_id, + project_name=project_name, + project_key=project_key, + summary=summary, + session_json=session_json, + pages=result["pages"], + meta=result["meta"], + file_mtime=stat.st_mtime, + file_size=stat.st_size, + created_at=created_at, + ) + return session_id, project_name, result + finally: + conn.close() + + +def import_session_data_to_db( + session_data, + session_id, + project_name="web", + project_key="web", + db_path=None, + github_repo=None, +): + """Import session data dict into the SQLite database. + + Returns (session_id, project_name, result) where result is the render_session_pages output. + Skips if already present. Returns None for result if skipped. + """ + from claude_code_transcripts.db import ( + get_db, + session_exists, + import_session, + DEFAULT_DB_PATH, + ) + + conn = get_db(db_path) + try: + if session_exists(conn, session_id): + return session_id, project_name, None + + result = render_session_pages(session_data, github_repo=github_repo) + + loglines = session_data.get("loglines", []) + created_at = loglines[0].get("timestamp") if loglines else None + + # Extract summary from first user message + summary = None + for entry in loglines: + if entry.get("type") == "user": + content = entry.get("message", {}).get("content", "") + text = extract_text_from_content(content) + if text: + summary = text[:200] + break + + import_session( + conn, + session_id=session_id, + project_name=project_name, + project_key=project_key, + summary=summary, + session_json=json.dumps(session_data), + pages=result["pages"], + meta=result["meta"], + created_at=created_at, + ) + return session_id, project_name, result + finally: + conn.close() + + +def generate_html(json_path, output_dir, github_repo=None): + output_dir = Path(output_dir) + output_dir.mkdir(exist_ok=True) + + # Load session file (supports both JSON and JSONL) + data = parse_session_file(json_path) + + # Auto-detect GitHub repo if not provided + if github_repo is None: + detected = detect_github_repo(data.get("loglines", [])) + if detected: + github_repo = detected + print(f"Auto-detected GitHub repo: {github_repo}") + else: + print( + "Warning: Could not auto-detect GitHub repo. Commit links will be disabled." + ) + + result = render_session_pages(data, github_repo=github_repo) + write_pages_to_dir(result["pages"], output_dir) + + total_convs = result["meta"]["total_prompts"] + total_pages = result["meta"]["total_pages"] + for page_name in sorted(result["pages"]): + if page_name != "index.html": + print(f"Generated {page_name}") print( - f"Generated {index_path.resolve()} ({total_convs} prompts, {total_pages} pages)" + f"Generated {(output_dir / 'index.html').resolve()} ({total_convs} prompts, {total_pages} pages)" ) @@ -1556,42 +1712,52 @@ def local_cmd(output, output_auto, repo, gist, include_json, open_browser, limit session_file = selected - # Determine output directory and whether to open browser - # If no -o specified, use temp dir and open browser by default - auto_open = output is None and not gist and not output_auto - if output_auto: - # Use -o as parent dir (or current dir), with auto-named subdirectory - parent_dir = Path(output) if output else Path(".") - output = parent_dir / session_file.stem - elif output is None: - output = Path(tempfile.gettempdir()) / f"claude-session-{session_file.stem}" + # If -o is specified, use legacy file-based output + use_file_output = output is not None or output_auto or gist - output = Path(output) - generate_html(session_file, output, github_repo=repo) - - # Show output directory - click.echo(f"Output: {output.resolve()}") - - # Copy JSONL file to output directory if requested - if include_json: - output.mkdir(exist_ok=True) - json_dest = output / session_file.name - shutil.copy(session_file, json_dest) - json_size_kb = json_dest.stat().st_size / 1024 - click.echo(f"JSONL: {json_dest} ({json_size_kb:.1f} KB)") - - if gist: - # Inject gist preview JS and create gist - inject_gist_preview_js(output) - click.echo("Creating GitHub gist...") - gist_id, gist_url = create_gist(output) - preview_url = f"https://gisthost.github.io/?{gist_id}/index.html" - click.echo(f"Gist: {gist_url}") - click.echo(f"Preview: {preview_url}") - - if open_browser or auto_open: - index_url = (output / "index.html").resolve().as_uri() - webbrowser.open(index_url) + if use_file_output: + auto_open = output is None and not gist and not output_auto + if output_auto: + parent_dir = Path(output) if output else Path(".") + output = parent_dir / session_file.stem + elif output is None: + output = Path(tempfile.gettempdir()) / f"claude-session-{session_file.stem}" + + output = Path(output) + generate_html(session_file, output, github_repo=repo) + click.echo(f"Output: {output.resolve()}") + + if include_json: + output.mkdir(exist_ok=True) + json_dest = output / session_file.name + shutil.copy(session_file, json_dest) + json_size_kb = json_dest.stat().st_size / 1024 + click.echo(f"JSONL: {json_dest} ({json_size_kb:.1f} KB)") + + if gist: + inject_gist_preview_js(output) + click.echo("Creating GitHub gist...") + gist_id, gist_url = create_gist(output) + preview_url = f"https://gisthost.github.io/?{gist_id}/index.html" + click.echo(f"Gist: {gist_url}") + click.echo(f"Preview: {preview_url}") + + if open_browser or auto_open: + index_url = (output / "index.html").resolve().as_uri() + webbrowser.open(index_url) + else: + # Default: import to SQLite + session_id, project_name, result = import_session_file_to_db( + session_file, github_repo=repo + ) + if result is None: + click.echo(f"Session {session_id} already in database.") + else: + click.echo(f"Imported {session_id} into database.") + click.echo(f"View: http://127.0.0.1:8080/{project_name}/{session_id}/") + + if open_browser: + webbrowser.open(f"http://127.0.0.1:8080/{project_name}/{session_id}/") def is_url(path): @@ -1675,54 +1841,62 @@ def json_cmd(json_file, output, output_auto, repo, gist, include_json, open_brow click.echo(f"Fetching {json_file}...") temp_file = fetch_url_to_tempfile(json_file) json_file_path = temp_file - # Use URL path for naming url_name = Path(json_file.split("?")[0]).stem or "session" else: - # Validate that local file exists json_file_path = Path(json_file) if not json_file_path.exists(): raise click.ClickException(f"File not found: {json_file}") url_name = None - # Determine output directory and whether to open browser - # If no -o specified, use temp dir and open browser by default - auto_open = output is None and not gist and not output_auto - if output_auto: - # Use -o as parent dir (or current dir), with auto-named subdirectory - parent_dir = Path(output) if output else Path(".") - output = parent_dir / (url_name or json_file_path.stem) - elif output is None: - output = ( - Path(tempfile.gettempdir()) - / f"claude-session-{url_name or json_file_path.stem}" + # If -o is specified, use legacy file-based output + use_file_output = output is not None or output_auto or gist + + if use_file_output: + auto_open = output is None and not gist and not output_auto + if output_auto: + parent_dir = Path(output) if output else Path(".") + output = parent_dir / (url_name or json_file_path.stem) + elif output is None: + output = ( + Path(tempfile.gettempdir()) + / f"claude-session-{url_name or json_file_path.stem}" + ) + + output = Path(output) + generate_html(json_file_path, output, github_repo=repo) + click.echo(f"Output: {output.resolve()}") + + if include_json: + output.mkdir(exist_ok=True) + json_dest = output / json_file_path.name + shutil.copy(json_file_path, json_dest) + json_size_kb = json_dest.stat().st_size / 1024 + click.echo(f"JSON: {json_dest} ({json_size_kb:.1f} KB)") + + if gist: + inject_gist_preview_js(output) + click.echo("Creating GitHub gist...") + gist_id, gist_url = create_gist(output) + preview_url = f"https://gisthost.github.io/?{gist_id}/index.html" + click.echo(f"Gist: {gist_url}") + click.echo(f"Preview: {preview_url}") + + if open_browser or auto_open: + index_url = (output / "index.html").resolve().as_uri() + webbrowser.open(index_url) + else: + # Default: import to SQLite + session_id, project_name, result = import_session_file_to_db( + json_file_path, github_repo=repo ) + if result is None: + click.echo(f"Session {session_id} already in database.") + else: + click.echo(f"Imported {session_id} into database.") + click.echo(f"View: http://127.0.0.1:8080/{project_name}/{session_id}/") - output = Path(output) - generate_html(json_file_path, output, github_repo=repo) - - # Show output directory - click.echo(f"Output: {output.resolve()}") - - # Copy JSON file to output directory if requested - if include_json: - output.mkdir(exist_ok=True) - json_dest = output / json_file_path.name - shutil.copy(json_file_path, json_dest) - json_size_kb = json_dest.stat().st_size / 1024 - click.echo(f"JSON: {json_dest} ({json_size_kb:.1f} KB)") - - if gist: - # Inject gist preview JS and create gist - inject_gist_preview_js(output) - click.echo("Creating GitHub gist...") - gist_id, gist_url = create_gist(output) - preview_url = f"https://gisthost.github.io/?{gist_id}/index.html" - click.echo(f"Gist: {gist_url}") - click.echo(f"Preview: {preview_url}") - - if open_browser or auto_open: - index_url = (output / "index.html").resolve().as_uri() - webbrowser.open(index_url) + if open_browser: + webbrowser.open(f"http://127.0.0.1:8080/{project_name}/{session_id}/") def resolve_credentials(token, org_uuid): @@ -1777,168 +1951,21 @@ def format_session_for_display(session_data): def generate_html_from_session_data(session_data, output_dir, github_repo=None): """Generate HTML from session data dict (instead of file path).""" - output_dir = Path(output_dir) - output_dir.mkdir(exist_ok=True, parents=True) - - loglines = session_data.get("loglines", []) - - # Auto-detect GitHub repo if not provided if github_repo is None: - github_repo = detect_github_repo(loglines) - if github_repo: + detected = detect_github_repo(session_data.get("loglines", [])) + if detected: + github_repo = detected click.echo(f"Auto-detected GitHub repo: {github_repo}") - # Set module-level variable for render functions - global _github_repo - _github_repo = github_repo - - conversations = [] - current_conv = None - for entry in loglines: - log_type = entry.get("type") - timestamp = entry.get("timestamp", "") - is_compact_summary = entry.get("isCompactSummary", False) - message_data = entry.get("message", {}) - if not message_data: - continue - # Convert message dict to JSON string for compatibility with existing render functions - message_json = json.dumps(message_data) - is_user_prompt = False - user_text = None - if log_type == "user": - content = message_data.get("content", "") - text = extract_text_from_content(content) - if text: - is_user_prompt = True - user_text = text - if is_user_prompt: - if current_conv: - conversations.append(current_conv) - current_conv = { - "user_text": user_text, - "timestamp": timestamp, - "messages": [(log_type, message_json, timestamp)], - "is_continuation": bool(is_compact_summary), - } - elif current_conv: - current_conv["messages"].append((log_type, message_json, timestamp)) - if current_conv: - conversations.append(current_conv) - - total_convs = len(conversations) - total_pages = (total_convs + PROMPTS_PER_PAGE - 1) // PROMPTS_PER_PAGE - - for page_num in range(1, total_pages + 1): - start_idx = (page_num - 1) * PROMPTS_PER_PAGE - end_idx = min(start_idx + PROMPTS_PER_PAGE, total_convs) - page_convs = conversations[start_idx:end_idx] - messages_html = [] - for conv in page_convs: - is_first = True - for log_type, message_json, timestamp in conv["messages"]: - msg_html = render_message(log_type, message_json, timestamp) - if msg_html: - # Wrap continuation summaries in collapsed details - if is_first and conv.get("is_continuation"): - msg_html = f'
Session continuation summary{msg_html}
' - messages_html.append(msg_html) - is_first = False - pagination_html = generate_pagination_html(page_num, total_pages) - page_template = get_template("page.html") - page_content = page_template.render( - css=CSS, - js=JS, - page_num=page_num, - total_pages=total_pages, - pagination_html=pagination_html, - messages_html="".join(messages_html), - ) - (output_dir / f"page-{page_num:03d}.html").write_text( - page_content, encoding="utf-8" - ) - click.echo(f"Generated page-{page_num:03d}.html") - - # Calculate overall stats and collect all commits for timeline - total_tool_counts = {} - total_messages = 0 - all_commits = [] # (timestamp, hash, message, page_num, conv_index) - for i, conv in enumerate(conversations): - total_messages += len(conv["messages"]) - stats = analyze_conversation(conv["messages"]) - for tool, count in stats["tool_counts"].items(): - total_tool_counts[tool] = total_tool_counts.get(tool, 0) + count - page_num = (i // PROMPTS_PER_PAGE) + 1 - for commit_hash, commit_msg, commit_ts in stats["commits"]: - all_commits.append((commit_ts, commit_hash, commit_msg, page_num, i)) - total_tool_calls = sum(total_tool_counts.values()) - total_commits = len(all_commits) - - # Build timeline items: prompts and commits merged by timestamp - timeline_items = [] - - # Add prompts - prompt_num = 0 - for i, conv in enumerate(conversations): - if conv.get("is_continuation"): - continue - if conv["user_text"].startswith("Stop hook feedback:"): - continue - prompt_num += 1 - page_num = (i // PROMPTS_PER_PAGE) + 1 - msg_id = make_msg_id(conv["timestamp"]) - link = f"page-{page_num:03d}.html#{msg_id}" - rendered_content = render_markdown_text(conv["user_text"]) - - # Collect all messages including from subsequent continuation conversations - # This ensures long_texts from continuations appear with the original prompt - all_messages = list(conv["messages"]) - for j in range(i + 1, len(conversations)): - if not conversations[j].get("is_continuation"): - break - all_messages.extend(conversations[j]["messages"]) + result = render_session_pages(session_data, github_repo=github_repo) + write_pages_to_dir(result["pages"], output_dir) - # Analyze conversation for stats (excluding commits from inline display now) - stats = analyze_conversation(all_messages) - tool_stats_str = format_tool_stats(stats["tool_counts"]) - - long_texts_html = "" - for lt in stats["long_texts"]: - rendered_lt = render_markdown_text(lt) - long_texts_html += _macros.index_long_text(rendered_lt) - - stats_html = _macros.index_stats(tool_stats_str, long_texts_html) - - item_html = _macros.index_item( - prompt_num, link, conv["timestamp"], rendered_content, stats_html - ) - timeline_items.append((conv["timestamp"], "prompt", item_html)) - - # Add commits as separate timeline items - for commit_ts, commit_hash, commit_msg, page_num, conv_idx in all_commits: - item_html = _macros.index_commit( - commit_hash, commit_msg, commit_ts, _github_repo - ) - timeline_items.append((commit_ts, "commit", item_html)) - - # Sort by timestamp - timeline_items.sort(key=lambda x: x[0]) - index_items = [item[2] for item in timeline_items] - - index_pagination = generate_index_pagination_html(total_pages) - index_template = get_template("index.html") - index_content = index_template.render( - css=CSS, - js=JS, - pagination_html=index_pagination, - prompt_num=prompt_num, - total_messages=total_messages, - total_tool_calls=total_tool_calls, - total_commits=total_commits, - total_pages=total_pages, - index_items_html="".join(index_items), - ) - index_path = output_dir / "index.html" - index_path.write_text(index_content, encoding="utf-8") + total_convs = result["meta"]["total_prompts"] + total_pages = result["meta"]["total_pages"] + for page_name in sorted(result["pages"]): + if page_name != "index.html": + click.echo(f"Generated {page_name}") + index_path = Path(output_dir) / "index.html" click.echo( f"Generated {index_path.resolve()} ({total_convs} prompts, {total_pages} pages)" ) @@ -2056,44 +2083,55 @@ def web_cmd( except httpx.RequestError as e: raise click.ClickException(f"Network error: {e}") - # Determine output directory and whether to open browser - # If no -o specified, use temp dir and open browser by default - auto_open = output is None and not gist and not output_auto - if output_auto: - # Use -o as parent dir (or current dir), with auto-named subdirectory - parent_dir = Path(output) if output else Path(".") - output = parent_dir / session_id - elif output is None: - output = Path(tempfile.gettempdir()) / f"claude-session-{session_id}" + # If -o is specified, use legacy file-based output + use_file_output = output is not None or output_auto or gist - output = Path(output) - click.echo(f"Generating HTML in {output}/...") - generate_html_from_session_data(session_data, output, github_repo=repo) - - # Show output directory - click.echo(f"Output: {output.resolve()}") - - # Save JSON session data if requested - if include_json: - output.mkdir(exist_ok=True) - json_dest = output / f"{session_id}.json" - with open(json_dest, "w") as f: - json.dump(session_data, f, indent=2) - json_size_kb = json_dest.stat().st_size / 1024 - click.echo(f"JSON: {json_dest} ({json_size_kb:.1f} KB)") - - if gist: - # Inject gist preview JS and create gist - inject_gist_preview_js(output) - click.echo("Creating GitHub gist...") - gist_id, gist_url = create_gist(output) - preview_url = f"https://gisthost.github.io/?{gist_id}/index.html" - click.echo(f"Gist: {gist_url}") - click.echo(f"Preview: {preview_url}") - - if open_browser or auto_open: - index_url = (output / "index.html").resolve().as_uri() - webbrowser.open(index_url) + if use_file_output: + auto_open = output is None and not gist and not output_auto + if output_auto: + parent_dir = Path(output) if output else Path(".") + output = parent_dir / session_id + elif output is None: + output = Path(tempfile.gettempdir()) / f"claude-session-{session_id}" + + output = Path(output) + click.echo(f"Generating HTML in {output}/...") + generate_html_from_session_data(session_data, output, github_repo=repo) + click.echo(f"Output: {output.resolve()}") + + if include_json: + output.mkdir(exist_ok=True) + json_dest = output / f"{session_id}.json" + with open(json_dest, "w") as f: + json.dump(session_data, f, indent=2) + json_size_kb = json_dest.stat().st_size / 1024 + click.echo(f"JSON: {json_dest} ({json_size_kb:.1f} KB)") + + if gist: + inject_gist_preview_js(output) + click.echo("Creating GitHub gist...") + gist_id, gist_url = create_gist(output) + preview_url = f"https://gisthost.github.io/?{gist_id}/index.html" + click.echo(f"Gist: {gist_url}") + click.echo(f"Preview: {preview_url}") + + if open_browser or auto_open: + index_url = (output / "index.html").resolve().as_uri() + webbrowser.open(index_url) + else: + # Default: import to SQLite + project_name = "web" + sid, project_name, result = import_session_data_to_db( + session_data, session_id, project_name=project_name, github_repo=repo + ) + if result is None: + click.echo(f"Session {session_id} already in database.") + else: + click.echo(f"Imported {session_id} into database.") + click.echo(f"View: http://127.0.0.1:8080/{project_name}/{session_id}/") + + if open_browser: + webbrowser.open(f"http://127.0.0.1:8080/{project_name}/{session_id}/") @cli.command("all") @@ -2220,5 +2258,170 @@ def on_progress(project_name, session_name, current, total): webbrowser.open(index_url) +@cli.command("serve") +@click.option( + "--db", + "db_path", + type=click.Path(), + help="SQLite database path (default: ~/.claude/transcripts.db).", +) +@click.option( + "-p", + "--port", + default=8080, + help="Port number (default: 8080).", +) +@click.option( + "--host", + default="127.0.0.1", + help="Bind address (default: 127.0.0.1).", +) +def serve_cmd(db_path, port, host): + """Serve transcripts from the SQLite database via HTTP. + + Starts a local web server that serves transcript HTML pages + directly from the SQLite database. + """ + from http.server import HTTPServer + from claude_code_transcripts.db import get_db, DEFAULT_DB_PATH + from claude_code_transcripts.serve import make_handler + + if db_path is None: + db_path = DEFAULT_DB_PATH + db_path = Path(db_path) + + if not db_path.exists(): + raise click.ClickException( + f"Database not found: {db_path}\nRun 'claude-code-transcripts sync' first." + ) + + click.echo(f"Serving transcripts from {db_path}") + click.echo(f"http://{host}:{port}/") + + handler = make_handler(db_path) + server = HTTPServer((host, port), handler) + try: + server.serve_forever() + except KeyboardInterrupt: + click.echo("\nShutting down.") + server.shutdown() + + +@cli.command("sync") +@click.option( + "-s", + "--source", + type=click.Path(), + help="Source directory containing Claude projects (default: ~/.claude/projects).", +) +@click.option( + "--db", + "db_path", + type=click.Path(), + help="SQLite database path (default: ~/.claude/transcripts.db).", +) +@click.option( + "--include-agents", + is_flag=True, + help="Include agent-* session files (excluded by default).", +) +@click.option( + "-q", + "--quiet", + is_flag=True, + help="Suppress progress output (only show final summary).", +) +def sync_cmd(source, db_path, include_agents, quiet): + """Sync local Claude Code sessions into the SQLite database. + + Scans ~/.claude/projects for JSONL session files, renders HTML, + and stores everything in a SQLite database. Sessions already in the + database are skipped (dedup by session ID). + """ + from claude_code_transcripts.db import get_db, session_exists, import_session + + # Default source folder + if source is None: + source = Path.home() / ".claude" / "projects" + else: + source = Path(source) + + if not source.exists(): + click.echo("No sessions found.") + return + + if not quiet: + click.echo(f"Scanning {source}...") + + projects = find_all_sessions(source, include_agents=include_agents) + + if not projects: + click.echo("No sessions found.") + return + + # Open/create database + conn = get_db(db_path) + + new_count = 0 + skip_count = 0 + fail_count = 0 + total_sessions = sum(len(p["sessions"]) for p in projects) + + for project in projects: + for session in project["sessions"]: + session_id = session["path"].stem + + if session_exists(conn, session_id): + skip_count += 1 + continue + + try: + # Read raw JSONL content + session_json = session["path"].read_text(encoding="utf-8") + + # Parse and render HTML + data = parse_session_file(session["path"]) + result = render_session_pages(data) + + # Extract first timestamp as created_at + loglines = data.get("loglines", []) + created_at = None + if loglines: + created_at = loglines[0].get("timestamp") + + stat = session["path"].stat() + + import_session( + conn, + session_id=session_id, + project_name=project["name"], + project_key=session["path"].parent.name, + summary=session["summary"], + session_json=session_json, + pages=result["pages"], + meta=result["meta"], + file_mtime=stat.st_mtime, + file_size=stat.st_size, + created_at=created_at, + ) + new_count += 1 + + if not quiet: + click.echo(f" Imported {project['name']}/{session_id}") + except Exception as e: + fail_count += 1 + if not quiet: + click.echo(f" Failed {project['name']}/{session_id}: {e}") + + conn.close() + + parts = [f"Synced {new_count} new sessions"] + if skip_count: + parts.append(f"{skip_count} already present") + if fail_count: + parts.append(f"{fail_count} failed") + click.echo(", ".join(parts) + ".") + + def main(): cli() diff --git a/src/claude_code_transcripts/db.py b/src/claude_code_transcripts/db.py new file mode 100644 index 0000000..d6b0cec --- /dev/null +++ b/src/claude_code_transcripts/db.py @@ -0,0 +1,195 @@ +"""SQLite database operations for storing Claude Code transcripts.""" + +import sqlite3 +from pathlib import Path + +DEFAULT_DB_PATH = Path.home() / ".claude" / "transcripts.db" + +SCHEMA_SQL = """ +CREATE TABLE IF NOT EXISTS sessions ( + session_id TEXT PRIMARY KEY, + project_name TEXT NOT NULL, + project_key TEXT NOT NULL, + summary TEXT, + session_json TEXT NOT NULL, + github_repo TEXT, + total_pages INTEGER NOT NULL DEFAULT 0, + total_prompts INTEGER NOT NULL DEFAULT 0, + total_messages INTEGER NOT NULL DEFAULT 0, + total_tool_calls INTEGER NOT NULL DEFAULT 0, + total_commits INTEGER NOT NULL DEFAULT 0, + file_mtime REAL, + file_size INTEGER, + created_at TEXT, + imported_at TEXT NOT NULL DEFAULT (datetime('now')) +); + +CREATE TABLE IF NOT EXISTS session_pages ( + session_id TEXT NOT NULL, + page_name TEXT NOT NULL, + html_content TEXT NOT NULL, + PRIMARY KEY (session_id, page_name), + FOREIGN KEY (session_id) REFERENCES sessions(session_id) +); +""" + + +def get_db(db_path=None): + """Open or create the SQLite database and ensure schema exists. + + Returns a sqlite3.Connection with WAL mode enabled. + """ + if db_path is None: + db_path = DEFAULT_DB_PATH + db_path = Path(db_path) + db_path.parent.mkdir(parents=True, exist_ok=True) + + conn = sqlite3.connect(str(db_path)) + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA foreign_keys=ON") + conn.executescript(SCHEMA_SQL) + conn.commit() + return conn + + +def session_exists(conn, session_id): + """Check if a session already exists in the database.""" + cursor = conn.execute("SELECT 1 FROM sessions WHERE session_id = ?", (session_id,)) + return cursor.fetchone() is not None + + +def import_session( + conn, + session_id, + project_name, + project_key, + summary, + session_json, + pages, + meta, + file_mtime=None, + file_size=None, + created_at=None, +): + """Import a session and its HTML pages into the database. + + Skips if session_id already exists (dedup). + + Args: + conn: sqlite3.Connection + session_id: Unique session identifier (filename stem) + project_name: Human-readable project name + project_key: Raw folder name for the project + summary: First user message preview + session_json: Raw JSONL content + pages: dict of {page_name: html_content} + meta: dict with total_pages, total_prompts, total_messages, + total_tool_calls, total_commits, github_repo + file_mtime: Original file modification time (unix timestamp) + file_size: Original file size in bytes + created_at: Earliest timestamp from session + """ + if session_exists(conn, session_id): + return + + conn.execute( + """INSERT INTO sessions + (session_id, project_name, project_key, summary, session_json, + github_repo, total_pages, total_prompts, total_messages, + total_tool_calls, total_commits, file_mtime, file_size, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + session_id, + project_name, + project_key, + summary, + session_json, + meta.get("github_repo"), + meta.get("total_pages", 0), + meta.get("total_prompts", 0), + meta.get("total_messages", 0), + meta.get("total_tool_calls", 0), + meta.get("total_commits", 0), + file_mtime, + file_size, + created_at, + ), + ) + + for page_name, html_content in pages.items(): + conn.execute( + "INSERT INTO session_pages (session_id, page_name, html_content) VALUES (?, ?, ?)", + (session_id, page_name, html_content), + ) + + conn.commit() + + +def get_session_page(conn, session_id, page_name): + """Fetch a single HTML page for a session. + + Returns the HTML content string, or None if not found. + """ + cursor = conn.execute( + "SELECT html_content FROM session_pages WHERE session_id = ? AND page_name = ?", + (session_id, page_name), + ) + row = cursor.fetchone() + return row[0] if row else None + + +def list_projects(conn): + """List all projects with session counts. + + Returns a list of dicts with keys: name, project_key, session_count, latest_import. + Sorted by most recent import. + """ + cursor = conn.execute( + """SELECT project_name, project_key, COUNT(*) as session_count, + MAX(imported_at) as latest_import + FROM sessions + GROUP BY project_name + ORDER BY latest_import DESC""" + ) + return [ + { + "name": row[0], + "project_key": row[1], + "session_count": row[2], + "latest_import": row[3], + } + for row in cursor.fetchall() + ] + + +def list_sessions(conn, project_name): + """List all sessions for a given project. + + Returns a list of dicts sorted by imported_at descending. + """ + cursor = conn.execute( + """SELECT session_id, summary, total_pages, total_prompts, + total_messages, total_tool_calls, total_commits, + github_repo, file_mtime, file_size, created_at, imported_at + FROM sessions + WHERE project_name = ? + ORDER BY imported_at DESC""", + (project_name,), + ) + return [ + { + "session_id": row[0], + "summary": row[1], + "total_pages": row[2], + "total_prompts": row[3], + "total_messages": row[4], + "total_tool_calls": row[5], + "total_commits": row[6], + "github_repo": row[7], + "file_mtime": row[8], + "file_size": row[9], + "created_at": row[10], + "imported_at": row[11], + } + for row in cursor.fetchall() + ] diff --git a/src/claude_code_transcripts/serve.py b/src/claude_code_transcripts/serve.py new file mode 100644 index 0000000..be0c3cb --- /dev/null +++ b/src/claude_code_transcripts/serve.py @@ -0,0 +1,157 @@ +"""HTTP server for serving transcripts from SQLite.""" + +import sqlite3 +from http.server import BaseHTTPRequestHandler, HTTPServer +from pathlib import Path +from urllib.parse import unquote + +from claude_code_transcripts.db import ( + get_db, + get_session_page, + list_projects, + list_sessions, +) + + +def _render_master_index(conn): + """Render the master index page listing all projects.""" + from claude_code_transcripts import get_template, CSS + + projects = list_projects(conn) + if not projects: + return None + + template = get_template("master_index.html") + + projects_data = [] + for project in projects: + sessions = list_sessions(conn, project["name"]) + projects_data.append( + { + "name": project["name"], + "sessions": [ + { + "name": s["session_id"], + "summary": s["summary"] or "", + "date": s["imported_at"] or "", + "size_kb": (s["file_size"] or 0) / 1024, + } + for s in sessions + ], + } + ) + + return template.render( + projects=projects_data, + total_projects=len(projects_data), + total_sessions=sum(p["session_count"] for p in projects), + css=CSS, + ) + + +def _render_project_index(conn, project_name): + """Render the project index page listing sessions.""" + from claude_code_transcripts import get_template, CSS + + sessions = list_sessions(conn, project_name) + if not sessions: + return None + + template = get_template("project_index.html") + + sessions_data = [] + for s in sessions: + sessions_data.append( + { + "name": s["session_id"], + "summary": s["summary"] or "", + "date": s["imported_at"] or "", + "size_kb": (s["file_size"] or 0) / 1024, + } + ) + + return template.render( + project_name=project_name, + sessions=sessions_data, + session_count=len(sessions_data), + css=CSS, + ) + + +def make_handler(db_path): + """Create a request handler class bound to a specific database.""" + + class TranscriptHandler(BaseHTTPRequestHandler): + def do_GET(self): + path = unquote(self.path) + + # Strip query string + if "?" in path: + path = path.split("?")[0] + + # Route: / -> master index + if path == "/" or path == "": + conn = get_db(db_path) + try: + html = _render_master_index(conn) + finally: + conn.close() + if html: + self._send_html(200, html) + else: + self._send_html(404, "

No transcripts found

") + return + + # Parse path segments + parts = [p for p in path.strip("/").split("/") if p] + + # Route: // -> project index + if len(parts) == 1: + project_name = parts[0] + conn = get_db(db_path) + try: + html = _render_project_index(conn, project_name) + finally: + conn.close() + if html: + self._send_html(200, html) + else: + self._send_html(404, "

Project not found

") + return + + # Route: /// -> redirect to index.html + if len(parts) == 2: + self.send_response(302) + self.send_header("Location", f"/{parts[0]}/{parts[1]}/index.html") + self.end_headers() + return + + # Route: ///.html -> session page + if len(parts) == 3: + project_name, session_id, page_name = parts + conn = get_db(db_path) + try: + html = get_session_page(conn, session_id, page_name) + finally: + conn.close() + if html: + self._send_html(200, html) + else: + self._send_html(404, "

Page not found

") + return + + self._send_html(404, "

Not found

") + + def _send_html(self, status, content): + self.send_response(status) + self.send_header("Content-Type", "text/html; charset=utf-8") + encoded = content.encode("utf-8") + self.send_header("Content-Length", str(len(encoded))) + self.end_headers() + self.wfile.write(encoded) + + def log_message(self, format, *args): + # Suppress default stderr logging during tests + pass + + return TranscriptHandler diff --git a/tests/test_db.py b/tests/test_db.py new file mode 100644 index 0000000..77922b2 --- /dev/null +++ b/tests/test_db.py @@ -0,0 +1,364 @@ +"""Tests for SQLite database operations.""" + +import sqlite3 +import tempfile +from pathlib import Path + +import pytest + +from claude_code_transcripts.db import ( + get_db, + session_exists, + import_session, + get_session_page, + list_projects, + list_sessions, +) + + +@pytest.fixture +def db_path(): + """Create a temporary database file.""" + with tempfile.TemporaryDirectory() as tmpdir: + yield Path(tmpdir) / "test.db" + + +@pytest.fixture +def db(db_path): + """Create and return a database connection.""" + conn = get_db(db_path) + yield conn + conn.close() + + +class TestGetDb: + def test_creates_database_file(self, db_path): + conn = get_db(db_path) + assert db_path.exists() + conn.close() + + def test_creates_sessions_table(self, db): + cursor = db.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='sessions'" + ) + assert cursor.fetchone() is not None + + def test_creates_session_pages_table(self, db): + cursor = db.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='session_pages'" + ) + assert cursor.fetchone() is not None + + def test_enables_wal_mode(self, db): + cursor = db.execute("PRAGMA journal_mode") + mode = cursor.fetchone()[0] + assert mode == "wal" + + def test_idempotent(self, db_path): + """Calling get_db twice on the same path doesn't error.""" + conn1 = get_db(db_path) + conn1.close() + conn2 = get_db(db_path) + conn2.close() + + def test_creates_parent_directories(self): + with tempfile.TemporaryDirectory() as tmpdir: + deep_path = Path(tmpdir) / "a" / "b" / "test.db" + conn = get_db(deep_path) + assert deep_path.exists() + conn.close() + + +class TestSessionExists: + def test_returns_false_for_missing_session(self, db): + assert session_exists(db, "nonexistent") is False + + def test_returns_true_for_existing_session(self, db): + import_session( + db, + session_id="abc123", + project_name="myproject", + project_key="-home-user-projects-myproject", + summary="Hello world", + session_json='{"type": "user"}', + pages={"index.html": "index"}, + meta={ + "total_pages": 1, + "total_prompts": 1, + "total_messages": 2, + "total_tool_calls": 0, + "total_commits": 0, + "github_repo": None, + }, + ) + assert session_exists(db, "abc123") is True + + +class TestImportSession: + def test_inserts_session_row(self, db): + import_session( + db, + session_id="abc123", + project_name="myproject", + project_key="-home-user-projects-myproject", + summary="Hello world", + session_json='{"type": "user"}', + pages={"index.html": "index"}, + meta={ + "total_pages": 1, + "total_prompts": 1, + "total_messages": 2, + "total_tool_calls": 5, + "total_commits": 1, + "github_repo": "owner/repo", + }, + ) + row = db.execute( + "SELECT session_id, project_name, project_key, summary, session_json, " + "github_repo, total_pages, total_prompts, total_messages, total_tool_calls, total_commits " + "FROM sessions WHERE session_id = ?", + ("abc123",), + ).fetchone() + assert row is not None + assert row[0] == "abc123" + assert row[1] == "myproject" + assert row[2] == "-home-user-projects-myproject" + assert row[3] == "Hello world" + assert row[4] == '{"type": "user"}' + assert row[5] == "owner/repo" + assert row[6] == 1 # total_pages + assert row[7] == 1 # total_prompts + assert row[8] == 2 # total_messages + assert row[9] == 5 # total_tool_calls + assert row[10] == 1 # total_commits + + def test_inserts_page_rows(self, db): + pages = { + "index.html": "index", + "page-001.html": "page 1", + "page-002.html": "page 2", + } + import_session( + db, + session_id="abc123", + project_name="myproject", + project_key="-home-user-projects-myproject", + summary="Hello", + session_json="{}", + pages=pages, + meta={ + "total_pages": 2, + "total_prompts": 1, + "total_messages": 2, + "total_tool_calls": 0, + "total_commits": 0, + "github_repo": None, + }, + ) + rows = db.execute( + "SELECT page_name, html_content FROM session_pages WHERE session_id = ? ORDER BY page_name", + ("abc123",), + ).fetchall() + assert len(rows) == 3 + assert rows[0] == ("index.html", "index") + assert rows[1] == ("page-001.html", "page 1") + assert rows[2] == ("page-002.html", "page 2") + + def test_skips_duplicate_session(self, db): + """Importing the same session_id twice should skip the second import.""" + kwargs = dict( + session_id="abc123", + project_name="myproject", + project_key="-home-user-projects-myproject", + summary="Hello", + session_json="{}", + pages={"index.html": "v1"}, + meta={ + "total_pages": 1, + "total_prompts": 1, + "total_messages": 2, + "total_tool_calls": 0, + "total_commits": 0, + "github_repo": None, + }, + ) + import_session(db, **kwargs) + # Second import with different content should be skipped + kwargs["pages"] = {"index.html": "v2"} + import_session(db, **kwargs) + + # Should still have original content + html = get_session_page(db, "abc123", "index.html") + assert html == "v1" + + def test_stores_optional_metadata(self, db): + import_session( + db, + session_id="abc123", + project_name="myproject", + project_key="key", + summary="Hello", + session_json="{}", + pages={"index.html": ""}, + meta={ + "total_pages": 1, + "total_prompts": 1, + "total_messages": 2, + "total_tool_calls": 0, + "total_commits": 0, + "github_repo": None, + }, + file_mtime=1704067200.0, + file_size=1024, + created_at="2025-01-01T10:00:00.000Z", + ) + row = db.execute( + "SELECT file_mtime, file_size, created_at FROM sessions WHERE session_id = ?", + ("abc123",), + ).fetchone() + assert row[0] == 1704067200.0 + assert row[1] == 1024 + assert row[2] == "2025-01-01T10:00:00.000Z" + + +class TestGetSessionPage: + def test_returns_html_content(self, db): + import_session( + db, + session_id="abc123", + project_name="myproject", + project_key="key", + summary="Hello", + session_json="{}", + pages={ + "index.html": "index", + "page-001.html": "page 1", + }, + meta={ + "total_pages": 1, + "total_prompts": 1, + "total_messages": 2, + "total_tool_calls": 0, + "total_commits": 0, + "github_repo": None, + }, + ) + assert get_session_page(db, "abc123", "index.html") == "index" + assert get_session_page(db, "abc123", "page-001.html") == "page 1" + + def test_returns_none_for_missing_page(self, db): + assert get_session_page(db, "abc123", "index.html") is None + + def test_returns_none_for_missing_session(self, db): + assert get_session_page(db, "nonexistent", "index.html") is None + + +class TestListProjects: + def test_returns_empty_for_empty_db(self, db): + assert list_projects(db) == [] + + def test_returns_projects_with_session_counts(self, db): + for sid, pname, pkey in [ + ("s1", "project-a", "key-a"), + ("s2", "project-a", "key-a"), + ("s3", "project-b", "key-b"), + ]: + import_session( + db, + session_id=sid, + project_name=pname, + project_key=pkey, + summary="Hello", + session_json="{}", + pages={"index.html": ""}, + meta={ + "total_pages": 1, + "total_prompts": 1, + "total_messages": 2, + "total_tool_calls": 0, + "total_commits": 0, + "github_repo": None, + }, + ) + + projects = list_projects(db) + assert len(projects) == 2 + # Should be sorted by most recent import + names = [p["name"] for p in projects] + assert "project-a" in names + assert "project-b" in names + # Check session counts + proj_a = next(p for p in projects if p["name"] == "project-a") + proj_b = next(p for p in projects if p["name"] == "project-b") + assert proj_a["session_count"] == 2 + assert proj_b["session_count"] == 1 + + +class TestListSessions: + def test_returns_empty_for_no_sessions(self, db): + assert list_sessions(db, "nonexistent") == [] + + def test_returns_sessions_for_project(self, db): + import_session( + db, + session_id="s1", + project_name="myproject", + project_key="key", + summary="First session", + session_json="{}", + pages={"index.html": ""}, + meta={ + "total_pages": 1, + "total_prompts": 3, + "total_messages": 10, + "total_tool_calls": 5, + "total_commits": 2, + "github_repo": "owner/repo", + }, + file_size=2048, + ) + import_session( + db, + session_id="s2", + project_name="myproject", + project_key="key", + summary="Second session", + session_json="{}", + pages={"index.html": ""}, + meta={ + "total_pages": 1, + "total_prompts": 1, + "total_messages": 4, + "total_tool_calls": 0, + "total_commits": 0, + "github_repo": None, + }, + file_size=512, + ) + + sessions = list_sessions(db, "myproject") + assert len(sessions) == 2 + s1 = next(s for s in sessions if s["session_id"] == "s1") + assert s1["summary"] == "First session" + assert s1["total_prompts"] == 3 + assert s1["file_size"] == 2048 + + def test_does_not_return_sessions_from_other_projects(self, db): + import_session( + db, + session_id="s1", + project_name="project-a", + project_key="key-a", + summary="A session", + session_json="{}", + pages={"index.html": ""}, + meta={ + "total_pages": 1, + "total_prompts": 1, + "total_messages": 2, + "total_tool_calls": 0, + "total_commits": 0, + "github_repo": None, + }, + ) + assert list_sessions(db, "project-b") == [] diff --git a/tests/test_generate_html.py b/tests/test_generate_html.py index 25c2822..b21e18d 100644 --- a/tests/test_generate_html.py +++ b/tests/test_generate_html.py @@ -1317,7 +1317,7 @@ def ask(self): assert result.exit_code == 0 assert "Loading local sessions" in result.output - assert "Generated" in result.output + assert "Imported" in result.output or "already in database" in result.output def test_no_args_runs_local_command(self, tmp_path, monkeypatch): """Test that running with no arguments runs local command.""" diff --git a/tests/test_serve.py b/tests/test_serve.py new file mode 100644 index 0000000..ff99825 --- /dev/null +++ b/tests/test_serve.py @@ -0,0 +1,166 @@ +"""Tests for the serve CLI command.""" + +import sqlite3 +import tempfile +import threading +import time +import urllib.request +from pathlib import Path + +import pytest +from click.testing import CliRunner + +from claude_code_transcripts import cli +from claude_code_transcripts.db import get_db, import_session +from claude_code_transcripts.serve import make_handler + + +@pytest.fixture +def db_path(): + with tempfile.TemporaryDirectory() as tmpdir: + yield Path(tmpdir) / "test.db" + + +@pytest.fixture +def populated_db(db_path): + """Create a DB with some test sessions.""" + conn = get_db(db_path) + import_session( + conn, + session_id="abc123", + project_name="project-a", + project_key="-home-user-projects-project-a", + summary="Hello from project A", + session_json='{"loglines": []}', + pages={ + "index.html": "Session index", + "page-001.html": "Page 1 content", + }, + meta={ + "total_pages": 1, + "total_prompts": 1, + "total_messages": 2, + "total_tool_calls": 0, + "total_commits": 0, + "github_repo": None, + }, + file_size=1024, + ) + import_session( + conn, + session_id="def456", + project_name="project-a", + project_key="-home-user-projects-project-a", + summary="Second session", + session_json='{"loglines": []}', + pages={ + "index.html": "Session 2 index", + "page-001.html": "Session 2 page 1", + }, + meta={ + "total_pages": 1, + "total_prompts": 1, + "total_messages": 2, + "total_tool_calls": 0, + "total_commits": 0, + "github_repo": None, + }, + file_size=512, + ) + import_session( + conn, + session_id="ghi789", + project_name="project-b", + project_key="-home-user-projects-project-b", + summary="Hello from project B", + session_json='{"loglines": []}', + pages={ + "index.html": "Project B session", + }, + meta={ + "total_pages": 1, + "total_prompts": 1, + "total_messages": 2, + "total_tool_calls": 0, + "total_commits": 0, + "github_repo": None, + }, + ) + conn.close() + return db_path + + +class TestTranscriptHandler: + """Test the HTTP handler using a real HTTP server on a random port.""" + + @pytest.fixture(autouse=True) + def setup_server(self, populated_db): + """Start a test server on a random port.""" + from http.server import HTTPServer + + handler = make_handler(populated_db) + self.server = HTTPServer(("127.0.0.1", 0), handler) + self.port = self.server.server_address[1] + self.base_url = f"http://127.0.0.1:{self.port}" + self.thread = threading.Thread(target=self.server.serve_forever) + self.thread.daemon = True + self.thread.start() + yield + self.server.shutdown() + + def _get(self, path): + """Make a GET request and return (status_code, body).""" + url = f"{self.base_url}{path}" + try: + resp = urllib.request.urlopen(url) + return resp.status, resp.read().decode("utf-8") + except urllib.error.HTTPError as e: + return e.code, e.read().decode("utf-8") + + def test_root_returns_master_index(self): + status, body = self._get("/") + assert status == 200 + assert "project-a" in body + assert "project-b" in body + + def test_project_page(self): + status, body = self._get("/project-a/") + assert status == 200 + assert "abc123" in body + assert "def456" in body + + def test_session_index_page(self): + status, body = self._get("/project-a/abc123/index.html") + assert status == 200 + assert "Session index" in body + + def test_session_page(self): + status, body = self._get("/project-a/abc123/page-001.html") + assert status == 200 + assert "Page 1 content" in body + + def test_session_dir_redirects_to_index(self): + # Accessing /project/session/ should redirect to index.html + url = f"{self.base_url}/project-a/abc123/" + req = urllib.request.Request(url) + # Don't follow redirects + import http.client + + conn = http.client.HTTPConnection("127.0.0.1", self.port) + conn.request("GET", "/project-a/abc123/") + resp = conn.getresponse() + assert resp.status in (301, 302) + assert "index.html" in resp.getheader("Location") + conn.close() + + def test_nonexistent_session_returns_404(self): + status, body = self._get("/project-a/nonexistent/index.html") + assert status == 404 + + def test_nonexistent_project_returns_404(self): + status, body = self._get("/nonexistent/") + assert status == 404 + + def test_nonexistent_page_returns_404(self): + status, body = self._get("/project-a/abc123/page-999.html") + assert status == 404 diff --git a/tests/test_sync.py b/tests/test_sync.py new file mode 100644 index 0000000..dbbf7b9 --- /dev/null +++ b/tests/test_sync.py @@ -0,0 +1,169 @@ +"""Tests for the sync CLI command.""" + +import sqlite3 +import tempfile +from pathlib import Path + +import pytest +from click.testing import CliRunner + +from claude_code_transcripts import cli +from claude_code_transcripts.db import ( + get_db, + list_projects, + list_sessions, + get_session_page, +) + + +@pytest.fixture +def mock_projects_dir(): + """Create a mock ~/.claude/projects structure with test sessions.""" + with tempfile.TemporaryDirectory() as tmpdir: + projects_dir = Path(tmpdir) + + # Create project-a with 2 sessions + project_a = projects_dir / "-home-user-projects-project-a" + project_a.mkdir(parents=True) + + session_a1 = project_a / "abc123.jsonl" + session_a1.write_text( + '{"type": "user", "timestamp": "2025-01-01T10:00:00.000Z", "message": {"role": "user", "content": "Hello from project A"}}\n' + '{"type": "assistant", "timestamp": "2025-01-01T10:00:05.000Z", "message": {"role": "assistant", "content": [{"type": "text", "text": "Hi there!"}]}}\n' + ) + + session_a2 = project_a / "def456.jsonl" + session_a2.write_text( + '{"type": "user", "timestamp": "2025-01-02T10:00:00.000Z", "message": {"role": "user", "content": "Second session in project A"}}\n' + '{"type": "assistant", "timestamp": "2025-01-02T10:00:05.000Z", "message": {"role": "assistant", "content": [{"type": "text", "text": "Got it!"}]}}\n' + ) + + # Create project-b with 1 session + project_b = projects_dir / "-home-user-projects-project-b" + project_b.mkdir(parents=True) + + session_b1 = project_b / "ghi789.jsonl" + session_b1.write_text( + '{"type": "user", "timestamp": "2025-01-04T10:00:00.000Z", "message": {"role": "user", "content": "Hello from project B"}}\n' + '{"type": "assistant", "timestamp": "2025-01-04T10:00:05.000Z", "message": {"role": "assistant", "content": [{"type": "text", "text": "Welcome!"}]}}\n' + ) + + # Create warmup session (should be skipped) + warmup = project_b / "warmup123.jsonl" + warmup.write_text( + '{"type": "user", "timestamp": "2025-01-05T10:00:00.000Z", "message": {"role": "user", "content": "warmup"}}\n' + ) + + yield projects_dir + + +@pytest.fixture +def db_path(): + with tempfile.TemporaryDirectory() as tmpdir: + yield Path(tmpdir) / "test.db" + + +class TestSyncCommand: + def test_imports_sessions_into_sqlite(self, mock_projects_dir, db_path): + runner = CliRunner() + result = runner.invoke( + cli, + ["sync", "--source", str(mock_projects_dir), "--db", str(db_path)], + ) + assert result.exit_code == 0 + assert "Synced 3 new sessions" in result.output + + # Verify database contents + conn = get_db(db_path) + projects = list_projects(conn) + assert len(projects) == 2 + conn.close() + + def test_dedup_on_second_run(self, mock_projects_dir, db_path): + runner = CliRunner() + # First run + runner.invoke( + cli, + ["sync", "--source", str(mock_projects_dir), "--db", str(db_path)], + ) + # Second run should find 0 new + result = runner.invoke( + cli, + ["sync", "--source", str(mock_projects_dir), "--db", str(db_path)], + ) + assert result.exit_code == 0 + assert "Synced 0 new sessions" in result.output + assert "3 already present" in result.output + + def test_stores_html_pages(self, mock_projects_dir, db_path): + runner = CliRunner() + runner.invoke( + cli, + ["sync", "--source", str(mock_projects_dir), "--db", str(db_path)], + ) + conn = get_db(db_path) + # Each session should have at least an index.html and a page-001.html + html = get_session_page(conn, "abc123", "index.html") + assert html is not None + assert "