Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions blind_watermark/mlwm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@

from .codec import (
ENCODED_BYTES,
KEYED_PROTOCOL,
PAYLOAD_BITS,
MAX_TEXT_BYTES,
decode_payload_bits,
decode_payload_logits,
encode_text_payload,
key_payload_bits,
unkey_payload_bits,
)

__all__ = [
'ENCODED_BYTES',
'KEYED_PROTOCOL',
'PAYLOAD_BITS',
'MAX_TEXT_BYTES',
'decode_payload_bits',
'decode_payload_logits',
'encode_text_payload',
'key_payload_bits',
'unkey_payload_bits',
]
67 changes: 60 additions & 7 deletions blind_watermark/mlwm/codec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import hashlib
import struct
import zlib
from dataclasses import dataclass
from typing import Any
Expand All @@ -15,6 +17,7 @@
ENCODED_BYTES = FRAME_BYTES + RS_NSYM
PAYLOAD_BITS = ENCODED_BYTES * 8
WHITENING_SEED = 0x4d4c574d
KEYED_PROTOCOL = 'keyed-v2'


@dataclass
Expand All @@ -25,6 +28,8 @@ class PayloadEnvelope:
encoded: bytes
bits: np.ndarray
flags: int = 0
protocol: str = KEYED_PROTOCOL
password_protected: bool = False


def _whitening_mask() -> np.ndarray:
Expand All @@ -35,6 +40,25 @@ def _whitening_mask() -> np.ndarray:
_PAYLOAD_WHITENING_MASK = _whitening_mask()


def _normalize_password(password: int | None) -> int | None:
if password is None:
return None
return int(password) & 0xffffffff


def _password_mask(password: int | None) -> np.ndarray:
normalized = _normalize_password(password)
if normalized is None:
return np.zeros(PAYLOAD_BITS, dtype=np.float32)
seed = b'LuminCrypt-MLWM-keyed-v2' + struct.pack('>I', normalized)
stream = bytearray()
counter = 0
while len(stream) < ENCODED_BYTES:
stream.extend(hashlib.sha256(seed + struct.pack('>I', counter)).digest())
counter += 1
return bytes_to_bits(bytes(stream[:ENCODED_BYTES]))


def _rs_codec() -> reedsolo.RSCodec:
return reedsolo.RSCodec(RS_NSYM)

Expand Down Expand Up @@ -72,6 +96,27 @@ def unwhiten_payload_bits(bits: np.ndarray | list[float] | list[int]) -> np.ndar
return whiten_payload_bits(bits)


def key_payload_bits(bits: np.ndarray | list[float] | list[int], password: int | None = None) -> np.ndarray:
flat = np.asarray(bits, dtype=np.float32).reshape(-1)
if flat.size != PAYLOAD_BITS:
raise ValueError(f'expected {PAYLOAD_BITS} payload bits, got {flat.size}')
hard = (flat >= 0.5).astype(np.float32)
return np.abs(hard - _password_mask(password)).astype(np.float32)


def unkey_payload_bits(bits: np.ndarray | list[float] | list[int], password: int | None = None) -> np.ndarray:
return key_payload_bits(bits, password)


def unkey_payload_logits(logits: np.ndarray, password: int | None = None) -> np.ndarray:
arr = np.asarray(logits, dtype=np.float32).reshape(-1)
if arr.size != PAYLOAD_BITS:
raise ValueError(f'expected {PAYLOAD_BITS} logits, got {arr.size}')
mask = _password_mask(password)
signs = np.where(mask >= 0.5, -1.0, 1.0).astype(np.float32)
return arr * signs


def build_frame(payload_bytes: bytes, *, flags: int = 0) -> bytes:
if len(payload_bytes) > MAX_TEXT_BYTES:
raise ValueError(f'neural payload supports up to {MAX_TEXT_BYTES} UTF-8 bytes')
Expand All @@ -96,17 +141,19 @@ def encode_frame(frame: bytes) -> bytes:
return bytes(_rs_codec().encode(frame))


def encode_text_payload(text: str, *, flags: int = 0) -> PayloadEnvelope:
def encode_text_payload(text: str, password: int | None = None, *, flags: int = 0) -> PayloadEnvelope:
payload_bytes = text.encode('utf-8')
frame = build_frame(payload_bytes, flags=flags)
encoded = encode_frame(frame)
whitened = whiten_payload_bits(bytes_to_bits(encoded))
return PayloadEnvelope(
text=text,
text_bytes=payload_bytes,
frame=frame,
encoded=encoded,
bits=whiten_payload_bits(bytes_to_bits(encoded)),
bits=key_payload_bits(whitened, password),
flags=flags,
password_protected=password is not None,
)


Expand Down Expand Up @@ -135,22 +182,28 @@ def decode_frame(encoded_bytes: bytes) -> dict[str, Any]:
}


def decode_payload_bits(bits: np.ndarray | list[float] | list[int]) -> dict[str, Any]:
raw_bits = unwhiten_payload_bits(bits)
def decode_payload_bits(bits: np.ndarray | list[float] | list[int], password: int | None = None) -> dict[str, Any]:
unkeyed = unkey_payload_bits(bits, password)
raw_bits = unwhiten_payload_bits(unkeyed)
encoded = bits_to_bytes(raw_bits)
result = decode_frame(encoded)
result['encoded'] = encoded
result['rawBits'] = raw_bits
result['protocol'] = KEYED_PROTOCOL if password is not None else 'unkeyed-v1'
result['passwordProtected'] = password is not None
return result


def decode_payload_logits(logits: np.ndarray) -> dict[str, Any]:
def decode_payload_logits(logits: np.ndarray, password: int | None = None) -> dict[str, Any]:
logits = np.asarray(logits, dtype=np.float32).reshape(-1)
if logits.size != PAYLOAD_BITS:
raise ValueError(f'expected {PAYLOAD_BITS} logits, got {logits.size}')
probs = 1.0 / (1.0 + np.exp(-logits))
decoded_logits = unkey_payload_logits(logits, password)
probs = 1.0 / (1.0 + np.exp(-decoded_logits))
bits = (probs >= 0.5).astype(np.float32)
decoded = decode_payload_bits(bits)
decoded = decode_payload_bits(bits, None)
decoded['probabilities'] = probs
decoded['bitConfidence'] = float(np.mean(np.maximum(probs, 1.0 - probs)))
decoded['protocol'] = KEYED_PROTOCOL if password is not None else 'unkeyed-v1'
decoded['passwordProtected'] = password is not None
return decoded
5 changes: 3 additions & 2 deletions blind_watermark/mlwm/infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ def neural_decode_views(
*,
models_dir: str | None = None,
use_cuda: bool = False,
password: int | None = None,
) -> dict[str, Any]:
status = probe_runtime(models_dir)
if not status.ready:
Expand All @@ -167,7 +168,7 @@ def neural_decode_views(
weighted_logits += logits * confidence
total_weight += confidence
try:
decoded = decode_payload_logits(logits)
decoded = decode_payload_logits(logits, password=password)
decoded['confidence'] = confidence
decoded['attemptIndex'] = index
decoded['strategy'] = 'single-view'
Expand All @@ -180,7 +181,7 @@ def neural_decode_views(
raise NeuralRuntimeUnavailable('decoder produced no usable confidence scores')

aggregated_logits = weighted_logits / total_weight
decoded = decode_payload_logits(aggregated_logits)
decoded = decode_payload_logits(aggregated_logits, password=password)
decoded['confidence'] = float(total_weight / max(len(views_rgb), 1))
decoded['attempts'] = [{'index': a['index'], 'confidence': a['confidence']} for a in attempts]
decoded['strategy'] = 'weighted-aggregate'
Expand Down
28 changes: 20 additions & 8 deletions blind_watermark/rwm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,23 @@
# v2 compat constants
_V2_RS_NSYM = 20
_V2_REDUNDANCY = 3
RWM_VERSION = '3.1.0'
RWM_VERSION = '3.2.0'
NEURAL_MAX_TEXT_BYTES = 16
NEURAL_PROFILES = {
'invisible': {
'residual_strength': 0.35,
'template_strength': 0.0,
'template_peaks': 0,
'sync_enabled': False,
},
'balanced': {
'residual_strength': 1.0,
'residual_strength': 0.55,
'template_strength': 0.0,
'template_peaks': 0,
'sync_enabled': False,
},
'aggressive': {
'residual_strength': 1.35,
'robust': {
'residual_strength': 1.0,
'template_strength': 0.0,
'template_peaks': 0,
'sync_enabled': False,
Expand Down Expand Up @@ -517,8 +523,8 @@ def _try_all_presets(gray_ch, preset_order=None):


def _resolve_neural_profile(quality):
if quality == 'robust':
return 'aggressive'
if quality in NEURAL_PROFILES:
return quality
return 'balanced'


Expand Down Expand Up @@ -600,7 +606,7 @@ def _neural_embed_impl(img, text, password=1, quality='balanced', models_dir=Non
profile = NEURAL_PROFILES[profile_name]
alpha = img[:, :, 3].copy() if img.ndim == 3 and img.shape[2] == 4 else None
base = img[:, :, :3] if alpha is not None else img.copy()
payload = encode_text_payload(text)
payload = encode_text_payload(text, password=password)
rgb = cv2.cvtColor(base, cv2.COLOR_BGR2RGB)
try:
encoded = neural_encode_residual(rgb, payload.bits, models_dir=models_dir, use_cuda=False)
Expand All @@ -622,6 +628,9 @@ def _neural_embed_impl(img, text, password=1, quality='balanced', models_dir=Non

diagnostics = {
'profile': profile_name,
'protocol': payload.protocol,
'passwordProtected': payload.password_protected,
'visualStrength': profile['residual_strength'],
'payloadBytes': len(payload.text_bytes),
'modelVersion': encoded.get('modelVersion'),
'modelsDir': models_dir,
Expand Down Expand Up @@ -658,7 +667,7 @@ def _neural_extract_impl(img, password=1, quality='balanced', models_dir=None):
geo = {'angle': 0.0, 'scale': 1.0, 'confidence': 0.0, 'peaks': 0, 'syncEnabled': False}
views = _build_neural_views(corrected)
try:
decoded = neural_decode_views(views, models_dir=models_dir, use_cuda=False)
decoded = neural_decode_views(views, models_dir=models_dir, use_cuda=False, password=password)
except NeuralRuntimeUnavailable:
raise

Expand All @@ -671,6 +680,9 @@ def _neural_extract_impl(img, password=1, quality='balanced', models_dir=None):
'confidence': confidence,
'diagnostics': {
'profile': profile_name,
'protocol': decoded.get('protocol', 'keyed-v2'),
'passwordProtected': bool(decoded.get('passwordProtected', True)),
'visualStrength': profile['residual_strength'],
'bitConfidence': float(decoded.get('bitConfidence', 0.0)),
'decodeStrategy': decoded.get('strategy'),
'geometricCorrection': geo,
Expand Down
26 changes: 26 additions & 0 deletions blind_watermark/tests/test_mlwm_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
bits_to_bytes,
bytes_to_bits,
decode_payload_bits,
decode_payload_logits,
encode_frame,
encode_text_payload,
unwhiten_payload_bits,
Expand All @@ -22,6 +23,31 @@ def test_roundtrip_short_text(self):
self.assertEqual(decoded['text'], 'TRACE-42')
self.assertEqual(len(envelope.encoded), ENCODED_BYTES)

def test_password_protected_payload_roundtrip(self):
envelope = encode_text_payload('LOCKED-42', password=2468)
decoded = decode_payload_bits(envelope.bits, password=2468)
self.assertEqual(decoded['text'], 'LOCKED-42')
self.assertTrue(envelope.password_protected)
self.assertTrue(decoded['passwordProtected'])

def test_wrong_password_rejects_payload(self):
envelope = encode_text_payload('LOCKED-42', password=2468)
with self.assertRaises(Exception):
decode_payload_bits(envelope.bits, password=2469)

def test_different_passwords_produce_different_payload_bits(self):
a = encode_text_payload('LOCKED-42', password=2468)
b = encode_text_payload('LOCKED-42', password=2469)
self.assertFalse(np.array_equal(a.bits, b.bits))

def test_password_protected_logits_roundtrip(self):
envelope = encode_text_payload('LOGITS-42', password=2468)
logits = (envelope.bits * 2.0 - 1.0) * 8.0
decoded = decode_payload_logits(logits, password=2468)
self.assertEqual(decoded['text'], 'LOGITS-42')
with self.assertRaises(Exception):
decode_payload_logits(logits, password=2469)

def test_reject_long_text(self):
with self.assertRaises(ValueError):
encode_text_payload('X' * (MAX_TEXT_BYTES + 1))
Expand Down
5 changes: 5 additions & 0 deletions blind_watermark/tests/test_mlwm_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ def test_alpha1_neural_profiles_do_not_inject_untrained_sync_template(self):
self.assertFalse(profile.get('sync_enabled', False))
self.assertEqual(profile.get('template_strength'), 0.0)

def test_neural_profiles_expose_three_visual_strengths(self):
self.assertEqual(set(rwm_engine.NEURAL_PROFILES.keys()), {'invisible', 'balanced', 'robust'})
self.assertLess(rwm_engine.NEURAL_PROFILES['invisible']['residual_strength'], rwm_engine.NEURAL_PROFILES['balanced']['residual_strength'])
self.assertLess(rwm_engine.NEURAL_PROFILES['balanced']['residual_strength'], rwm_engine.NEURAL_PROFILES['robust']['residual_strength'])


if __name__ == '__main__':
unittest.main()
Loading