Skip to content

ExpediaGroup/kamae

Kamae

CI PyPI - Version

Kamae bridges the gap between offline data processing and online model serving. Build preprocessing pipelines in Spark for big data workloads, then export them as Keras models for low-latency inference.

Why Kamae?

Training and serving often happen on different platforms. Spark for batch processing at scale, TensorFlow for low-latency inference. Manually reimplementing preprocessing logic in both places creates:

  • Training/serving skew: Subtle bugs from inconsistent implementations
  • Development overhead: Writing and maintaining duplicate code
  • Deployment friction: Changes require updates in multiple systems

Kamae solves this by generating the inference model directly from your Spark pipeline, guaranteeing consistency between training and serving.

Installation

pip install kamae

Platform notes: Kamae supports tensorflow>=2.9.1,<2.19.0. For Mac ARM with tensorflow<2.13.0, install tensorflow-macos manually. TensorFlow no longer supports Mac x86_64 from version 2.18.0 onwards.

Quick Start

from pyspark.sql import SparkSession
from kamae.spark.estimators import StandardScaleEstimator, StringIndexEstimator
from kamae.spark.pipeline import KamaeSparkPipeline
from kamae.spark.transformers import LogTransformer, ArrayConcatenateTransformer

# Define preprocessing in Spark
spark = SparkSession.builder.getOrCreate()
data = spark.createDataFrame(
    [(1, 2, "a"), (4, 5, "b"), (7, 8, "c")],
    ["col1", "col2", "category"]
)

pipeline = KamaeSparkPipeline(stages=[
    LogTransformer(inputCol="col1", outputCol="log_col1", alpha=1, inputDtype="float"),
    ArrayConcatenateTransformer(inputCols=["log_col1", "col2"], outputCol="features", inputDtype="float"),
    StandardScaleEstimator(inputCol="features", outputCol="scaled_features"),
    StringIndexEstimator(inputCol="category", outputCol="category_indexed"),
])

fitted_pipeline = pipeline.fit(data)
fitted_pipeline.transform(data).show()  # Use in Spark

# Export for TensorFlow Serving
tf_input_schema = [
    {"name": "col1", "dtype": "int32", "shape": (None, 1)},
    {"name": "col2", "dtype": "int32", "shape": (None, 1)},
    {"name": "category", "dtype": "string", "shape": (None, 1)},
]
keras_model = fitted_pipeline.build_keras_model(tf_input_schema=tf_input_schema)
keras_model.save("./preprocessing_model.keras")

Usage

Spark Pipeline (Recommended): Build preprocessing pipelines using Spark transformers and estimators, fit on DataFrames, then export as Keras models. See examples for common patterns.

Direct Keras Layers: Import and compose Keras layers directly for non-tabular data or custom workflows. Browse available layers in the transformation table below.

For Scikit-learn support (experimental, unmaintained), see sklearn examples.

Documentation

Supported Preprocessing Layers

Transformation Description Keras Layer Spark Transformer Scikit-learn Transformer
AbsoluteValue Applies the abs(x) transform. Link Link Not yet implemented
ArrayConcatenate Assembles multiple features into a single array. Link Link Link
ArrayCrop Crops or pads a feature array to a consistent size. Link Link Not yet implemented
ArraySplit Splits a feature array into multiple features. Link Link Link
ArraySubtractMinimum Subtracts the minimum element in an array from therest to compute a timestamp difference. Ignores padded values. Link Link Not yet implemented
BearingAngle Compute the bearing angle (https://en.wikipedia.org/wiki/Bearing_(navigation)) between two pairs of lat/long. Link Link Not yet implemented
Bin Bins a numerical column into string categorical bins. Users can specify the bin values, labels and a default label. Link Link Not yet implemented
BloomEncode Hash encodes a string feature multiple times to create an array of indices. Useful for compressing input dimensions for embeddings. Paper: https://arxiv.org/pdf/1706.03993.pdf Link Link Not yet implemented
Bucketize Buckets a numerical column into integer bins. Link Link Not yet implemented
ConditionalStandardScale Normalises by the mean and standard deviation, with ability to: apply a mask on another column, not scale the zeros, and apply a non standard scaling function. Link Link Not yet implemented
CosineSimilarity Computes the cosine similarity between two array features. Link Link Not yet implemented
CurrentDate Returns the current date for use in other transformers. Link Link Not yet implemented
CurrentDateTime Returns the current date time in the format yyyy-MM-dd HH:mm:ss.SSS for use in other transformers. Link Link Not yet implemented
CurrentUnixTimestamp Returns the current unix timestamp in either seconds or milliseconds for use in other transformers. Link Link Not yet implemented
DateAdd Adds a static or dynamic number of days to a date feature. NOTE: Destroys any time component of the datetime if present. Link Link Not yet implemented
DateDiff Computes the number of days between two date features. Link Link Not yet implemented
DateParse Parses a string date of format YYYY-MM-DD to extract a given date part. E.g. day of year. Link Link Not yet implemented
DateTimeToUnixTimestamp Converts a UTC datetime string to unix timestamp. Link Link Not yet implemented
Divide Divides a single feature by a constant or divides multiple features against each other. Link Link Not yet implemented
Exp Applies the exp(x) operation to the feature. Link Link Not yet implemented
Exponent Applies the x^exponent to a single feature or x^y for multiple features. Link Link Not yet implemented
HashIndex Transforms strings to indices via a hash table of predeterminded size. Link Link Not yet implemented
HaversineDistance Computes the haversine distance between latitude and longitude pairs. Link Link Not yet implemented
Identity Applies the identity operation, leaving the input the same. Link Link Link
IfStatement Computes a simple if statement on a set of columns/tensors and/or constants. Link Link Not yet implemented
Impute Performs imputation of either mean or median value of the data over a specified mask. Link Link Not yet implemented
LambdaFunction Transforms an input (or multiple inputs) to an output (or multiple outputs) with a user provided tensorflow function. Link Link Not yet implemented
ListMax Computes the listwise max of a feature, optionally calculated only on the top items based on another given feature. Link Link Not yet implemented
ListMean Computes the listwise mean of a feature, optionally calculated only on the top items based on another given feature. Link Link Not yet implemented
ListMedian Computes the listwise median of a feature, optionally calculated only on the top items based on another given feature. Link Link Not yet implemented
ListMin Computes the listwise min of a feature, optionally calculated only on the top items based on another given feature. Link Link Not yet implemented
ListRank Computes the listwise rank (ordering) of a feature. Link Link Not yet implemented
ListStdDev Computes the listwise standard deviation of a feature, optionally calculated only on the top items based on another given feature. Link Link Not yet implemented
Log Applies the natural logarithm log(alpha + x) transform . Link Link Link
LogicalAnd Performs an and(x, y) operation on multiple boolean features. Link Link Not yet implemented
LogicalNot Performs a not(x) operation on a single boolean feature. Link Link Not yet implemented
LogicalOr Performs an or(x, y) operation on multiple boolean features. Link Link Not yet implemented
Max Computes the maximum of a feature with a constant or multiple other features. Link Link Not yet implemented
Mean Computes the mean of a feature with a constant or multiple other features. Link Link Not yet implemented
Min Computes the minimum of a feature with a constant or multiple other features. Link Link Not yet implemented
MinHashIndex Creates an integer bit array from a set of strings using the MinHash algorithm. Link Link Not yet implemented
MinMaxScale Scales the input feature by the min/max resulting in a feature in [0, 1]. Link Link Not yet implemented
Modulo Computes the modulo of a feature with the mod divisor being a constant or another feature. Link Link Not yet implemented
Multiply Multiplies a single feature by a constant or multiples multiple features together. Link Link Not yet implemented
NumericalIfStatement Performs a simple if else statement witha given operator. Value to check, result if true or false can be constants or features. Link Link Not yet implemented
OneHotEncode Transforms a string to a one-hot array. Link Link Not yet implemented
OrdinalArrayEncode Encodes strings in an array according to the order in which they appear. Only for 2D tensors. Link Link Not yet implemented
Round Rounds a floating feature to the nearest integer using ceil, floor or a standard round op. Link Link Not yet implemented
RoundToDecimal Rounds a floating feature to the nearest decimal precision. Link Link Not yet implemented
SharedOneHotEncode Transforms a string to a one-hot array, using labels across multiple inputs to determine the one-hot size. Link Link Not yet implemented
SharedStringIndex Transforms strings to indices via a vocabulary lookup, sharing the vocabulary across multiple inputs. Link Link Not yet implemented
SingleFeatureArrayStandardScale Normalises by the mean and standard deviation calculated over all elements of all inputs, with ability to mask a specified value. Link Link Not yet implemented
StandardScale Normalises by the mean and standard deviation, with ability to mask a specified value. Link Link Link
StringAffix Prefixes and suffixes a string with provided constants. Link Link Not yet implemented
StringArrayConstant Inserts provided string array constant into a column. Link Link Not yet implemented
StringCase Applies an upper or lower casing operation to the feature. Link Link Not yet implemented
StringConcatenate Joins string columns using the provided separator. Link Link Not yet implemented
StringContains Checks for the existence of a constant or tensor-element substring within a feature. Link Link Not yet implemented
StringContainsList Checks for the existence of any string from a list of string constants within a feature. Link Link Not yet implemented
StringEqualsIfStatement Performs a simple if else statement on string equality. Value to check, result if true or false can be constants or features. Link Link Not yet implemented
StringIndex Transforms strings to indices via a vocabulary lookup Link Link Not yet implemented
StringListToString Concatenates a list of strings to a single string with a given delimiter. Link Link Not yet implemented
StringMap Maps a list of string values to a list of other string values with a standard CASE WHEN statement. Can provide a default value for ELSE. Link Link Not yet implemented
StringIsInList Checks if the feature is equal to at least one of the strings provided. Link Link Not yet implemented
StringReplace Performs a regex replace operation on a feature with constant params or between multiple features Link Link Not yet implemented
StringToStringList Splits a string by a separator, returning a list of parametrised length (with a default value for missing inputs). Link Link Not yet implemented
SubStringDelimAtIndex Splits a string column using the provided delimiter, and returns the value at the index given. If the index is out of bounds, returns a given default value Link Link Not yet implemented
Subtract Subtracts a constant from a single feature or subtracts multiple features from each other. Link Link Not yet implemented
Sum Adds a constant to a single feature or sums multiple features together. Link Link Not yet implemented
UnixTimestampToDateTime Converts a unix timestamp to a UTC datetime string. Link Link Not yet implemented

Development

Setup

Requirements: Python 3.10 (for development), pipx (installation instructions)

make setup      # Install dependencies and pre-commit hooks
make all        # Run tests, formatting, and linting
make help       # See all available commands

The package supports Python 3.8-3.12 in production.

Common Commands

make run-example        # Run example pipeline
make test-tf-serving    # Test TensorFlow Serving inference
make test-end-to-end    # Run example + test serving

Contributing

Create a branch from main and open a pull request. Follow the adding transformers guide for new transformers.

Code quality: Pre-commit hooks enforce formatting and linting. Install with uv run pre-commit install. PRs must pass all tests in tests/.

Versioning: Automated via semantic-release. Use conventional commit prefixes in PR titles: fix: (patch), feat: (minor), BREAKING CHANGE: (major).

Contact: Questions? Reach out to the Kamae team.

About

Feature engineering for big data and quick inference

Resources

License

Code of conduct

Contributing

Stars

Watchers

Forks

Packages

 
 
 

Contributors