Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/infrastructure/docker-compose-etcd.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '2'
services:
etcd:
image: gcr.io/etcd-development/etcd:v3.4.20
image: gcr.io/etcd-development/etcd:v3.5.21
ports:
- "12379:2379"
command: etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379
command: etcd --listen-client-urls http://0.0.0.0:2379 --advertise-client-urls http://0.0.0.0:2379
87 changes: 87 additions & 0 deletions common/component/postgresql/v1/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"reflect"
"strconv"
"strings"
"time"

"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -182,6 +183,7 @@ func (p *PostgreSQL) Features() []state.Feature {
state.FeatureETag,
state.FeatureTransactional,
state.FeatureTTL,
state.FeatureKeysLike,
}
}

Expand Down Expand Up @@ -531,3 +533,88 @@ func (p *PostgreSQL) GetComponentMetadata() (metadataInfo metadata.MetadataMap)
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.StateStoreType)
return
}

func (p *PostgreSQL) KeysLike(ctx context.Context, req *state.KeysLikeRequest) (*state.KeysLikeResponse, error) {
if len(req.Pattern) == 0 {
return nil, state.ErrKeysLikeEmptyPattern
}

// Match with backslash-escaping for % and _
where := []string{
`key LIKE $1 ESCAPE '\'`,
`(expiredate IS NULL OR expiredate > CURRENT_TIMESTAMP)`,
}
args := []any{req.Pattern}

// Pagination: resume strictly AFTER the last returned row_id
if req.ContinuationToken != nil && *req.ContinuationToken != "" {
rid, err := strconv.ParseInt(*req.ContinuationToken, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid continue token: %w", err)
}
where = append(where, fmt.Sprintf("row_id > $%d", len(args)+1))
args = append(args, rid)
}

// Optional LIMIT: fetch one extra row to detect "has next"
limitClause := ""
var pageSize uint32
if req.PageSize != nil && *req.PageSize > 0 {
pageSize = *req.PageSize
limitClause = fmt.Sprintf(" LIMIT $%d", len(args)+1)
args = append(args, pageSize+1)
}

query := fmt.Sprintf(`
SELECT key, row_id
FROM %s
WHERE %s
ORDER BY row_id ASC%s`,
p.metadata.TableName,
strings.Join(where, " AND "),
limitClause,
)

rows, err := p.db.Query(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()

type rec struct {
key string
rowID uint64
}
list := make([]rec, 0, 256)

for rows.Next() {
var k string
var rid uint64
if err := rows.Scan(&k, &rid); err != nil {
return nil, err
}
list = append(list, rec{key: k, rowID: rid})
}
if err := rows.Err(); err != nil {
return nil, err
}

resp := &state.KeysLikeResponse{
Keys: make([]string, 0, len(list)),
}

// If we fetched more than a page, set the token to the last returned row's row_id
//nolint:gosec
if pageSize > 0 && uint32(len(list)) > pageSize {
lastReturned := list[pageSize-1].rowID
tok := strconv.FormatUint(lastReturned, 10)
resp.ContinuationToken = &tok
list = list[:pageSize]
}

for _, r := range list {
resp.Keys = append(resp.Keys, r.key)
}

return resp, nil
}
7 changes: 1 addition & 6 deletions common/component/postgresql/v1/postgresql_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,7 @@ func NewPostgreSQLQueryStateStore(logger logger.Logger, opts Options) state.Stor

// Features returns the features available in this component.
func (p *PostgreSQLQuery) Features() []state.Feature {
return []state.Feature{
state.FeatureETag,
state.FeatureTransactional,
state.FeatureQueryAPI,
state.FeatureTTL,
}
return append(p.PostgreSQL.Features(), state.FeatureQueryAPI)
}

// Query executes a query against store.
Expand Down
95 changes: 71 additions & 24 deletions state/cockroachdb/cockroachdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ WHERE
}

func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgresql.MigrateOptions) error {
// Create state table if missing, with row_id ready for pagination
exists, err := tableExists(ctx, db, opts.StateTableName)
if err != nil {
return err
Expand All @@ -78,42 +79,88 @@ func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgre
if !exists {
opts.Logger.Info("Creating CockroachDB state table")
_, err = db.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s (
key text NOT NULL PRIMARY KEY,
value jsonb NOT NULL,
isbinary boolean NOT NULL,
etag INT,
insertdate TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updatedate TIMESTAMP WITH TIME ZONE NULL,
expiredate TIMESTAMP WITH TIME ZONE NULL,
INDEX expiredate_idx (expiredate)
key text NOT NULL PRIMARY KEY,
value jsonb NOT NULL,
isbinary boolean NOT NULL,
etag INT,
insertdate TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updatedate TIMESTAMP WITH TIME ZONE NULL,
expiredate TIMESTAMP WITH TIME ZONE NULL,
row_id INT8 NOT NULL DEFAULT unique_rowid(),
UNIQUE (row_id)
);`, opts.StateTableName))
if err != nil {
return err
}
}
// Indexes created after table create for idempotency
if _, err = db.Exec(ctx, fmt.Sprintf(
`CREATE INDEX IF NOT EXISTS %s_expiredate_idx ON %s (expiredate);`,
opts.StateTableName, opts.StateTableName)); err != nil {
return err
}
} else {
// Existing table: make sure columns + indexes exist
// 1) expiredate (idempotent)
if _, err = db.Exec(ctx, fmt.Sprintf(
`ALTER TABLE %s ADD COLUMN IF NOT EXISTS expiredate TIMESTAMPTZ NULL;`,
opts.StateTableName)); err != nil {
return err
}
if _, err = db.Exec(ctx, fmt.Sprintf(
`CREATE INDEX IF NOT EXISTS %s_expiredate_idx ON %s (expiredate);`,
opts.StateTableName, opts.StateTableName)); err != nil {
return err
}

// If table was created before v1.11.
_, err = db.Exec(ctx, fmt.Sprintf(
`ALTER TABLE %s ADD COLUMN IF NOT EXISTS expiredate TIMESTAMP WITH TIME ZONE NULL;`, opts.StateTableName))
if err != nil {
return err
}
_, err = db.Exec(ctx, fmt.Sprintf(
`CREATE INDEX IF NOT EXISTS expiredate_idx ON %s (expiredate);`, opts.StateTableName))
if err != nil {
return err
// 2) row_id for keyset pagination
opts.Logger.Infof("Ensuring row_id exists on '%s'", opts.StateTableName)

// Add column if missing (nullable initially)
if _, err = db.Exec(ctx, fmt.Sprintf(
`ALTER TABLE %s ADD COLUMN IF NOT EXISTS row_id INT8;`,
opts.StateTableName)); err != nil {
return err
}

// Ensure it has a default generator
if _, err = db.Exec(ctx, fmt.Sprintf(
`ALTER TABLE %s ALTER COLUMN row_id SET DEFAULT unique_rowid();`,
opts.StateTableName)); err != nil {
return err
}

// Backfill NULLs (older rows) with generated values
if _, err = db.Exec(ctx, fmt.Sprintf(
`UPDATE %s SET row_id = unique_rowid() WHERE row_id IS NULL;`,
opts.StateTableName)); err != nil {
return err
}

// Enforce NOT NULL
if _, err = db.Exec(ctx, fmt.Sprintf(
`ALTER TABLE %s ALTER COLUMN row_id SET NOT NULL;`,
opts.StateTableName)); err != nil {
return err
}

// Unique index to guarantee ordering without changing PK
if _, err = db.Exec(ctx, fmt.Sprintf(
`CREATE UNIQUE INDEX IF NOT EXISTS %s_row_id_uidx ON %s (row_id);`,
opts.StateTableName, opts.StateTableName)); err != nil {
return err
}
}

// Metadata table
exists, err = tableExists(ctx, db, opts.MetadataTableName)
if err != nil {
return err
}

if !exists {
opts.Logger.Info("Creating CockroachDB metadata table")
_, err = db.Exec(ctx, fmt.Sprintf(`CREATE TABLE %s (
key text NOT NULL PRIMARY KEY,
value text NOT NULL
key text NOT NULL PRIMARY KEY,
value text NOT NULL
);`, opts.MetadataTableName))
if err != nil {
return err
Expand All @@ -124,7 +171,7 @@ func ensureTables(ctx context.Context, db pginterfaces.PGXPoolConn, opts postgre
}

func tableExists(ctx context.Context, db pginterfaces.PGXPoolConn, tableName string) (bool, error) {
exists := false
err := db.QueryRow(ctx, "SELECT EXISTS (SELECT * FROM pg_tables where tablename = $1)", tableName).Scan(&exists)
var exists bool
err := db.QueryRow(ctx, "SELECT EXISTS (SELECT * FROM pg_tables WHERE tablename = $1)", tableName).Scan(&exists)
return exists, err
}
2 changes: 2 additions & 0 deletions state/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
ETagMismatch ETagErrorKind = "mismatch"
)

var ErrKeysLikeEmptyPattern = errors.New("keys like pattern cannot be empty")

// ETagError is a custom error type for etag exceptions.
type ETagError struct {
err error
Expand Down
Loading