Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mkdocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ nav:
- Overview: how-to/object-storage-overview.md
- Choose Storage Type: how-to/choose-storage-type.md
- Use Object Storage: how-to/use-object-storage.md
- Staged Insert: how-to/staged-insert.md
- Use NPY Codec: how-to/use-npy-codec.md
- Use Plugin Codecs: how-to/use-plugin-codecs.md
- Create Custom Codecs: how-to/create-custom-codec.md
Expand Down
3 changes: 3 additions & 0 deletions src/how-to/insert-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ ImageData.insert1({
})
```

For multi-GB arrays, Zarr stores, HDF5 files, or any object too large to hold in memory before insert, use [Staged Insert](staged-insert.md) instead — it writes directly to object storage and commits the row atomically.

## Insert Options Summary

| Option | Default | Description |
Expand Down Expand Up @@ -146,6 +148,7 @@ if Subject.validate(rows):

## See Also

- [Staged Insert](staged-insert.md) — Atomic insert for large objects written directly to object storage
- [Master-Part Tables](master-part.ipynb) — Atomic insertion of master and parts
- [Define Tables](define-tables.md) — Table definition syntax
- [Delete Data](delete-data.md) — Removing data from tables
191 changes: 191 additions & 0 deletions src/how-to/staged-insert.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# Staged Insert

Write large objects directly to object storage as part of an atomic insert.

## Overview

`staged_insert1` is a context manager for inserting rows whose object-typed fields are too large to copy through local storage first. It writes directly to the destination object store while the row is being built, then finalizes the database insert when the block exits cleanly. If an exception is raised inside the block, the staged objects are cleaned up and no row is inserted.

This pattern is the right choice when:

- Objects are large (multi-GB arrays, long recordings, image stacks)
- You want to stream or write in chunks rather than buffer in memory
- You want all-or-nothing semantics across object storage and the database

It is only available for `<object@>` fields — the schema-addressed codec used for Zarr arrays, HDF5 files, and other multi-file objects. Attempting `staged.store()` or `staged.open()` on a field of any other type raises `DataJointError`. For ordinary inserts of small or in-memory objects, use [`insert` / `insert1`](insert-data.md).

## Quick Start

```python
import zarr
import datajoint as dj

schema = dj.Schema('imaging')

@schema
class ImagingSession(dj.Manual):
definition = """
subject_id : int32
session_id : int32
---
n_frames : int32
frame_rate : float32
frames : <object@>
"""

with ImagingSession.staged_insert1 as staged:
# 1. Set primary key values first
staged.rec['subject_id'] = 1
staged.rec['session_id'] = 1

# 2. Get a storage handle for the object field
store = staged.store('frames', '.zarr')

# 3. Write directly to object storage (no local copy)
z = zarr.open(store, mode='w', shape=(1000, 512, 512),
chunks=(10, 512, 512), dtype='int32')
for i in range(1000):
z[i] = acquire_frame()

# 4. Set remaining record attributes
staged.rec['n_frames'] = 1000
staged.rec['frame_rate'] = 30.0

# On clean exit: metadata is computed, row is inserted
# On exception: staged objects are removed, no row is inserted
```

## How It Works

Inside the `with` block, the row is a draft — `staged.rec` collects attribute values, and `staged.store(field, ext)` / `staged.open(field, ext)` return handles that write directly to the destination object store.

When the block exits without an exception, DataJoint:

1. Computes object metadata (size, manifest) from the staged objects.
2. Inserts the row into the database with the populated metadata.

When the block raises, DataJoint:

1. Removes any objects that were written inside the block.
2. Skips the database insert.

This gives the same atomicity guarantee as an ordinary `insert1` — readers never see a row whose object data is partial.

## API Reference

### `Table.staged_insert1`

```python
with Table.staged_insert1 as staged:
...
```

Context manager property on every `dj.Table` subclass. Yields a `StagedInsert` object scoped to one row. Writes go to the store referenced by the field's type spec — `<object@>` uses `stores.default`, and `<object@name>` uses the named store.

### `staged.rec`

A dict for the row's attribute values. Set primary key fields **before** calling `staged.store()` or `staged.open()` — the storage path is derived from the primary key.

```python
staged.rec['subject_id'] = 1
staged.rec['session_id'] = 1
```

### `staged.store(field, ext='')`

Returns an `fsspec.FSMap` for an object field. Suitable for Zarr, xarray, or any library that takes a mapping-style store.

```python
store = staged.store('frames', '.zarr')
z = zarr.open(store, mode='w', shape=..., dtype=...)
```

### `staged.open(field, ext='', mode='wb')`

Returns a file-like object for an object field. Suitable for HDF5, raw binary, or any library that takes a file handle.

```python
with staged.open('recording', '.h5') as f:
h5py.File(f, mode='w').create_dataset('data', data=arr)
```

### `staged.fs`

The underlying `fsspec.AbstractFileSystem` for advanced operations (listing, deleting, custom paths). Most users won't need this.

## Patterns

### Zarr arrays

```python
with Recording.staged_insert1 as staged:
staged.rec['recording_id'] = recording_id
z = zarr.open(staged.store('frames', '.zarr'), mode='w',
shape=(n_frames, h, w), chunks=(1, h, w), dtype='uint16')
for i, frame in enumerate(stream):
z[i] = frame
```

### HDF5 files

```python
import h5py

with Recording.staged_insert1 as staged:
staged.rec['recording_id'] = recording_id
with staged.open('raw', '.h5') as f:
with h5py.File(f, 'w') as h5:
h5.create_dataset('signal', data=signal, chunks=True)
h5.attrs['fs'] = sampling_rate
```

### Streaming from an instrument

Set the primary key, get the handle, then write as data arrives. The block exits — and the row commits — only after the stream is fully captured:

```python
with ImagingSession.staged_insert1 as staged:
staged.rec['subject_id'] = subject_id
staged.rec['session_id'] = session_id

z = zarr.open(staged.store('frames', '.zarr'), mode='w', ...)
for i in range(n_frames):
z[i] = camera.grab()

staged.rec['n_frames'] = n_frames
```

If the camera errors out mid-stream, the partial Zarr is removed and the row is not inserted.

## Error Handling and Atomicity

A `staged_insert1` block is atomic across object storage and the database:

- **Object storage**: anything written via `staged.store()` / `staged.open()` is staged under a path derived from the primary key. On exception inside the block, those staged objects are removed.
- **Database**: the row is only inserted on clean exit.

If the database insert itself fails on exit (e.g., duplicate primary key), the staged objects are also removed.

## Limitations

- Only one row per block — use a loop of `with` blocks for many rows, or use the standard `insert` for batches that fit in memory.
- The block must set all primary key fields before calling `store()` or `open()`.
- Requires `stores.default` configured, or a named store referenced by the field's type spec.
- Cleanup only runs for ordinary exceptions. `KeyboardInterrupt` (Ctrl+C) and other `BaseException` subclasses bypass the cleanup path, so a process killed mid-write may leave staged objects behind. Run the garbage collector to reclaim them — see [Clean Up Storage](garbage-collection.md).

## Troubleshooting

### `Storage is not configured`

Set `stores.default` and `stores.<name>` in `datajoint.json` or via `dj.config`. See [Configure Object Storage](configure-storage.md).

### `Primary key not set` when calling `staged.store()`

Set primary key attributes on `staged.rec` before calling `staged.store()` or `staged.open()`. The object path depends on the primary key.

## See Also

- [Insert Data](insert-data.md) — Standard insert for ordinary rows
- [Use Object Storage](use-object-storage.md) — Object-augmented schemas and storage types
- [Configure Object Storage](configure-storage.md) — Store configuration
- [Use the `<npy>` Codec](use-npy-codec.md) — NumPy array storage with lazy fetch
43 changes: 4 additions & 39 deletions src/how-to/use-object-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,52 +157,17 @@ Use schema-addressed storage for:

## Write Directly to Object Storage

For large datasets like multi-GB imaging recordings, avoid intermediate copies by writing directly to object storage with `staged_insert1`:
For multi-GB imaging recordings, Zarr arrays, HDF5 files, or any object too large to round-trip through local storage, use [Staged Insert](staged-insert.md). It writes directly to the destination object store inside a context manager and commits the database row atomically on clean exit:

```python
import zarr

@schema
class ImagingSession(dj.Manual):
definition = """
subject_id : int32
session_id : int32
---
n_frames : int32
frame_rate : float32
frames : <object@>
"""

# Write Zarr directly to object storage
with ImagingSession.staged_insert1 as staged:
# 1. Set primary key values first
staged.rec['subject_id'] = 1
staged.rec['session_id'] = 1

# 2. Get storage handle
store = staged.store('frames', '.zarr')

# 3. Write directly (no local copy)
z = zarr.open(store, mode='w', shape=(1000, 512, 512),
chunks=(10, 512, 512), dtype='int32')
for i in range(1000):
z[i] = acquire_frame() # Write frame-by-frame

# 4. Set remaining attributes
staged.rec['n_frames'] = 1000
staged.rec['frame_rate'] = 30.0

# Record inserted with computed metadata on successful exit
z = zarr.open(staged.store('frames', '.zarr'), mode='w', ...)
...
```

The `staged_insert1` context manager:

- Writes directly to the object store (no intermediate files)
- Computes metadata (size, manifest) automatically on exit
- Cleans up storage if an error occurs (atomic)
- Requires primary key values before calling `store()` or `open()`

Use `staged.store(field, ext)` for FSMap access (Zarr), or `staged.open(field, ext)` for file-like access.
See [Staged Insert](staged-insert.md) for the full API, atomicity guarantees, and patterns for Zarr, HDF5, and streaming sources.

## Attachments

Expand Down
Loading