Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ help:
@echo "Available targets:"
@echo " build Build the qhronosd binary"
@echo " clean Remove the qhronosd binary"
@echo " test Run the test script (requires docker-up)"
@echo " test [name] Run the test script (requires docker-up). Optionally pass a test or subtest name, e.g. 'make test TestScheduler/successful scheduling'"
@echo " migrate-up Run migrations up using scripts/migrate.sh"
@echo " migrate-down Run migrations down using scripts/migrate.sh"
@echo " migrate-clean-slate Drop and recreate the database, then run all migrations from scratch"
Expand All @@ -28,7 +28,7 @@ clean:
rm -f $(BINARY_NAME)

test:
bash scripts/test.sh
bash scripts/test.sh $(filter-out $@,$(MAKECMDGOALS))

migrate-up:
bash scripts/migrate.sh up
Expand Down Expand Up @@ -56,3 +56,6 @@ redis-cleanup:

docker-qup:
docker compose up -d postgres redis qhronosd

%::
@:
6 changes: 4 additions & 2 deletions internal/handlers/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ func TestEventHandler(t *testing.T) {
db := testutils.TestDB(t)
logger := zap.NewNop()
redisClient := testutils.TestRedis(t)
eventRepo := repository.NewEventRepository(db, logger, redisClient)
// Use test namespace for Redis keys in tests
namespace := testutils.GetRedisNamespace()
eventRepo := repository.NewEventRepository(db, logger, redisClient, namespace)
occurrenceRepo := repository.NewOccurrenceRepository(db, logger)
schedulerService := scheduler.NewScheduler(redisClient, logger)
schedulerService := scheduler.NewScheduler(redisClient, logger, namespace)
expander := scheduler.NewExpander(
schedulerService,
eventRepo,
Expand Down
3 changes: 2 additions & 1 deletion internal/handlers/occurrence_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func TestOccurrenceHandler(t *testing.T) {
db := testutils.TestDB(t)
logger := zap.NewNop()
redisClient := testutils.TestRedis(t)
eventRepo := repository.NewEventRepository(db, logger, redisClient)
namespace := testutils.GetRedisNamespace()
eventRepo := repository.NewEventRepository(db, logger, redisClient, namespace)
occurrenceRepo := repository.NewOccurrenceRepository(db, logger)
handler := NewOccurrenceHandler(eventRepo, occurrenceRepo)

Expand Down
13 changes: 7 additions & 6 deletions internal/integration/event_redis_cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func TestEventRepository_RedisCleanupOnDelete(t *testing.T) {
logger := zap.NewNop()
// Use a real DB for event repo, but test Redis
db := testutils.TestDB(t)
repo := repository.NewEventRepository(db, logger, redisClient)
namespace := testutils.GetRedisNamespace()
repo := repository.NewEventRepository(db, logger, redisClient, namespace)

// Create event and schedule occurrences in Redis (inline logic)
event := &models.Event{
Expand Down Expand Up @@ -65,17 +66,17 @@ func TestEventRepository_RedisCleanupOnDelete(t *testing.T) {
require.NoError(t, err)
key := "schedule:" + event.ID.String() + ":" + fmt.Sprintf("%d", occ.ScheduledAt.Unix())
score := float64(occ.ScheduledAt.UnixMilli())
_, err = redisClient.ZAdd(ctx, "schedules", redis.Z{
_, err = redisClient.ZAdd(ctx, namespace+"schedules", redis.Z{
Score: score,
Member: key,
}).Result()
require.NoError(t, err)
_, err = redisClient.HSet(ctx, "schedule:data", key, data).Result()
_, err = redisClient.HSet(ctx, namespace+"schedule:data", key, data).Result()
require.NoError(t, err)
}

// Ensure occurrences are in Redis
results, err := redisClient.ZRange(ctx, "schedules", 0, -1).Result()
results, err := redisClient.ZRange(ctx, namespace+"schedules", 0, -1).Result()
require.NoError(t, err)
assert.NotEmpty(t, results)

Expand All @@ -84,11 +85,11 @@ func TestEventRepository_RedisCleanupOnDelete(t *testing.T) {
require.NoError(t, err)

// Ensure occurrences for this event are removed from Redis
results, err = redisClient.ZRange(ctx, "schedules", 0, -1).Result()
results, err = redisClient.ZRange(ctx, namespace+"schedules", 0, -1).Result()
require.NoError(t, err)
for _, res := range results {
// Fetch from hash instead of decoding directly
data, err := redisClient.HGet(ctx, "schedule:data", res).Result()
data, err := redisClient.HGet(ctx, namespace+"schedule:data", res).Result()
if err != nil {
continue // skip if not found
}
Expand Down
19 changes: 10 additions & 9 deletions internal/repository/event_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ import (
)

type EventRepository struct {
db *sqlx.DB
logger *zap.Logger
redis *redis.Client
db *sqlx.DB
logger *zap.Logger
redis *redis.Client
redisPrefix string
}

func NewEventRepository(db *sqlx.DB, logger *zap.Logger, redis *redis.Client) *EventRepository {
return &EventRepository{db: db, logger: logger, redis: redis}
func NewEventRepository(db *sqlx.DB, logger *zap.Logger, redis *redis.Client, prefix string) *EventRepository {
return &EventRepository{db: db, logger: logger, redis: redis, redisPrefix: prefix}
}

func timePtr(t time.Time) *time.Time {
Expand Down Expand Up @@ -174,13 +175,13 @@ func (r *EventRepository) Delete(ctx context.Context, id uuid.UUID) error {

// RemoveEventOccurrencesFromRedis removes all scheduled occurrences for an event from Redis
func (r *EventRepository) RemoveEventOccurrencesFromRedis(ctx context.Context, eventID uuid.UUID) error {
results, err := r.redis.ZRange(ctx, "schedules", 0, -1).Result()
results, err := r.redis.ZRange(ctx, r.redisPrefix+"schedules", 0, -1).Result()
if err != nil {
return err
}
for _, key := range results {
// Fetch the schedule data from the hash
data, err := r.redis.HGet(ctx, "schedule:data", key).Result()
data, err := r.redis.HGet(ctx, r.redisPrefix+"schedule:data", key).Result()
if err != nil {
continue // skip if not found or error
}
Expand All @@ -192,8 +193,8 @@ func (r *EventRepository) RemoveEventOccurrencesFromRedis(ctx context.Context, e
}
if sched.EventID == eventID {
// Remove from both sorted set and hash
r.redis.ZRem(ctx, "schedules", key)
r.redis.HDel(ctx, "schedule:data", key)
r.redis.ZRem(ctx, r.redisPrefix+"schedules", key)
r.redis.HDel(ctx, r.redisPrefix+"schedule:data", key)
}
}
return nil
Expand Down
12 changes: 8 additions & 4 deletions internal/repository/event_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ func TestEventRepository(t *testing.T) {
db := testutils.TestDB(t)
logger := zap.NewNop()
redisClient := testutils.TestRedis(t)
repo := NewEventRepository(db, logger, redisClient)
namespace := testutils.GetRedisNamespace()
repo := NewEventRepository(db, logger, redisClient, namespace)

// Add cleanup function
cleanup := func() {
Expand Down Expand Up @@ -398,7 +399,8 @@ func TestDeleteOldEvents(t *testing.T) {

logger := zap.NewNop()
redisClient := testutils.TestRedis(t)
repo := NewEventRepository(db, logger, redisClient)
namespace := testutils.GetRedisNamespace()
repo := NewEventRepository(db, logger, redisClient, namespace)
ctx := context.Background()

// Create test data
Expand Down Expand Up @@ -467,7 +469,8 @@ func TestDeleteOldOccurrences(t *testing.T) {

logger := zap.NewNop()
redisClient := testutils.TestRedis(t)
repo := NewEventRepository(db, logger, redisClient)
namespace := testutils.GetRedisNamespace()
repo := NewEventRepository(db, logger, redisClient, namespace)
ctx := context.Background()

// Create test event
Expand Down Expand Up @@ -544,6 +547,7 @@ func TestArchiveOldData(t *testing.T) {

logger := zap.NewNop()
redisClient := testutils.TestRedis(t)
namespace := testutils.GetRedisNamespace()

// Insert an old event and occurrence
oldEvent := &models.Event{
Expand All @@ -558,7 +562,7 @@ func TestArchiveOldData(t *testing.T) {
Status: models.EventStatusActive,
CreatedAt: time.Now().Add(-48 * time.Hour),
}
eventRepo := NewEventRepository(db, logger, redisClient)
eventRepo := NewEventRepository(db, logger, redisClient, namespace)
err := eventRepo.Create(ctx, oldEvent)
require.NoError(t, err)

Expand Down
3 changes: 2 additions & 1 deletion internal/repository/occurrence_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ func TestOccurrenceRepository(t *testing.T) {
db := testutils.TestDB(t)
logger := zap.NewNop()
redisClient := testutils.TestRedis(t)
eventRepo := NewEventRepository(db, logger, redisClient)
namespace := testutils.GetRedisNamespace()
eventRepo := NewEventRepository(db, logger, redisClient, namespace)
repo := NewOccurrenceRepository(db, logger)

cleanup := func() {
Expand Down
3 changes: 2 additions & 1 deletion internal/scheduler/archiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func TestArchivalScheduler(t *testing.T) {
Status: models.EventStatusActive,
CreatedAt: time.Now().Add(-48 * time.Hour),
}
eventRepo := repository.NewEventRepository(db, logger, redisClient)
namespace := testutils.GetRedisNamespace()
eventRepo := repository.NewEventRepository(db, logger, redisClient, namespace)
err = eventRepo.Create(ctx, oldEvent)
require.NoError(t, err)

Expand Down
12 changes: 7 additions & 5 deletions internal/scheduler/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Dispatcher struct {
logger *zap.Logger
clientNotifier ClientNotifier // optional, for q: webhooks
scheduler *Scheduler // new field for scheduler
redisPrefix string // added redisPrefix field
}

// HTTPClient interface for mocking HTTP requests
Expand Down Expand Up @@ -62,6 +63,7 @@ func NewDispatcher(eventRepo *repository.EventRepository, occurrenceRepo *reposi
logger: logger,
clientNotifier: clientNotifier,
scheduler: scheduler,
redisPrefix: scheduler.redisPrefix,
}
}

Expand All @@ -77,8 +79,8 @@ func (d *Dispatcher) DispatchWebhook(ctx context.Context, sched *models.Schedule
if err != nil || event == nil {
key := fmt.Sprintf("schedule:%s:%d", sched.EventID.String(), sched.ScheduledAt.Unix())
if d.scheduler != nil && d.scheduler.redis != nil {
_, zremErr := d.scheduler.redis.ZRem(ctx, ScheduleKey, key).Result()
_, hdelErr := d.scheduler.redis.HDel(ctx, "schedule:data", key).Result()
_, zremErr := d.scheduler.redis.ZRem(ctx, d.scheduler.redisPrefix+ScheduleKey, key).Result()
_, hdelErr := d.scheduler.redis.HDel(ctx, d.scheduler.redisPrefix+"schedule:data", key).Result()
d.logger.Warn("Orphaned schedule found and removed",
zap.String("event_id", sched.EventID.String()),
zap.String("schedule_key", key),
Expand Down Expand Up @@ -245,7 +247,7 @@ return due
case <-ticker.C:
now := fmt.Sprintf("%f", float64(time.Now().Unix()))
// Use Lua script for atomic move
res, err := scheduler.redis.Eval(ctx, retryLua, []string{retryQueueKey, dispatchQueueKey}, now).Result()
res, err := scheduler.redis.Eval(ctx, retryLua, []string{d.scheduler.redisPrefix + retryQueueKey, d.scheduler.redisPrefix + dispatchQueueKey}, now).Result()
if err != nil {
d.logger.Error("[RETRY POLLER] Lua script failed", zap.Error(err))
continue
Expand All @@ -267,7 +269,7 @@ return due
d.logger.Debug("[DISPATCHER] Worker waiting for item", zap.Int("worker_id", workerID), zap.Time("ts", itemStart))
// Pop from dispatch queue (no processing queue)
popStart := time.Now()
data, err := scheduler.redis.BRPop(ctx, 5*time.Second, dispatchQueueKey).Result()
data, err := scheduler.redis.BRPop(ctx, 5*time.Second, d.scheduler.redisPrefix+dispatchQueueKey).Result()
popEnd := time.Now()
d.logger.Debug("[DISPATCHER] BRPop duration", zap.Int("worker_id", workerID), zap.Duration("duration", popEnd.Sub(popStart)), zap.Error(err))
if err == redis.Nil {
Expand Down Expand Up @@ -312,7 +314,7 @@ return due
nextRetry := time.Now().Add(d.retryDelay).Unix()
updatedData, _ := json.Marshal(sched)
d.logger.Debug("[DISPATCHER] Before ZAdd to retry queue", zap.Int("worker_id", workerID), zap.Time("ts", zaddStart))
err := scheduler.redis.ZAdd(ctx, retryQueueKey, redis.Z{
err := scheduler.redis.ZAdd(ctx, d.scheduler.redisPrefix+retryQueueKey, redis.Z{
Score: float64(nextRetry),
Member: updatedData,
}).Err()
Expand Down
Loading
Loading