Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 14 additions & 24 deletions sei-db/db_engine/litt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,18 +233,13 @@ the [value](#value) associated with a [key](#key) can be retrieved from disk.
An address is encoded in a 64-bit integer. It contains two pieces of information:

- the [segment](#segment) [index](#segment-index) where the [value](#value) is stored
- the [shard](#shard) within that segment that holds the [value](#value)
- the offset within the [value file](#segment-value-files) where the first byte of
the [value](#value) is stored
- the length of the [value](#value) in bytes

This information is not enough by itself to retrieve the [value](#value) from disk if there is more than one
[shard](#shard) in the [table](#table). When there is more than one [shard](#shard), the following information
must also be known in order to retrieve the [value](#value) (i.e. to figure out which [shard](#shard) to look in):

- the [sharding factor](#sharding-factor) for the [segment](#segment) where the [value](#value) is stored
(stored in the [segment metadata file](#segment-metadata-file))
- the [sharding salt](#sharding-salt) for the [table](#table) where the [value](#value) is stored
(stored in the [table metadata file](#table-metadata-file))
- the [key](#key) that the [value](#value) is associated with
All four pieces are packed into the address itself, so retrieving a [value](#value) is a self-contained
operation that does not need to consult any segment-level metadata or recompute anything from the [key](#key).

## Atomicity

Expand Down Expand Up @@ -432,7 +427,6 @@ Each metadata contains the following information:
- the [segment index](#segment-index)
- serialization version (in case the format changes in the future)
- the [sharding factor](#sharding-factor) for the segment
- the [salt](#sharding-salt) used for the segment
- the [timestamp](#segment-timestamp) of the last element written in the segment.
the [TTL](#ttl) of any data contained within it.
- whether or not the segment is [immutable](#segment-mutability)
Expand Down Expand Up @@ -462,26 +456,22 @@ The file name of a value file is `X-Y.values`, where `X` is the [segment index](
LittDB supports sharding. That is to say, it can break the data into smaller pieces and spread those pieces across
multiple locations.

In order to determine the shard that a particular [key](#key) is in, a hash function is used. The data that goes
into the hash function is the [key](#key) itself, as well as a [sharding salt](#sharding-salt) that is unique to
each [segment](#segment).
Within a [segment](#segment), [values](#value) are assigned to shards in round-robin order at write time: the first
write goes to shard 0, the second to shard 1, and so on, wrapping around once every shard has been used. Each
[value's](#value) shard is recorded in its [address](#address), so reads do not need to recompute the assignment.

The [sharding salt](#sharding-salt) is chosen randomly. Its purpose is to make the mapping between [keys](#key) and
shards unpredictable to an outside attacker. Without this sort of randomness, an attacker could intentionally craft
keys that all map to the same shard, causing a hot spot in the database and potentially degrading performance.
This scheme produces a perfectly even distribution of [values](#value) across shards regardless of the
[keys](#key) being written. As a side benefit, an outside attacker cannot craft a sequence of [keys](#key) that
all land in the same shard, since the shard chosen for a given [value](#value) depends only on the order in which
it was written, not on the contents of its [key](#key).

### Sharding Factor

The number of [shards](#shard) in a [segment](#segment) is called the "sharding factor". The sharding factor must be
a positive, non-zero integer. The sharding factor can be changed at runtime without restarting the database or
The number of [shards](#shard) in a [segment](#segment) is called the "sharding factor". The sharding factor must
be a positive, non-zero integer no larger than 256 (the limit imposed by encoding the shard ID as a single byte
inside the [address](#address)). The sharding factor can be changed at runtime without restarting the database or
performing a data migration.

### Sharding Salt

A random number chosen to make the [shard](#shard) hash function unpredictable to an outside attacker. This number
does not need to be chosen via a cryptographically secure random number generator, as long as it is not publicly
known.

## Table

A table in LittDB is a unique namespace. Two [keys](#key) with identical values do not conflict with each other as
Expand Down
8 changes: 4 additions & 4 deletions sei-db/db_engine/litt/cli/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ func TestPrune(t *testing.T) {
seg := segments[i]
metadataPath := seg.GetMetadataFilePath()

// Overwrite the old metadata file. The timestamp is encoded at [24:32] in nanoseconds since the epoch.
// Overwrite the old metadata file. The timestamp is encoded at [8:16] in nanoseconds since the epoch.
data, err := os.ReadFile(metadataPath)
require.NoError(t, err)
binary.BigEndian.PutUint64(data[24:32], sixHoursAgo)
binary.BigEndian.PutUint64(data[8:16], sixHoursAgo)

// write the modified metadata file back to disk.
err = os.WriteFile(metadataPath, data, 0644)
Expand Down Expand Up @@ -278,10 +278,10 @@ func TestPruneSubset(t *testing.T) {
seg := segments[i]
metadataPath := seg.GetMetadataFilePath()

// Overwrite the old metadata file. The timestamp is encoded at [24:32] in nanoseconds since the epoch.
// Overwrite the old metadata file. The timestamp is encoded at [8:16] in nanoseconds since the epoch.
data, err := os.ReadFile(metadataPath)
require.NoError(t, err)
binary.BigEndian.PutUint64(data[24:32], sixHoursAgo)
binary.BigEndian.PutUint64(data[8:16], sixHoursAgo)

// write the modified metadata file back to disk.
err = os.WriteFile(metadataPath, data, 0644)
Expand Down
10 changes: 0 additions & 10 deletions sei-db/db_engine/litt/disktable/control_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package disktable
import (
"fmt"
"log/slog"
"math/rand"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -84,9 +83,6 @@ type controlLoop struct {
// The table's metadata.
metadata *tableMetadata

// A source of randomness used for generating sharding salt.
saltShaker *rand.Rand

// whether fsync mode is enabled.
fsync bool

Expand Down Expand Up @@ -320,19 +316,13 @@ func (c *controlLoop) expandSegments() error {
c.immutableSegmentSize += c.segments[c.highestSegmentIndex].Size()

// Create a new segment.
salt := [16]byte{}
_, err = c.saltShaker.Read(salt[:])
if err != nil {
return fmt.Errorf("failed to read salt: %w", err)
}
newSegment, err := segment.CreateSegment(
c.logger,
c.errorMonitor,
c.highestSegmentIndex+1,
c.segmentPaths,
c.snapshottingEnabled,
c.metadata.GetShardingFactor(),
salt,
c.fsync)
if err != nil {
return err
Expand Down
15 changes: 4 additions & 11 deletions sei-db/db_engine/litt/disktable/disk_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"log/slog"
"math"
"math/rand"
"os"
"path"
"sync"
Expand Down Expand Up @@ -229,20 +228,13 @@ func NewDiskTable(
} else {
nextSegmentIndex = highestSegmentIndex + 1
}
salt := [16]byte{}
_, err = config.SaltShaker.Read(salt[:])
if err != nil {
return nil, fmt.Errorf("failed to read salt: %w", err)
}

mutableSegment, err := segment.CreateSegment(
config.Logger,
errorMonitor,
nextSegmentIndex,
segmentPaths,
snapshottingEnabled,
metadata.GetShardingFactor(),
salt,
config.Fsync)
if err != nil {
return nil, fmt.Errorf("failed to create mutable segment: %w", err)
Expand All @@ -261,8 +253,6 @@ func NewDiskTable(
}
}

tableSaltShaker := rand.New(rand.NewSource(config.SaltShaker.Int63()))

var upperBoundSnapshotFile *BoundaryFile
if config.SnapshotDirectory != "" {
// Initialize snapshot files if snapshotting is enabled.
Expand Down Expand Up @@ -307,7 +297,6 @@ func NewDiskTable(
clock: config.Clock,
segmentPaths: segmentPaths,
snapshottingEnabled: snapshottingEnabled,
saltShaker: tableSaltShaker,
metadata: metadata,
fsync: config.Fsync,
metrics: metrics,
Expand Down Expand Up @@ -635,6 +624,10 @@ func (d *DiskTable) SetShardingFactor(shardingFactor uint32) error {
if shardingFactor == 0 {
return fmt.Errorf("sharding factor must be greater than 0")
}
if shardingFactor > litt.MaxShardingFactor {
return fmt.Errorf("sharding factor must be at most %d, got %d",
litt.MaxShardingFactor, shardingFactor)
}

request := &controlLoopSetShardingFactorRequest{
shardingFactor: shardingFactor,
Expand Down
9 changes: 2 additions & 7 deletions sei-db/db_engine/litt/disktable/disk_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func buildMemKeyDiskTableSingleShard(
config.TargetSegmentFileSize = 100 // intentionally use a very small segment size
config.GCPeriod = time.Millisecond
config.Fsync = false
config.SaltShaker = util.NewTestRandom().Rand
config.Logger = logger

table, err := NewDiskTable(
Expand Down Expand Up @@ -153,7 +152,6 @@ func buildMemKeyDiskTableMultiShard(
config.TargetSegmentFileSize = 100 // intentionally use a very small segment size
config.GCPeriod = time.Millisecond
config.Fsync = false
config.SaltShaker = util.NewTestRandom().Rand
config.ShardingFactor = 4
config.Logger = logger

Expand Down Expand Up @@ -200,7 +198,6 @@ func buildPebbleDBKeyDiskTableSingleShard(
config.TargetSegmentFileSize = 100 // intentionally use a very small segment size
config.GCPeriod = time.Millisecond
config.Fsync = false
config.SaltShaker = util.NewTestRandom().Rand
config.Logger = logger

table, err := NewDiskTable(
Expand Down Expand Up @@ -246,7 +243,6 @@ func buildPebbleDBKeyDiskTableMultiShard(
config.TargetSegmentFileSize = 100 // intentionally use a very small segment size
config.GCPeriod = time.Millisecond
config.Fsync = false
config.SaltShaker = util.NewTestRandom().Rand
config.ShardingFactor = 4
config.Logger = logger

Expand Down Expand Up @@ -1214,10 +1210,9 @@ func truncatedValueFileTest(t *testing.T, tableBuilder *tableBuilder) {
// Find a shard that has at least one key in the last segment (truncating an empty file is boring)
keysInLastFile, err := segments[highestSegmentIndex].GetKeys()
require.NoError(t, err)
diskTable := table.(*DiskTable)
nonEmptyShards := make(map[uint32]struct{})
for key := range keysInLastFile {
keyShard := diskTable.controlLoop.segments[highestSegmentIndex].GetShard(keysInLastFile[key].Key)
keyShard := uint32(keysInLastFile[key].Address.ShardID())
nonEmptyShards[keyShard] = struct{}{}
}
var shard uint32
Expand All @@ -1243,7 +1238,7 @@ func truncatedValueFileTest(t *testing.T, tableBuilder *tableBuilder) {
// Figure out which keys are expected to be missing
missingKeys := make(map[string]struct{})
for _, key := range keysInLastFile {
keyShard := diskTable.controlLoop.segments[diskTable.controlLoop.highestSegmentIndex].GetShard(key.Key)
keyShard := uint32(key.Address.ShardID())
if keyShard != shard {
// key does not belong to the shard that was truncated
continue
Expand Down
21 changes: 15 additions & 6 deletions sei-db/db_engine/litt/disktable/keymap/keymap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ func buildPebbleDBKeymap(logger *slog.Logger, path string) (Keymap, error) {
return kmap, nil
}

func randomAddress(rand *util.TestRandom) types.Address {
return types.NewAddress(
rand.Uint32(),
rand.Uint32(),
uint8(rand.Uint32Range(0, 256)),
rand.Uint32(),
)
}

func testBasicBehavior(t *testing.T, keymap Keymap) {
rand := util.NewTestRandom()

Expand All @@ -49,7 +58,7 @@ func testBasicBehavior(t *testing.T, keymap Keymap) {
if choice < 0.5 {
// Write a random value
key := []byte(rand.String(32))
address := types.Address(rand.Uint64())
address := randomAddress(rand)

err := keymap.Put([]*types.ScopedKey{{Key: key, Address: address}})
require.NoError(t, err)
Expand Down Expand Up @@ -78,7 +87,7 @@ func testBasicBehavior(t *testing.T, keymap Keymap) {
pairs := make([]*types.ScopedKey, numberToWrite)
for i := 0; i < int(numberToWrite); i++ {
key := []byte(rand.String(32))
address := types.Address(rand.Uint64())
address := randomAddress(rand)
pairs[i] = &types.ScopedKey{Key: key, Address: address}
expected[string(key)] = address
}
Expand Down Expand Up @@ -150,7 +159,7 @@ func TestRestart(t *testing.T) {
if choice < 0.5 {
// Write a random value
key := []byte(rand.String(32))
address := types.Address(rand.Uint64())
address := randomAddress(rand)

err := keymap.Put([]*types.ScopedKey{{Key: key, Address: address}})
require.NoError(t, err)
Expand Down Expand Up @@ -179,7 +188,7 @@ func TestRestart(t *testing.T) {
pairs := make([]*types.ScopedKey, numberToWrite)
for i := 0; i < int(numberToWrite); i++ {
key := []byte(rand.String(32))
address := types.Address(rand.Uint64())
address := randomAddress(rand)
pairs[i] = &types.ScopedKey{Key: key, Address: address}
expected[string(key)] = address
}
Expand Down Expand Up @@ -225,7 +234,7 @@ func TestRestart(t *testing.T) {
if choice < 0.5 {
// Write a random value
key := []byte(rand.String(32))
address := types.Address(rand.Uint64())
address := randomAddress(rand)

err := keymap.Put([]*types.ScopedKey{{Key: key, Address: address}})
require.NoError(t, err)
Expand Down Expand Up @@ -254,7 +263,7 @@ func TestRestart(t *testing.T) {
pairs := make([]*types.ScopedKey, numberToWrite)
for i := 0; i < int(numberToWrite); i++ {
key := []byte(rand.String(32))
address := types.Address(rand.Uint64())
address := randomAddress(rand)
pairs[i] = &types.ScopedKey{Key: key, Address: address}
expected[string(key)] = address
}
Expand Down
8 changes: 4 additions & 4 deletions sei-db/db_engine/litt/disktable/keymap/pebble_db_keymap.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,19 @@ func (p *PebbleDBKeymap) Get(key []byte) (types.Address, bool, error) {
val, closer, err := p.db.Get(key)
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
return 0, false, nil
return types.Address{}, false, nil
}
return 0, false, fmt.Errorf("failed to get key from PebbleDB: %w", err)
return types.Address{}, false, fmt.Errorf("failed to get key from PebbleDB: %w", err)
}
// Clone the bytes before closing, since the slice is only valid until closer.Close().
cloned := bytes.Clone(val)
if cerr := closer.Close(); cerr != nil {
return 0, false, fmt.Errorf("failed to close PebbleDB get closer: %w", cerr)
return types.Address{}, false, fmt.Errorf("failed to close PebbleDB get closer: %w", cerr)
}

address, err := types.DeserializeAddress(cloned)
if err != nil {
return 0, false, fmt.Errorf("failed to deserialize address: %w", err)
return types.Address{}, false, fmt.Errorf("failed to deserialize address: %w", err)
}

return address, true, nil
Expand Down
Loading
Loading