'
+ )
+
+ body = (c.get("body", "") or "").strip()
+ if c.get("deleted"):
+ body = "_[comment deleted]_"
+ body_html = md_to_html_static(body)
+
+ children = c.get("children", []) or []
+ children_html = ""
+ if children:
+ inner = "".join(f"
{render_one(child)}
" for child in children)
+ children_html = f'
{inner}
'
+
+ return f'{header}
{body_html}
{children_html}'
+
+ total = count_all_comments(comments)
+ items = "".join(f"
{render_one(c)}
" for c in comments)
+ return (
+ f''
+ f'
Comments ({total})
'
+ f'
{items}
'
+ f''
+ )
+
+
+def md_to_html_static(md_content: str) -> str:
+ """Module-level Markdown โ HTML conversion (mirrors BaseSubstackScraper.md_to_html)."""
+ if not md_content:
+ return ""
+ return markdown.markdown(md_content, extensions=['extra'])
+
+
+# =============================================================================
+# STRUCTURED HEADER RENDERING (classic Substack article look)
+# =============================================================================
+
+def _format_header_date(date_str: str) -> str:
+ """Format an ISO date (``YYYY-MM-DD``) for display in the byline.
+
+ Falls back to the raw string (including the sentinel ``"Date not found"``).
+ Mirrors the legacy header date formatting in ``combine_metadata_and_content``.
+ """
+ if not date_str:
+ return ""
+ try:
+ return datetime.fromisoformat(date_str).strftime("%b %d, %Y")
+ except ValueError:
+ return date_str
+
+
+def build_post_header(meta: dict) -> str:
+ """Render a Substack-style centered post header from structured metadata.
+
+ ``meta`` is a dict that may contain: ``title``, ``subtitle``, ``author``,
+ ``date`` (ISO ``YYYY-MM-DD``), ``cover_image``. Any missing/empty field is
+ simply omitted. Returns an HTML string for a ````
+ block, or ``""`` if there is nothing to render (no title, subtitle, author
+ or date). The cover image is shown above the title when present.
+ """
+ if not isinstance(meta, dict) or not meta:
+ return ""
+
+ cover_image = (meta.get("cover_image") or "").strip()
+ title = (meta.get("title") or "").strip()
+ subtitle = (meta.get("subtitle") or "").strip()
+ author = (meta.get("author") or "").strip()
+ date_str = (meta.get("date") or "").strip()
+
+ if not (title or subtitle or author or date_str):
+ return ""
+
+ parts = []
+ if cover_image:
+ parts.append(
+ f''
+ )
+ if title:
+ parts.append(f'
{_html_escape(title)}
')
+ if subtitle:
+ parts.append(f'
{_html_escape(subtitle)}
')
+
+ byline_bits = []
+ if author:
+ byline_bits.append(_html_escape(author))
+ display_date = _format_header_date(date_str)
+ if display_date:
+ byline_bits.append(_html_escape(display_date))
+ if byline_bits:
+ parts.append(
+ f'
{" ยท ".join(byline_bits)}
'
+ )
+
+ return f'{"".join(parts)}'
+
+
+def split_metadata_and_body(md_content: str, frontmatter_format: str = "legacy") -> Tuple[dict, str]:
+ """Inverse of ``combine_metadata_and_content``: recover metadata + body.
+
+ Used by ``render_posts.py`` to re-render on-disk markdown into the structured
+ Substack look without re-scraping. Returns ``(meta_dict, body_md)`` where
+ ``meta_dict`` has keys ``title``, ``subtitle``, ``author``, ``date``,
+ ``cover_image``, and ``like_count`` (any that aren't found are absent).
+
+ - ``mdx``: strip the leading YAML frontmatter (``---\\nโฆ\\n---``) and parse it.
+ - ``legacy``: strip the leading ``# title`` line, an optional ``## subtitle``,
+ the ``****`` line, and the ``**Likes:** N`` line.
+
+ If the content doesn't match the expected pattern, it is returned unchanged as
+ the body with an empty metadata dict (so rendering is never destructive).
+ """
+ if not md_content:
+ return {}, ""
+
+ meta: dict = {}
+
+ if frontmatter_format == "mdx":
+ m = re.match(r'^---\s*\n(.*?)\n---\s*\n?(.*)$', md_content, re.DOTALL)
+ if m:
+ for line in m.group(1).splitlines():
+ if ":" not in line:
+ continue
+ key, _, raw = line.partition(":")
+ key = key.strip()
+ val = raw.strip()
+ # Strip surrounding YAML quotes.
+ if (val.startswith('"') and val.endswith('"')) \
+ or (val.startswith("'") and val.endswith("'")):
+ val = val[1:-1]
+ if key and val:
+ if key == "image":
+ meta["cover_image"] = val
+ else:
+ meta[key] = val
+ return meta, m.group(2).lstrip("\n")
+
+ # legacy format
+ lines = md_content.split("\n")
+ idx = 0
+
+ # Title: "# ..."
+ if idx < len(lines) and lines[idx].startswith("# "):
+ meta["title"] = lines[idx][2:].strip()
+ idx += 1
+ # Skip the blank line after the title.
+ if idx < len(lines) and lines[idx].strip() == "":
+ idx += 1
+ # Subtitle: "## ..."
+ if idx < len(lines) and lines[idx].startswith("## "):
+ meta["subtitle"] = lines[idx][3:].strip()
+ idx += 1
+ if idx < len(lines) and lines[idx].strip() == "":
+ idx += 1
+ # Date: "**...**"
+ date_match = re.match(r'^\*\*(.+?)\*\*$', lines[idx]) if idx < len(lines) else None
+ if date_match:
+ meta["date"] = date_match.group(1).strip()
+ idx += 1
+ if idx < len(lines) and lines[idx].strip() == "":
+ idx += 1
+ # Likes: "**Likes:** N"
+ likes_match = re.match(r'^\*\*Likes:\*\*\s*(\d+)\s*$', lines[idx]) if idx < len(lines) else None
+ if likes_match:
+ meta["like_count"] = likes_match.group(1)
+ idx += 1
+ if idx < len(lines) and lines[idx].strip() == "":
+ idx += 1
+
+ body = "\n".join(lines[idx:]).lstrip("\n")
+ return meta, body
+
+
+def build_post_document(
+ html_dir: str,
+ body_html: str,
+ comments_html: str = "",
+ header_html: str = "",
+ title: Optional[str] = None,
+) -> str:
+ """Assemble the full HTML document for a post page (classic Substack shell).
+
+ Shared by the scraper's ``save_to_html_file`` and the standalone ``render_posts.py``
+ so both produce identical markup: Spectral webfont, the essay stylesheet, an optional
+ structured header above the body, and optional comments below it.
+ """
+ css_path = os.path.relpath("./assets/css/essay-styles.css", html_dir)
+ css_path = css_path.replace("\\", "/")
+
+ doc_title = _html_escape(title) if title else "Markdown Content"
+ header_block = f"\n {header_html}" if header_html else ""
+ comments_block = f"\n {comments_html}" if comments_html else ""
+
+ return f"""
+
+
+
+
+
+ {doc_title}
+
+
+
+
+
+
+ {header_block}
+ {body_html}{comments_block}
+
+
+
+ """
+
+
+def render_post_to_html_file(
+ html_filepath: str,
+ body_md: str,
+ meta: Optional[dict] = None,
+ comments_list: Optional[list] = None,
+ frontmatter_format: str = "legacy",
+) -> None:
+ """Re-render a post page from markdown + structured metadata + cached comments.
+
+ Network-free: reads only local content. Used by ``render_posts.py`` (and the
+ ``--render-only`` CLI path) to apply the classic Substack look to posts that were
+ scraped before the structured renderer existed, without re-scraping.
+
+ ``body_md`` is rendered as the post body; ``meta`` (title/subtitle/author/date/
+ cover_image) becomes the header. If ``body_md`` still contains a legacy/mdx header
+ (i.e. it's the full on-disk markdown), pass ``split=True``... otherwise it is split
+ via ``split_metadata_and_body`` automatically when ``meta`` is empty.
+ """
+ body = body_md
+ header_meta = meta or {}
+
+ # If no structured meta was supplied, try to recover it from the markdown itself.
+ if not header_meta:
+ header_meta, body = split_metadata_and_body(body_md, frontmatter_format)
+
+ body_html = md_to_html_static(body)
+ comments_html = render_comments_html(comments_list) if comments_list else ""
+ header_html = build_post_header(header_meta)
+ title = header_meta.get("title")
+
+ html_dir = os.path.dirname(html_filepath)
+ document = build_post_document(
+ html_dir, body_html, comments_html=comments_html, header_html=header_html, title=title
+ )
+ with open(html_filepath, "w", encoding="utf-8") as f:
+ f.write(document)
+
+
def extract_main_part(url: str) -> str:
parts = urlparse(url).netloc.split('.')
return parts[1] if parts[0] == 'www' else parts[0]
@@ -224,17 +683,26 @@ def get_browser_version(browser: str) -> Optional[str]:
except Exception:
pass
else: # macOS/Linux
- try:
- result = subprocess.run(
- ['google-chrome', '--version'],
- capture_output=True, text=True, timeout=10
- )
- if result.returncode == 0:
- match = re.search(r'(\d+\.\d+\.\d+\.\d+)', result.stdout)
- if match:
- version = match.group(1)
- except Exception:
- pass
+ candidates = [
+ "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
+ os.path.expanduser("~/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"),
+ "google-chrome", # Linux PATH fallback
+ "chromium",
+ "chromium-browser",
+ ]
+ for candidate in candidates:
+ try:
+ result = subprocess.run(
+ [candidate, "--version"],
+ capture_output=True, text=True, timeout=10
+ )
+ if result.returncode == 0:
+ match = re.search(r'(\d+\.\d+\.\d+\.\d+)', result.stdout)
+ if match:
+ version = match.group(1)
+ break
+ except Exception:
+ continue
elif browser == 'edge':
if os.name == 'nt': # Windows
@@ -255,17 +723,24 @@ def get_browser_version(browser: str) -> Optional[str]:
except Exception:
pass
else: # macOS/Linux
- try:
- result = subprocess.run(
- ['microsoft-edge', '--version'],
- capture_output=True, text=True, timeout=10
- )
- if result.returncode == 0:
- match = re.search(r'(\d+\.\d+\.\d+\.\d+)', result.stdout)
- if match:
- version = match.group(1)
- except Exception:
- pass
+ candidates = [
+ "/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge",
+ os.path.expanduser("~/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge"),
+ "microsoft-edge", # Linux PATH fallback
+ ]
+ for candidate in candidates:
+ try:
+ result = subprocess.run(
+ [candidate, "--version"],
+ capture_output=True, text=True, timeout=10
+ )
+ if result.returncode == 0:
+ match = re.search(r'(\d+\.\d+\.\d+\.\d+)', result.stdout)
+ if match:
+ version = match.group(1)
+ break
+ except Exception:
+ continue
return version
@@ -321,9 +796,25 @@ def get_user_data_dir(browser: str) -> str:
if not os.path.exists(base_dir):
os.makedirs(base_dir)
return os.path.join(base_dir, f'{browser}_profile')
-
+
+ @staticmethod
+ def _driver_platform(browser: str) -> str:
+ """Return the Chrome-for-Testing / Edge driver platform string for the current OS/arch.
+
+ Detects ARM vs Intel on macOS so Apple Silicon gets mac-arm64 instead of mac-x64.
+ """
+ if os.name == 'nt':
+ return 'win64'
+ if sys.platform == 'darwin':
+ is_arm = os.uname().machine == 'arm64'
+ if browser == 'edge':
+ # Edge driver uses a different naming convention: mac64 vs mac64_m1
+ return 'mac64_m1' if is_arm else 'mac64'
+ return 'mac-arm64' if is_arm else 'mac-x64'
+ return 'linux64'
+
@classmethod
- def download_driver_with_requests(cls, browser: str, browser_version: str) -> Optional[str]:
+ def download_driver_with_requests(cls, browser: str, browser_version: str, quiet: bool = False) -> Optional[str]:
"""
Download the correct driver directly using requests.
This bypasses webdriver_manager issues and gives us full control.
@@ -344,12 +835,14 @@ def download_driver_with_requests(cls, browser: str, browser_version: str) -> Op
if os.path.exists(driver_path):
cached_version = cls.get_driver_version(driver_path)
if cached_version and cls.versions_compatible(browser_version, cached_version):
- print(f"Using cached chromedriver {cached_version}")
+ if not quiet:
+ print(f"Using cached chromedriver {cached_version}")
return driver_path
try:
# Get the latest driver version for this Chrome version
- print(f"Fetching Chrome driver info for version {major_version}...")
+ if not quiet:
+ print(f"Fetching Chrome driver info for version {major_version}...")
# Try the Chrome for Testing endpoints
endpoints = [
@@ -366,7 +859,7 @@ def download_driver_with_requests(cls, browser: str, browser_version: str) -> Op
if resp.ok:
driver_version = resp.text.strip()
# Construct download URL
- platform = 'win64' if os.name == 'nt' else ('mac-x64' if sys.platform == 'darwin' else 'linux64')
+ platform = cls._driver_platform(browser)
download_url = f"https://storage.googleapis.com/chrome-for-testing-public/{driver_version}/{platform}/chromedriver-{platform}.zip"
except Exception:
pass
@@ -382,7 +875,7 @@ def download_driver_with_requests(cls, browser: str, browser_version: str) -> Op
if driver_version.startswith(major_version):
downloads = stable.get('downloads', {}).get('chromedriver', [])
- platform = 'win64' if os.name == 'nt' else ('mac-x64' if sys.platform == 'darwin' else 'linux64')
+ platform = cls._driver_platform(browser)
for d in downloads:
if d.get('platform') == platform:
download_url = d.get('url')
@@ -392,7 +885,8 @@ def download_driver_with_requests(cls, browser: str, browser_version: str) -> Op
print(f"Could not find chromedriver download URL for Chrome {major_version}")
return None
- print(f"Downloading chromedriver {driver_version}...")
+ if not quiet:
+ print(f"Downloading chromedriver {driver_version}...")
resp = requests.get(download_url, timeout=120)
if not resp.ok:
print(f"Download failed: HTTP {resp.status_code}")
@@ -416,7 +910,8 @@ def download_driver_with_requests(cls, browser: str, browser_version: str) -> Op
# Make executable on Unix
if os.name != 'nt':
os.chmod(target_path, 0o755)
- print(f"[OK] Chromedriver downloaded to: {target_path}")
+ if not quiet:
+ print(f"[OK] Chromedriver downloaded to: {target_path}")
return target_path
print("Could not find chromedriver in downloaded archive")
@@ -434,15 +929,17 @@ def download_driver_with_requests(cls, browser: str, browser_version: str) -> Op
if os.path.exists(driver_path):
cached_version = cls.get_driver_version(driver_path)
if cached_version and cls.versions_compatible(browser_version, cached_version):
- print(f"Using cached msedgedriver {cached_version}")
+ if not quiet:
+ print(f"Using cached msedgedriver {cached_version}")
return driver_path
try:
# Get latest Edge driver version
- print(f"Fetching Edge driver info for version {major_version}...")
+ if not quiet:
+ print(f"Fetching Edge driver info for version {major_version}...")
# Edge driver download URL pattern
- platform = 'win64' if os.name == 'nt' else ('mac64' if sys.platform == 'darwin' else 'linux64')
+ platform = cls._driver_platform(browser)
# Try to get the exact version
version_url = f"https://msedgedriver.azureedge.net/LATEST_RELEASE_{major_version}"
@@ -457,7 +954,8 @@ def download_driver_with_requests(cls, browser: str, browser_version: str) -> Op
download_url = f"https://msedgedriver.azureedge.net/{driver_version}/edgedriver_{platform}.zip"
- print(f"Downloading msedgedriver {driver_version}...")
+ if not quiet:
+ print(f"Downloading msedgedriver {driver_version}...")
resp = requests.get(download_url, timeout=120)
if not resp.ok:
print(f"Download failed: HTTP {resp.status_code}")
@@ -478,7 +976,8 @@ def download_driver_with_requests(cls, browser: str, browser_version: str) -> Op
target.write(source.read())
if os.name != 'nt':
os.chmod(target_path, 0o755)
- print(f"[OK] msedgedriver downloaded to: {target_path}")
+ if not quiet:
+ print(f"[OK] msedgedriver downloaded to: {target_path}")
return target_path
print("Could not find msedgedriver in downloaded archive")
@@ -499,6 +998,7 @@ def create_driver(
browser_path: Optional[str] = None,
user_agent: Optional[str] = None,
use_persistent_profile: bool = False,
+ quiet: bool = False,
) -> webdriver.Remote:
"""
Creates a WebDriver instance with smart fallback logic.
@@ -509,6 +1009,10 @@ def create_driver(
3. Download driver directly to our cache (bypasses PATH issues)
4. Fall back to webdriver_manager
5. Fall back to Selenium Manager
+
+ Args:
+ quiet: Suppress informational prints (used during periodic driver restarts).
+ Warnings/errors are still printed.
"""
browser = browser.lower()
if browser not in cls.SUPPORTED_BROWSERS:
@@ -525,7 +1029,8 @@ def create_driver(
# Detect browser version
browser_version = cls.get_browser_version(browser)
- print(f"Detected {browser.title()} version: {browser_version or 'unknown'}")
+ if not quiet:
+ print(f"Detected {browser.title()} version: {browser_version or 'unknown'}")
if not browser_version:
print(f"WARNING: Could not detect {browser.title()} version. Make sure it's installed.")
@@ -548,7 +1053,8 @@ def create_driver(
if use_persistent_profile:
profile_dir = cls.get_user_data_dir(browser)
options.add_argument(f"user-data-dir={profile_dir}")
- print(f"Using persistent profile at: {profile_dir}")
+ if not quiet:
+ print(f"Using persistent profile at: {profile_dir}")
# Common options for stability
options.add_argument("--no-sandbox")
@@ -561,7 +1067,8 @@ def create_driver(
# Strategy 1: Explicit driver path
if driver_path and os.path.exists(driver_path):
try:
- print(f"Using explicit driver path: {driver_path}")
+ if not quiet:
+ print(f"Using explicit driver path: {driver_path}")
driver_version = cls.get_driver_version(driver_path)
if driver_version:
print(f"Driver version: {driver_version}")
@@ -580,11 +1087,13 @@ def create_driver(
# Strategy 2: Download to our cache (primary method - bypasses PATH issues)
if browser_version:
- print(f"\nDownloading driver to local cache (bypasses system PATH)...")
+ if not quiet:
+ print(f"\nDownloading driver to local cache (bypasses system PATH)...")
try:
- downloaded_path = cls.download_driver_with_requests(browser, browser_version)
+ downloaded_path = cls.download_driver_with_requests(browser, browser_version, quiet=quiet)
if downloaded_path and os.path.exists(downloaded_path):
- print(f"Using downloaded driver: {downloaded_path}")
+ if not quiet:
+ print(f"Using downloaded driver: {downloaded_path}")
if browser == 'chrome':
service = ChromeService(executable_path=downloaded_path)
return webdriver.Chrome(service=service, options=options)
@@ -596,29 +1105,43 @@ def create_driver(
print(f"[FAIL] Direct download failed: {e}")
# Strategy 3: webdriver_manager with explicit path
- print("\nTrying webdriver_manager...")
+ if not quiet:
+ print("\nTrying webdriver_manager...")
try:
if browser == 'chrome':
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.core.os_manager import ChromeType
mgr = ChromeDriverManager()
driver_path_wdm = mgr.install()
- print(f"webdriver_manager installed driver to: {driver_path_wdm}")
- service = ChromeService(executable_path=driver_path_wdm)
- return webdriver.Chrome(service=service, options=options)
+ if not quiet:
+ print(f"webdriver_manager installed driver to: {driver_path_wdm}")
+ # Reject known non-executable artifacts (THIRD_PARTY_NOTICES / LICENSE) returned
+ # by webdriver_manager on some platforms before trying to exec them.
+ if driver_path_wdm and os.path.isfile(driver_path_wdm) \
+ and not driver_path_wdm.endswith("THIRD_PARTY_NOTICES.chromedriver") \
+ and not driver_path_wdm.endswith("LICENSE.chromedriver"):
+ service = ChromeService(executable_path=driver_path_wdm)
+ return webdriver.Chrome(service=service, options=options)
+ print(f"[SKIP] webdriver_manager returned a non-driver file, falling through.")
else:
from webdriver_manager.microsoft import EdgeChromiumDriverManager
mgr = EdgeChromiumDriverManager()
driver_path_wdm = mgr.install()
- print(f"webdriver_manager installed driver to: {driver_path_wdm}")
- service = EdgeService(executable_path=driver_path_wdm)
- return webdriver.Edge(service=service, options=options)
+ if not quiet:
+ print(f"webdriver_manager installed driver to: {driver_path_wdm}")
+ if driver_path_wdm and os.path.isfile(driver_path_wdm) \
+ and not driver_path_wdm.endswith("THIRD_PARTY_NOTICES.msedgedriver") \
+ and not driver_path_wdm.endswith("LICENSE.msedgedriver"):
+ service = EdgeService(executable_path=driver_path_wdm)
+ return webdriver.Edge(service=service, options=options)
+ print(f"[SKIP] webdriver_manager returned a non-driver file, falling through.")
except Exception as e:
errors.append(f"webdriver_manager failed: {e}")
print(f"[FAIL] webdriver_manager failed: {e}")
# Strategy 4: Let Selenium Manager try (last resort)
- print("\nTrying Selenium Manager (last resort)...")
+ if not quiet:
+ print("\nTrying Selenium Manager (last resort)...")
try:
if browser == 'chrome':
return webdriver.Chrome(options=options)
@@ -735,6 +1258,8 @@ def __init__(
html_save_dir: str,
download_images: bool = False,
frontmatter_format: str = "legacy",
+ fetch_comments_flag: bool = False,
+ comments_sort: str = COMMENTS_SORT,
):
if frontmatter_format not in ("legacy", "mdx"):
raise ValueError("frontmatter_format must be 'legacy' or 'mdx'")
@@ -766,6 +1291,13 @@ def __init__(
self.download_images: bool = download_images
self.image_dir = Path(BASE_IMAGE_DIR) / self.writer_name
+ self.fetch_comments: bool = fetch_comments_flag
+ self.comments_sort: str = comments_sort
+ self.comments_save_dir: str = os.path.join(COMMENTS_DATA_DIR, self.writer_name)
+ if self.fetch_comments and not os.path.exists(self.comments_save_dir):
+ os.makedirs(self.comments_save_dir)
+ print(f"Created comments directory {self.comments_save_dir}")
+
if self.is_single_post:
self.post_urls: List[str] = [original_url]
else:
@@ -842,35 +1374,41 @@ def save_to_file(filepath: str, content: str) -> None:
@staticmethod
def md_to_html(md_content: str) -> str:
"""Converts Markdown to HTML."""
- return markdown.markdown(md_content, extensions=['extra'])
+ return md_to_html_static(md_content)
+
+ def save_to_html_file(
+ self,
+ filepath: str,
+ content: str,
+ comments_html: str = "",
+ header_html: str = "",
+ title: Optional[str] = None,
+ ) -> None:
+ """Saves HTML content to a file with a link to the external CSS file.
+
+ Renders the classic Substack article shell: Spectral webfont, the essay
+ stylesheet, and the body inside ````.
- def save_to_html_file(self, filepath: str, content: str) -> None:
- """Saves HTML content to a file with a link to an external CSS file."""
+ - ``header_html`` (optional): a structured post header block (see
+ ``build_post_header``) rendered above the body. When omitted the body is
+ rendered as-is (legacy flat-markdown behaviour).
+ - ``title`` (optional): used for ````/document title.
+ - ``comments_html`` (optional): appended inside ```` after the body
+ (renders the fetched comment thread on the individual post page).
+ """
if not isinstance(filepath, str):
raise ValueError("filepath must be a string")
if not isinstance(content, str):
raise ValueError("content must be a string")
html_dir = os.path.dirname(filepath)
- css_path = os.path.relpath("./assets/css/essay-styles.css", html_dir)
- css_path = css_path.replace("\\", "/")
-
- html_content = f"""
-
-
-
-
-
- Markdown Content
-
-
-
-
- {content}
-
-
-
- """
+ html_content = build_post_document(
+ html_dir,
+ content,
+ comments_html=comments_html,
+ header_html=header_html,
+ title=title,
+ )
with open(filepath, 'w', encoding='utf-8') as file:
file.write(html_content)
@@ -940,11 +1478,14 @@ def combine_metadata_and_content(
metadata += f"**Likes:** {like_count}\n\n"
return metadata + content
- def extract_post_data(self, soup: BeautifulSoup, url: str = "") -> Tuple[str, str, str, str, str, str, str]:
+ def extract_post_data(self, soup: BeautifulSoup, url: str = "") -> Tuple[str, str, str, str, str, str, str, str, str]:
"""Converts a Substack post soup to markdown.
Returns:
- ``(title, subtitle, author, date, cover_image, like_count, md_content)``.
+ ``(title, subtitle, author, date, cover_image, like_count, comment_count,
+ md_content, body_md)``. ``md_content`` is the body merged with the selected
+ frontmatter header (what gets saved to disk); ``body_md`` is the post body
+ alone, used by the structured HTML renderer.
"""
# Title
title_element = soup.select_one("h1.post-title, h2")
@@ -959,6 +1500,7 @@ def extract_post_data(self, soup: BeautifulSoup, url: str = "") -> Tuple[str, st
date = ""
author = ""
cover_image = ""
+ comment_count = "0"
script_tag = soup.find("script", {"type": "application/ld+json"})
if script_tag and script_tag.string:
try:
@@ -980,6 +1522,18 @@ def extract_post_data(self, soup: BeautifulSoup, url: str = "") -> Tuple[str, st
cover_image = img.get("url", "") if isinstance(img, dict) else str(img)
elif isinstance(images, dict):
cover_image = images.get("url", "")
+ # Comment count: prefer the top-level field, then the CommentAction
+ # statistic in interactionStatistic (matches the public API value).
+ raw_cc = ld_json.get("comment_count")
+ if raw_cc is None:
+ for stat in ld_json.get("interactionStatistic") or []:
+ if not isinstance(stat, dict):
+ continue
+ if "CommentAction" in str(stat.get("interactionType", "")):
+ raw_cc = stat.get("userInteractionCount")
+ break
+ if raw_cc is not None and str(raw_cc).strip().lstrip("-").isdigit():
+ comment_count = str(int(raw_cc))
except (json.JSONDecodeError, ValueError, KeyError):
pass
@@ -1024,7 +1578,7 @@ def extract_post_data(self, soup: BeautifulSoup, url: str = "") -> Tuple[str, st
title, subtitle, date, author, cover_image, like_count, md, self.frontmatter_format
)
- return title, subtitle, author, date, cover_image, like_count, md_content
+ return title, subtitle, author, date, cover_image, like_count, comment_count, md_content, md
@abstractmethod
def get_url_soup(self, url: str) -> str:
@@ -1044,6 +1598,119 @@ def save_essays_data_to_json(self, essays_data: list) -> None:
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(essays_data, f, ensure_ascii=False, indent=4)
+ def _get_session(self) -> Optional[requests.Session]:
+ """Return a requests session for JSON API calls (used for comments).
+
+ Default is ``None`` (unauthenticated, free scraper). Premium scrapers override this
+ to build a session seeded with the logged-in browser's cookies so paid-only comment
+ threads can be fetched.
+ """
+ return None
+
+ def scrape_comments_for_post(self, url: str) -> Optional[dict]:
+ """Fetch (and cache) a post's comment thread, returning the comment list.
+
+ Behavior:
+ - If ``{slug}.comments.json`` already exists on disk, load and return it (cache hit,
+ **no network calls**) โ this makes ``--comments`` cheap to re-run on already-scraped
+ publications.
+ - Otherwise resolve the post id via the public posts API, fetch the nested thread,
+ and persist it to ``{slug}.comments.json`` (raw payload, machine fidelity).
+ - Returns ``None`` when there are no comments, when a paid-only thread can't be read
+ without ``--premium`` (logged once), or on fetch failure.
+ - Returns ``{"total_comments": int, "comments": list, "json_path": str}`` on success.
+
+ The rendered comments live in the individual post HTML page (see ``scrape_posts``);
+ no separate ``.comments.md`` file is written.
+ """
+ slug = get_post_slug(url) if is_post_url(url) else (url.rstrip('/').split('/')[-1] or "unknown_post")
+
+ json_path = os.path.join(self.comments_save_dir, f"{slug}.comments.json")
+
+ # Cache hit: load from disk without hitting the network.
+ if os.path.exists(json_path):
+ try:
+ with open(json_path, "r", encoding="utf-8") as f:
+ cached = json.load(f)
+ if isinstance(cached, list):
+ return {
+ "total_comments": count_all_comments(cached),
+ "comments": cached,
+ "json_path": json_path,
+ }
+ except (json.JSONDecodeError, OSError) as e:
+ print(f"[WARN] Corrupt comments cache {json_path}: {e}. Refetching.")
+
+ session = self._get_session()
+
+ meta = get_post_id_from_slug(self.base_substack_url, slug, session=session)
+ if meta is None:
+ print(f"[SKIP] Could not resolve post metadata for comments: {url}")
+ return None
+ post_id, comment_count, permissions = meta
+
+ if comment_count == 0:
+ return None
+
+ # Small delay between the post-lookup and the comments call to avoid 429s.
+ sleep(random.uniform(1.0, 2.0))
+
+ comments = fetch_comments(self.base_substack_url, post_id, sort=self.comments_sort, session=session)
+
+ if not comments:
+ # comment_count > 0 but empty payload โ likely a paid-only thread without auth.
+ if permissions and "only_paid" in permissions:
+ print(
+ f"[SKIP] {comment_count} comments on {url} are paid-only "
+ f"โ rerun with --premium to fetch them."
+ )
+ return None
+
+ os.makedirs(self.comments_save_dir, exist_ok=True)
+ with open(json_path, "w", encoding="utf-8") as f:
+ json.dump(comments, f, ensure_ascii=False, indent=4)
+
+ return {
+ "total_comments": count_all_comments(comments),
+ "comments": comments,
+ "json_path": json_path,
+ }
+
+ def _write_post_html(
+ self,
+ html_filepath: str,
+ md_content: str,
+ comments_list: Optional[list] = None,
+ meta: Optional[dict] = None,
+ ) -> None:
+ """Convert markdown to HTML and write the post page, optionally baking in comments.
+
+ Rendering modes:
+
+ - **Structured** (``meta`` given): ``md_content`` is treated as the post *body* only.
+ Metadata (title/subtitle/author/date/cover) is rendered into a Substack-style
+ header via ``build_post_header`` and placed above the body, so the title/date are
+ no longer inlined in the body text. This is the classic Substack look.
+ - **Flat** (``meta`` is ``None``): ``md_content`` is the merged title+metadata+body
+ markdown and is rendered wholesale (the historical behaviour). Existing callers and
+ unit tests that pass the merged markdown rely on this path.
+
+ When ``comments_list`` is non-empty, the thread is rendered (via
+ ``render_comments_html``) and injected into the page's ```` after the article
+ body. An empty/None list produces a page without a comments section.
+ """
+ body_html = self.md_to_html(md_content)
+ comments_html = render_comments_html(comments_list) if comments_list else ""
+ header_html = build_post_header(meta) if meta else ""
+ title = (meta or {}).get("title") if meta else None
+ self.save_to_html_file(
+ html_filepath,
+ body_html,
+ comments_html=comments_html,
+ header_html=header_html,
+ title=title,
+ )
+
def scrape_posts(self, num_posts_to_scrape: int = 0) -> None:
"""Iterates over all posts and saves them as markdown and html files."""
essays_data = []
@@ -1060,12 +1727,20 @@ def scrape_posts(self, num_posts_to_scrape: int = 0) -> None:
if not os.path.exists(md_filepath):
soup = self.get_url_soup(url)
if soup is None:
+ # Body is paywalled/unavailable. Still attempt comments so the
+ # free scraper can report paid-only threads (the post metadata
+ # API is public even when the body is not).
+ if self.fetch_comments:
+ try:
+ self.scrape_comments_for_post(url)
+ except Exception as ce:
+ pbar.write(f"[WARN] Comments failed for {url}: {ce}")
total += 1
pbar.total = total
pbar.refresh()
continue
- title, subtitle, author, date, cover_image, like_count, md = self.extract_post_data(soup, url)
+ title, subtitle, author, date, cover_image, like_count, comment_count, md, body_md = self.extract_post_data(soup, url)
# Skip writing if extraction clearly failed โ leaves no stale file so reruns retry.
content_element = soup.select_one("div.available-content")
@@ -1086,23 +1761,81 @@ def scrape_posts(self, num_posts_to_scrape: int = 0) -> None:
leave=False,
) as img_pbar:
md = process_markdown_images(md, self.writer_name, slug, img_pbar)
+ # Re-apply to the raw body so the rendered HTML body uses the
+ # same local image paths. Downloads are skipped (files exist).
+ body_md = process_markdown_images(body_md, self.writer_name, slug)
self.save_to_file(md_filepath, md)
- html_content = self.md_to_html(md)
- self.save_to_html_file(html_filepath, html_content)
- essays_data.append({
+ # Fetch comments BEFORE rendering the HTML so they can be baked into
+ # the individual post page. The .md source stays clean.
+ comments_result = None
+ if self.fetch_comments:
+ try:
+ comments_result = self.scrape_comments_for_post(url)
+ except Exception as ce:
+ pbar.write(f"[WARN] Comments failed for {url}: {ce}")
+ comments_list = comments_result["comments"] if comments_result else []
+
+ # Structured render: metadata becomes a Substack-style header, the
+ # body is rendered separately (no inlined # title / **date** block).
+ post_meta = {
+ "title": title,
+ "subtitle": subtitle,
+ "author": author,
+ "date": date,
+ "cover_image": cover_image,
+ }
+ self._write_post_html(html_filepath, body_md, comments_list, meta=post_meta)
+
+ essay_entry = {
"title": title,
"subtitle": subtitle,
"author": author,
"date": date,
"cover_image": cover_image,
"like_count": like_count,
+ # Top-level comment count from the page's ld+json; always
+ # available (no extra request). When --comments scrapes the
+ # full thread below, total_comments overrides this with the
+ # recursive count (includes nested replies).
+ "comment_count": comment_count,
"file_link": md_filepath,
"html_link": html_filepath
- })
+ }
+ if comments_result:
+ essay_entry["comment_count"] = comments_result.get("total_comments")
+ essay_entry["total_comments"] = comments_result.get("total_comments")
+ essay_entry["comments_json_link"] = comments_result.get("json_path")
+ essays_data.append(essay_entry)
+
+ # Periodic driver restart to shed accumulated state/leaks before they
+ # destabilize the renderer. Only applies to scrapers with a driver.
+ if hasattr(self, "_recreate_driver"):
+ self._scrape_counter += 1
+ if self._scrape_counter % 40 == 0:
+ pbar.write("[MAINT] Periodic driver restart to shed state...")
+ self._recreate_driver()
+ sleep(random.uniform(4, 8))
else:
pbar.write(f"File already exists: {md_filepath}")
+ # Fetch comments independently of the post body so --comments can be
+ # added to an already-scraped publication without re-scraping posts.
+ # Re-render the HTML (from the on-disk md) with comments baked in.
+ if self.fetch_comments:
+ try:
+ comments_result = self.scrape_comments_for_post(url)
+ comments_list = comments_result["comments"] if comments_result else []
+ with open(md_filepath, "r", encoding="utf-8") as f:
+ md_text = f.read()
+ on_disk_meta, on_disk_body = split_metadata_and_body(
+ md_text, self.frontmatter_format
+ )
+ self._write_post_html(
+ html_filepath, on_disk_body, comments_list, meta=on_disk_meta
+ )
+ except Exception as ce:
+ pbar.write(f"[WARN] Comments failed for {url}: {ce}")
except Exception as e:
pbar.write(f"Error scraping post: {e}")
@@ -1126,9 +1859,17 @@ def __init__(
html_save_dir: str,
download_images: bool = False,
frontmatter_format: str = "legacy",
+ fetch_comments_flag: bool = False,
+ comments_sort: str = COMMENTS_SORT,
):
super().__init__(
- base_substack_url, md_save_dir, html_save_dir, download_images, frontmatter_format
+ base_substack_url,
+ md_save_dir,
+ html_save_dir,
+ download_images,
+ frontmatter_format,
+ fetch_comments_flag=fetch_comments_flag,
+ comments_sort=comments_sort,
)
def get_url_soup(self, url: str, max_attempts: int = 5) -> Optional[BeautifulSoup]:
@@ -1180,6 +1921,8 @@ def __init__(
use_persistent_profile: bool = False,
skip_login: bool = False,
frontmatter_format: str = "legacy",
+ fetch_comments_flag: bool = False,
+ comments_sort: str = COMMENTS_SORT,
) -> None:
"""
Initialize the premium scraper with browser automation.
@@ -1196,6 +1939,15 @@ def __init__(
use_persistent_profile: Reuse browser profile across runs (saves login)
skip_login: Skip login if using a pre-authenticated profile
"""
+ # Store settings so the driver can be recreated with identical options after a crash.
+ self._browser = browser
+ self._headless = headless
+ self._driver_path = driver_path
+ self._browser_path = browser_path
+ self._user_agent = user_agent
+ self._base_substack_url = base_substack_url
+ self._scrape_counter = 0
+
# Initialize driver before calling super().__init__ since that fetches URLs
self.driver = BrowserManager.create_driver(
browser=browser,
@@ -1218,7 +1970,13 @@ def __init__(
sleep(3)
super().__init__(
- base_substack_url, md_save_dir, html_save_dir, download_images, frontmatter_format
+ base_substack_url,
+ md_save_dir,
+ html_save_dir,
+ download_images,
+ frontmatter_format,
+ fetch_comments_flag=fetch_comments_flag,
+ comments_sort=comments_sort,
)
def login(self) -> None:
@@ -1263,10 +2021,77 @@ def is_login_failed(self) -> bool:
error_container = self.driver.find_elements(By.ID, 'error-container')
return len(error_container) > 0 and error_container[0].is_displayed()
+ def _get_session(self) -> Optional[requests.Session]:
+ """Build a requests.Session seeded with the logged-in browser's cookies.
+
+ Lets the JSON comment endpoints authenticate as the current user, so paid-only
+ comment threads can be fetched. Cookies are read fresh each call so they stay valid
+ after a ``_recreate_driver()`` (persistent profile carries the session).
+ """
+ session = requests.Session()
+ try:
+ cookies = self.driver.get_cookies()
+ except Exception as e:
+ print(f"[WARN] Could not read browser cookies for comments: {e}")
+ return session
+ for c in cookies:
+ try:
+ session.cookies.set(
+ c.get("name", ""),
+ c.get("value", ""),
+ domain=c.get("domain"),
+ path=c.get("path", "/"),
+ )
+ except Exception:
+ continue
+ try:
+ session.headers.update({
+ "User-Agent": self.driver.execute_script("return navigator.userAgent;"),
+ })
+ except Exception:
+ pass
+ return session
+
+ def _driver_is_dead(self) -> bool:
+ """Cheap liveness probe. Returns True if the driver/session is unusable."""
+ try:
+ _ = self.driver.current_url
+ return False
+ except (WebDriverException, InvalidSessionIdException):
+ return True
+
+ def _recreate_driver(self) -> None:
+ """Tear down the (possibly dead) driver and build a fresh one with the same settings.
+
+ With a persistent profile, saved cookies mean NO re-login is required after recreation.
+ """
+ try:
+ self.driver.quit()
+ except Exception:
+ pass
+ self.driver = BrowserManager.create_driver(
+ browser=self._browser,
+ headless=self._headless,
+ driver_path=self._driver_path,
+ browser_path=self._browser_path,
+ user_agent=self._user_agent,
+ use_persistent_profile=self.use_persistent_profile,
+ quiet=True,
+ )
+ if not self.use_persistent_profile:
+ self.login()
+ else:
+ try:
+ self.driver.get(self._base_substack_url)
+ sleep(3)
+ except Exception:
+ pass
+
def get_url_soup(self, url: str, max_attempts: int = 5) -> Optional[BeautifulSoup]:
"""Gets soup from URL using logged-in Selenium driver, with retry on rate limiting."""
for attempt in range(1, max_attempts + 1):
try:
+ sleep(random.uniform(4.0, 9.0))
self.driver.get(url)
# Wait up to 20s for the post body (or a paywall marker) to appear, instead of a fixed sleep.
@@ -1300,6 +2125,23 @@ def get_url_soup(self, url: str, max_attempts: int = 5) -> Optional[BeautifulSou
except RuntimeError:
raise
except Exception as e:
+ msg = str(e).lower()
+ crashed = (
+ isinstance(e, InvalidSessionIdException)
+ or any(s in msg for s in (
+ "tab crashed",
+ "chrome not reachable",
+ "no such session",
+ "session not created",
+ "unable to connect to renderer",
+ "target window already closed",
+ ))
+ )
+ if crashed:
+ print(f"[{attempt}/{max_attempts}] Tab/session crashed โ recreating driver: {e}")
+ self._recreate_driver()
+ sleep(random.uniform(5, 10)) # cool-down after recovery
+ continue # retry the SAME url on a fresh driver
raise ValueError(f"Error fetching page: {url}. Error: {e}") from e
raise RuntimeError(f"Failed to fetch page after {max_attempts} attempts: {url}")
@@ -1335,6 +2177,9 @@ def parse_args() -> argparse.Namespace:
# Subsequent runs (skip login, use saved session)
python substack_scraper.py --url https://example.substack.com --premium --persistent-profile --skip-login
+ # Fetch comments too (public threads; add --premium for paid-only comments)
+ python substack_scraper.py --url https://example.substack.com --comments
+
# Use manually downloaded driver
python substack_scraper.py --url https://example.substack.com --premium --chrome-driver-path /path/to/chromedriver
"""
@@ -1344,6 +2189,16 @@ def parse_args() -> argparse.Namespace:
"-u", "--url", type=str,
help="The base URL of the Substack site to scrape."
)
+ parser.add_argument(
+ "--render-only", action="store_true",
+ help="Skip scraping. Re-render existing on-disk Markdown into the Substack-styled "
+ "HTML (no network). Give authors as positional args or use --all. Equivalent to "
+ "running render_posts.py."
+ )
+ parser.add_argument(
+ "--render-all", action="store_true",
+ help="With --render-only, re-render every author under data/."
+ )
parser.add_argument(
"-d", "--directory", type=str,
help="The directory to save scraped markdown posts."
@@ -1361,6 +2216,17 @@ def parse_args() -> argparse.Namespace:
action="store_true",
help="Download images and update markdown to use local paths."
)
+ parser.add_argument(
+ "--comments",
+ action="store_true",
+ help="Fetch each post's comment thread as separate .comments.md/.comments.json files. "
+ "Public threads need no auth; paid-only threads require --premium."
+ )
+ parser.add_argument(
+ "--comments-sort", type=str, default=COMMENTS_SORT,
+ choices=["best", "most_recent_first"],
+ help="Comment sort order (default: best)."
+ )
parser.add_argument(
"--frontmatter", type=str, default="legacy", choices=["legacy", "mdx"],
help="Header format for scraped markdown. 'legacy' (default) uses the original "
@@ -1414,12 +2280,39 @@ def parse_args() -> argparse.Namespace:
help="Custom user agent string."
)
+ parser.add_argument(
+ "authors", nargs="*", default=[],
+ help="Author name(s) for --render-only (= data/.json stem).",
+ )
+
return parser.parse_args()
+def _run_render_only(args: argparse.Namespace) -> None:
+ """Delegate the --render-only path to the standalone renderer (network-free)."""
+ import render_posts
+
+ if args.render_all:
+ authors = render_posts.discover_authors()
+ if not authors:
+ print("[SKIP] No authors found under data/.")
+ return
+ for author in authors:
+ render_posts.render_author(author, force=True)
+ elif args.authors:
+ for author in args.authors:
+ render_posts.render_author(author, force=True)
+ else:
+ print("Provide one or more authors, or use --render-only --render-all.")
+
+
def main():
args = parse_args()
+ if args.render_only:
+ _run_render_only(args)
+ return
+
if args.directory is None:
args.directory = BASE_MD_DIR
@@ -1449,6 +2342,8 @@ def main():
use_persistent_profile=args.persistent_profile,
skip_login=args.skip_login,
frontmatter_format=args.frontmatter,
+ fetch_comments_flag=args.comments,
+ comments_sort=args.comments_sort,
)
else:
scraper = SubstackScraper(
@@ -1457,6 +2352,8 @@ def main():
html_save_dir=args.html_directory,
download_images=args.images,
frontmatter_format=args.frontmatter,
+ fetch_comments_flag=args.comments,
+ comments_sort=args.comments_sort,
)
scraper.scrape_posts(args.number)
@@ -1476,6 +2373,8 @@ def main():
use_persistent_profile=args.persistent_profile,
skip_login=args.skip_login,
frontmatter_format=args.frontmatter,
+ fetch_comments_flag=args.comments,
+ comments_sort=args.comments_sort,
)
else:
scraper = SubstackScraper(
@@ -1484,6 +2383,8 @@ def main():
html_save_dir=args.html_directory,
download_images=args.images,
frontmatter_format=args.frontmatter,
+ fetch_comments_flag=args.comments,
+ comments_sort=args.comments_sort,
)
scraper.scrape_posts(num_posts_to_scrape=NUM_POSTS_TO_SCRAPE)
diff --git a/tests/test_substack_scraper.py b/tests/test_substack_scraper.py
index ab8a8bbb..c102de5d 100644
--- a/tests/test_substack_scraper.py
+++ b/tests/test_substack_scraper.py
@@ -1,5 +1,6 @@
import os
import sys
+import json
import shutil
import pytest
@@ -227,4 +228,562 @@ def test_scraper_initialization(tmp_path):
assert scraper.writer_name == "example"
assert os.path.isdir(os.path.join(md_dir, "example"))
- assert os.path.isdir(os.path.join(html_dir, "example"))
\ No newline at end of file
+ assert os.path.isdir(os.path.join(html_dir, "example"))
+
+
+# ---------------------------------------------------------------------------
+# Comment helpers
+# ---------------------------------------------------------------------------
+
+
+# 9. test_render_comments_markdown_flat
+def test_render_comments_markdown_flat():
+ comments = [{
+ "name": "Alice",
+ "body": "Great post!",
+ "date": "2026-06-01T22:54:34.749Z",
+ "reactions": {"โค": 0},
+ "metadata": {"is_author": False},
+ "children": [],
+ }]
+
+ rendered = ss.render_comments_markdown(comments)
+
+ assert "**Alice**" in rendered
+ assert "Great post!" in rendered
+ assert "Jun 01, 2026" in rendered
+ # No reactions shown when count is 0
+ assert "โค" not in rendered
+ # No nested blockquotes at top level
+ assert ">" not in rendered
+
+
+# 10. test_render_comments_markdown_nested
+def test_render_comments_markdown_nested():
+ comments = [{
+ "name": "Alice",
+ "body": "Parent comment",
+ "date": "2026-06-01T22:54:34.749Z",
+ "reactions": {},
+ "metadata": {"is_author": False},
+ "children": [{
+ "name": "Bob",
+ "body": "Reply one\n\nReply two",
+ "date": "2026-06-02T15:36:32.599Z",
+ "reactions": {},
+ "metadata": {"is_author": False},
+ "children": [],
+ }],
+ }]
+
+ rendered = ss.render_comments_markdown(comments)
+
+ assert "Parent comment" in rendered
+ assert "**Bob**" in rendered
+ # Child is rendered as a blockquote under the parent
+ assert "> **Bob**" in rendered
+ # Multi-paragraph child body: every line prefixed with ">"
+ assert "> Reply one" in rendered
+ assert "> Reply two" in rendered
+ # Blank line inside the blockquote is rendered as ">"
+ assert "\n>\n>" in rendered
+
+
+# 11. test_render_comments_markdown_author_flag_and_reactions
+def test_render_comments_markdown_author_flag_and_reactions():
+ comments = [{
+ "name": "Janey Park",
+ "body": "Author reply",
+ "date": "2026-06-02T15:36:32.599Z",
+ "reactions": {"โค": 5},
+ "metadata": {"is_author": True},
+ "children": [],
+ }]
+
+ rendered = ss.render_comments_markdown(comments)
+
+ assert "**Janey Park** (author)" in rendered
+ assert "โค 5" in rendered
+
+
+# 12. test_count_all_comments_recursive
+def test_count_all_comments_recursive():
+ comments = [
+ {"children": [{"children": [{"children": []}]}]},
+ {"children": [{"children": []}]},
+ {"children": []},
+ ]
+
+ assert ss.count_all_comments(comments) == 6
+
+ assert ss.count_all_comments([]) == 0
+
+
+# 13. test_get_post_id_from_slug
+@patch("substack_scraper._request_json_with_rate_limit_retry")
+def test_get_post_id_from_slug(mock_request):
+ mock_request.return_value = {
+ "id": 201818433,
+ "comment_count": 2,
+ "write_comment_permissions": "everyone",
+ }
+
+ result = ss.get_post_id_from_slug("https://example.substack.com/", "my-post")
+
+ assert result == (201818433, 2, "everyone")
+ mock_request.assert_called_once_with(
+ "https://example.substack.com/api/v1/posts/my-post", session=None
+ )
+
+
+@patch("substack_scraper._request_json_with_rate_limit_retry")
+def test_get_post_id_from_slug_returns_none_on_failure(mock_request):
+ mock_request.return_value = None
+
+ assert ss.get_post_id_from_slug("https://example.substack.com/", "missing") is None
+
+
+# 14. test_fetch_comments_returns_list
+@patch("substack_scraper._request_json_with_rate_limit_retry")
+def test_fetch_comments_returns_list(mock_request):
+ mock_request.return_value = {"comments": [{"id": 1}, {"id": 2}], "automod_hidden_comments": []}
+
+ result = ss.fetch_comments("https://example.substack.com/", 201818433)
+
+ assert result == [{"id": 1}, {"id": 2}]
+ mock_request.assert_called_once_with(
+ "https://example.substack.com/api/v1/post/201818433/comments?all_comments=true&sort=best",
+ session=None,
+ )
+
+
+@patch("substack_scraper._request_json_with_rate_limit_retry")
+def test_fetch_comments_empty_payload(mock_request):
+ mock_request.return_value = {"comments": [], "automod_hidden_comments": []}
+
+ assert ss.fetch_comments("https://example.substack.com/", 123) == []
+
+
+# 15. test_parse_args_supports_comments_flag
+def test_parse_args_supports_comments_flag(monkeypatch):
+ monkeypatch.setattr(
+ sys,
+ "argv",
+ ["substack_scraper.py", "--url", "https://example.substack.com/p/post", "--comments"],
+ )
+
+ args = ss.parse_args()
+
+ assert args.comments is True
+ assert args.comments_sort == "best"
+
+
+# 16. test_scrape_comments_loads_from_cache
+def test_scrape_comments_loads_from_cache(tmp_path):
+ """Cached comments JSON is loaded from disk WITHOUT any network calls."""
+ scraper = DummyScraper(
+ "https://example.substack.com/p/my-post",
+ str(tmp_path / "md"),
+ str(tmp_path / "html"),
+ fetch_comments_flag=True,
+ )
+
+ # Pre-create the comments JSON cache with a real comment list.
+ os.makedirs(scraper.comments_save_dir, exist_ok=True)
+ slug = "my-post"
+ json_path = os.path.join(scraper.comments_save_dir, f"{slug}.comments.json")
+ cached = [{"id": 1, "name": "Alice", "body": "Hi", "children": []}]
+ with open(json_path, "w", encoding="utf-8") as f:
+ json.dump(cached, f)
+
+ # Cache hit โ no network (get_post_id_from_slug must NOT be called).
+ with patch("substack_scraper.get_post_id_from_slug") as mock_meta:
+ result = scraper.scrape_comments_for_post("https://example.substack.com/p/my-post")
+
+ assert result is not None
+ assert result["comments"] == cached
+ assert result["total_comments"] == 1
+ assert result["json_path"] == json_path
+ mock_meta.assert_not_called()
+
+
+# ---------------------------------------------------------------------------
+# Comment HTML rendering (for the individual post page)
+# ---------------------------------------------------------------------------
+
+
+# 17. test_render_comments_html_flat
+def test_render_comments_html_flat():
+ comments = [{
+ "name": "Alice",
+ "body": "Great post!",
+ "date": "2026-06-01T22:54:34.749Z",
+ "reactions": {"โค": 0},
+ "metadata": {"is_author": False},
+ "children": [],
+ }]
+
+ html = ss.render_comments_html(comments)
+
+ assert '' in html
+ assert "
Comments (1)
" in html
+ assert 'class="comment-author">Alice' in html
+ assert "Great post!" in html
+ assert "Jun 01, 2026" in html
+ # No avatar when photo_url missing; no reactions when count 0; not an author.
+ assert "comment-avatar" not in html
+ assert "comment-author-flag" not in html
+ assert "comment-reactions" not in html
+
+
+# 18. test_render_comments_html_nested
+def test_render_comments_html_nested():
+ comments = [{
+ "name": "Alice",
+ "body": "Parent",
+ "date": "2026-06-01T22:54:34.749Z",
+ "reactions": {},
+ "metadata": {"is_author": False},
+ "children": [{
+ "name": "Bob",
+ "body": "Reply",
+ "date": "2026-06-02T15:36:32.599Z",
+ "reactions": {},
+ "metadata": {"is_author": False},
+ "children": [],
+ }],
+ }]
+
+ html = ss.render_comments_html(comments)
+
+ assert "Parent" in html
+ assert "Bob" in html
+ assert "Reply" in html
+ # Total includes the nested child.
+ assert "
Comments (2)
" in html
+ # Child is rendered inside a .comment-children list.
+ assert '
' in html
+
+
+# 19. test_render_comments_html_author_flag_and_reactions
+def test_render_comments_html_author_flag_and_reactions():
+ comments = [{
+ "name": "Janey Park",
+ "body": "Author reply",
+ "date": "2026-06-02T15:36:32.599Z",
+ "photo_url": "https://example.com/avatar.png",
+ "reactions": {"โค": 5},
+ "metadata": {"is_author": True},
+ "children": [],
+ }]
+
+ html = ss.render_comments_html(comments)
+
+ assert 'Author' in html
+ assert 'โค 5' in html
+ assert 'body
" in content
+ assert "comments" not in content
+
+
+# 22. test_write_post_html_includes_comments_section
+def test_write_post_html_includes_comments_section(tmp_path):
+ scraper = DummyScraper(
+ "https://example.substack.com/p/my-post",
+ str(tmp_path / "md"),
+ str(tmp_path / "html"),
+ )
+ html_path = str(tmp_path / "out.html")
+ comments = [{"name": "Alice", "body": "Hi", "children": []}]
+
+ scraper._write_post_html(html_path, "# Title\n\nBody", comments)
+
+ content = Path(html_path).read_text(encoding="utf-8")
+ assert '' in content
+ assert "Alice" in content
+ # The markdown body was converted to HTML.
+ assert "
Title
" in content or "
" in content
+
+
+# ---------------------------------------------------------------------------
+# Structured Substack header / render helpers
+# ---------------------------------------------------------------------------
+
+
+# 23. test_build_post_header_renders_full_header
+def test_build_post_header_renders_full_header():
+ meta = {
+ "title": "My Essay",
+ "subtitle": "A subtitle",
+ "author": "Jane Doe",
+ "date": "2026-06-19",
+ "cover_image": "https://example.com/cover.png",
+ }
+
+ html = ss.build_post_header(meta)
+
+ assert '' in html
+ assert '
My Essay
' in html
+ assert '
A subtitle
' in html
+ assert 'Only a title
' in html
+ assert "post-subtitle" not in html
+ assert "post-byline" not in html
+ assert "post-cover" not in html
+
+
+# 25. test_build_post_header_empty_returns_empty
+@pytest.mark.parametrize("meta", [{}, None])
+def test_build_post_header_empty_returns_empty(meta):
+ assert ss.build_post_header(meta) == ""
+
+
+# 26. test_build_post_header_escapes_html
+def test_build_post_header_escapes_html():
+ html = ss.build_post_header({"title": ''})
+ assert "
{inner}
' + + return f'{header}Comments ({total})
' + f'{items}
' + f'{_html_escape(title)}
') + if subtitle: + parts.append(f'{_html_escape(subtitle)}
') + + byline_bits = [] + if author: + byline_bits.append(_html_escape(author)) + display_date = _format_header_date(date_str) + if display_date: + byline_bits.append(_html_escape(display_date)) + if byline_bits: + parts.append( + f'{" ยท ".join(byline_bits)}
' + ) + + return f'Comments (1)
" in html + assert 'class="comment-author">Alice' in html + assert "Great post!" in html + assert "Jun 01, 2026" in html + # No avatar when photo_url missing; no reactions when count 0; not an author. + assert "comment-avatar" not in html + assert "comment-author-flag" not in html + assert "comment-reactions" not in html + + +# 18. test_render_comments_html_nested +def test_render_comments_html_nested(): + comments = [{ + "name": "Alice", + "body": "Parent", + "date": "2026-06-01T22:54:34.749Z", + "reactions": {}, + "metadata": {"is_author": False}, + "children": [{ + "name": "Bob", + "body": "Reply", + "date": "2026-06-02T15:36:32.599Z", + "reactions": {}, + "metadata": {"is_author": False}, + "children": [], + }], + }] + + html = ss.render_comments_html(comments) + + assert "Parent" in html + assert "Bob" in html + assert "Reply" in html + # Total includes the nested child. + assert "Comments (2)
" in html + # Child is rendered inside a .comment-children list. + assert '' in html + + +# 19. test_render_comments_html_author_flag_and_reactions +def test_render_comments_html_author_flag_and_reactions(): + comments = [{ + "name": "Janey Park", + "body": "Author reply", + "date": "2026-06-02T15:36:32.599Z", + "photo_url": "https://example.com/avatar.png", + "reactions": {"โค": 5}, + "metadata": {"is_author": True}, + "children": [], + }] + + html = ss.render_comments_html(comments) + + assert 'Author' in html + assert 'โค 5' in html + assert '
body