Skip to content

Conversation

@ghukill
Copy link
Contributor

@ghukill ghukill commented Dec 9, 2025

Purpose and background context

Two nice-to-have functionalities were missing from the first pass of embeddings reading:

  1. filter by record metadata columns now in embeddings schema
  2. retrieve record metadata columns in embeddings read methods

There was a deliberate choice to keep embeddings read methods simple in the first pass. This builds on that work.

For TIMDEXEmbeddings.read_batches_iter(), the base read method for all embeddings read methods, perform a join to record metadata via the composite key (timdex_record_id, run_id, run_record_offset). Given that record metadata is very fast and memory safe, this join is too. By performing this join, we can expose record metadata columns that intentionally don't exist in the embeddings schema -- e.g. source or run_timestamp -- for filtering and selecting.

Some additional tests were also added for checking the results of new views data.embeddings, data.current_embeddings, and data.current_run_embeddings.

How can a reviewer manually see the effects of these changes?

1- Open python shell

pipenv run ipython

2- Imports and define a function that will generate a bunch of sample data (copy + paste into python shell):

from timdex_dataset_api import TIMDEXDataset
from timdex_dataset_api.config import configure_dev_logger
from tests.utils import generate_sample_records, generate_sample_embeddings_for_run

configure_dev_logger()

DATASET_PATH = "/tmp/use143"

td = TIMDEXDataset(DATASET_PATH)


def simulate_etl_runs_per_source():
    import datetime as dt
    import random

    # Config
    sources = ["apple", "orange", "lemon", "lime", "pear", "grape"]
    runs_per_source = 25
    full_run_size = 2_000
    min_daily_size = 1
    max_daily_size = 1_000

    # Deterministic-ish randomness if you want repeatability
    random.seed(143)

    td = TIMDEXDataset(DATASET_PATH)

    # Keep track of all ETL runs so we can later write embeddings for each
    etl_runs: list[dict] = []

    #################################################################
    # 1. RECORD WRITES: many ETL runs across several sources
    #################################################################

    for s_index, source in enumerate(sources):
        # Stagger base dates per source a bit, mostly for realism
        base_date = dt.date(2025, 1, 1) + dt.timedelta(days=s_index * 200)
        mid_index = runs_per_source // 2  # "somewhere in the middle"

        for i in range(runs_per_source):
            # Decide run_type and record count
            if i == 0 or i == mid_index:
                run_type = "full"
                num_records = full_run_size
            else:
                run_type = "daily"
                num_records = random.randint(min_daily_size, max_daily_size)

            run_date = (base_date + dt.timedelta(days=i)).isoformat()
            run_id = f"{source}-{i + 1:03d}"

            # Write records for this ETL run
            td.write(
                generate_sample_records(
                    num_records,
                    source=source,
                    run_type=run_type,
                    run_date=run_date,
                    run_id=run_id,
                )
            )

            etl_runs.append(
                {
                    "source": source,
                    "run_id": run_id,
                    "run_type": run_type,
                    "run_date": run_date,
                    "num_records": num_records,
                }
            )

    #################################################################
    # 2. REBUILD METADATA + RELOAD TD
    #################################################################

    td.metadata.rebuild_dataset_metadata()
    td = TIMDEXDataset(DATASET_PATH)

    #################################################################
    # 3. EMBEDDINGS WRITES: one embedding run per ETL run
    #################################################################

    for etl_run in etl_runs:
        td.embeddings.write(
            generate_sample_embeddings_for_run(
                td,
                run_id=etl_run["run_id"],
                timestamp=f'{etl_run["run_date"]}T12:00:00',
            )
        )
        # simulate a secondary timdex-embeddings 20% of the time
        if random.randint(0, 100) > 80:
            td.embeddings.write(
                generate_sample_embeddings_for_run(
                    td,
                    run_id=etl_run["run_id"],
                    timestamp=f'{etl_run["run_date"]}T18:00:00',
                )
            )

    print("Done: wrote records and embeddings for:")
    for source in sources:
        count_for_source = sum(1 for r in etl_runs if r["source"] == source)
        print(f"  - {source}: {count_for_source} ETL runs")

    ################################################################
    # 4. SUMMARY
    #################################################################

    td = TIMDEXDataset(DATASET_PATH)

    etl_runs = td.conn.query(
        """select count(distinct run_id) from metadata.records;"""
    ).fetchone()[0]
    embeddings_runs = td.conn.query(
        """select count(*) 
    from (
        select distinct run_id, timestamp
        from data.embeddings
    );"""
    ).fetchone()[0]
    print(f"Total ETL runs: {etl_runs}")
    print(
        f"Total embeddings runs: {embeddings_runs}, {round((embeddings_runs / etl_runs) * 10, 2)}% additional"
    )
  • simulates multiple ETL runs for sources
  • simulates sometimes we run timdex-embeddings multiple times for the same ETL run

3- Run function and reload TIMDEXDataset instance:

simulate_etl_runs_per_source()
td = TIMDEXDataset(DATASET_PATH)

Should see output like this near the end:

...
Done: wrote records and embeddings for:
  - apple: 25 ETL runs
  - orange: 25 ETL runs
  - lemon: 25 ETL runs
  - lime: 25 ETL runs
  - pear: 25 ETL runs
  - grape: 25 ETL runs
...
Total ETL runs: 150
Total embeddings runs: 181, 12.07% additional
...

4- Show breakdown of embeddings, current_embeddings, current_run_embeddings:

td.conn.query(
    """
    select
        e.run_id,
        count(*) as embeddings,
        (select count(*)
           from data.current_embeddings ce
           where ce.run_id = e.run_id) as current_embeddings,
        (select count(*)
           from data.current_run_embeddings cre
           where cre.run_id = e.run_id) as current_run_embeddings,
        count(distinct e.timestamp) as embedding_timestamps
    from data.embeddings e
    group by e.run_id
    order by e.run_id;
    """
)
  • embeddings shows all embeddings for a given run_id
  • current_embeddings: shows how many records from that run are "current" for the timdex_record_id
  • current_run_embeddings: shows how many, within a single run_id, are current (if multiple timdex-embeddings run, will be half)

5- Analyze a specific run:

td.conn.query(
    """
    select
        'lime-016' as run_id,
        (select count(*) from data.embeddings where run_id='lime-016') as embeddings,
        (select count(*) from data.current_embeddings where run_id='lime-016') as current_embeddings,
        (select count(*) from data.current_run_embeddings where run_id='lime-016') as current_run_embeddings
    ;
    """
)
  • demonstrates that for run lime-016 we have 1966 total embeddings
  • but, only 983 are "current" for that run (half, so must have run twice)
  • lastly, only 67 are current for those timdex_record_id / source

6- Finally, perform a TIMDEXEmbeddings.read_* method that utilizes record metadata columns:

td.embeddings.read_dataframe(
    table="current_embeddings",
    run_id="lime-016",
    source="lime",  # FILTER BY record metadata column
    columns=[
        "timdex_record_id",
        "source",  # SELECT record metadata column
        "run_timestamp",  # SELECT record metadata column
        "embedding_object",
    ],
)
  • formerly we couldn't filter by, or return, the source column
  • this is possible now given the record metadata join

Includes new or updated dependencies?

YES

Changes expectations for external applications?

NO

What are the relevant tickets?

Code review

  • Code review best practices are documented here and you are encouraged to have a constructive dialogue with your reviewers about their preferences and expectations.

Comment on lines 414 to 419
# create SQL statement with join to metadata.records
join_condition = and_(
embeddings_table.c.timdex_record_id == metadata_table.c.timdex_record_id,
embeddings_table.c.run_id == metadata_table.c.run_id,
embeddings_table.c.run_record_offset == metadata_table.c.run_record_offset,
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This addition is central to this update. Here, always, we join against the metadata.records view with our composite key of (timdex_record_id, run_id, run_record_offset) thereby giving us access to records metadatda columns like source for embeddings queries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not to be nitpicky but I don't think line 414 is accurate, rather it's:

combine SQL expressions for join to metadata.records

Technically the join SQL statement is added at line 445

@coveralls
Copy link

coveralls commented Dec 9, 2025

Pull Request Test Coverage Report for Build 20270987662

Details

  • 39 of 39 (100.0%) changed or added relevant lines in 2 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage increased (+0.4%) to 93.861%

Totals Coverage Status
Change from base Build 20139181994: 0.4%
Covered Lines: 688
Relevant Lines: 733

💛 - Coveralls

Comment on lines +423 to +432
embeddings_cols = []
metadata_cols = []

for col_name in columns:
if col_name in TIMDEX_DATASET_EMBEDDINGS_SCHEMA.names:
embeddings_cols.append(embeddings_table.c[col_name])
elif col_name in METADATA_SELECT_FILTER_COLUMNS:
metadata_cols.append(metadata_table.c[col_name])
else:
raise ValueError(f"Invalid column: {col_name}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because columns for filtering and selecting can be found on the record metadata schema or the embedding schema, we get a list of those columns here as SQLAlchemy objects. Then, we can use these lists in filtering and selection below.

)


def generate_sample_embeddings_for_run(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new sample data utility will generate embeddings specifically for records/runs that already exist in the dataset. This creates more "real" embeddings rows that support joining in read methods.

@ghukill ghukill marked this pull request as ready for review December 9, 2025 16:12
@ghukill ghukill requested a review from a team as a code owner December 9, 2025 16:12
@jonavellecuerdo jonavellecuerdo self-assigned this Dec 9, 2025
Why these changes are being introduced:

Two nice-to-have functionalities were missing from the first pass of embeddings
reading:

1. filter by record metadata columns now in embeddings schema
2. retrieve record metadata columns in embeddings read methods

There was a deliberate choice to keep embeddings read methods simple in the first
pass.  This builds on that work.

How this addresses that need:

For TIMDEXEmbeddings.read_batches_iter(), the base read method for all embeddings
read methods, perform a join to record metadata via the composite key (timdex_record_id,
run_id, run_record_offset).  Given that record metadata is very fast and memory safe, this
join is too.  By performing this join, we can expose record metadata columns that
intentionally don't exist in the embeddings schema -- e.g. 'source' or 'run_timestamp' --
for filtering and selecting.

Side effects of this change:
* Read methods for TIMDEXEmbeddings can filter and return columns only found in
records metadata tables/views.

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/USE-143
@ghukill ghukill force-pushed the USE-143-record-metadata-join branch from 2f82fdc to 04c9c3e Compare December 11, 2025 16:48
Copy link
Contributor

@jonavellecuerdo jonavellecuerdo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good! Just have a few questions / suggestions re: naming conventions and comments. Will run suggested code post addressing of change requests. Thanks!

timdex_record_id: str
run_id: str
run_record_offset: int
timestamp: str | datetime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the full list of columns we can filter embeddings by, it would be nice to rename timestamp -> embedding_timestamp. 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm glad this came up, curious your thoughts on a couple of options.

This class EmbeddingsFilters is really kind of driven by the TIMDEX_DATASET_EMBEDDINGS_SCHEMA schema of the embeddings data:

TIMDEX_DATASET_EMBEDDINGS_SCHEMA = pa.schema(
    (
        pa.field("timdex_record_id", pa.string()),
        pa.field("run_id", pa.string()),
        pa.field("run_record_offset", pa.int32()),
        pa.field("timestamp", pa.timestamp("us", tz="UTC")),  #<-----------
        pa.field("embedding_model", pa.string()),
        pa.field("embedding_strategy", pa.string()),
        pa.field("embedding_vector", pa.list_(pa.float32())),
        pa.field("embedding_object", pa.binary()),
        pa.field("year", pa.string()),
        pa.field("month", pa.string()),
        pa.field("day", pa.string()),
    )
)

To change the filter, I think we'd need to change the schema, unless we want to juggle conversions during reading/writing and I'd vote that is definitely more trouble than it's worth.

Option 1: Rename timestamp to embeddings_timestamp

This would require a schema change, which really isn't too bad given embeddings aren't live anywhere. This would also match embedding_model, embedding_strategy, etc.

This would start to establish a pattern where each new data source in the TIMDEX dataset -- broadly speaking -- would have sort of a "namespace" prefix. So if we introduce a full-text data source, with it's own schema, perhaps we'd have columns like fulltext_text or fulltext_strategy, etc.

The downside, it's kind of long and verbose. And, the namespace is kind of implied by the schema it belongs to.

The upside, when we're joining across data sources -- kind of like we're doing here for embeddings -- at the python level the kwargs translate directly to DuckDB table/view columns and we don't need any translation.

Option 2: We remove embedding_ as a prefix for all columns in schema

In this approach, we'd assume that any field in the embeddings schema is assumed to be "embedding" related. So we might change embedding_model --> model and embedding_strategy --> strategy.

The plus side, terse, and leans into the namespace of the schema.

The downside, thinking about the full-text situation where we might have a "strategy" column as well in this new imagined schema, it'd get tricky at the python level where we might want to join and filter by embedding strategy and/or the full-text strategy. Even though our DuckDB views/tables could be disambiguated at the SQL level, it wouldn't be obvious how to handle with python kwargs in our read methods.

Proposal

I'd propose we do what you're saying and make it embedding_timestamp and lean into each data source schema -- e.g. embeddings, in the future full-text, etc. -- having pretty verbose names for columns assuming that we might be filtering at the python level during read where we can't easily access the DuckDB views/tables as namespacing.

I wrote all this out as I think it's important to weigh the pros / cons, but I think the verbose columns names at the schema level is a good option for right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verbosity in favor of clarity sounds good to me!

This does make me wonder, what would be the difference between embedding_strategy vs. fulltext_strategy? 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To my thinking, a fulltext_strategy would be -- and this is really projecting into the future -- the "strategy" we used to get the full-text for that particular TIMDEX record. Maybe it's got values like "dspace_fulltext" or "dspace_default_pdf", who knows.

It's anticipating that given a new full-text schema in the TIMDEX dataset, like for embeddings, it's a bit of context on how that full-text came to be for that particular record.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But sounds good! I'll make this change to update the schema + filter/select columns. I think we'll need a change in timdex-embeddings, but that's fine, it's due for some action pretty soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ticket created for update to timdex-embeddings: https://mitlibraries.atlassian.net/browse/USE-288.

Comment on lines 388 to +442
if columns:
stmt = select(*sa_table.c[*columns]).select_from(sa_table)
embeddings_cols = []
metadata_cols = []

for col_name in columns:
if col_name in TIMDEX_DATASET_EMBEDDINGS_SCHEMA.names:
embeddings_cols.append(embeddings_table.c[col_name])
elif col_name in METADATA_SELECT_FILTER_COLUMNS:
metadata_cols.append(metadata_table.c[col_name])
else:
raise ValueError(f"Invalid column: {col_name}")

stmt = select(*embeddings_cols, *metadata_cols)
else:
stmt = select(sa_table).select_from(sa_table)
embeddings_cols = [
embeddings_table.c[col] for col in TIMDEX_DATASET_EMBEDDINGS_SCHEMA.names
]
metadata_cols = [
metadata_table.c[col] for col in METADATA_SELECT_FILTER_COLUMNS
]
stmt = select(*embeddings_cols, *metadata_cols)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's worth changing to

if col_name in EMBEDDINGS_FILTER_COLUMNS

for consistency?

Also, could EMBEDDINGS_FILTER_COLUMNS be renamed to EMBEDDINGS_SELECT_FILTER_COLUMNS? 🤔

Copy link
Contributor Author

@ghukill ghukill Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to respond with an enthusiastic "good suggestion", and it seemed like it was... but I think there is some nuance here.

We want to allow users to select all columns from the embeddings schema, but prevent filtering by columns like embedding_object or embedding_vector. They are technically still filterable by the custom, manual where clause option, but it feels unusual to filter by them at the python level which a bit higher level.

Admiteddly, this is a bit driven by the records + record metadata reading precedence where the record metadata is a standalone DuckDB view / table that does not contain the columns source_record or transformed_record. We can't allow filtering by those, because they aren't "metadata" which is how the records filtering works.

That said.... the embeddings are a bit different in that they are "simple" and just casting over the parquet files. We technically could filter by embedding_object or embedding_vector just like any other column.

My proposal would be to keep things as-is, which kind of makes explicit that:

  • EMBEDDINGS_FILTER_COLUMNS is a list of embeddings columns you can/should filter by
  • but a user can select anything from the schema TIMDEX_DATASET_EMBEDDINGS_SCHEMA

Does that make sense? that okay by you @jonavellecuerdo?

Great question though. If I thought filtering by the object or vector was a normal or helpful thing to do, it feels like a no-brainer, but for now it feels like maybe we want to keep those data-heavy columns out of normal filtering ergonomics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch! I forgot about embedding_vector and embedding_object. Yes, this makes sense to me.

Comment on lines 414 to 419
# create SQL statement with join to metadata.records
join_condition = and_(
embeddings_table.c.timdex_record_id == metadata_table.c.timdex_record_id,
embeddings_table.c.run_id == metadata_table.c.run_id,
embeddings_table.c.run_record_offset == metadata_table.c.run_record_offset,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not to be nitpicky but I don't think line 414 is accurate, rather it's:

combine SQL expressions for join to metadata.records

Technically the join SQL statement is added at line 445

Why these changes are being introduced:

A PR discussion pointed out that `timestamp` was the only column in the new embeddings
schema that, for columns unique to this schema, doesn't prefix with `embeddings_`.  There was
some discussion that for any of those columns it's implied they are part of the embeddings
schema so the `embeddings_` prefix is not necessary.  This would hold true in a pure SQL
world with tables and views acting like namespaces.

But at the level of this python library, where we support quite a bit of kwargs filtering
of read methods, that kind of namespacing is awkward.  We would like to be able to filter by
`run_timestamp` or `embeddings_timestamp`, or in the future maybe `fulltext_timestamp`.  Being
verbose with our columns names simplifies this a bit.

How this addresses that need:

Renames the column `timestamp` to `embedding_timestamp` in the embeddings schema.

Side effects of this change:
* The CLI timdex-embeddings will need an update to what columns it writes to

Relevant ticket(s):
* https://mitlibraries.atlassian.net/browse/USE-143
* https://mitlibraries.atlassian.net/browse/USE-288
@ghukill
Copy link
Contributor Author

ghukill commented Dec 16, 2025

@jonavellecuerdo - re-requesting a review after the renaming of timestamp to embedding_timestamp: b125772.

@ghukill ghukill merged commit a1d8ad7 into main Dec 16, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants