From 58184011066300feef7e60ca41b5fb35e87300ad Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Fri, 29 May 2026 14:43:12 -0700 Subject: [PATCH] feat(indexer): bridge on-chain plays into the plays table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The vendored ETL play processor only writes etl_plays, which nothing in api/ reads. Restore the legacy Python index_core_plays behavior with a PlaysHook (go-openaudio #322) that writes each on-chain play into the `plays` table — the row every downstream consumer depends on (the on_play trigger's aggregates/milestones/notifications, the challenge processors, trending, hourly play counts). Field mapping mirrors index_core_plays exactly: play_item_id = int(track_id) (skip non-integer), user_id = int(user_id) or NULL for anonymous listens, source = "relay", created_at = play timestamp, slot = Core block height. Challenge-event dispatch is intentionally omitted — the new challenge processors reconcile from `plays` by polling. Runs in the same DB tx as etl_plays, so the rows commit atomically. Bumps the go-openaudio pin to the commit that adds RegisterPlaysHook. Co-Authored-By: Claude Opus 4.7 --- go.mod | 4 +- go.sum | 8 +- indexer/indexer.go | 9 +++ indexer/plays_hook.go | 120 ++++++++++++++++++++++++++++++ indexer/plays_hook_test.go | 145 +++++++++++++++++++++++++++++++++++++ 5 files changed, 280 insertions(+), 6 deletions(-) create mode 100644 indexer/plays_hook.go create mode 100644 indexer/plays_hook_test.go diff --git a/go.mod b/go.mod index cea03e0c..1e6ad6c6 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( connectrpc.com/connect v1.18.1 github.com/AlecAivazis/survey/v2 v2.3.7 github.com/Doist/unfurlist v0.0.0-20250409100812-515f2735f8e5 - github.com/OpenAudio/go-openaudio v1.3.1-0.20260529194448-35ad422c0f27 - github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260529194448-35ad422c0f27 + github.com/OpenAudio/go-openaudio v1.3.1-0.20260529221831-4d1c9dfdfb52 + github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260529221831-4d1c9dfdfb52 github.com/aquasecurity/esquery v0.2.0 github.com/axiomhq/axiom-go v0.23.0 github.com/axiomhq/hyperloglog v0.2.5 diff --git a/go.sum b/go.sum index 9940fc7f..ceab1798 100644 --- a/go.sum +++ b/go.sum @@ -20,10 +20,10 @@ github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 h1:+vx7roKuyA63n github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDeC1lPdgDeDbhX8XFpy1jqjK0IBG8W5K+xYqA0w= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= -github.com/OpenAudio/go-openaudio v1.3.1-0.20260529194448-35ad422c0f27 h1:h42jz04hL+3kxICalInzqppFPUvll0BpckPJp/ei86w= -github.com/OpenAudio/go-openaudio v1.3.1-0.20260529194448-35ad422c0f27/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs= -github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260529194448-35ad422c0f27 h1:OC9CDVmc5EtEcXLLfTfkFXRMNjQLgeCV4+qDlWZzSVU= -github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260529194448-35ad422c0f27/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8= +github.com/OpenAudio/go-openaudio v1.3.1-0.20260529221831-4d1c9dfdfb52 h1:uT5PXDx/R398ufAYswcUjaavc4OuYIrhf4DhWFRKlaY= +github.com/OpenAudio/go-openaudio v1.3.1-0.20260529221831-4d1c9dfdfb52/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs= +github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260529221831-4d1c9dfdfb52 h1:cwLCsQGeiJbLBa2tEK1fZrg4r+bmnv7ZuJ8PrWHlwPE= +github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260529221831-4d1c9dfdfb52/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8= github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= diff --git a/indexer/indexer.go b/indexer/indexer.go index f7f03203..94721b5f 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -89,6 +89,15 @@ func NewIndexer(cfg config.Config) *CoreIndexer { etlIndexer.SetUserCreatedHook(userEventsHook) etlIndexer.RegisterPostHook(em.EntityTypeUser, em.ActionUpdate, userEventsHook) + // Write each on-chain play into the `plays` table, restoring the legacy + // Python `index_core_plays` behavior. The vendored ETL play processor + // only writes `etl_plays` (which nothing in api/ reads); this hook + // bridges plays into the `plays` table every downstream consumer (the + // on_play trigger's aggregates/milestones/notifications, the challenge + // processors, trending, hourly-play-count) depends on. Runs in the same + // DB tx as etl_plays, so the rows commit atomically. + etlIndexer.RegisterPlaysHook(newPlaysHook(logger)) + return &CoreIndexer{ aggregatesCalculator: aggregatesCalculator, etlIndexer: etlIndexer, diff --git a/indexer/plays_hook.go b/indexer/plays_hook.go new file mode 100644 index 00000000..871fc6e3 --- /dev/null +++ b/indexer/plays_hook.go @@ -0,0 +1,120 @@ +package indexer + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" + + etl "github.com/OpenAudio/go-openaudio/pkg/etl" + "go.uber.org/zap" +) + +// newPlaysHook returns a pkg/etl PlaysHook that writes each on-chain play +// into api/'s `plays` table, restoring the behavior of the legacy Python +// discovery-provider task `index_core_plays`. +// +// Background: the vendored ETL play processor only writes its own +// `etl_plays` table; nothing in api/ reads `etl_plays`. The `plays` table is +// the one every downstream consumer depends on — the `on_play` trigger fans +// a row out to aggregate_plays / aggregate_monthly_plays / milestones / +// notifications / user_distinct_play_*; the challenge processors +// (listen_streak, play_count_milestones) poll `plays` directly; trending and +// hourly-play-count jobs read it. This hook bridges that gap. +// +// The hook runs in the same DB transaction (savepoint) the play processor +// used for etl_plays (etl.PlaysParams.DBTX), so the `plays` rows commit +// atomically with etl_plays and the rest of the block. +// +// Field mapping mirrors index_core_plays exactly: +// - play_item_id = int(track_id); the play is SKIPPED if track_id is not an +// integer (matches Python's `try: int(track_id) except: continue`). +// - user_id = int(user_id), or NULL for an anonymous listen when user_id is +// not an integer (Python's "Recording anonymous listen" path). +// - source = "relay", created_at = play timestamp, updated_at = now(). +// - city/region/country/signature copied through. +// - slot = Core block height: a monotonically increasing integer shared by +// all plays in a block, the same shape as Python's shared per-tx +// next_slot. (`plays.slot` is read by no api/ Go consumer; the `on_play` +// trigger only forwards it onto milestone/notification rows.) +// +// Unlike Python, this hook does NOT dispatch challenge events — the new +// challenge processors reconcile from `plays` by polling, so the bridge only +// needs to land the rows. +// +// Hook errors are logged but not propagated (etl.PlaysHook contract): a +// malformed play must not roll back etl_plays or halt the indexer. +func newPlaysHook(logger *zap.Logger) etl.PlaysHook { + hookLogger := logger.Named("PlaysHook") + return func(ctx context.Context, params *etl.PlaysParams) error { + if err := indexPlays(ctx, params); err != nil { + hookLogger.Warn("failed to index plays into plays table", + zap.String("tx_hash", params.TxHash), + zap.Int64("block_height", params.BlockHeight), + zap.Error(err)) + } + return nil + } +} + +func indexPlays(ctx context.Context, params *etl.PlaysParams) error { + plays := params.Plays + if len(plays) == 0 { + return nil + } + + // slot is shared by every play in this block (mirrors Python's shared + // per-tx next_slot). Block height is monotonic and comfortably within + // int range for the foreseeable life of the chain. + slot := int(params.BlockHeight) + now := time.Now() + + // Columns per row: user_id, source, play_item_id, created_at, + // updated_at, slot, signature, city, region, country. + var ( + tuples []string + args []any + ) + for _, p := range plays { + trackID, err := strconv.Atoi(p.GetTrackId()) + if err != nil { + continue // non-integer track id: skip (matches Python) + } + + // Anonymous listens carry a non-integer user_id; store NULL. + var userID any + if uid, err := strconv.Atoi(p.GetUserId()); err == nil { + userID = uid + } else { + userID = nil + } + + n := len(args) + tuples = append(tuples, fmt.Sprintf("($%d,$%d,$%d,$%d,$%d,$%d,$%d,$%d,$%d,$%d)", + n+1, n+2, n+3, n+4, n+5, n+6, n+7, n+8, n+9, n+10)) + args = append(args, + userID, // user_id (nullable) + "relay", // source + trackID, // play_item_id + p.GetTimestamp().AsTime(), // created_at + now, // updated_at + slot, // slot + p.GetSignature(), // signature + p.GetCity(), // city + p.GetRegion(), // region + p.GetCountry(), // country + ) + } + + if len(tuples) == 0 { + return nil + } + + sql := `INSERT INTO plays + (user_id, source, play_item_id, created_at, updated_at, slot, signature, city, region, country) + VALUES ` + strings.Join(tuples, ",") + + _, err := params.DBTX.Exec(ctx, sql, args...) + return err +} diff --git a/indexer/plays_hook_test.go b/indexer/plays_hook_test.go new file mode 100644 index 00000000..4f611575 --- /dev/null +++ b/indexer/plays_hook_test.go @@ -0,0 +1,145 @@ +package indexer + +import ( + "context" + "testing" + "time" + + "api.audius.co/database" + corev1 "github.com/OpenAudio/go-openaudio/pkg/api/core/v1" + etl "github.com/OpenAudio/go-openaudio/pkg/etl" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// runPlaysHook drives the plays indexing logic for one Plays tx against a +// real DB. It calls indexPlays directly (not the hook wrapper) so a DB error +// surfaces as a test failure instead of being swallowed by the PlaysHook +// "log and continue" contract. +func runPlaysHook(t *testing.T, pool *pgxpool.Pool, height int64, plays []*corev1.TrackPlay) { + t.Helper() + err := indexPlays(context.Background(), &etl.PlaysParams{ + Plays: plays, + BlockHeight: height, + BlockTime: time.Unix(1700000000, 0), + BlockHash: "blockhash", + TxHash: "txhash", + DBTX: pool, + }) + require.NoError(t, err) +} + +func play(userID, trackID string) *corev1.TrackPlay { + return &corev1.TrackPlay{ + UserId: userID, + TrackId: trackID, + Signature: "sig-" + userID + "-" + trackID, + City: "Brooklyn", + Region: "NY", + Country: "US", + Timestamp: timestamppb.New(time.Unix(1700000000, 0)), + } +} + +func TestPlaysHook_InsertsPlay(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_indexer") + database.Seed(pool, database.FixtureMap{}) + + runPlaysHook(t, pool, 555, []*corev1.TrackPlay{play("100", "200")}) + + var ( + userID *int64 + playItem int64 + source string + slot *int64 + signature *string + city *string + ) + err := pool.QueryRow(context.Background(), + `SELECT user_id, play_item_id, source, slot, signature, city + FROM plays WHERE play_item_id = 200`). + Scan(&userID, &playItem, &source, &slot, &signature, &city) + require.NoError(t, err) + + require.NotNil(t, userID) + assert.Equal(t, int64(100), *userID) + assert.Equal(t, int64(200), playItem) + assert.Equal(t, "relay", source) + require.NotNil(t, slot) + assert.Equal(t, int64(555), *slot, "slot should be the Core block height") + require.NotNil(t, signature) + assert.Equal(t, "sig-100-200", *signature) + require.NotNil(t, city) + assert.Equal(t, "Brooklyn", *city) +} + +// A non-integer user_id is an anonymous listen: the row is still written but +// user_id is NULL (mirrors index_core_plays). +func TestPlaysHook_AnonymousListen(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_indexer") + database.Seed(pool, database.FixtureMap{}) + + runPlaysHook(t, pool, 1, []*corev1.TrackPlay{play("anon-device-uuid", "300")}) + + var userID *int64 + var count int + err := pool.QueryRow(context.Background(), + `SELECT count(*), max(user_id) FROM plays WHERE play_item_id = 300`). + Scan(&count, &userID) + require.NoError(t, err) + assert.Equal(t, 1, count, "anonymous play is still recorded") + assert.Nil(t, userID, "anonymous play has NULL user_id") +} + +// A non-integer track_id is skipped entirely (Python's try/except continue). +func TestPlaysHook_SkipsNonIntegerTrackId(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_indexer") + database.Seed(pool, database.FixtureMap{}) + + runPlaysHook(t, pool, 1, []*corev1.TrackPlay{ + play("100", "not-a-number"), + play("100", "400"), // valid one in the same tx still lands + }) + + var total, valid int + require.NoError(t, pool.QueryRow(context.Background(), + `SELECT count(*) FROM plays`).Scan(&total)) + require.NoError(t, pool.QueryRow(context.Background(), + `SELECT count(*) FROM plays WHERE play_item_id = 400`).Scan(&valid)) + assert.Equal(t, 1, total, "only the integer-track_id play is written") + assert.Equal(t, 1, valid) +} + +func TestPlaysHook_EmptyIsNoOp(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_indexer") + database.Seed(pool, database.FixtureMap{}) + + runPlaysHook(t, pool, 1, nil) + runPlaysHook(t, pool, 1, []*corev1.TrackPlay{}) + + var total int + require.NoError(t, pool.QueryRow(context.Background(), + `SELECT count(*) FROM plays`).Scan(&total)) + assert.Equal(t, 0, total) +} + +// The on_play trigger fans a play out to aggregate_plays; verify the bridge +// row actually drives those downstream aggregates (the whole reason we write +// `plays` and not just `etl_plays`). +func TestPlaysHook_DrivesAggregatePlays(t *testing.T) { + pool := database.CreateTestDatabase(t, "test_indexer") + database.Seed(pool, database.FixtureMap{}) + + runPlaysHook(t, pool, 1, []*corev1.TrackPlay{ + play("100", "200"), + play("101", "200"), + }) + + var count int64 + err := pool.QueryRow(context.Background(), + `SELECT count FROM aggregate_plays WHERE play_item_id = 200`).Scan(&count) + require.NoError(t, err) + assert.Equal(t, int64(2), count, "on_play trigger should have counted both plays") +}