diff --git a/README.md b/README.md index d9fb493..55d5967 100644 --- a/README.md +++ b/README.md @@ -117,29 +117,69 @@ Qhronos manages database schema changes using embedded migration files. You can ## API Usage See [API documentation](docs/api.md) for full details. -**Create an event:** -```sh -curl -X POST http://localhost:8080/events \ - -H 'Authorization: Bearer ' \ - -H 'Content-Type: application/json' \ - -d '{ - "name": "Daily Backup", - "start_time": "2024-03-20T00:00:00Z", - "webhook": "https://example.com/webhook", - "schedule": { - "frequency": "weekly", - "interval": 1, - "by_day": ["MO", "WE", "FR"] - }, - "tags": ["system:backup"] - }' +## Event Creation Example + +You can now specify an action for event delivery. The `action` field supports both webhook and websocket types. + +### Webhook Example +```json +{ + "name": "My Event", + "description": "A test event", + "start_time": "2025-01-01T00:00:00Z", + "action": { + "type": "webhook", + "params": { "url": "https://example.com/webhook" } + }, + "tags": ["api"] +} +``` + +### Websocket Example +```json +{ + "name": "Websocket Event", + "description": "A test event for websocket client", + "start_time": "2025-01-01T00:00:00Z", + "action": { + "type": "websocket", + "params": { "client_name": "client1" } + }, + "tags": ["api"] +} +``` + +### API Call Example +```json +{ + "name": "API Call Event", + "description": "A test event for apicall action", + "start_time": "2025-01-01T00:00:00Z", + "action": { + "type": "apicall", + "params": { + "method": "POST", + "url": "https://api.example.com/endpoint", + "headers": { "Authorization": "Bearer token", "Content-Type": "application/json" }, + "body": "{ \"foo\": \"bar\" }" + } + }, + "tags": ["api"] +} ``` -> **Note:** -> - For one-time events, omit the `schedule` field and provide `start_time`. -> - For recurring events, provide both `start_time` and a `schedule` object. -> - The `webhook` field is required (not `webhook_url`). -> - The `Authorization` header is required for all requests. +### Backward Compatibility + +The legacy `webhook` field is still supported for backward compatibility. If you provide `webhook`, it will be automatically mapped to the appropriate `action`. + +## Action System + +Qhronos uses an extensible action system for event delivery. Each event can specify an `action` object with a `type` and `params`. Supported types: +- `webhook`: Delivers the event to an HTTP endpoint. +- `websocket`: Delivers the event to a connected websocket client. +- `apicall`: Makes a generic HTTP request with custom method, headers, body, and url. + +The system is designed to be extensible for future action types. ## Schedule Parameter Tutorial diff --git a/design.md b/design.md index 7d1d81f..126e8da 100644 --- a/design.md +++ b/design.md @@ -344,3 +344,72 @@ Qhronos provides a WebSocket server to enable real-time event delivery for two t --- +## Event Model + +Events now use an extensible action system for delivery. The `action` field specifies how the event is delivered. + +### Event Table (simplified) +| Field | Type | Description | +|-------------- |-------------------- |---------------------------------------------| +| id | UUID | Primary key | +| name | TEXT | Event name | +| description | TEXT | Event description | +| start_time | TIMESTAMP | When the event is scheduled to start | +| action | JSONB | Action object (see below) | +| webhook | TEXT | (Deprecated, for backward compatibility) | +| ... | ... | ... | + +### Action Structure + +The `action` field is a JSON object: +```json +{ + "type": "webhook" | "websocket" | "apicall", + "params": { ... } +} +``` +- **type**: The action type. Supported: `webhook`, `websocket`, `apicall`. +- **params**: Parameters for the action type. + - For `webhook`: `{ "url": "https://..." }` + - For `websocket`: `{ "client_name": "client1" }` + - For `apicall`: `{ "method": "POST", "url": "https://...", "headers": { ... }, "body": "..." }` + +#### Example: Webhook Action +```json +{ + "type": "webhook", + "params": { "url": "https://example.com/webhook" } +} +``` + +#### Example: Websocket Action +```json +{ + "type": "websocket", + "params": { "client_name": "client1" } +} +``` + +#### Example: API Call Action +```json +{ + "type": "apicall", + "params": { + "method": "POST", + "url": "https://api.example.com/endpoint", + "headers": { "Authorization": "Bearer token", "Content-Type": "application/json" }, + "body": "{ \"foo\": \"bar\" }" + } +} +``` + +### Backward Compatibility +- The legacy `webhook` field is still supported for legacy clients. If provided, it is mapped to the appropriate `action`. + +## Dispatcher and Action System + +The dispatcher no longer switches directly on webhook/websocket. Instead, it delegates event delivery to the action system: +- The dispatcher uses an `ActionsManager` to execute the action specified in the event. +- Each action type (webhook, websocket, apicall) is registered with the manager and can be extended in the future. +- This design allows for easy addition of new action types (e.g., email, SMS, etc.) without changing the dispatcher logic. + diff --git a/internal/actions/apicall.go b/internal/actions/apicall.go new file mode 100644 index 0000000..a0dada3 --- /dev/null +++ b/internal/actions/apicall.go @@ -0,0 +1,45 @@ +package actions + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/feedloop/qhronos/internal/models" +) + +func NewAPICallExecutor(httpClient HTTPClient) ActionExecutor { + return func(ctx context.Context, event *models.Event, params json.RawMessage) error { + var apiParams models.ApicallActionParams + if err := json.Unmarshal(params, &apiParams); err != nil { + return fmt.Errorf("failed to unmarshal apicall params: %w", err) + } + if apiParams.URL == "" || apiParams.Method == "" { + return fmt.Errorf("apicall action requires both url and method") + } + var bodyReader *bytes.Reader + if len(apiParams.Body) > 0 { + bodyReader = bytes.NewReader(apiParams.Body) + } else { + bodyReader = bytes.NewReader([]byte{}) + } + req, err := http.NewRequestWithContext(ctx, apiParams.Method, apiParams.URL, bodyReader) + if err != nil { + return fmt.Errorf("failed to build apicall request: %w", err) + } + for k, v := range apiParams.Headers { + req.Header.Set(k, v) + } + resp, err := httpClient.Do(req) + if err != nil { + return fmt.Errorf("apicall request failed: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("apicall returned non-2xx status: %d", resp.StatusCode) + } + return nil + } +} diff --git a/internal/actions/apicall_test.go b/internal/actions/apicall_test.go new file mode 100644 index 0000000..a303da2 --- /dev/null +++ b/internal/actions/apicall_test.go @@ -0,0 +1,97 @@ +package actions + +import ( + "context" + "encoding/json" + "errors" + "io/ioutil" + "net/http" + "testing" + + "github.com/feedloop/qhronos/internal/models" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +type MockHTTPClient struct { + mock.Mock +} + +func (m *MockHTTPClient) Do(req *http.Request) (*http.Response, error) { + args := m.Called(req) + resp, _ := args.Get(0).(*http.Response) + return resp, args.Error(1) +} + +func TestAPICallExecutor_Success(t *testing.T) { + mockHTTP := new(MockHTTPClient) + executor := NewAPICallExecutor(mockHTTP) + params := models.ApicallActionParams{ + Method: "POST", + URL: "https://api.example.com/endpoint", + Headers: map[string]string{"Authorization": "Bearer token", "Content-Type": "application/json"}, + Body: json.RawMessage(`{"foo":"bar"}`), + } + paramsBytes, _ := json.Marshal(params) + mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return(&http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(nil), + }, nil) + event := &models.Event{ID: [16]byte{1}, Name: "API Call Event"} + err := executor(context.Background(), event, paramsBytes) + assert.NoError(t, err) + mockHTTP.AssertCalled(t, "Do", mock.AnythingOfType("*http.Request")) +} + +func TestAPICallExecutor_Non2xx(t *testing.T) { + mockHTTP := new(MockHTTPClient) + executor := NewAPICallExecutor(mockHTTP) + params := models.ApicallActionParams{ + Method: "POST", + URL: "https://api.example.com/endpoint", + Headers: map[string]string{}, + Body: json.RawMessage(`{"foo":"bar"}`), + } + paramsBytes, _ := json.Marshal(params) + mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return(&http.Response{ + StatusCode: 500, + Body: ioutil.NopCloser(nil), + }, nil) + event := &models.Event{ID: [16]byte{1}, Name: "API Call Event"} + err := executor(context.Background(), event, paramsBytes) + assert.Error(t, err) + assert.Contains(t, err.Error(), "non-2xx") +} + +func TestAPICallExecutor_NetworkError(t *testing.T) { + mockHTTP := new(MockHTTPClient) + executor := NewAPICallExecutor(mockHTTP) + params := models.ApicallActionParams{ + Method: "POST", + URL: "https://api.example.com/endpoint", + Headers: map[string]string{}, + Body: json.RawMessage(`{"foo":"bar"}`), + } + paramsBytes, _ := json.Marshal(params) + mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return((*http.Response)(nil), errors.New("network error")) + event := &models.Event{ID: [16]byte{1}, Name: "API Call Event"} + err := executor(context.Background(), event, paramsBytes) + assert.Error(t, err) + assert.Contains(t, err.Error(), "network error") +} + +func TestAPICallExecutor_MissingParams(t *testing.T) { + mockHTTP := new(MockHTTPClient) + executor := NewAPICallExecutor(mockHTTP) + params := models.ApicallActionParams{ + Method: "", + URL: "", + Headers: map[string]string{}, + Body: json.RawMessage(`{"foo":"bar"}`), + } + paramsBytes, _ := json.Marshal(params) + event := &models.Event{ID: [16]byte{1}, Name: "API Call Event"} + err := executor(context.Background(), event, paramsBytes) + assert.Error(t, err) + assert.Contains(t, err.Error(), "requires both url and method") +} diff --git a/internal/actions/manager.go b/internal/actions/manager.go new file mode 100644 index 0000000..0cdedec --- /dev/null +++ b/internal/actions/manager.go @@ -0,0 +1,37 @@ +package actions + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/feedloop/qhronos/internal/models" +) + +type ActionExecutor func(ctx context.Context, event *models.Event, params json.RawMessage) error + +type ActionsManager struct { + executors map[models.ActionType]ActionExecutor +} + +func NewActionsManager() *ActionsManager { + return &ActionsManager{ + executors: make(map[models.ActionType]ActionExecutor), + } +} + +func (am *ActionsManager) Register(actionType models.ActionType, executor ActionExecutor) { + am.executors[actionType] = executor +} + +func (am *ActionsManager) Execute(ctx context.Context, event *models.Event) error { + if event.Action == nil { + return errors.New("event action is nil") + } + exec, ok := am.executors[event.Action.Type] + if !ok { + return fmt.Errorf("no executor registered for action type: %s", event.Action.Type) + } + return exec(ctx, event, event.Action.Params) +} diff --git a/internal/actions/webhook.go b/internal/actions/webhook.go new file mode 100644 index 0000000..d0dea57 --- /dev/null +++ b/internal/actions/webhook.go @@ -0,0 +1,68 @@ +package actions + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/feedloop/qhronos/internal/models" + "github.com/feedloop/qhronos/internal/services" +) + +type HTTPClient interface { + Do(req *http.Request) (*http.Response, error) +} + +func NewWebhookExecutor(hmacService *services.HMACService, httpClient HTTPClient) ActionExecutor { + return func(ctx context.Context, event *models.Event, params json.RawMessage) error { + var whParams models.WebhookActionParams + if err := json.Unmarshal(params, &whParams); err != nil { + return fmt.Errorf("failed to unmarshal webhook params: %w", err) + } + if whParams.URL == "" { + return fmt.Errorf("webhook URL is empty in action params") + } + + payload := map[string]interface{}{ + "event_id": event.ID, + "name": event.Name, + "description": event.Description, + "scheduled_at": event.StartTime, + "metadata": event.Metadata, + "tags": event.Tags, + } + jsonPayload, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("error marshaling webhook payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", whParams.URL, bytes.NewBuffer(jsonPayload)) + if err != nil { + return fmt.Errorf("error creating webhook request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + if hmacService != nil { + secret := event.HMACSecret + if secret == nil || *secret == "" { + sig := hmacService.SignPayload(jsonPayload, "") + req.Header.Set("X-Qhronos-Signature", sig) + } else { + sig := hmacService.SignPayload(jsonPayload, *secret) + req.Header.Set("X-Qhronos-Signature", sig) + } + } + + resp, err := httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return fmt.Errorf("received non-2xx status code: %d", resp.StatusCode) + } + return nil + } +} diff --git a/internal/actions/websocket.go b/internal/actions/websocket.go new file mode 100644 index 0000000..d146601 --- /dev/null +++ b/internal/actions/websocket.go @@ -0,0 +1,40 @@ +package actions + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/feedloop/qhronos/internal/models" +) + +type ClientNotifier interface { + DispatchToClient(clientID string, payload []byte) error +} + +func NewWebsocketExecutor(clientNotifier ClientNotifier) ActionExecutor { + return func(ctx context.Context, event *models.Event, params json.RawMessage) error { + var wsParams models.WebsocketActionParams + if err := json.Unmarshal(params, &wsParams); err != nil { + return fmt.Errorf("failed to unmarshal websocket params: %w", err) + } + if wsParams.ClientName == "" { + return fmt.Errorf("client_name is empty in websocket action params") + } + + payload := map[string]interface{}{ + "event_id": event.ID, + "name": event.Name, + "description": event.Description, + "scheduled_at": event.StartTime, + "metadata": event.Metadata, + "tags": event.Tags, + "type": "event", + } + jsonPayload, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("error marshaling websocket payload: %w", err) + } + return clientNotifier.DispatchToClient(wsParams.ClientName, jsonPayload) + } +} diff --git a/internal/handlers/event_handler.go b/internal/handlers/event_handler.go index c6a13da..e3dd011 100644 --- a/internal/handlers/event_handler.go +++ b/internal/handlers/event_handler.go @@ -2,6 +2,8 @@ package handlers import ( "context" + "encoding/json" + "fmt" "net/http" "strings" "time" @@ -26,33 +28,106 @@ func NewEventHandler(repo *repository.EventRepository, expander *scheduler.Expan return &EventHandler{repo: repo, expander: expander} } +// deriveActionFromRequest determines the Action and webhook string based on request fields. +// It prioritizes reqAction. If reqAction is nil, it derives from reqWebhook. +// Returns the derived Action, the canonical webhook string, and an error if validation fails. +func deriveActionFromRequest(reqWebhook *string, reqAction *models.Action) (*models.Action, string, error) { + var finalAction *models.Action + var finalWebhookString string + var err error + + if reqAction != nil { + finalAction = reqAction + // Validate provided action + if finalAction.Type == "" { + return nil, "", fmt.Errorf("action type is required when action object is provided") + } + if finalAction.Params == nil || len(finalAction.Params) == 0 { + return nil, "", fmt.Errorf("action params are required when action object is provided") + } + + switch finalAction.Type { + case models.ActionTypeWebhook: + var whParams models.WebhookActionParams + if err := json.Unmarshal(finalAction.Params, &whParams); err != nil { + return nil, "", fmt.Errorf("invalid params for webhook action: %w", err) + } + if whParams.URL == "" { + return nil, "", fmt.Errorf("URL is required for webhook action params") + } + finalWebhookString = whParams.URL + case models.ActionTypeWebsocket: + var wsParams models.WebsocketActionParams + if err := json.Unmarshal(finalAction.Params, &wsParams); err != nil { + return nil, "", fmt.Errorf("invalid params for websocket action: %w", err) + } + if wsParams.ClientName == "" { + return nil, "", fmt.Errorf("client_name is required for websocket action params") + } + finalWebhookString = "q:" + wsParams.ClientName + default: + return nil, "", fmt.Errorf("unknown action type: %s", finalAction.Type) + } + } else if reqWebhook != nil && *reqWebhook != "" { + finalWebhookString = *reqWebhook + var paramsBytes []byte + var actionType models.ActionType + + if len(finalWebhookString) > 2 && finalWebhookString[:2] == "q:" { + actionType = models.ActionTypeWebsocket + wsParams := models.WebsocketActionParams{ClientName: finalWebhookString[2:]} + paramsBytes, err = json.Marshal(wsParams) + } else { + actionType = models.ActionTypeWebhook + whParams := models.WebhookActionParams{URL: finalWebhookString} + paramsBytes, err = json.Marshal(whParams) + } + if err != nil { + return nil, "", fmt.Errorf("failed to marshal derived action params: %w", err) + } + finalAction = &models.Action{ + Type: actionType, + Params: paramsBytes, + } + } else { + // Neither Action nor a valid Webhook string was provided. + return nil, "", fmt.Errorf("either action object or a non-empty webhook string is required") + } + + return finalAction, finalWebhookString, nil +} + func (h *EventHandler) CreateEvent(c *gin.Context) { logger := c.MustGet(middleware.LoggerKey).(*zap.Logger) - var req models.CreateEventRequest + var req models.EventCreateRequest if err := c.ShouldBindJSON(&req); err != nil { - logger.Warn("Invalid event creation request", zap.Error(err)) - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + logger.Warn("Invalid event creation request format", zap.Error(err)) + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid JSON format: " + err.Error()}) return } - // Validate required fields - if req.Name == "" { + // Validate required fields (name and start_time are validated by binding) + if req.Name == "" { // Redundant if binding:required works, but good for clarity c.JSON(http.StatusBadRequest, gin.H{"error": "name is required"}) return } - if req.Webhook == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "webhook is required"}) - return - } - if req.StartTime.IsZero() { - c.JSON(http.StatusBadRequest, gin.H{"error": "start_time is required"}) + // startTime is validated by binding:required + + derivedAction, webhookString, err := deriveActionFromRequest(req.Webhook, req.Action) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } + // Default metadata to empty object if not provided - if len(req.Metadata) == 0 { - emptyJSON := datatypes.JSON([]byte("{}")) - req.Metadata = emptyJSON + // req.Metadata is json.RawMessage from EventCreateRequest + var metadataToStore datatypes.JSON + if len(req.Metadata) > 0 { + metadataToStore = datatypes.JSON(req.Metadata) + } else { + metadataToStore = datatypes.JSON("{}") } + // Default tags to empty array if not provided if req.Tags == nil { req.Tags = []string{} @@ -66,12 +141,18 @@ func (h *EventHandler) CreateEvent(c *gin.Context) { event := &models.Event{ Name: req.Name, - Description: req.Description, + Description: "", StartTime: req.StartTime, - Webhook: req.Webhook, - Metadata: req.Metadata, + Webhook: webhookString, // Use the derived webhook string + Action: derivedAction, // Use the derived action + Metadata: metadataToStore, Schedule: req.Schedule, Tags: pq.StringArray(req.Tags), + HMACSecret: req.HMACSecret, + } + + if req.Description != nil { + event.Description = *req.Description } if err := h.repo.Create(c, event); err != nil { @@ -95,7 +176,7 @@ func (h *EventHandler) CreateEvent(c *gin.Context) { }() } - c.JSON(http.StatusCreated, event) + c.JSON(http.StatusCreated, event.ToEventResponse()) // Use ToEventResponse } func (h *EventHandler) GetEvent(c *gin.Context) { @@ -119,28 +200,34 @@ func (h *EventHandler) GetEvent(c *gin.Context) { } func (h *EventHandler) UpdateEvent(c *gin.Context) { - id, err := uuid.Parse(c.Param("id")) + logger := c.MustGet(middleware.LoggerKey).(*zap.Logger) + idStr := c.Param("id") + id, err := uuid.Parse(idStr) if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "invalid event ID"}) + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid event ID format"}) return } - var req models.UpdateEventRequest + var req models.EventUpdateRequest if err := c.ShouldBindJSON(&req); err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + logger.Warn("Invalid event update request format", zap.Error(err)) + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid JSON format: " + err.Error()}) return } event, err := h.repo.GetByID(c, id) if err != nil { - if err == models.ErrEventNotFound { - c.JSON(http.StatusNotFound, gin.H{"error": "event not found"}) - return - } - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + // Assuming GetByID returns sql.ErrNoRows which is converted to nil by repo + // or a specific error like models.ErrEventNotFound + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to fetch event for update"}) + return + } + if event == nil { + c.JSON(http.StatusNotFound, gin.H{"error": "event not found"}) return } + // Apply updates from request if req.Name != nil { event.Name = *req.Name } @@ -150,12 +237,31 @@ func (h *EventHandler) UpdateEvent(c *gin.Context) { if req.StartTime != nil { event.StartTime = *req.StartTime } - if req.Webhook != nil { - event.Webhook = *req.Webhook - } - if req.Metadata != nil { - event.Metadata = req.Metadata + + // Handle Action and Webhook update logic + // Priority: req.Action > req.Webhook > existing value + if req.Action != nil { + derivedAction, derivedWebhookString, deriveErr := deriveActionFromRequest(nil, req.Action) + if deriveErr != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid action in update: " + deriveErr.Error()}) + return + } + event.Action = derivedAction + event.Webhook = derivedWebhookString + } else if req.Webhook != nil { // Only consider req.Webhook if req.Action was not provided + derivedAction, derivedWebhookString, deriveErr := deriveActionFromRequest(req.Webhook, nil) + if deriveErr != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "Invalid webhook string in update: " + deriveErr.Error()}) + return + } + event.Action = derivedAction + event.Webhook = derivedWebhookString + } // If neither is provided, event.Action and event.Webhook remain as fetched + + if req.Metadata != nil { // req.Metadata is json.RawMessage + event.Metadata = datatypes.JSON(req.Metadata) } + if req.Schedule != nil { if !isValidSchedule(req.Schedule) { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid schedule format"}) @@ -167,31 +273,42 @@ func (h *EventHandler) UpdateEvent(c *gin.Context) { event.Tags = pq.StringArray(req.Tags) } if req.Status != nil { - event.Status = models.EventStatus(*req.Status) + event.Status = *req.Status + } + if req.HMACSecret != nil { + event.HMACSecret = req.HMACSecret } if err := h.repo.Update(c, event); err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + logger.Error("Failed to update event", zap.String("event_id", idStr), zap.Error(err)) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update event"}) return } - // Remove all scheduled occurrences for this event from Redis + // Remove all scheduled occurrences for this event from Redis and re-expand + // This is important if start_time or schedule changed + logger.Info("Event updated, re-expanding occurrences", zap.String("event_id", event.ID.String())) err = h.repo.RemoveEventOccurrencesFromRedis(c, event.ID) if err != nil { - // Log but do not fail the request - zap.L().Warn("Failed to remove old scheduled occurrences from Redis", zap.String("event_id", event.ID.String()), zap.Error(err)) + logger.Warn("Failed to remove old scheduled occurrences from Redis after update", zap.String("event_id", event.ID.String()), zap.Error(err)) } - - // Re-expand the event to schedule new occurrences go func() { + // Use a new context for background task + bgCtx := context.Background() if event.Schedule == nil { - _ = h.expander.ExpandNonRecurringEvent(context.Background(), event) + err := h.expander.ExpandNonRecurringEvent(bgCtx, event) + if err != nil { + logger.Error("Error re-expanding non-recurring event after update", zap.String("event_id", event.ID.String()), zap.Error(err)) + } } else { - _ = h.expander.ExpandRecurringEvent(context.Background(), event) + err := h.expander.ExpandRecurringEvent(bgCtx, event) + if err != nil { + logger.Error("Error re-expanding recurring event after update", zap.String("event_id", event.ID.String()), zap.Error(err)) + } } }() - c.JSON(http.StatusOK, event) + c.JSON(http.StatusOK, event.ToEventResponse()) // Use ToEventResponse } func (h *EventHandler) DeleteEvent(c *gin.Context) { diff --git a/internal/handlers/event_handler_test.go b/internal/handlers/event_handler_test.go index df05215..1dd208d 100644 --- a/internal/handlers/event_handler_test.go +++ b/internal/handlers/event_handler_test.go @@ -26,6 +26,8 @@ func timePtr(t time.Time) *time.Time { return &t } +func ptr[T any](v T) *T { return &v } + func TestEventHandler(t *testing.T) { db := testutils.TestDB(t) logger := zap.NewNop() @@ -69,7 +71,7 @@ func TestEventHandler(t *testing.T) { cleanup() req := models.CreateEventRequest{ Name: "Test Event", - Description: "Test Description", + Description: ptr("Test Description"), StartTime: time.Now(), Webhook: "https://example.com/webhook", Metadata: datatypes.JSON([]byte(`{"key": "value"}`)), @@ -94,7 +96,7 @@ func TestEventHandler(t *testing.T) { err = json.Unmarshal(w.Body.Bytes(), &response) require.NoError(t, err) assert.Equal(t, req.Name, response.Name) - assert.Equal(t, req.Description, response.Description) + assert.Equal(t, *req.Description, response.Description) assert.Equal(t, req.Webhook, response.Webhook) assertJSONEqual(t, req.Metadata, response.Metadata) assert.Equal(t, req.Schedule, response.Schedule) @@ -164,16 +166,13 @@ func TestEventHandler(t *testing.T) { require.NoError(t, err) updateReq := models.UpdateEventRequest{ - Name: stringPtr("Updated Event"), - Description: stringPtr("Updated Description"), - Webhook: stringPtr("https://example.com/updated"), + Name: ptr("Updated Event"), + Description: ptr("Updated Description"), + Webhook: ptr("https://example.com/updated"), Metadata: datatypes.JSON([]byte(`{"key": "updated"}`)), - Schedule: &models.ScheduleConfig{ - Frequency: "daily", - Interval: 1, - }, - Tags: []string{"updated"}, - Status: stringPtr(string(models.EventStatusInactive)), + Schedule: &models.ScheduleConfig{Frequency: "daily", Interval: 1}, + Tags: []string{"updated"}, + Status: ptr(string(models.EventStatusInactive)), } body, err := json.Marshal(updateReq) require.NoError(t, err) @@ -188,12 +187,13 @@ func TestEventHandler(t *testing.T) { var response models.Event err = json.Unmarshal(w.Body.Bytes(), &response) require.NoError(t, err) - assert.Equal(t, *updateReq.Name, response.Name) - assert.Equal(t, *updateReq.Description, response.Description) - assert.Equal(t, *updateReq.Webhook, response.Webhook) - assertJSONEqual(t, updateReq.Metadata, response.Metadata) - assert.Equal(t, updateReq.Schedule, response.Schedule) - assert.ElementsMatch(t, updateReq.Tags, response.Tags) + assert.Equal(t, "Updated Event", response.Name) + assert.Equal(t, "Updated Description", response.Description) + assert.Equal(t, "https://example.com/updated", response.Webhook) + assertJSONEqual(t, datatypes.JSON([]byte(`{"key": "updated"}`)), response.Metadata) + require.NotNil(t, response.Schedule) + assert.Equal(t, models.ScheduleConfig{Frequency: "daily", Interval: 1}, *response.Schedule) + assert.ElementsMatch(t, []string{"updated"}, response.Tags) assert.Equal(t, models.EventStatusInactive, response.Status) }) @@ -288,7 +288,7 @@ func TestEventHandler(t *testing.T) { cleanup() req := models.CreateEventRequest{ Name: "Test Event", - Description: "Test Description", + Description: ptr("Test Description"), StartTime: time.Now(), Webhook: "https://example.com/webhook", Metadata: datatypes.JSON([]byte(`{"key": "value"}`)), @@ -550,10 +550,105 @@ func TestEventHandler(t *testing.T) { router.ServeHTTP(w, r) assert.Equal(t, http.StatusNotFound, w.Code) }) -} -func stringPtr(s string) *string { - return &s + t.Run("Create Event with action:webhook", func(t *testing.T) { + cleanup() + body := `{ + "name": "Action Webhook Event", + "description": "Test event with webhook action", + "start_time": "2025-01-01T00:00:00Z", + "action": { + "type": "webhook", + "params": { "url": "https://example.com/webhook" } + }, + "tags": ["api"] + }` + w := httptest.NewRecorder() + r := httptest.NewRequest("POST", "/events", bytes.NewBufferString(body)) + r.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, r) + assert.Equal(t, http.StatusCreated, w.Code) + var resp map[string]interface{} + err := json.Unmarshal(w.Body.Bytes(), &resp) + assert.NoError(t, err) + assert.Equal(t, "Action Webhook Event", resp["name"]) + assert.Equal(t, "Test event with webhook action", resp["description"]) + action := resp["action"].(map[string]interface{}) + assert.Equal(t, "webhook", action["type"]) + params := action["params"].(map[string]interface{}) + assert.Equal(t, "https://example.com/webhook", params["url"]) + }) + + t.Run("Create Event with action:websocket", func(t *testing.T) { + cleanup() + body := `{ + "name": "Action Websocket Event", + "description": "Test event with websocket action", + "start_time": "2025-01-01T00:00:00Z", + "action": { + "type": "websocket", + "params": { "client_name": "client1" } + }, + "tags": ["api"] + }` + w := httptest.NewRecorder() + r := httptest.NewRequest("POST", "/events", bytes.NewBufferString(body)) + r.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, r) + assert.Equal(t, http.StatusCreated, w.Code) + var resp map[string]interface{} + err := json.Unmarshal(w.Body.Bytes(), &resp) + assert.NoError(t, err) + assert.Equal(t, "Action Websocket Event", resp["name"]) + assert.Equal(t, "Test event with websocket action", resp["description"]) + action := resp["action"].(map[string]interface{}) + assert.Equal(t, "websocket", action["type"]) + params := action["params"].(map[string]interface{}) + assert.Equal(t, "client1", params["client_name"]) + }) + + t.Run("Update Event with new action", func(t *testing.T) { + cleanup() + // Create initial event + body := `{ + "name": "Update Action Event", + "description": "Initial event", + "start_time": "2025-01-01T00:00:00Z", + "action": { + "type": "webhook", + "params": { "url": "https://example.com/old" } + }, + "tags": ["api"] + }` + w := httptest.NewRecorder() + r := httptest.NewRequest("POST", "/events", bytes.NewBufferString(body)) + r.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w, r) + assert.Equal(t, http.StatusCreated, w.Code) + var resp map[string]interface{} + err := json.Unmarshal(w.Body.Bytes(), &resp) + assert.NoError(t, err) + id := resp["id"].(string) + // Update with new action + updateBody := `{ + "action": { + "type": "websocket", + "params": { "client_name": "client2" } + } + }` + w2 := httptest.NewRecorder() + r2 := httptest.NewRequest("PUT", "/events/"+id, bytes.NewBufferString(updateBody)) + r2.Header.Set("Content-Type", "application/json") + router.ServeHTTP(w2, r2) + assert.Equal(t, http.StatusOK, w2.Code) + var resp2 map[string]interface{} + err = json.Unmarshal(w2.Body.Bytes(), &resp2) + assert.NoError(t, err) + action := resp2["action"].(map[string]interface{}) + assert.Equal(t, "websocket", action["type"]) + params := action["params"].(map[string]interface{}) + assert.Equal(t, "client2", params["client_name"]) + }) } func assertJSONEqual(t *testing.T, expected, actual datatypes.JSON) { diff --git a/internal/models/action.go b/internal/models/action.go new file mode 100644 index 0000000..6ac3c41 --- /dev/null +++ b/internal/models/action.go @@ -0,0 +1,97 @@ +package models + +import ( + "database/sql/driver" + "encoding/json" + "fmt" +) + +// ActionType defines the type of action to be performed. +type ActionType string + +const ( + ActionTypeWebhook ActionType = "webhook" + ActionTypeWebsocket ActionType = "websocket" + ActionTypeAPICall ActionType = "apicall" + // ActionTypeUnknown represents an unsupported or undefined action type. + ActionTypeUnknown ActionType = "unknown" +) + +// ActionParams is an interface for action-specific parameters. +// We use json.RawMessage for Params in the Action struct to allow flexible +// marshalling/unmarshalling of type-specific parameter structs. +type ActionParams interface{} + +// WebhookActionParams contains parameters specific to webhook actions. +type WebhookActionParams struct { + URL string `json:"url"` + // Future enhancements: + // Method string `json:"method,omitempty"` + // Headers map[string]string `json:"headers,omitempty"` + // Body string `json:"body,omitempty"` // Template support could be added +} + +// WebsocketActionParams contains parameters specific to websocket actions. +// This refers to dispatching to a client connected to Qhronos's own WebSocket server. +type WebsocketActionParams struct { + ClientName string `json:"client_name"` +} + +// ApicallActionParams defines parameters for the apicall action type +type ApicallActionParams struct { + Method string `json:"method"` + Headers map[string]string `json:"headers,omitempty"` + Body json.RawMessage `json:"body,omitempty"` + URL string `json:"url"` +} + +// Action represents a generic action to be performed for an event. +type Action struct { + Type ActionType `json:"type"` + Params json.RawMessage `json:"params"` // Holds WebhookActionParams or WebsocketActionParams as JSON +} + +// Value implements the driver.Valuer interface for database serialization. +func (a *Action) Value() (driver.Value, error) { + if a == nil { + return nil, nil + } + return json.Marshal(a) +} + +// Scan implements the sql.Scanner interface for database deserialization. +func (a *Action) Scan(value interface{}) error { + if value == nil { + *a = Action{} + return nil + } + b, ok := value.([]byte) + if !ok { + return fmt.Errorf("type assertion to []byte failed for Action scan") + } + return json.Unmarshal(b, a) +} + +// GetWebhookParams attempts to unmarshal the action's params into WebhookActionParams. +func (a *Action) GetWebhookParams() (*WebhookActionParams, error) { + if a.Type != ActionTypeWebhook { + return nil, fmt.Errorf("action type is not '%s'", ActionTypeWebhook) + } + var params WebhookActionParams + if err := json.Unmarshal(a.Params, ¶ms); err != nil { + return nil, fmt.Errorf("failed to unmarshal webhook params: %w", err) + } + return ¶ms, nil +} + +// GetWebsocketParams attempts to unmarshal the action's params into WebsocketActionParams. +func (a *Action) GetWebsocketParams() (*WebsocketActionParams, error) { + if a.Type != ActionTypeWebsocket { + return nil, fmt.Errorf("action type is not '%s'", ActionTypeWebsocket) + } + var params WebsocketActionParams + if err := json.Unmarshal(a.Params, ¶ms); err != nil { + return nil, fmt.Errorf("failed to unmarshal websocket params: %w", err) + } + return ¶ms, nil +} diff --git a/internal/models/event.go b/internal/models/event.go index 442421e..44ab1c8 100644 --- a/internal/models/event.go +++ b/internal/models/event.go @@ -60,21 +60,22 @@ func (s *ScheduleConfig) Scan(value interface{}) error { type Event struct { ID uuid.UUID `json:"id" db:"id"` Name string `json:"name" db:"name"` - Description string `json:"description" db:"description"` + Description string `json:"description" db:"description,omitempty"` StartTime time.Time `json:"start_time" db:"start_time"` Webhook string `json:"webhook" db:"webhook"` - Metadata datatypes.JSON `json:"metadata" db:"metadata"` - Schedule *ScheduleConfig `json:"schedule,omitempty" db:"schedule"` - Tags pq.StringArray `json:"tags" db:"tags"` + Action *Action `json:"action" db:"action,omitempty"` + Metadata datatypes.JSON `json:"metadata" db:"metadata,omitempty"` + Schedule *ScheduleConfig `json:"schedule" db:"schedule,omitempty"` + Tags pq.StringArray `json:"tags" db:"tags,omitempty"` Status EventStatus `json:"status" db:"status"` - HMACSecret *string `json:"hmac_secret,omitempty" db:"hmac_secret"` + HMACSecret *string `json:"hmac_secret" db:"hmac_secret,omitempty"` CreatedAt time.Time `json:"created_at" db:"created_at"` - UpdatedAt *time.Time `json:"updated_at,omitempty" db:"updated_at"` + UpdatedAt *time.Time `json:"updated_at" db:"updated_at,omitempty"` } type CreateEventRequest struct { Name string `json:"name" validate:"required"` - Description string `json:"description"` + Description *string `json:"description,omitempty"` StartTime time.Time `json:"start_time" validate:"required"` Webhook string `json:"webhook" validate:"required"` Metadata datatypes.JSON `json:"metadata" validate:"required"` @@ -101,3 +102,74 @@ type EventFilter struct { StartTimeBefore *time.Time StartTimeAfter *time.Time } + +// TableName specifies the database table name for GORM (if used) +// For sqlx, this is typically not needed in the struct itself. +/*func (Event) TableName() string { + return "events" +}*/ + +// ToEventResponse converts an Event model to an EventResponse for API output. +// This can be expanded to include more fields or transform data as needed. +func (e *Event) ToEventResponse() EventResponse { + return EventResponse{ + ID: e.ID, + Name: e.Name, + Description: e.Description, + StartTime: e.StartTime, + Webhook: e.Webhook, + Action: e.Action, + Metadata: e.Metadata, + Schedule: e.Schedule, + Tags: e.Tags, + Status: string(e.Status), + HMACSecret: e.HMACSecret, + CreatedAt: e.CreatedAt, + UpdatedAt: e.UpdatedAt, + } +} + +// EventCreateRequest defines the structure for creating a new event +type EventCreateRequest struct { + Name string `json:"name" binding:"required"` + Description *string `json:"description,omitempty"` + StartTime time.Time `json:"start_time" binding:"required"` + Webhook *string `json:"webhook,omitempty"` // Kept for backward compatibility + Action *Action `json:"action,omitempty"` // New action field + Metadata json.RawMessage `json:"metadata,omitempty"` // Use json.RawMessage for flexible metadata + Schedule *ScheduleConfig `json:"schedule,omitempty"` + Tags []string `json:"tags,omitempty"` + HMACSecret *string `json:"hmac_secret,omitempty"` +} + +// EventUpdateRequest defines the structure for updating an existing event +// All fields are optional, so use pointers +type EventUpdateRequest struct { + Name *string `json:"name,omitempty"` + Description *string `json:"description,omitempty"` + StartTime *time.Time `json:"start_time,omitempty"` + Webhook *string `json:"webhook,omitempty"` // Kept for backward compatibility + Action *Action `json:"action,omitempty"` // New action field + Metadata json.RawMessage `json:"metadata,omitempty"` // Use json.RawMessage for flexible metadata + Schedule *ScheduleConfig `json:"schedule,omitempty"` + Tags []string `json:"tags,omitempty"` + Status *EventStatus `json:"status,omitempty"` + HMACSecret *string `json:"hmac_secret,omitempty"` +} + +// EventResponse defines the structure for API responses for an event +type EventResponse struct { + ID uuid.UUID `json:"id"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + StartTime time.Time `json:"start_time"` + Webhook string `json:"webhook"` + Action *Action `json:"action,omitempty"` + Metadata datatypes.JSON `json:"metadata,omitempty"` + Schedule *ScheduleConfig `json:"schedule,omitempty"` + Tags pq.StringArray `json:"tags,omitempty"` + Status string `json:"status"` + HMACSecret *string `json:"hmac_secret,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt *time.Time `json:"updated_at,omitempty"` +} diff --git a/internal/repository/event_repository.go b/internal/repository/event_repository.go index dca178b..32eecd2 100644 --- a/internal/repository/event_repository.go +++ b/internal/repository/event_repository.go @@ -30,10 +30,70 @@ func timePtr(t time.Time) *time.Time { return &t } +// deriveAndSetAction populates the event.Action field based on event.Webhook +// if Action is not already set. It also ensures Webhook is consistent if Action is set. +func (r *EventRepository) deriveAndSetAction(event *models.Event) error { + // Always re-derive Action from Webhook if Webhook is set + if event.Webhook != "" { + var paramsBytes []byte + var err error + var actionType models.ActionType + + if len(event.Webhook) > 2 && event.Webhook[:2] == "q:" { + actionType = models.ActionTypeWebsocket + wsParams := models.WebsocketActionParams{ClientName: event.Webhook[2:]} + paramsBytes, err = json.Marshal(wsParams) + } else { + actionType = models.ActionTypeWebhook + whParams := models.WebhookActionParams{URL: event.Webhook} + paramsBytes, err = json.Marshal(whParams) + } + + if err != nil { + return fmt.Errorf("failed to marshal action params: %w", err) + } + event.Action = &models.Action{ + Type: actionType, + Params: paramsBytes, + } + } else if event.Action != nil && event.Webhook == "" { + r.populateWebhookFromAction(event) + } + return nil +} + +// populateWebhookFromAction sets the event.Webhook string based on event.Action. +// This is for backward compatibility for code that might still read the Webhook field. +func (r *EventRepository) populateWebhookFromAction(event *models.Event) { + if event.Action == nil { + return + } + switch event.Action.Type { + case models.ActionTypeWebhook: + params, err := event.Action.GetWebhookParams() + if err != nil { + r.logger.Error("Failed to get webhook params for populating Webhook string", zap.String("event_id", event.ID.String()), zap.Error(err)) + return + } + event.Webhook = params.URL + case models.ActionTypeWebsocket: + params, err := event.Action.GetWebsocketParams() + if err != nil { + r.logger.Error("Failed to get websocket params for populating Webhook string", zap.String("event_id", event.ID.String()), zap.Error(err)) + return + } + event.Webhook = "q:" + params.ClientName + default: + // If action type is unknown or params are malformed, Webhook might remain as it is or be empty. + // Consider if event.Webhook should be cleared here if action type is unknown. + r.logger.Warn("Unknown action type for populating Webhook string", zap.String("event_id", event.ID.String()), zap.String("action_type", string(event.Action.Type))) + } +} + func (r *EventRepository) Create(ctx context.Context, event *models.Event) error { query := ` - INSERT INTO events (id, name, description, schedule, start_time, metadata, webhook, tags, status, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + INSERT INTO events (id, name, description, schedule, start_time, metadata, webhook, tags, status, created_at, updated_at, action) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING id` now := time.Now() @@ -46,6 +106,10 @@ func (r *EventRepository) Create(ctx context.Context, event *models.Event) error event.Status = models.EventStatusActive } + if err := r.deriveAndSetAction(event); err != nil { + return fmt.Errorf("error deriving action for event: %w", err) + } + err := r.db.QueryRowContext(ctx, query, event.ID, event.Name, @@ -53,11 +117,12 @@ func (r *EventRepository) Create(ctx context.Context, event *models.Event) error event.Schedule, event.StartTime, event.Metadata, - event.Webhook, + event.Webhook, // Still pass webhook for now, migration will handle old data / or ensure deriveAndSetAction fills it if action exists event.Tags, event.Status, event.CreatedAt, event.UpdatedAt, + event.Action, ).Scan(&event.ID) if err != nil { @@ -69,7 +134,7 @@ func (r *EventRepository) Create(ctx context.Context, event *models.Event) error func (r *EventRepository) GetByID(ctx context.Context, id uuid.UUID) (*models.Event, error) { query := ` - SELECT id, name, description, schedule, start_time, metadata, webhook, tags, status, created_at, updated_at + SELECT id, name, description, schedule, start_time, metadata, webhook, tags, status, created_at, updated_at, action FROM events WHERE id = $1` @@ -81,13 +146,13 @@ func (r *EventRepository) GetByID(ctx context.Context, id uuid.UUID) (*models.Ev if err != nil { return nil, fmt.Errorf("error getting event: %w", err) } - + r.populateWebhookFromAction(&event) return &event, nil } func (r *EventRepository) List(ctx context.Context) ([]models.Event, error) { query := ` - SELECT id, name, description, schedule, start_time, metadata, webhook, tags, status, created_at, updated_at + SELECT id, name, description, schedule, start_time, metadata, webhook, tags, status, created_at, updated_at, action FROM events ORDER BY created_at DESC` @@ -97,18 +162,26 @@ func (r *EventRepository) List(ctx context.Context) ([]models.Event, error) { return nil, fmt.Errorf("error listing events: %w", err) } + for i := range events { + r.populateWebhookFromAction(&events[i]) + } + return events, nil } func (r *EventRepository) Update(ctx context.Context, event *models.Event) error { query := ` UPDATE events - SET name = $1, description = $2, schedule = $3, start_time = $4, metadata = $5, webhook = $6, tags = $7, status = $8, updated_at = $9 - WHERE id = $10 + SET name = $1, description = $2, schedule = $3, start_time = $4, metadata = $5, webhook = $6, tags = $7, status = $8, updated_at = $9, action = $10 + WHERE id = $11 RETURNING id` event.UpdatedAt = timePtr(time.Now()) + if err := r.deriveAndSetAction(event); err != nil { + return fmt.Errorf("error deriving action for event update: %w", err) + } + result, err := r.db.ExecContext(ctx, query, event.Name, event.Description, @@ -119,6 +192,7 @@ func (r *EventRepository) Update(ctx context.Context, event *models.Event) error event.Tags, event.Status, event.UpdatedAt, + event.Action, event.ID, ) @@ -202,7 +276,7 @@ func (r *EventRepository) RemoveEventOccurrencesFromRedis(ctx context.Context, e func (r *EventRepository) ListByTags(ctx context.Context, tags []string) ([]*models.Event, error) { query := ` - SELECT id, name, description, schedule, start_time, metadata, webhook, tags, status, created_at, updated_at + SELECT id, name, description, schedule, start_time, metadata, webhook, tags, status, created_at, updated_at, action FROM events WHERE tags && $1 ORDER BY created_at DESC @@ -214,6 +288,12 @@ func (r *EventRepository) ListByTags(ctx context.Context, tags []string) ([]*mod return nil, fmt.Errorf("error listing events by tags: %w", err) } + for i := range events { + if events[i] != nil { + r.populateWebhookFromAction(events[i]) + } + } + return events, nil } @@ -246,7 +326,7 @@ func (r *EventRepository) DeleteOldOccurrences(ctx context.Context, cutoff time. func (r *EventRepository) ListActive(ctx context.Context) ([]*models.Event, error) { query := ` - SELECT id, name, description, schedule, start_time, metadata, webhook, tags, status, created_at, updated_at + SELECT id, name, description, schedule, start_time, metadata, webhook, tags, status, created_at, updated_at, action FROM events WHERE status = $1 ORDER BY created_at DESC` @@ -257,6 +337,12 @@ func (r *EventRepository) ListActive(ctx context.Context) ([]*models.Event, erro return nil, fmt.Errorf("error listing active events: %w", err) } + for i := range events { + if events[i] != nil { + r.populateWebhookFromAction(events[i]) + } + } + return events, nil } @@ -351,51 +437,16 @@ func (r *EventRepository) CreateEvent(ctx context.Context, event *models.Event) } func (r *EventRepository) GetEvent(ctx context.Context, id string) (*models.Event, error) { - query := ` - SELECT id, name, description, start_time, webhook, - metadata, schedule, tags, status, hmac_secret, created_at, updated_at - FROM events - WHERE id = $1 - ` - - var event models.Event - var scheduleJSON []byte - err := r.db.QueryRowContext(ctx, query, id).Scan( - &event.ID, - &event.Name, - &event.Description, - &event.StartTime, - &event.Webhook, - &event.Metadata, - &scheduleJSON, - pq.Array(&event.Tags), - &event.Status, - &event.HMACSecret, - &event.CreatedAt, - &event.UpdatedAt, - ) + eventID, err := uuid.Parse(id) if err != nil { - if err == sql.ErrNoRows { - return nil, fmt.Errorf("event not found: %w", err) - } - return nil, fmt.Errorf("failed to get event: %w", err) - } - - if len(scheduleJSON) > 0 { - var schedule models.ScheduleConfig - if err := json.Unmarshal(scheduleJSON, &schedule); err != nil { - return nil, fmt.Errorf("failed to unmarshal schedule: %w", err) - } - event.Schedule = &schedule + return nil, fmt.Errorf("invalid event ID format: %w", err) } - - return &event, nil + return r.GetByID(ctx, eventID) } func (r *EventRepository) ListEvents(ctx context.Context, filter models.EventFilter) ([]*models.Event, error) { query := ` - SELECT id, name, description, start_time, webhook, - metadata, schedule, tags, status, hmac_secret, created_at, updated_at + SELECT id, name, description, schedule, start_time, metadata, webhook, tags, status, created_at, updated_at, action FROM events WHERE 1=1 ` @@ -448,9 +499,9 @@ func (r *EventRepository) ListEvents(ctx context.Context, filter models.EventFil &scheduleJSON, pq.Array(&event.Tags), &event.Status, - &event.HMACSecret, &event.CreatedAt, &event.UpdatedAt, + &event.Action, ) if err != nil { return nil, fmt.Errorf("failed to scan event: %w", err) @@ -471,5 +522,11 @@ func (r *EventRepository) ListEvents(ctx context.Context, filter models.EventFil return nil, fmt.Errorf("error iterating events: %w", err) } + for i := range events { + if events[i] != nil { + r.populateWebhookFromAction(events[i]) + } + } + return events, nil } diff --git a/internal/scheduler/dispatcher.go b/internal/scheduler/dispatcher.go index 2b4f54d..a7cc2f5 100644 --- a/internal/scheduler/dispatcher.go +++ b/internal/scheduler/dispatcher.go @@ -1,15 +1,13 @@ package scheduler import ( - "bytes" "context" "encoding/json" "fmt" - "io/ioutil" "net/http" - "strings" "time" + "github.com/feedloop/qhronos/internal/actions" "github.com/feedloop/qhronos/internal/models" "github.com/feedloop/qhronos/internal/repository" "github.com/feedloop/qhronos/internal/services" @@ -33,9 +31,10 @@ type Dispatcher struct { maxRetries int retryDelay time.Duration logger *zap.Logger - clientNotifier ClientNotifier // optional, for q: webhooks - scheduler *Scheduler // new field for scheduler - redisPrefix string // added redisPrefix field + clientNotifier ClientNotifier // optional, for q: webhooks + scheduler *Scheduler // new field for scheduler + redisPrefix string // added redisPrefix field + actionsManager *actions.ActionsManager // new field for actions manager } // HTTPClient interface for mocking HTTP requests @@ -52,18 +51,29 @@ func (d *DefaultHTTPClient) Do(req *http.Request) (*http.Response, error) { return d.client.Do(req) } -func NewDispatcher(eventRepo *repository.EventRepository, occurrenceRepo *repository.OccurrenceRepository, hmacService *services.HMACService, logger *zap.Logger, maxRetries int, retryDelay time.Duration, clientNotifier ClientNotifier, scheduler *Scheduler) *Dispatcher { +func NewDispatcher(eventRepo *repository.EventRepository, occurrenceRepo *repository.OccurrenceRepository, hmacService *services.HMACService, logger *zap.Logger, maxRetries int, retryDelay time.Duration, clientNotifier ClientNotifier, scheduler *Scheduler, httpClient HTTPClient) *Dispatcher { + am := actions.NewActionsManager() + var webhookClient HTTPClient + if httpClient != nil { + webhookClient = httpClient + } else { + webhookClient = &DefaultHTTPClient{client: &http.Client{Timeout: 10 * time.Second}} + } + am.Register(models.ActionTypeWebhook, actions.NewWebhookExecutor(hmacService, webhookClient)) + am.Register(models.ActionTypeWebsocket, actions.NewWebsocketExecutor(clientNotifier)) + am.Register(models.ActionTypeAPICall, actions.NewAPICallExecutor(webhookClient)) return &Dispatcher{ eventRepo: eventRepo, occurrenceRepo: occurrenceRepo, hmacService: hmacService, - client: &DefaultHTTPClient{client: &http.Client{Timeout: 10 * time.Second}}, + client: webhookClient, maxRetries: maxRetries, retryDelay: retryDelay, logger: logger, clientNotifier: clientNotifier, scheduler: scheduler, redisPrefix: scheduler.redisPrefix, + actionsManager: am, } } @@ -72,151 +82,107 @@ func (d *Dispatcher) SetHTTPClient(client HTTPClient) { d.client = client } -// DispatchWebhook sends a webhook request and handles a single attempt (no in-process retry) -func (d *Dispatcher) DispatchWebhook(ctx context.Context, sched *models.Schedule) error { - // Defensive: Clean up orphaned schedule if event does not exist +// DispatchAction sends a webhook request or dispatches a websocket message based on the event's action type. +func (d *Dispatcher) DispatchAction(ctx context.Context, sched *models.Schedule) error { + // Fetch the full event details, as sched from Redis might be minimal event, err := d.eventRepo.GetByID(ctx, sched.EventID) - if err != nil || event == nil { + if err != nil { + d.logger.Error("Failed to get event for dispatch", zap.String("event_id", sched.EventID.String()), zap.String("occurrence_id", sched.OccurrenceID.String()), zap.Error(err)) + // Log occurrence as failed if event cannot be fetched + logErrOccurrence(ctx, d.occurrenceRepo, sched, 0, "", fmt.Sprintf("event not found for dispatch: %v", err)) + // Defensive: Clean up orphaned schedule if event does not exist (similar to old logic) key := fmt.Sprintf("schedule:%s:%d", sched.EventID.String(), sched.ScheduledAt.Unix()) if d.scheduler != nil && d.scheduler.redis != nil { - _, zremErr := d.scheduler.redis.ZRem(ctx, d.scheduler.redisPrefix+ScheduleKey, key).Result() - _, hdelErr := d.scheduler.redis.HDel(ctx, d.scheduler.redisPrefix+"schedule:data", key).Result() - d.logger.Warn("Orphaned schedule found and removed", - zap.String("event_id", sched.EventID.String()), - zap.String("schedule_key", key), - zap.Error(err), - zap.Error(zremErr), - zap.Error(hdelErr), - ) - } - return fmt.Errorf("event not found: %s", sched.EventID) - } - if strings.HasPrefix(sched.Webhook, "q:") { - if d.clientNotifier == nil { - return fmt.Errorf("client notifier not configured for q: webhooks") - } - clientID := strings.TrimPrefix(sched.Webhook, "q:") - payload := map[string]interface{}{ - "event_id": sched.EventID, - "occurrence_id": sched.OccurrenceID, - "name": sched.Name, - "description": sched.Description, - "scheduled_at": sched.ScheduledAt, - "metadata": sched.Metadata, + _, _ = d.scheduler.redis.ZRem(ctx, d.scheduler.redisPrefix+ScheduleKey, key).Result() // Best effort + _, _ = d.scheduler.redis.HDel(ctx, d.scheduler.redisPrefix+"schedule:data", key).Result() // Best effort + d.logger.Warn("Orphaned schedule possibly removed after failing to fetch event for dispatch", zap.String("event_id", sched.EventID.String()), zap.String("schedule_key", key)) } - jsonPayload, err := json.Marshal(payload) - if err != nil { - return fmt.Errorf("error marshaling payload: %w", err) - } - err = d.clientNotifier.DispatchToClient(clientID, jsonPayload) - finalStatus := models.OccurrenceStatusCompleted - if err != nil { - finalStatus = models.OccurrenceStatusFailed - } - logOccurrence := &models.Occurrence{ - OccurrenceID: sched.OccurrenceID, - EventID: sched.EventID, - ScheduledAt: sched.ScheduledAt, - Status: finalStatus, - AttemptCount: sched.AttemptCount, - Timestamp: time.Now(), - StatusCode: 0, - ResponseBody: "", - ErrorMessage: fmt.Sprintf("client hook dispatch: %v", err), - } - _ = d.occurrenceRepo.Create(ctx, logOccurrence) - return err + return fmt.Errorf("event not found for dispatch: %s, error: %w", sched.EventID, err) } - // Prepare webhook payload with rich information - payload := map[string]interface{}{ - "event_id": sched.EventID, - "occurrence_id": sched.OccurrenceID, - "name": sched.Name, - "description": sched.Description, - "scheduled_at": sched.ScheduledAt, - "metadata": sched.Metadata, + if event == nil { // Should be covered by err != nil if repo returns sql.ErrNoRows correctly mapped + d.logger.Error("Event is nil after GetByID for dispatch", zap.String("event_id", sched.EventID.String()), zap.String("occurrence_id", sched.OccurrenceID.String())) + logErrOccurrence(ctx, d.occurrenceRepo, sched, 0, "", "event is nil after GetByID") + return fmt.Errorf("event is nil after GetByID: %s", sched.EventID) } - jsonPayload, err := json.Marshal(payload) - if err != nil { - return fmt.Errorf("error marshaling payload: %w", err) - } - baseReq, err := http.NewRequestWithContext(ctx, "POST", sched.Webhook, nil) - if err != nil { - return fmt.Errorf("error creating request: %w", err) - } - baseReq.Header.Set("Content-Type", "application/json") - baseReq.Header.Set("Content-Length", fmt.Sprintf("%d", len(jsonPayload))) - if d.hmacService != nil { - secret := "" - signature := d.hmacService.SignPayload(jsonPayload, secret) - baseReq.Header.Set("X-Qhronos-Signature", signature) - } - // Only one attempt per call - attemptCount := sched.AttemptCount - if attemptCount == 0 { - attemptCount = 1 - } else { - attemptCount++ + + // Ensure Action is present. The repository should have populated it. + // If not, log a warning. The migration should handle old data. + if event.Action == nil { + d.logger.Error("Event action is nil, cannot dispatch", zap.String("event_id", event.ID.String()), zap.String("occurrence_id", sched.OccurrenceID.String())) + logErrOccurrence(ctx, d.occurrenceRepo, sched, 0, "", "event action is nil") + return fmt.Errorf("event action is nil for event %s", event.ID) } - req := baseReq.Clone(ctx) - req.Body = ioutil.NopCloser(bytes.NewBuffer(jsonPayload)) - var finalStatus models.OccurrenceStatus + + // Increment attempt count for the schedule from Redis before dispatching + // Note: sched.AttemptCount is from Redis and might not reflect the absolute truth if there were prior system crashes. + // The occurrence table logging will reflect the true attempt for that specific logged dispatch. + currentAttempt := sched.AttemptCount + 1 // This is the attempt number for *this* dispatch cycle from Redis data. + + var dispatchError error var statusCode int var responseBody string - var errorMessage string - resp, err := d.client.Do(req) - if err != nil { - finalStatus = models.OccurrenceStatusFailed - statusCode = 0 - responseBody = "" - errorMessage = err.Error() - } else if resp == nil { + var finalStatus models.OccurrenceStatus + + dispatchError = d.actionsManager.Execute(ctx, event) + // Optionally, you can enhance ActionsManager.Execute to return statusCode/responseBody if needed in the future. + // For now, just handle error/success as before. + + // Log Occurrence + errorMessage := "" + if dispatchError != nil { finalStatus = models.OccurrenceStatusFailed - statusCode = 0 - responseBody = "" - errorMessage = "empty response from server" + errorMessage = dispatchError.Error() } else { - defer func() { - if resp.Body != nil { - resp.Body.Close() - } - }() - statusCode = resp.StatusCode - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - finalStatus = models.OccurrenceStatusCompleted - responseBody = "" - errorMessage = "" - } else { - finalStatus = models.OccurrenceStatusFailed - responseBody = "" - errorMessage = fmt.Sprintf("received non-2xx status code: %d", resp.StatusCode) - } + finalStatus = models.OccurrenceStatusCompleted } + logOccurrence := &models.Occurrence{ OccurrenceID: sched.OccurrenceID, - EventID: sched.EventID, + EventID: event.ID, ScheduledAt: sched.ScheduledAt, Status: finalStatus, - AttemptCount: attemptCount, + AttemptCount: currentAttempt, // Use the attempt count for this dispatch cycle Timestamp: time.Now(), StatusCode: statusCode, - ResponseBody: responseBody, + ResponseBody: responseBody, // May be empty for websocket ErrorMessage: errorMessage, + StartedAt: sched.ScheduledAt, // Assign time.Time directly + CompletedAt: time.Now(), // Assign time.Time directly } - _ = d.occurrenceRepo.Create(ctx, logOccurrence) - // Auto-inactivate one-time events after dispatch (success or max retries) - event, err = d.eventRepo.GetByID(ctx, sched.EventID) - if event == nil { - return fmt.Errorf("event not found: %s", sched.EventID) + if err := d.occurrenceRepo.Create(ctx, logOccurrence); err != nil { + d.logger.Error("Failed to log occurrence", zap.Error(err), zap.String("occurrence_id", sched.OccurrenceID.String())) + // Even if logging fails, the original dispatchError determines the return for retry logic. } - if err == nil && event.Schedule == nil && event.Status == "active" { - event.Status = "inactive" - _ = d.eventRepo.Update(ctx, event) + + // Auto-inactivate one-time events after dispatch (success or max retries - this check is now implicit in worker) + if event.Schedule == nil && event.Status == models.EventStatusActive && finalStatus == models.OccurrenceStatusCompleted { + // If it's a one-time event and completed successfully, mark as inactive. + // Retry logic in Run() handles maxRetries failures. + event.Status = models.EventStatusInactive + if err := d.eventRepo.Update(ctx, event); err != nil { + d.logger.Error("Failed to auto-inactivate event", zap.String("event_id", event.ID.String()), zap.Error(err)) + } } - if finalStatus == models.OccurrenceStatusCompleted { - return nil + + return dispatchError // This error determines if it goes to retry queue +} + +// Helper to log an error occurrence +func logErrOccurrence(ctx context.Context, repo *repository.OccurrenceRepository, sched *models.Schedule, attempt int, respBody, errMsg string) { + if repo == nil || sched == nil { + return } - return fmt.Errorf("dispatch failed: %s", errorMessage) + log := &models.Occurrence{ + OccurrenceID: sched.OccurrenceID, + EventID: sched.EventID, + ScheduledAt: sched.ScheduledAt, + Status: models.OccurrenceStatusFailed, + AttemptCount: attempt, + Timestamp: time.Now(), + ResponseBody: respBody, + ErrorMessage: errMsg, + } + _ = repo.Create(ctx, log) // Best effort logging } const retryQueueKey = "retry:queue" @@ -248,7 +214,7 @@ return due now := fmt.Sprintf("%f", float64(time.Now().Unix())) // Use Lua script for atomic move res, err := scheduler.redis.Eval(ctx, retryLua, []string{d.scheduler.redisPrefix + retryQueueKey, d.scheduler.redisPrefix + dispatchQueueKey}, now).Result() - if err != nil { + if err != nil && err != redis.Nil { // redis.Nil can happen if script returns empty array but is not an actual error d.logger.Error("[RETRY POLLER] Lua script failed", zap.Error(err)) continue } @@ -263,87 +229,80 @@ return due for { select { case <-ctx.Done(): + d.logger.Info("Dispatcher worker shutting down", zap.Int("worker_id", workerID)) return default: - itemStart := time.Now() - d.logger.Debug("[DISPATCHER] Worker waiting for item", zap.Int("worker_id", workerID), zap.Time("ts", itemStart)) - // Pop from dispatch queue (no processing queue) - popStart := time.Now() + // Pop from dispatch queue data, err := scheduler.redis.BRPop(ctx, 5*time.Second, d.scheduler.redisPrefix+dispatchQueueKey).Result() - popEnd := time.Now() - d.logger.Debug("[DISPATCHER] BRPop duration", zap.Int("worker_id", workerID), zap.Duration("duration", popEnd.Sub(popStart)), zap.Error(err)) if err == redis.Nil { - d.logger.Debug("[DISPATCHER] No item found, continuing", zap.Int("worker_id", workerID), zap.Time("ts", time.Now())) continue // No item, keep waiting } else if err != nil { - d.logger.Error("Worker failed to BRPOP", zap.Int("worker_id", workerID), zap.Error(err)) + d.logger.Error("Worker failed to BRPop from dispatch queue", zap.Int("worker_id", workerID), zap.Error(err)) + time.Sleep(1 * time.Second) // Avoid fast loop on persistent BRPop errors continue } - // BRPop returns [queue, value] - item := data[1] - unmarshalStart := time.Now() + + item := data[1] // BRPop returns [queue, value] var sched models.Schedule if err := json.Unmarshal([]byte(item), &sched); err != nil { - d.logger.Error("Worker failed to unmarshal schedule", zap.Int("worker_id", workerID), zap.Error(err), zap.String("data", item)) + d.logger.Error("Worker failed to unmarshal schedule from dispatch queue", zap.Int("worker_id", workerID), zap.Error(err), zap.String("data", item)) continue } - unmarshalEnd := time.Now() - d.logger.Debug("[DISPATCHER] Unmarshal duration", zap.Int("worker_id", workerID), zap.Duration("duration", unmarshalEnd.Sub(unmarshalStart))) - d.logger.Debug("[DISPATCHER] Worker unmarshalled schedule", zap.Int("worker_id", workerID), zap.Any("schedule", sched)) - d.logger.Debug("[DISPATCHER] Worker dispatching webhook", zap.Int("worker_id", workerID), zap.String("occurrence_id", sched.OccurrenceID.String())) - // Track and increment attempt count - if sched.AttemptCount == 0 { - sched.AttemptCount = 1 - } else { - sched.AttemptCount++ - } + d.logger.Debug("Worker picked up schedule for dispatch", zap.Int("worker_id", workerID), zap.String("occurrence_id", sched.OccurrenceID.String())) - dispatchStart := time.Now() - err = d.DispatchWebhook(ctx, &sched) - dispatchEnd := time.Now() - d.logger.Debug("[DISPATCHER] DispatchWebhook duration", zap.Int("worker_id", workerID), zap.Duration("duration", dispatchEnd.Sub(dispatchStart)), zap.Error(err)) + // DispatchAction will handle logging the occurrence and fetching the full event. + // The sched.AttemptCount from Redis is passed to give context on retries for this queue item. + dispatchErr := d.DispatchAction(ctx, &sched) - if err != nil { - d.logger.Error("Worker failed to dispatch webhook", zap.Int("worker_id", workerID), zap.String("occurrence_id", sched.OccurrenceID.String()), zap.Error(err), zap.Int("attempt_count", sched.AttemptCount)) - if sched.AttemptCount >= d.maxRetries { - d.logger.Debug("[DISPATCHER] Max retries exceeded, dropping item", zap.Int("worker_id", workerID), zap.Any("schedule", sched)) - // No further action needed, item is dropped + if dispatchErr != nil { + d.logger.Error("DispatchAction failed", + zap.Int("worker_id", workerID), + zap.String("occurrence_id", sched.OccurrenceID.String()), + zap.Int("redis_attempt_count", sched.AttemptCount+1), + zap.Error(dispatchErr)) + + if sched.AttemptCount+1 >= d.maxRetries { + d.logger.Warn("Max retries exceeded for occurrence, dropping item", + zap.Int("worker_id", workerID), + zap.String("occurrence_id", sched.OccurrenceID.String()), + zap.Int("attempts", sched.AttemptCount+1)) + // Final failure is logged by DispatchAction itself via logErrOccurrence or by successful creation with failed status. } else { - zaddStart := time.Now() - nextRetry := time.Now().Add(d.retryDelay).Unix() - updatedData, _ := json.Marshal(sched) - d.logger.Debug("[DISPATCHER] Before ZAdd to retry queue", zap.Int("worker_id", workerID), zap.Time("ts", zaddStart)) - err := scheduler.redis.ZAdd(ctx, d.scheduler.redisPrefix+retryQueueKey, redis.Z{ - Score: float64(nextRetry), + // Re-queue for retry + sched.AttemptCount++ // Increment attempt for next try + updatedData, marshalErr := json.Marshal(sched) + if marshalErr != nil { + d.logger.Error("Failed to marshal schedule for retry queue", zap.Error(marshalErr), zap.String("occurrence_id", sched.OccurrenceID.String())) + continue // Skip re-queueing if marshaling fails + } + nextRetryTime := time.Now().Add(d.retryDelay).Unix() + if zerr := scheduler.redis.ZAdd(ctx, d.scheduler.redisPrefix+retryQueueKey, redis.Z{ + Score: float64(nextRetryTime), Member: updatedData, - }).Err() - zaddEnd := time.Now() - d.logger.Debug("[DISPATCHER] After ZAdd to retry queue", zap.Int("worker_id", workerID), zap.Time("ts", zaddEnd), zap.Duration("duration", zaddEnd.Sub(zaddStart)), zap.Error(err)) - if err != nil { - d.logger.Error("[DISPATCHER] Failed to add to retry queue", zap.Int("worker_id", workerID), zap.Error(err)) + }).Err(); zerr != nil { + d.logger.Error("Failed to add item to retry queue", zap.Error(zerr), zap.String("occurrence_id", sched.OccurrenceID.String())) + } else { + d.logger.Info("Item moved to retry queue", zap.String("occurrence_id", sched.OccurrenceID.String()), zap.Int("next_attempt", sched.AttemptCount)) } - d.logger.Debug("[DISPATCHER] Item moved to retry queue", zap.Int("worker_id", workerID), zap.Any("schedule", sched), zap.Int64("next_retry", nextRetry)) } - itemEnd := time.Now() - d.logger.Debug("[DISPATCHER] Total time for item", zap.Int("worker_id", workerID), zap.Duration("duration", itemEnd.Sub(itemStart))) - continue + } else { + d.logger.Info("DispatchAction successful", zap.Int("worker_id", workerID), zap.String("occurrence_id", sched.OccurrenceID.String())) } - d.logger.Debug("[DISPATCHER] Worker successfully dispatched webhook", zap.Int("worker_id", workerID), zap.String("occurrence_id", sched.OccurrenceID.String())) - itemEnd := time.Now() - d.logger.Debug("[DISPATCHER] Total time for item", zap.Int("worker_id", workerID), zap.Duration("duration", itemEnd.Sub(itemStart))) } } } for i := 0; i < workerCount; i++ { go workerFn(i) } + <-ctx.Done() pollerCancel() - d.logger.Info("Dispatcher worker pool shutting down") - return ctx.Err() + d.logger.Info("Dispatcher worker pool shutting down.") + return nil // Or ctx.Err() if you want to propagate it } +// timePtr helper, can be removed if not used elsewhere. func timePtr(t time.Time) *time.Time { return &t } diff --git a/internal/scheduler/dispatcher_test.go b/internal/scheduler/dispatcher_test.go index 96e494b..7e37b06 100644 --- a/internal/scheduler/dispatcher_test.go +++ b/internal/scheduler/dispatcher_test.go @@ -83,8 +83,7 @@ func TestDispatcher(t *testing.T) { mockHTTP := new(MockHTTPClient) scheduler := NewScheduler(redisClient, logger, namespace) - dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler) - dispatcher.SetHTTPClient(mockHTTP) + dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler, mockHTTP) // Add cleanup function cleanup := func() { @@ -99,24 +98,26 @@ func TestDispatcher(t *testing.T) { t.Run("successful dispatch", func(t *testing.T) { cleanup() - - // Create test event + mockHTTP := new(MockHTTPClient) + dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler, mockHTTP) + mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return(&http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewBuffer([]byte{})), + }, nil) + whParams, _ := json.Marshal(models.WebhookActionParams{URL: "http://example.com/webhook"}) event := &models.Event{ ID: uuid.New(), Name: "Test Event", Description: "Test Description", StartTime: time.Now(), - Webhook: "http://example.com/webhook", + Action: &models.Action{Type: models.ActionTypeWebhook, Params: whParams}, Status: models.EventStatusActive, Metadata: []byte(`{"key": "value"}`), Tags: pq.StringArray{"test"}, CreatedAt: time.Now(), } - err := eventRepo.Create(ctx, event) require.NoError(t, err) - - // Create test schedule (simulate scheduling in Redis only) schedule := &models.Schedule{ Occurrence: models.Occurrence{ OccurrenceID: uuid.New(), @@ -125,25 +126,13 @@ func TestDispatcher(t *testing.T) { }, Name: event.Name, Description: event.Description, - Webhook: event.Webhook, + Webhook: "http://example.com/webhook", Metadata: event.Metadata, Tags: event.Tags, } - - // Setup expectations - mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return(&http.Response{ - StatusCode: 200, - Body: ioutil.NopCloser(bytes.NewBuffer([]byte{})), - }, nil) - - // Run dispatch - err = dispatcher.DispatchWebhook(ctx, schedule) - - // Verify + err = dispatcher.DispatchAction(ctx, schedule) assert.NoError(t, err) mockHTTP.AssertExpectations(t) - - // Verify schedule log in Postgres (append-only) logged, err := occurrenceRepo.GetLatestByOccurrenceID(ctx, schedule.OccurrenceID) require.NoError(t, err) require.NotNil(t, logged) @@ -151,11 +140,10 @@ func TestDispatcher(t *testing.T) { }) t.Run("event not found", func(t *testing.T) { - start := time.Now() - cleanupStart := time.Now() cleanup() - fmt.Printf("[PROFILE] cleanup: %v\n", time.Since(cleanupStart)) - scheduleStart := time.Now() + mockHTTP := new(MockHTTPClient) + dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler, mockHTTP) + mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return((*http.Response)(nil), errors.New("event not found")) schedule := &models.Schedule{ Occurrence: models.Occurrence{ OccurrenceID: uuid.New(), @@ -168,38 +156,31 @@ func TestDispatcher(t *testing.T) { Metadata: []byte(`{"key": "value"}`), Tags: pq.StringArray{"test"}, } - fmt.Printf("[PROFILE] schedule creation: %v\n", time.Since(scheduleStart)) - // Setup expectations for HTTP call (even if event not found, dispatcher may attempt HTTP call) - mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return((*http.Response)(nil), errors.New("event not found")) - callStart := time.Now() - err := dispatcher.DispatchWebhook(ctx, schedule) - fmt.Printf("[PROFILE] DispatchWebhook call: %v\n", time.Since(callStart)) + err := dispatcher.DispatchAction(ctx, schedule) assert.Error(t, err) - assertionStart := time.Now() assert.Contains(t, err.Error(), "event not found") mockHTTP.AssertExpectations(t) - fmt.Printf("[PROFILE] final assertion: %v\n", time.Since(assertionStart)) - fmt.Printf("[PROFILE] total test duration: %v\n", time.Since(start)) }) t.Run("webhook request failure", func(t *testing.T) { cleanup() - // Create test event + mockHTTP := new(MockHTTPClient) + dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler, mockHTTP) + mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return((*http.Response)(nil), errors.New("connection failed")) + whParams, _ := json.Marshal(models.WebhookActionParams{URL: "http://example.com/webhook"}) event := &models.Event{ ID: uuid.New(), Name: "Test Event", Description: "Test Description", StartTime: time.Now(), - Webhook: "http://example.com/webhook", + Action: &models.Action{Type: models.ActionTypeWebhook, Params: whParams}, Status: models.EventStatusActive, Metadata: []byte(`{"key": "value"}`), Tags: pq.StringArray{"test"}, CreatedAt: time.Now(), } - err := eventRepo.Create(ctx, event) require.NoError(t, err) - schedule := &models.Schedule{ Occurrence: models.Occurrence{ OccurrenceID: uuid.New(), @@ -208,88 +189,75 @@ func TestDispatcher(t *testing.T) { }, Name: event.Name, Description: event.Description, - Webhook: event.Webhook, + Webhook: "http://example.com/webhook", Metadata: event.Metadata, Tags: event.Tags, } - - err = occurrenceRepo.Create(ctx, &models.Occurrence{ - OccurrenceID: schedule.OccurrenceID, - EventID: schedule.EventID, - ScheduledAt: schedule.ScheduledAt, - Status: models.OccurrenceStatusPending, - Timestamp: time.Now(), - StatusCode: 0, - ResponseBody: "", - ErrorMessage: "", - StartedAt: time.Time{}, - CompletedAt: time.Time{}, - }) - require.NoError(t, err) - - // Setup expectations - mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return((*http.Response)(nil), errors.New("connection failed")) - - // Run dispatch - err = dispatcher.DispatchWebhook(ctx, schedule) - - // Verify + err = dispatcher.DispatchAction(ctx, schedule) assert.Error(t, err) - assert.Contains(t, err.Error(), "dispatch failed") mockHTTP.AssertExpectations(t) - - // Verify schedule status updatedSchedule, err := occurrenceRepo.GetLatestByOccurrenceID(ctx, schedule.OccurrenceID) require.NoError(t, err) assert.Equal(t, models.OccurrenceStatusFailed, updatedSchedule.Status) }) - // Add client hook tests t.Run("client hook dispatch - single client", func(t *testing.T) { - start := time.Now() - cleanupStart := time.Now() cleanup() - fmt.Printf("[PROFILE] cleanup: %v\n", time.Since(cleanupStart)) - dispatcherStart := time.Now() + mockHTTP := new(MockHTTPClient) mockNotifier := NewMockClientNotifier() mockNotifier.connected["client1"] = []string{"c1"} - dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, mockNotifier, scheduler) - fmt.Printf("[PROFILE] dispatcher creation: %v\n", time.Since(dispatcherStart)) - scheduleStart := time.Now() + dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, mockNotifier, scheduler, mockHTTP) + mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return(&http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewBuffer([]byte{})), + }, nil) + wsParams, _ := json.Marshal(models.WebsocketActionParams{ClientName: "client1"}) + event := &models.Event{ + ID: uuid.New(), + Name: "Client Hook Event", + Description: "Test client hook", + StartTime: time.Now(), + Action: &models.Action{Type: models.ActionTypeWebsocket, Params: wsParams}, + Status: models.EventStatusActive, + Metadata: []byte(`{"key": "value"}`), + Tags: pq.StringArray{"test"}, + CreatedAt: time.Now(), + } + err := eventRepo.Create(ctx, event) + require.NoError(t, err) schedule := &models.Schedule{ Occurrence: models.Occurrence{ OccurrenceID: uuid.New(), - EventID: uuid.New(), + EventID: event.ID, ScheduledAt: time.Now(), }, - Name: "Client Hook Event", - Description: "Test client hook", + Name: event.Name, + Description: event.Description, Webhook: "q:client1", - Metadata: []byte(`{"key": "value"}`), - Tags: pq.StringArray{"test"}, + Metadata: event.Metadata, + Tags: event.Tags, } - fmt.Printf("[PROFILE] schedule creation: %v\n", time.Since(scheduleStart)) - callStart := time.Now() - err := dispatcher.DispatchWebhook(ctx, schedule) - fmt.Printf("[PROFILE] DispatchWebhook call: %v\n", time.Since(callStart)) + err = dispatcher.DispatchAction(ctx, schedule) assert.NoError(t, err) - assertionStart := time.Now() assert.Equal(t, []string{"client1:c1"}, mockNotifier.calls) - fmt.Printf("[PROFILE] final assertion: %v\n", time.Since(assertionStart)) - fmt.Printf("[PROFILE] total test duration: %v\n", time.Since(start)) }) t.Run("client hook dispatch - no client connected", func(t *testing.T) { cleanup() + mockHTTP := new(MockHTTPClient) mockNotifier := NewMockClientNotifier() - dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 2, 1*time.Millisecond, mockNotifier, scheduler) - // Insert the event into the database + dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 2, 1*time.Millisecond, mockNotifier, scheduler, mockHTTP) + mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return(&http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewBuffer([]byte{})), + }, nil) + wsParams, _ := json.Marshal(models.WebsocketActionParams{ClientName: "client2"}) event := &models.Event{ ID: uuid.New(), Name: "Client Hook Event", Description: "Test client hook", StartTime: time.Now(), - Webhook: "q:client2", + Action: &models.Action{Type: models.ActionTypeWebsocket, Params: wsParams}, Status: models.EventStatusActive, Metadata: []byte(`{"key": "value"}`), Tags: pq.StringArray{"test"}, @@ -305,7 +273,7 @@ func TestDispatcher(t *testing.T) { }, Name: event.Name, Description: event.Description, - Webhook: event.Webhook, + Webhook: "q:client2", Metadata: event.Metadata, Tags: event.Tags, } @@ -313,46 +281,51 @@ func TestDispatcher(t *testing.T) { require.NoError(t, err) err = redisClient.RPush(ctx, namespace+dispatchQueueKey, data).Err() require.NoError(t, err) - // Run the worker for a short time to process retries runWorkerAndWait(ctx, dispatcher, scheduler, 20*time.Millisecond) - // Now assert the number of calls - assert.Equal(t, 3, len(mockNotifier.calls)) // 3 attempts (initial + 2 retries) + assert.Equal(t, 3, len(mockNotifier.calls)) }) t.Run("client hook dispatch - round robin", func(t *testing.T) { - start := time.Now() - cleanupStart := time.Now() cleanup() - fmt.Printf("[PROFILE] cleanup: %v\n", time.Since(cleanupStart)) - dispatcherStart := time.Now() + mockHTTP := new(MockHTTPClient) mockNotifier := NewMockClientNotifier() mockNotifier.connected["client3"] = []string{"c1", "c2"} - dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Millisecond, mockNotifier, scheduler) - fmt.Printf("[PROFILE] dispatcher creation: %v\n", time.Since(dispatcherStart)) - scheduleStart := time.Now() + dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, mockNotifier, scheduler, mockHTTP) + mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return(&http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewBuffer([]byte{})), + }, nil) + wsParams, _ := json.Marshal(models.WebsocketActionParams{ClientName: "client3"}) + event := &models.Event{ + ID: uuid.New(), + Name: "Client Hook Event", + Description: "Test client hook", + StartTime: time.Now(), + Action: &models.Action{Type: models.ActionTypeWebsocket, Params: wsParams}, + Status: models.EventStatusActive, + Metadata: []byte(`{"key": "value"}`), + Tags: pq.StringArray{"test"}, + CreatedAt: time.Now(), + } + err := eventRepo.Create(ctx, event) + require.NoError(t, err) schedule := &models.Schedule{ Occurrence: models.Occurrence{ OccurrenceID: uuid.New(), - EventID: uuid.New(), + EventID: event.ID, ScheduledAt: time.Now(), }, - Name: "Client Hook Event", - Description: "Test client hook", + Name: event.Name, + Description: event.Description, Webhook: "q:client3", - Metadata: []byte(`{"key": "value"}`), - Tags: pq.StringArray{"test"}, + Metadata: event.Metadata, + Tags: event.Tags, } - fmt.Printf("[PROFILE] schedule creation: %v\n", time.Since(scheduleStart)) for i := 0; i < 4; i++ { - callStart := time.Now() - err := dispatcher.DispatchWebhook(ctx, schedule) - fmt.Printf("[PROFILE] DispatchWebhook call %d: %v\n", i+1, time.Since(callStart)) + err := dispatcher.DispatchAction(ctx, schedule) assert.NoError(t, err) } - assertionStart := time.Now() assert.Equal(t, []string{"client3:c1", "client3:c2", "client3:c1", "client3:c2"}, mockNotifier.calls) - fmt.Printf("[PROFILE] final assertion: %v\n", time.Since(assertionStart)) - fmt.Printf("[PROFILE] total test duration: %v\n", time.Since(start)) }) } @@ -368,8 +341,7 @@ func TestDispatcher_RedisOnlyDispatch(t *testing.T) { hmacService := services.NewHMACService("test-secret") mockHTTP := new(MockHTTPClient) scheduler := NewScheduler(redisClient, logger, namespace) - dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler) - dispatcher.SetHTTPClient(mockHTTP) + dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler, mockHTTP) // Create Scheduler instance scheduler = NewScheduler(redisClient, logger, namespace) @@ -443,8 +415,7 @@ func TestDispatcher_GetDueSchedules(t *testing.T) { hmacService := services.NewHMACService("test-secret") mockHTTP := new(MockHTTPClient) scheduler := NewScheduler(redisClient, logger, namespace) - dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler) - dispatcher.SetHTTPClient(mockHTTP) + dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler, mockHTTP) // Create Scheduler instance scheduler = NewScheduler(redisClient, logger, namespace) @@ -535,11 +506,7 @@ func TestDispatcher_DispatchQueueWorker(t *testing.T) { eventRepo := repository.NewEventRepository(db, logger, redisClient, namespace) occurrenceRepo := repository.NewOccurrenceRepository(db, logger) hmacService := services.NewHMACService("test-secret") - mockHTTP := new(MockHTTPClient) scheduler := NewScheduler(redisClient, logger, namespace) - dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler) - dispatcher.SetHTTPClient(mockHTTP) - cleanup := func() { _, err := db.ExecContext(ctx, "TRUNCATE TABLE occurrences CASCADE") require.NoError(t, err) @@ -550,7 +517,8 @@ func TestDispatcher_DispatchQueueWorker(t *testing.T) { t.Run("worker_processes_and_removes_from_processing_queue_on_success", func(t *testing.T) { cleanup() - mockHTTP = new(MockHTTPClient) + mockHTTP := new(MockHTTPClient) + dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler, mockHTTP) mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return(&http.Response{ StatusCode: 200, Body: ioutil.NopCloser(bytes.NewBuffer([]byte{})), @@ -605,7 +573,8 @@ func TestDispatcher_DispatchQueueWorker(t *testing.T) { t.Run("worker_leaves_item_in_retry_queue_on_failure", func(t *testing.T) { cleanup() - mockHTTP = new(MockHTTPClient) + mockHTTP := new(MockHTTPClient) + dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler, mockHTTP) mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return((*http.Response)(nil), errors.New("fail")).Maybe() dispatcher.SetHTTPClient(mockHTTP) // Set retryDelay to 2s to ensure item stays in retry queue for test @@ -663,7 +632,8 @@ func TestDispatcher_DispatchQueueWorker(t *testing.T) { t.Run("worker_removes_item_after_max_retries", func(t *testing.T) { cleanup() - mockHTTP = new(MockHTTPClient) + mockHTTP := new(MockHTTPClient) + dispatcher := NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, 3, 5*time.Second, nil, scheduler, mockHTTP) mockHTTP.On("Do", mock.AnythingOfType("*http.Request")).Return((*http.Response)(nil), errors.New("fail")) dispatcher.SetHTTPClient(mockHTTP) // Set retryDelay to 10ms for fast test diff --git a/main.go b/main.go index 4366449..77d136c 100644 --- a/main.go +++ b/main.go @@ -223,7 +223,7 @@ func main() { // Initialize services tokenService := services.NewTokenService(cfg.Auth.MasterToken, cfg.Auth.JWTSecret) hmacService := services.NewHMACService(cfg.HMAC.DefaultSecret) - dispatcher := scheduler.NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, cfg.DispatchMaxRetries, cfg.DispatchRetryBackoff, wsHandler, schedulerService) + dispatcher := scheduler.NewDispatcher(eventRepo, occurrenceRepo, hmacService, logger, cfg.DispatchMaxRetries, cfg.DispatchRetryBackoff, wsHandler, schedulerService, nil) // Initialize handlers eventHandler := handlers.NewEventHandler(eventRepo, expander) diff --git a/migrations/002_add_action_system.sql b/migrations/002_add_action_system.sql new file mode 100644 index 0000000..bdde723 --- /dev/null +++ b/migrations/002_add_action_system.sql @@ -0,0 +1,29 @@ +-- Add action system to events and archived_events + +-- Add action column to events table +ALTER TABLE events +ADD COLUMN IF NOT EXISTS action JSONB; + +-- Add action column to archived_events table +ALTER TABLE archived_events +ADD COLUMN IF NOT EXISTS action JSONB; + +-- Update existing events to populate the action column +UPDATE events +SET action = CASE + WHEN webhook LIKE 'q:%' THEN jsonb_build_object('type', 'websocket', 'params', jsonb_build_object('client_name', substring(webhook from 3))) + ELSE jsonb_build_object('type', 'webhook', 'params', jsonb_build_object('url', webhook)) +END +WHERE action IS NULL; -- Only update if not already populated + +-- Update existing archived_events to populate the action column +UPDATE archived_events +SET action = CASE + WHEN webhook LIKE 'q:%' THEN jsonb_build_object('type', 'websocket', 'params', jsonb_build_object('client_name', substring(webhook from 3))) + ELSE jsonb_build_object('type', 'webhook', 'params', jsonb_build_object('url', webhook)) +END +WHERE action IS NULL; -- Only update if not already populated + +-- Add comments for the new column if desired (optional) +COMMENT ON COLUMN events.action IS 'Stores the generalized action (type and params) for the event'; +COMMENT ON COLUMN archived_events.action IS 'Stores the generalized action (type and params) for the archived event'; \ No newline at end of file diff --git a/scripts/test.sh b/scripts/test.sh index 8901f14..ddcf416 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -41,9 +41,12 @@ BEGIN END $$;" -# Run migrations -echo "Running migrations..." -cat migrations/001_initial_schema.sql | docker exec -i qhronos_db psql -U postgres -d qhronos_test +# Run all migrations in order +echo "Running all migrations..." +for f in migrations/*.sql; do + echo "Applying migration $f" + cat "$f" | docker exec -i qhronos_db psql -U postgres -d qhronos_test +done # List all internal packages PKGS=$(go list ./internal/...)