Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
connectrpc.com/connect v1.18.1
github.com/AlecAivazis/survey/v2 v2.3.7
github.com/Doist/unfurlist v0.0.0-20250409100812-515f2735f8e5
github.com/OpenAudio/go-openaudio v1.3.1-0.20260529194448-35ad422c0f27
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260529194448-35ad422c0f27
github.com/OpenAudio/go-openaudio v1.3.1-0.20260529221831-4d1c9dfdfb52
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260529221831-4d1c9dfdfb52
github.com/aquasecurity/esquery v0.2.0
github.com/axiomhq/axiom-go v0.23.0
github.com/axiomhq/hyperloglog v0.2.5
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2 h1:+vx7roKuyA63n
github.com/Netflix/go-expect v0.0.0-20220104043353-73e0943537d2/go.mod h1:HBCaDeC1lPdgDeDbhX8XFpy1jqjK0IBG8W5K+xYqA0w=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk=
github.com/OpenAudio/go-openaudio v1.3.1-0.20260529194448-35ad422c0f27 h1:h42jz04hL+3kxICalInzqppFPUvll0BpckPJp/ei86w=
github.com/OpenAudio/go-openaudio v1.3.1-0.20260529194448-35ad422c0f27/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs=
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260529194448-35ad422c0f27 h1:OC9CDVmc5EtEcXLLfTfkFXRMNjQLgeCV4+qDlWZzSVU=
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260529194448-35ad422c0f27/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8=
github.com/OpenAudio/go-openaudio v1.3.1-0.20260529221831-4d1c9dfdfb52 h1:uT5PXDx/R398ufAYswcUjaavc4OuYIrhf4DhWFRKlaY=
github.com/OpenAudio/go-openaudio v1.3.1-0.20260529221831-4d1c9dfdfb52/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs=
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260529221831-4d1c9dfdfb52 h1:cwLCsQGeiJbLBa2tEK1fZrg4r+bmnv7ZuJ8PrWHlwPE=
github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260529221831-4d1c9dfdfb52/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8=
github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA=
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8=
github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI=
Expand Down
9 changes: 9 additions & 0 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ func NewIndexer(cfg config.Config) *CoreIndexer {
etlIndexer.SetUserCreatedHook(userEventsHook)
etlIndexer.RegisterPostHook(em.EntityTypeUser, em.ActionUpdate, userEventsHook)

// Write each on-chain play into the `plays` table, restoring the legacy
// Python `index_core_plays` behavior. The vendored ETL play processor
// only writes `etl_plays` (which nothing in api/ reads); this hook
// bridges plays into the `plays` table every downstream consumer (the
// on_play trigger's aggregates/milestones/notifications, the challenge
// processors, trending, hourly-play-count) depends on. Runs in the same
// DB tx as etl_plays, so the rows commit atomically.
etlIndexer.RegisterPlaysHook(newPlaysHook(logger))

return &CoreIndexer{
aggregatesCalculator: aggregatesCalculator,
etlIndexer: etlIndexer,
Expand Down
120 changes: 120 additions & 0 deletions indexer/plays_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package indexer

import (
"context"
"fmt"
"strconv"
"strings"
"time"

etl "github.com/OpenAudio/go-openaudio/pkg/etl"
"go.uber.org/zap"
)

// newPlaysHook returns a pkg/etl PlaysHook that writes each on-chain play
// into api/'s `plays` table, restoring the behavior of the legacy Python
// discovery-provider task `index_core_plays`.
//
// Background: the vendored ETL play processor only writes its own
// `etl_plays` table; nothing in api/ reads `etl_plays`. The `plays` table is
// the one every downstream consumer depends on — the `on_play` trigger fans
// a row out to aggregate_plays / aggregate_monthly_plays / milestones /
// notifications / user_distinct_play_*; the challenge processors
// (listen_streak, play_count_milestones) poll `plays` directly; trending and
// hourly-play-count jobs read it. This hook bridges that gap.
//
// The hook runs in the same DB transaction (savepoint) the play processor
// used for etl_plays (etl.PlaysParams.DBTX), so the `plays` rows commit
// atomically with etl_plays and the rest of the block.
//
// Field mapping mirrors index_core_plays exactly:
// - play_item_id = int(track_id); the play is SKIPPED if track_id is not an
// integer (matches Python's `try: int(track_id) except: continue`).
// - user_id = int(user_id), or NULL for an anonymous listen when user_id is
// not an integer (Python's "Recording anonymous listen" path).
// - source = "relay", created_at = play timestamp, updated_at = now().
// - city/region/country/signature copied through.
// - slot = Core block height: a monotonically increasing integer shared by
// all plays in a block, the same shape as Python's shared per-tx
// next_slot. (`plays.slot` is read by no api/ Go consumer; the `on_play`
// trigger only forwards it onto milestone/notification rows.)
//
// Unlike Python, this hook does NOT dispatch challenge events — the new
// challenge processors reconcile from `plays` by polling, so the bridge only
// needs to land the rows.
//
// Hook errors are logged but not propagated (etl.PlaysHook contract): a
// malformed play must not roll back etl_plays or halt the indexer.
func newPlaysHook(logger *zap.Logger) etl.PlaysHook {
hookLogger := logger.Named("PlaysHook")
return func(ctx context.Context, params *etl.PlaysParams) error {
if err := indexPlays(ctx, params); err != nil {
hookLogger.Warn("failed to index plays into plays table",
zap.String("tx_hash", params.TxHash),
zap.Int64("block_height", params.BlockHeight),
zap.Error(err))
}
return nil
}
}

func indexPlays(ctx context.Context, params *etl.PlaysParams) error {
plays := params.Plays
if len(plays) == 0 {
return nil
}

// slot is shared by every play in this block (mirrors Python's shared
// per-tx next_slot). Block height is monotonic and comfortably within
// int range for the foreseeable life of the chain.
slot := int(params.BlockHeight)
now := time.Now()

// Columns per row: user_id, source, play_item_id, created_at,
// updated_at, slot, signature, city, region, country.
var (
tuples []string
args []any
)
for _, p := range plays {
trackID, err := strconv.Atoi(p.GetTrackId())
if err != nil {
continue // non-integer track id: skip (matches Python)
}

// Anonymous listens carry a non-integer user_id; store NULL.
var userID any
if uid, err := strconv.Atoi(p.GetUserId()); err == nil {
userID = uid
} else {
userID = nil
}

n := len(args)
tuples = append(tuples, fmt.Sprintf("($%d,$%d,$%d,$%d,$%d,$%d,$%d,$%d,$%d,$%d)",
n+1, n+2, n+3, n+4, n+5, n+6, n+7, n+8, n+9, n+10))
args = append(args,
userID, // user_id (nullable)
"relay", // source
trackID, // play_item_id
p.GetTimestamp().AsTime(), // created_at
now, // updated_at
slot, // slot
p.GetSignature(), // signature
p.GetCity(), // city
p.GetRegion(), // region
p.GetCountry(), // country
)
}

if len(tuples) == 0 {
return nil
}

sql := `INSERT INTO plays
(user_id, source, play_item_id, created_at, updated_at, slot, signature, city, region, country)
VALUES ` + strings.Join(tuples, ",")

_, err := params.DBTX.Exec(ctx, sql, args...)
return err
}
145 changes: 145 additions & 0 deletions indexer/plays_hook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package indexer

import (
"context"
"testing"
"time"

"api.audius.co/database"
corev1 "github.com/OpenAudio/go-openaudio/pkg/api/core/v1"
etl "github.com/OpenAudio/go-openaudio/pkg/etl"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
)

// runPlaysHook drives the plays indexing logic for one Plays tx against a
// real DB. It calls indexPlays directly (not the hook wrapper) so a DB error
// surfaces as a test failure instead of being swallowed by the PlaysHook
// "log and continue" contract.
func runPlaysHook(t *testing.T, pool *pgxpool.Pool, height int64, plays []*corev1.TrackPlay) {
t.Helper()
err := indexPlays(context.Background(), &etl.PlaysParams{
Plays: plays,
BlockHeight: height,
BlockTime: time.Unix(1700000000, 0),
BlockHash: "blockhash",
TxHash: "txhash",
DBTX: pool,
})
require.NoError(t, err)
}

func play(userID, trackID string) *corev1.TrackPlay {
return &corev1.TrackPlay{
UserId: userID,
TrackId: trackID,
Signature: "sig-" + userID + "-" + trackID,
City: "Brooklyn",
Region: "NY",
Country: "US",
Timestamp: timestamppb.New(time.Unix(1700000000, 0)),
}
}

func TestPlaysHook_InsertsPlay(t *testing.T) {
pool := database.CreateTestDatabase(t, "test_indexer")
database.Seed(pool, database.FixtureMap{})

runPlaysHook(t, pool, 555, []*corev1.TrackPlay{play("100", "200")})

var (
userID *int64
playItem int64
source string
slot *int64
signature *string
city *string
)
err := pool.QueryRow(context.Background(),
`SELECT user_id, play_item_id, source, slot, signature, city
FROM plays WHERE play_item_id = 200`).
Scan(&userID, &playItem, &source, &slot, &signature, &city)
require.NoError(t, err)

require.NotNil(t, userID)
assert.Equal(t, int64(100), *userID)
assert.Equal(t, int64(200), playItem)
assert.Equal(t, "relay", source)
require.NotNil(t, slot)
assert.Equal(t, int64(555), *slot, "slot should be the Core block height")
require.NotNil(t, signature)
assert.Equal(t, "sig-100-200", *signature)
require.NotNil(t, city)
assert.Equal(t, "Brooklyn", *city)
}

// A non-integer user_id is an anonymous listen: the row is still written but
// user_id is NULL (mirrors index_core_plays).
func TestPlaysHook_AnonymousListen(t *testing.T) {
pool := database.CreateTestDatabase(t, "test_indexer")
database.Seed(pool, database.FixtureMap{})

runPlaysHook(t, pool, 1, []*corev1.TrackPlay{play("anon-device-uuid", "300")})

var userID *int64
var count int
err := pool.QueryRow(context.Background(),
`SELECT count(*), max(user_id) FROM plays WHERE play_item_id = 300`).
Scan(&count, &userID)
require.NoError(t, err)
assert.Equal(t, 1, count, "anonymous play is still recorded")
assert.Nil(t, userID, "anonymous play has NULL user_id")
}

// A non-integer track_id is skipped entirely (Python's try/except continue).
func TestPlaysHook_SkipsNonIntegerTrackId(t *testing.T) {
pool := database.CreateTestDatabase(t, "test_indexer")
database.Seed(pool, database.FixtureMap{})

runPlaysHook(t, pool, 1, []*corev1.TrackPlay{
play("100", "not-a-number"),
play("100", "400"), // valid one in the same tx still lands
})

var total, valid int
require.NoError(t, pool.QueryRow(context.Background(),
`SELECT count(*) FROM plays`).Scan(&total))
require.NoError(t, pool.QueryRow(context.Background(),
`SELECT count(*) FROM plays WHERE play_item_id = 400`).Scan(&valid))
assert.Equal(t, 1, total, "only the integer-track_id play is written")
assert.Equal(t, 1, valid)
}

func TestPlaysHook_EmptyIsNoOp(t *testing.T) {
pool := database.CreateTestDatabase(t, "test_indexer")
database.Seed(pool, database.FixtureMap{})

runPlaysHook(t, pool, 1, nil)
runPlaysHook(t, pool, 1, []*corev1.TrackPlay{})

var total int
require.NoError(t, pool.QueryRow(context.Background(),
`SELECT count(*) FROM plays`).Scan(&total))
assert.Equal(t, 0, total)
}

// The on_play trigger fans a play out to aggregate_plays; verify the bridge
// row actually drives those downstream aggregates (the whole reason we write
// `plays` and not just `etl_plays`).
func TestPlaysHook_DrivesAggregatePlays(t *testing.T) {
pool := database.CreateTestDatabase(t, "test_indexer")
database.Seed(pool, database.FixtureMap{})

runPlaysHook(t, pool, 1, []*corev1.TrackPlay{
play("100", "200"),
play("101", "200"),
})

var count int64
err := pool.QueryRow(context.Background(),
`SELECT count FROM aggregate_plays WHERE play_item_id = 200`).Scan(&count)
require.NoError(t, err)
assert.Equal(t, int64(2), count, "on_play trigger should have counted both plays")
}
Loading