Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions pipeline/_internal.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
from dataclasses import dataclass

import polars as pl
import dataframely as dy
from .schema.report import (
AverageCarVolumeSchema,
PopularModelsSchema,
SafestModelsSchema,
)


@dataclass
class Report:
popularity: pl.DataFrame | pl.LazyFrame
safety: pl.DataFrame | pl.LazyFrame
volume: pl.DataFrame | pl.LazyFrame
popularity: dy.LazyFrame[PopularModelsSchema]
safety: dy.LazyFrame[SafestModelsSchema]
volume: dy.LazyFrame[AverageCarVolumeSchema]

def to_string(self) -> str:
"""
Create a pretty-printable representation of this report.
"""
# Enforce laziness and collection here to ensure we are
#
df_popularity, df_volume, df_safety = pl.collect_all(
[
self.popularity.lazy(),
self.volume.lazy().sort("age_of_car"),
self.safety.lazy(),
self.popularity,
self.volume,
self.safety,
]
)
header = [
Expand Down
16 changes: 9 additions & 7 deletions pipeline/data.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
from dataclasses import dataclass
import dataframely as dy

import polars as pl
from .schema.preprocessed import PrepPoliciesSchema, PrepModelsSchema
from .schema.raw import RawModelsSchema, RawPoliciesSchema


@dataclass
class RawData[T: (pl.DataFrame | pl.LazyFrame)]:
models: T
policies: T
class RawData:
models: dy.LazyFrame[RawModelsSchema]
policies: dy.LazyFrame[RawPoliciesSchema]


@dataclass
class PreprocessedData[T: (pl.DataFrame | pl.LazyFrame)]:
models: T
policies: T
class PreprocessedData:
models: dy.LazyFrame[PrepModelsSchema]
policies: dy.LazyFrame[PrepPoliciesSchema]
66 changes: 27 additions & 39 deletions pipeline/preprocess.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import polars as pl

import dataframely as dy
from .data import PreprocessedData, RawData
from .schema.preprocessed import PrepModelsSchema, PrepPoliciesSchema
from .schema.raw import RawModelsSchema, RawPoliciesSchema


def preprocess(raw: RawData) -> PreprocessedData:
Expand All @@ -10,62 +12,48 @@ def preprocess(raw: RawData) -> PreprocessedData:
)


def preprocess_policies[T: (pl.DataFrame, pl.LazyFrame)](policies: T) -> T:
def preprocess_policies(
policies: dy.LazyFrame[RawPoliciesSchema],
) -> dy.LazyFrame[PrepPoliciesSchema]:
"""Transform the raw policies for optimal representation."""
return policies.with_columns(
# Categorical columns
pl.col("model").cast(pl.Categorical),
pl.col("area_cluster").cast(pl.Categorical),
# Float columns often do not need full 64-bit precision
# This depends on the domain we are working on
pl.col("policy_tenure").cast(pl.Float32),
pl.col("age_of_car").cast(pl.Float32),
pl.col("age_of_policyholder").cast(pl.Float32),
pl.col("population_density").cast(pl.Float32),
# Normalize ID
pl.col("policy_id").str.strip_prefix("policy").cast(pl.UInt64),
)
pl.col("policy_id").str.strip_prefix("policy"),
).pipe(PrepPoliciesSchema.validate, cast=True, eager=False)


def preprocess_models[T: (pl.DataFrame, pl.LazyFrame)](models: T) -> T:
def preprocess_models(
models: dy.LazyFrame[RawModelsSchema],
) -> dy.LazyFrame[PrepModelsSchema]:
"""Transform the raw models for optimal representation."""

# Unique to drop duplicate rows we found while investigating primary key failures
df = models.unique()

# 1. Convert semantically boolean columns from pl.String to pl.Boolean
df = models.with_columns(pl.col("^is_.*$") == "Yes")
df = df.with_columns(pl.col("^is_.*$") == "Yes")

# 2. Split max torque and power into components
torque_parts = pl.col("max_torque").str.split("@")
df = df.with_columns(
max_torque_nm=torque_parts.list[0].str.strip_suffix("Nm").cast(pl.Float32),
max_torque_rpm=torque_parts.list[1].str.strip_suffix("rpm").cast(pl.UInt16),
max_torque_nm=torque_parts.list[0].str.strip_suffix("Nm"),
max_torque_rpm=torque_parts.list[1].str.strip_suffix("rpm"),
)

power_parts = pl.col("max_power").str.split("@")
df = df.with_columns(
max_power_bhp=power_parts.list[0].str.strip_suffix("bhp").cast(pl.Float16),
max_power_rpm=power_parts.list[1].str.strip_suffix("rpm").cast(pl.UInt16),
max_power_bhp=power_parts.list[0].str.strip_suffix("bhp"),
max_power_rpm=power_parts.list[1].str.strip_suffix("rpm"),
)

# Step 3: Use efficient data types
# Step 4: Ensure that length / width / height are in millimeters, not centimeters
def _ensure_mm(col: pl.Expr):
return pl.when(col < 1_000).then(col * 10).otherwise(col)

df = df.with_columns(
# Some of the categorical columns are easily enumerated
pl.col("steering_type").cast(pl.Enum(["Electric", "Manual", "Power"])),
pl.col("fuel_type").cast(pl.Enum(["CNG", "Diesel", "Petrol"])),
pl.col("rear_brakes_type").cast(pl.Enum(["Drum", "Disc"])),
# For other categoricals, we may not be sure yet that we have seen all values
# so we do not want to commit to an Enum, yet
pl.col("engine_type").cast(pl.Categorical),
pl.col("model").cast(pl.Categorical),
pl.col("segment").cast(pl.Categorical),
# Value-based dtypes
pl.col("width").cast(pl.UInt16),
pl.col("height").cast(pl.UInt16),
pl.col("length").cast(pl.UInt16),
pl.col("displacement").cast(pl.UInt16),
pl.col("cylinder").cast(pl.UInt8),
pl.col("gross_weight").cast(pl.UInt16),
pl.col("gear_box").cast(pl.UInt8),
pl.col("airbags").cast(pl.UInt8),
_ensure_mm(pl.col("length")),
_ensure_mm(pl.col("width")),
_ensure_mm(pl.col("height")),
)

return df
return df.pipe(PrepModelsSchema.validate, cast=True, eager=False)
27 changes: 20 additions & 7 deletions pipeline/report.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import polars as pl
import dataframely as dy

from ._internal import Report
from .data import PreprocessedData
from .schema.preprocessed import PrepModelsSchema, PrepPoliciesSchema
from .schema.report import (
AverageCarVolumeSchema,
PopularModelsSchema,
SafestModelsSchema,
)


def build_report(prep: PreprocessedData) -> Report:
Expand All @@ -12,9 +19,9 @@ def build_report(prep: PreprocessedData) -> Report:
)


def find_three_most_popular_make_and_models[T: (pl.DataFrame, pl.LazyFrame)](
models: T, policies: T
) -> T:
def find_three_most_popular_make_and_models(
models: dy.LazyFrame[PrepModelsSchema], policies: dy.LazyFrame[PrepPoliciesSchema]
) -> dy.LazyFrame[PopularModelsSchema]:
"""Among all policies, compute the three make/model combinations that appears most often.

Returns:
Expand All @@ -26,10 +33,13 @@ def find_three_most_popular_make_and_models[T: (pl.DataFrame, pl.LazyFrame)](
.agg(count=pl.len())
.sort("count", descending=True)
.head(3)
.pipe(PopularModelsSchema.validate, cast=True, eager=False)
)


def find_safest_models[T: (pl.DataFrame, pl.LazyFrame)](models: T) -> T:
def find_safest_models(
models: dy.LazyFrame[PrepModelsSchema],
) -> dy.LazyFrame[SafestModelsSchema]:
"""Among all models, find the safest ones as measured by the number of safety features.

Returns:
Expand All @@ -41,12 +51,13 @@ def find_safest_models[T: (pl.DataFrame, pl.LazyFrame)](models: T) -> T:
)
.sort("safety_score", descending=True)
.head(5)
.pipe(SafestModelsSchema.validate, cast=True, eager=False)
)


def find_average_car_volume_by_age[T: (pl.DataFrame, pl.LazyFrame)](
models: T, policies: T
) -> T:
def find_average_car_volume_by_age(
models: dy.LazyFrame[PrepModelsSchema], policies: dy.LazyFrame[PrepPoliciesSchema]
) -> dy.LazyFrame[AverageCarVolumeSchema]:
"""Among all policies, find the mean physical car volume in 10-year blocks of car age.

This method should compute the volume of a car if interpreted as cuboid (i.e. box-shaped).
Expand All @@ -68,4 +79,6 @@ def find_average_car_volume_by_age[T: (pl.DataFrame, pl.LazyFrame)](
- 1
)
)
.sort("age_of_car")
.pipe(AverageCarVolumeSchema.validate, cast=True, eager=False)
)
10 changes: 10 additions & 0 deletions pipeline/schema/preprocessed.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import dataframely as dy
import polars as pl


class PrepPoliciesSchema(dy.Schema):
Expand Down Expand Up @@ -51,3 +52,12 @@ class PrepModelsSchema(dy.Schema):
max_torque_rpm = dy.UInt16()
max_power_bhp = dy.Float32()
max_power_rpm = dy.UInt16()

@dy.rule()
def volume_is_realistic(cls) -> pl.Expr:
"""Only allow reasonably sized cars"""
volume = cls.length.col.cast(pl.UInt64) * cls.width.col * cls.height.col

# Lengths are in millimeters and 1e9 mm^3 is 1 cubic meter
cubic_meter = 1e9
return volume.is_between(1 * cubic_meter, 20 * cubic_meter)
2 changes: 1 addition & 1 deletion pipeline/schema/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class RawPoliciesSchema(dy.Schema):
class RawModelsSchema(dy.Schema):
"""Schema for the raw models table as provided by our data source"""

model = dy.String(primary_key=True)
model = dy.String()
segment = dy.String()
fuel_type = dy.String()
airbags = dy.Int64()
Expand Down
15 changes: 9 additions & 6 deletions pipeline/schema/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@


class PopularModelsSchema(dy.Schema):
# TODO: Fill out
...
make = dy.String()
model = dy.String(primary_key=True)
count = dy.UInt32()


class SafestModelsSchema(dy.Schema):
# TODO: Fill out
...
model = dy.Categorical(primary_key=True)
segment = dy.Categorical()
safety_score = dy.UInt16()


class AverageCarVolumeSchema(dy.Schema):
# TODO: Fill out
...
age_of_car = dy.String(primary_key=True)
volume = dy.Float32()
change = dy.Float32(nullable=True)
16 changes: 10 additions & 6 deletions tests/test_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,22 @@ def test_find_average_car_volume_by_age():
{"model": "M2", "height": 2_000, "width": 2_000, "length": 2_000},
]
)
# TODO: Use `.sample` to create a policies dataframe with two policies:
# One with model "M1" and car age 4.5,
# One with model "M2" and car age 14.5
policies = PrepPoliciesSchema.sample(...)
policies = PrepPoliciesSchema.sample(
overrides=[
{"model": "M1", "age_of_car": 4.5},
{"model": "M2", "age_of_car": 14.5},
]
)

volume_m1 = 1e-9 * 1_500 * 2_000 * 2_500
volume_m2 = 1e-9 * 2_000 * 2_000 * 2_000
change = 100 * (volume_m2 / volume_m1 - 1)
expected = AverageCarVolumeSchema.validate(
# TODO: Add the second, missing row for the expected dataframe
pl.DataFrame(
[{"age_of_car": "(-inf, 10]", "volume": volume_m1, "change": None}, ...]
[
{"age_of_car": "(-inf, 10]", "volume": volume_m1, "change": None},
{"age_of_car": "(10, 20]", "volume": volume_m2, "change": change},
]
),
cast=True,
).lazy()
Expand Down
Loading