Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.12"]
python-version: ["3.10", "3.13"]
steps:
- name: Checkout code
uses: actions/checkout@v6
Expand Down
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.12
3.13
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ classifiers = [
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Topic :: Text Processing",
"Topic :: Text Processing :: Linguistic",
]
Expand All @@ -38,8 +40,7 @@ dependencies = [
"transformers>=4.46.3",
"sentencepiece>=0.2.0",
"sacremoses>=0.1.1",
"fast-langdetect>=1.0.0",
"fasttext-predict==0.9.2.4",
"lingua-language-detector>=2.0.0",
"tomli>=2.0.0;python_version<'3.11'",
]

Expand Down Expand Up @@ -105,7 +106,7 @@ disallow_untyped_defs = true
warn_unused_ignores = true

[[tool.mypy.overrides]]
module = ["frontmatter", "yaml", "tomli"]
module = ["frontmatter", "lingua", "yaml", "tomli"]
ignore_missing_imports = true

[tool.pytest.ini_options]
Expand Down
57 changes: 7 additions & 50 deletions src/scribae/language.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,36 +136,21 @@ def _detect_language(text: str, language_detector: Callable[[str], str] | None)


def _default_language_detector() -> Callable[[str], str]:
copy_error = "Unable to avoid copy while creating an array as requested"
try:
import fast_langdetect # type: ignore[import-untyped]
except Exception as exc: # pragma: no cover - defensive fallback
from lingua import LanguageDetectorBuilder
except ImportError as exc: # pragma: no cover - defensive fallback
return _naive_detector(exc)

try:
detector = fast_langdetect.LangDetector()
detector = LanguageDetectorBuilder.from_all_languages().build()
except Exception as exc: # pragma: no cover - defensive fallback
return _naive_detector(exc)

naive = _naive_detector(None)

def _detect(sample: str) -> str:
try:
results = detector.detect(sample, model="auto", k=1, threshold=0.0)
except ValueError as exc:
if copy_error in str(exc):
results = _detect_with_fasttext_copy_fix(detector, sample)
else:
return naive(sample)
except Exception:
return naive(sample)
if not results:
return naive(sample)
first = results[0]
lang = first.get("lang") if isinstance(first, Mapping) else None
if not isinstance(lang, str) or not lang:
return naive(sample)
return normalize_language(lang)
result = detector.detect_language_of(sample)
if result is None:
raise LanguageResolutionError("lingua could not identify the language.")
return result.iso_code_639_1.name.lower()

return _detect

Expand Down Expand Up @@ -194,34 +179,6 @@ def _report(reporter: Callable[[str], None] | None, message: str) -> None:
reporter(message)


def _detect_with_fasttext_copy_fix(detector: Any, text: str) -> list[dict[str, object]]:
try:
ft_model = detector._get_model(low_memory=False, fallback_on_memory_error=True)
processed = detector._preprocess_text(text)
normalized = detector._normalize_text(processed, detector.config.normalize_input)
except Exception:
return []

if "\n" in normalized:
return []

raw_predictor = getattr(ft_model, "f", None)
if raw_predictor is None or not hasattr(raw_predictor, "predict"):
return []

try:
predictions = raw_predictor.predict(f"{normalized}\n", 1, 0.0, "strict")
except Exception:
return []

if not predictions:
return []

scored = [(str(label).replace("__label__", ""), min(float(score), 1.0)) for score, label in predictions]
scored.sort(key=lambda item: item[1], reverse=True)
return [{"lang": label, "score": score} for label, score in scored]


__all__ = [
"LanguageResolution",
"LanguageResolutionError",
Expand Down
78 changes: 7 additions & 71 deletions src/scribae/translate/postedit.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from __future__ import annotations

import asyncio
import importlib
import os
from collections.abc import Callable, Iterable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Literal, cast
from typing import TYPE_CHECKING

from pydantic_ai import Agent, NativeOutput, UnexpectedModelBehavior
from pydantic_ai.settings import ModelSettings
Expand Down Expand Up @@ -202,80 +200,18 @@ def _get_language_detector(self) -> Callable[[str], str]:
return self._language_detector

def _create_language_detector(self) -> Callable[[str], str]:
fast_langdetect = importlib.import_module("fast_langdetect")
config_kwargs: dict[str, Any] = {}
env_model_path = os.getenv("FASTTEXT_LID_MODEL")
if env_model_path:
config_kwargs["custom_model_path"] = env_model_path
from lingua import LanguageDetectorBuilder

config = fast_langdetect.LangDetectConfig(**config_kwargs) if config_kwargs else None
detector = fast_langdetect.LangDetector(config) if config else fast_langdetect.LangDetector()
detector = LanguageDetectorBuilder.from_all_languages().build()

def _detect(text: str) -> str:
results = self._detect_labels(detector, text, model="auto", k=1, threshold=0.0)
if not results:
raise UnexpectedModelBehavior("language detector returned no labels")
lang = results[0].get("lang")
if not isinstance(lang, str) or not lang:
raise UnexpectedModelBehavior("language detector returned invalid language label")
return self._normalize_lang(lang)
result = detector.detect_language_of(text)
if result is None:
raise UnexpectedModelBehavior("lingua could not identify the language")
return self._normalize_lang(result.iso_code_639_1.name.lower())

return _detect

def _detect_labels(
self,
detector: Any,
text: str,
*,
model: Literal["lite", "full", "auto"],
k: int,
threshold: float,
) -> list[dict[str, object]]:
try:
return cast(list[dict[str, object]], detector.detect(text, model=model, k=k, threshold=threshold))
except ValueError as exc:
message = str(exc)
copy_error = "Unable to avoid copy while creating an array as requested"
if copy_error not in message:
raise
return self._detect_with_fasttext_copy_fix(detector, text, model=model, k=k, threshold=threshold)

def _detect_with_fasttext_copy_fix(
self,
detector: Any,
text: str,
*,
model: Literal["lite", "full", "auto"],
k: int,
threshold: float,
) -> list[dict[str, object]]:
if model not in {"lite", "full", "auto"}:
raise UnexpectedModelBehavior(f"Invalid language detection model '{model}'")

if model == "lite":
ft_model = detector._get_model(low_memory=True, fallback_on_memory_error=False)
elif model == "full":
ft_model = detector._get_model(low_memory=False, fallback_on_memory_error=False)
else:
ft_model = detector._get_model(low_memory=False, fallback_on_memory_error=True)

processed = detector._preprocess_text(text)
normalized = detector._normalize_text(processed, detector.config.normalize_input)
if "\n" in normalized:
raise UnexpectedModelBehavior("language detection input contains newline characters")

raw_predictor = getattr(ft_model, "f", None)
if raw_predictor is None or not hasattr(raw_predictor, "predict"):
raise UnexpectedModelBehavior("fasttext model missing raw predictor for copy-safe language detection")

predictions = cast(list[tuple[float, str]], raw_predictor.predict(f"{normalized}\n", k, threshold, "strict"))
if not predictions:
return []

scored = [(str(label).replace("__label__", ""), min(float(score), 1.0)) for score, label in predictions]
scored.sort(key=lambda item: item[1], reverse=True)
return [{"lang": label, "score": score} for label, score in scored]

def _leading_markdown_marker(self, line: str) -> str:
"""Return the Markdown prefix (blockquote, list, heading) if present."""
import re
Expand Down
18 changes: 1 addition & 17 deletions src/scribae/translate_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

translate_app = typer.Typer()

_LIBRARY_LOGGERS = ("transformers", "huggingface_hub", "sentencepiece", "fasttext", "fast_langdetect")
_LIBRARY_LOGGERS = ("transformers", "huggingface_hub", "sentencepiece")
_LANGUAGE_CODE_RE = re.compile(r"^[A-Za-z]{2,3}$|^[A-Za-z]{3}[-_][A-Za-z]{4}$")


Expand Down Expand Up @@ -58,22 +58,6 @@ def _configure_library_logging() -> None:
except Exception:
pass

try:
import fast_langdetect.infer as fast_langdetect_infer # type: ignore[import-untyped]
import robust_downloader # type: ignore[import-untyped]

original_download = robust_downloader.download
if getattr(original_download, "__name__", "") != "quiet_download":

def quiet_download(*args: Any, **kwargs: Any) -> None:
kwargs.setdefault("show_progress", False)
kwargs.setdefault("logging_level", logging.ERROR)
original_download(*args, **kwargs)

robust_downloader.download = quiet_download
fast_langdetect_infer.download = quiet_download
except Exception:
pass


def _load_glossary(path: Path | None) -> dict[str, str]:
Expand Down
59 changes: 19 additions & 40 deletions tests/unit/language_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,56 +95,35 @@ def invoke(prompt: str) -> str:
)


def test_default_language_detector_handles_numpy_copy_error(monkeypatch: pytest.MonkeyPatch) -> None:
class Capture:
predict_args: tuple[str, int, float, str] | None = None
model_args: tuple[bool, bool] | None = None
def test_default_language_detector_uses_lingua(monkeypatch: pytest.MonkeyPatch) -> None:
captured: dict[str, object] = {}

capture = Capture()
class FakeIsoCode:
name = "FR"

class FakeConfig:
normalize_input = True

class FakePredictor:
def predict(self, text: str, k: int, threshold: float, mode: str) -> list[tuple[float, str]]:
capture.predict_args = (text, k, threshold, mode)
return [(0.8, "__label__FR")]

class FakeModel:
def __init__(self) -> None:
self.f = FakePredictor()
class FakeLanguageResult:
iso_code_639_1 = FakeIsoCode()

class FakeDetector:
config = FakeConfig()

def detect(
self,
text: str,
model: str = "auto",
k: int = 1,
threshold: float = 0.0,
) -> list[dict[str, object]]:
raise ValueError("Unable to avoid copy while creating an array as requested.")

def _get_model(self, low_memory: bool, fallback_on_memory_error: bool) -> FakeModel:
capture.model_args = (low_memory, fallback_on_memory_error)
return FakeModel()
def detect_language_of(self, text: str) -> FakeLanguageResult:
captured["text"] = text
return FakeLanguageResult()

def _preprocess_text(self, text: str) -> str:
return text
class FakeBuilder:
@staticmethod
def from_all_languages() -> "FakeBuilder":
return FakeBuilder()

def _normalize_text(self, text: str, normalize_input: bool) -> str:
return text
def build(self) -> FakeDetector:
return FakeDetector()

class FakeModule:
LangDetector = FakeDetector
class FakeLinguaModule:
LanguageDetectorBuilder = FakeBuilder

monkeypatch.setitem(sys.modules, "fast_langdetect", FakeModule())
monkeypatch.setitem(sys.modules, "lingua", FakeLinguaModule())

detector = language._default_language_detector()
detected = detector("Bonjour le monde")

assert detected == "fr"
assert capture.predict_args is not None
assert capture.predict_args[1:] == (1, 0.0, "strict")
assert capture.model_args == (False, True)
assert captured["text"] == "Bonjour le monde"
Loading