diff --git a/cmd/deploy_azure.go b/cmd/deploy_azure.go index 14e9a12..ff4a7ba 100644 --- a/cmd/deploy_azure.go +++ b/cmd/deploy_azure.go @@ -3,7 +3,6 @@ package cmd import ( "encoding/json" "fmt" - "net/http" "os" "path/filepath" "strings" @@ -304,14 +303,10 @@ func runDeployAzure(cmd *cobra.Command, args []string) error { if backendReady { fmt.Println(" āœ… Backend is responding!") - fmt.Println("\nšŸ”„ Triggering database migration...") - httpClient := &http.Client{Timeout: 5 * time.Second} - resp, err := httpClient.Get(deployment.BackendEndpoint + "/proceed-db-migration") - if err == nil { - resp.Body.Close() - fmt.Println(" āœ… Migration triggered") - } else { - fmt.Printf(" āš ļø Migration may need manual trigger: %v\n", err) + if err := triggerAndWaitForMigration(deployment.BackendEndpoint); err != nil { + fmt.Printf(" āš ļø %v\n", err) + fmt.Printf(" Trigger migration manually if needed: GET %s/proceed-db-migration\n", deployment.BackendEndpoint) + fmt.Println(" Migration may still be running — proceeding anyway") } } else { fmt.Println(" Backend not ready after 30 attempts.") diff --git a/cmd/deploy_local.go b/cmd/deploy_local.go index 917282c..61104ac 100644 --- a/cmd/deploy_local.go +++ b/cmd/deploy_local.go @@ -8,7 +8,6 @@ import ( "strings" "time" - "github.com/DevExpGBB/gh-devlake/internal/devlake" dockerpkg "github.com/DevExpGBB/gh-devlake/internal/docker" "github.com/DevExpGBB/gh-devlake/internal/download" "github.com/DevExpGBB/gh-devlake/internal/gitclone" @@ -196,17 +195,10 @@ func runDeployLocal(cmd *cobra.Command, args []string) error { } cfgURL = backendURL - fmt.Println("\nšŸ”„ Triggering database migration...") - migClient := devlake.NewClient(backendURL) - if err := migClient.TriggerMigration(); err != nil { - fmt.Printf(" āš ļø Migration may need manual trigger: %v\n", err) - } else { - fmt.Println(" āœ… Migration triggered") - fmt.Println("\nā³ Waiting for migration to complete...") - if err := waitForMigration(backendURL, 60, 5*time.Second); err != nil { - fmt.Printf(" āš ļø %v\n", err) - fmt.Println(" Migration may still be running — proceeding anyway") - } + if err := triggerAndWaitForMigration(backendURL); err != nil { + fmt.Printf(" āš ļø %v\n", err) + fmt.Printf(" Trigger migration manually if needed: GET %s/proceed-db-migration\n", backendURL) + fmt.Println(" Migration may still be running — proceeding anyway") } if !deployLocalQuiet { diff --git a/cmd/helpers.go b/cmd/helpers.go index edfe91a..4218a81 100644 --- a/cmd/helpers.go +++ b/cmd/helpers.go @@ -2,7 +2,9 @@ package cmd import ( "encoding/json" + "errors" "fmt" + "io" "net/http" "os" "strings" @@ -221,19 +223,73 @@ func waitForReadyAny(baseURLs []string, maxAttempts int, interval time.Duration) // During migration the API returns 428 (Precondition Required). func waitForMigration(baseURL string, maxAttempts int, interval time.Duration) error { httpClient := &http.Client{Timeout: 5 * time.Second} + lastStatus := 0 for attempt := 1; attempt <= maxAttempts; attempt++ { resp, err := httpClient.Get(baseURL + "/ping") if err == nil { + lastStatus = resp.StatusCode + _, _ = io.Copy(io.Discard, resp.Body) resp.Body.Close() if resp.StatusCode == http.StatusOK { fmt.Println(" āœ… Migration complete!") return nil } + } else if resp != nil { + lastStatus = resp.StatusCode + _, _ = io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + statusSuffix := "" + if lastStatus != 0 { + statusSuffix = fmt.Sprintf(", status=%d", lastStatus) } - fmt.Printf(" Migrating... (%d/%d)\n", attempt, maxAttempts) + fmt.Printf(" Migrating... (%d/%d%s)\n", attempt, maxAttempts, statusSuffix) time.Sleep(interval) } - return fmt.Errorf("migration did not complete after %d attempts", maxAttempts) + statusSuffix := "" + if lastStatus != 0 { + statusSuffix = fmt.Sprintf(" (last status %d)", lastStatus) + } + return fmt.Errorf("migration did not complete after %d attempts%s", maxAttempts, statusSuffix) +} + +func triggerAndWaitForMigration(baseURL string) error { + return triggerAndWaitForMigrationWithClient(devlake.NewClientWithTimeout(baseURL, 5*time.Second), 3, 10*time.Second, 60, 5*time.Second) +} + +func triggerAndWaitForMigrationWithClient(devlakeClient *devlake.Client, triggerAttempts int, triggerInterval time.Duration, waitAttempts int, waitInterval time.Duration) error { + fmt.Println("\nšŸ”„ Triggering database migration...") + + var lastErr error + for attempt := 1; attempt <= triggerAttempts; attempt++ { + err := devlakeClient.TriggerMigration() + if err == nil { + lastErr = nil + fmt.Println(" āœ… Migration triggered") + break + } + lastErr = err + fmt.Printf(" āš ļø Trigger attempt %d/%d failed: %v\n", attempt, triggerAttempts, err) + if attempt < triggerAttempts { + fmt.Println(" DevLake may still be starting or migration may already be running — retrying...") + time.Sleep(triggerInterval) + } + } + + fmt.Println("\nā³ Waiting for migration to complete...") + if lastErr != nil { + fmt.Println(" Continuing to monitor migration status anyway...") + } + if err := waitForMigration(devlakeClient.BaseURL, waitAttempts, waitInterval); err != nil { + if lastErr != nil { + return errors.Join( + fmt.Errorf("migration trigger failed earlier: %w", lastErr), + fmt.Errorf("waiting for migration completion: %w", err), + ) + } + return err + } + return nil } // ── Scope orchestration ───────────────────────────────────────── diff --git a/cmd/helpers_migration_test.go b/cmd/helpers_migration_test.go new file mode 100644 index 0000000..ed11d63 --- /dev/null +++ b/cmd/helpers_migration_test.go @@ -0,0 +1,189 @@ +package cmd + +import ( + "errors" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/DevExpGBB/gh-devlake/internal/devlake" +) + +func TestTriggerAndWaitForMigrationWithClient_CompletesAfterTriggerTimeout(t *testing.T) { + triggerCalls := 0 + pingCalls := 0 + var mu sync.Mutex + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/proceed-db-migration": + mu.Lock() + triggerCalls++ + mu.Unlock() + time.Sleep(25 * time.Millisecond) + w.WriteHeader(http.StatusOK) + case "/ping": + mu.Lock() + pingCalls++ + currentPingCalls := pingCalls + mu.Unlock() + if currentPingCalls == 1 { + w.WriteHeader(http.StatusPreconditionRequired) + return + } + w.WriteHeader(http.StatusOK) + default: + http.NotFound(w, r) + } + })) + defer srv.Close() + + client := &devlake.Client{ + BaseURL: srv.URL, + HTTPClient: &http.Client{ + Timeout: 5 * time.Millisecond, + }, + } + + err := triggerAndWaitForMigrationWithClient(client, 1, time.Millisecond, 3, time.Millisecond) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + mu.Lock() + gotTriggerCalls := triggerCalls + gotPingCalls := pingCalls + mu.Unlock() + if gotTriggerCalls != 1 { + t.Fatalf("trigger calls = %d, want 1", gotTriggerCalls) + } + if gotPingCalls != 2 { + t.Fatalf("ping calls = %d, want 2", gotPingCalls) + } +} + +func TestTriggerAndWaitForMigrationWithClient_RetriesBeforeWaiting(t *testing.T) { + triggerCalls := 0 + pingCalls := 0 + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/proceed-db-migration": + triggerCalls++ + if triggerCalls == 1 { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) + case "/ping": + pingCalls++ + w.WriteHeader(http.StatusOK) + default: + http.NotFound(w, r) + } + })) + defer srv.Close() + + client := devlake.NewClient(srv.URL) + + err := triggerAndWaitForMigrationWithClient(client, 2, time.Millisecond, 2, time.Millisecond) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if triggerCalls != 2 { + t.Fatalf("trigger calls = %d, want 2", triggerCalls) + } + if pingCalls != 1 { + t.Fatalf("ping calls = %d, want 1", pingCalls) + } +} + +func TestTriggerAndWaitForMigrationWithClient_TriggerEventuallySucceedsBeforeWaitFails(t *testing.T) { + triggerCalls := 0 + pingCalls := 0 + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/proceed-db-migration": + triggerCalls++ + if triggerCalls == 1 { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + w.WriteHeader(http.StatusOK) + case "/ping": + pingCalls++ + w.WriteHeader(http.StatusPreconditionRequired) + default: + http.NotFound(w, r) + } + })) + defer srv.Close() + + client := devlake.NewClient(srv.URL) + + err := triggerAndWaitForMigrationWithClient(client, 2, 5*time.Millisecond, 2, 5*time.Millisecond) + if err == nil { + t.Fatal("expected error, got nil") + } + if strings.Contains(err.Error(), "migration trigger failed earlier") { + t.Fatalf("unexpected trigger failure in error: %v", err) + } + if !strings.Contains(err.Error(), "migration did not complete after 2 attempts") { + t.Fatalf("expected wait failure in error, got: %v", err) + } + if triggerCalls != 2 { + t.Fatalf("trigger calls = %d, want 2", triggerCalls) + } + if pingCalls != 2 { + t.Fatalf("ping calls = %d, want 2", pingCalls) + } +} + +func TestTriggerAndWaitForMigrationWithClient_JoinsTriggerAndWaitFailures(t *testing.T) { + triggerCalls := 0 + pingCalls := 0 + + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/proceed-db-migration": + triggerCalls++ + w.WriteHeader(http.StatusServiceUnavailable) + case "/ping": + pingCalls++ + w.WriteHeader(http.StatusPreconditionRequired) + default: + http.NotFound(w, r) + } + })) + defer srv.Close() + + client := devlake.NewClient(srv.URL) + + err := triggerAndWaitForMigrationWithClient(client, 1, 5*time.Millisecond, 1, 5*time.Millisecond) + if err == nil { + t.Fatal("expected error, got nil") + } + if !strings.Contains(err.Error(), "migration trigger failed earlier: GET /proceed-db-migration") { + t.Fatalf("expected joined trigger failure in error, got: %v", err) + } + if !strings.Contains(err.Error(), "waiting for migration completion: migration did not complete after 1 attempts") { + t.Fatalf("expected joined wait failure in error, got: %v", err) + } + + var joined interface{ Unwrap() []error } + if !errors.As(err, &joined) { + t.Fatalf("expected joined error, got: %T", err) + } + if len(joined.Unwrap()) != 2 { + t.Fatalf("joined error count = %d, want 2", len(joined.Unwrap())) + } + if triggerCalls != 1 { + t.Fatalf("trigger calls = %d, want 1", triggerCalls) + } + if pingCalls != 1 { + t.Fatalf("ping calls = %d, want 1", pingCalls) + } +} diff --git a/internal/devlake/client.go b/internal/devlake/client.go index 9831e3a..494da36 100644 --- a/internal/devlake/client.go +++ b/internal/devlake/client.go @@ -1,545 +1,581 @@ -// Package devlake provides an HTTP client for the DevLake REST API. -package devlake - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - "time" -) - -// Client wraps HTTP calls to the DevLake backend API. -type Client struct { - BaseURL string - HTTPClient *http.Client -} - -// NewClient creates a Client for the given base URL. -func NewClient(baseURL string) *Client { - return &Client{ - BaseURL: baseURL, - HTTPClient: &http.Client{ - Timeout: 90 * time.Second, - }, - } -} - -// Ping checks if the DevLake backend is reachable. -func (c *Client) Ping() error { - resp, err := c.HTTPClient.Get(c.BaseURL + "/ping") - if err != nil { - return fmt.Errorf("cannot reach DevLake at %s/ping: %w", c.BaseURL, err) - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("DevLake returned status %d from /ping", resp.StatusCode) - } - return nil -} - -// Connection represents a DevLake plugin connection. -type Connection struct { - ID int `json:"id"` - Name string `json:"name"` - Endpoint string `json:"endpoint,omitempty"` - Proxy string `json:"proxy,omitempty"` - Token string `json:"token,omitempty"` - Organization string `json:"organization,omitempty"` - Enterprise string `json:"enterprise,omitempty"` -} - -// ConnectionUpdateRequest is the payload for PATCH /plugins/{plugin}/connections/{id}. -// Fields with omitempty are only included in the request when non-empty, -// enabling sparse updates (only changed fields are sent). -type ConnectionUpdateRequest struct { - Name string `json:"name,omitempty"` - Endpoint string `json:"endpoint,omitempty"` - Proxy string `json:"proxy,omitempty"` - AuthMethod string `json:"authMethod,omitempty"` - Token string `json:"token,omitempty"` - Organization string `json:"organization,omitempty"` - Enterprise string `json:"enterprise,omitempty"` -} - -// ConnectionCreateRequest is the payload for creating a plugin connection. -type ConnectionCreateRequest struct { - Name string `json:"name"` - Endpoint string `json:"endpoint"` - Proxy string `json:"proxy,omitempty"` - AuthMethod string `json:"authMethod"` - Token string `json:"token,omitempty"` - Username string `json:"username,omitempty"` - Password string `json:"password,omitempty"` - EnableGraphql bool `json:"enableGraphql,omitempty"` - RateLimitPerHour int `json:"rateLimitPerHour"` - Organization string `json:"organization,omitempty"` - Enterprise string `json:"enterprise,omitempty"` - TokenExpiresAt string `json:"tokenExpiresAt,omitempty"` - RefreshTokenExpiresAt string `json:"refreshTokenExpiresAt,omitempty"` -} - -// ConnectionTestRequest is the payload for testing a connection before creating. -type ConnectionTestRequest struct { - Name string `json:"name"` - Endpoint string `json:"endpoint"` - AuthMethod string `json:"authMethod"` - Token string `json:"token,omitempty"` - Username string `json:"username,omitempty"` - Password string `json:"password,omitempty"` - EnableGraphql bool `json:"enableGraphql,omitempty"` - RateLimitPerHour int `json:"rateLimitPerHour"` - Proxy string `json:"proxy"` - Organization string `json:"organization,omitempty"` - Enterprise string `json:"enterprise,omitempty"` -} - -// ConnectionTestResult is the response from testing a connection. -type ConnectionTestResult struct { - Success bool `json:"success"` - Message string `json:"message"` -} - -// ListConnections returns all connections for a plugin (e.g. "github", "gh-copilot"). -func (c *Client) ListConnections(plugin string) ([]Connection, error) { - resp, err := c.HTTPClient.Get(fmt.Sprintf("%s/plugins/%s/connections", c.BaseURL, plugin)) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("reading response: %w", err) - } - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("list connections returned %d: %s", resp.StatusCode, body) - } - - var conns []Connection - if err := json.Unmarshal(body, &conns); err != nil { - return nil, err - } - return conns, nil -} - -// FindConnectionByName returns the first connection matching the given name, or nil. -func (c *Client) FindConnectionByName(plugin, name string) (*Connection, error) { - conns, err := c.ListConnections(plugin) - if err != nil { - return nil, err - } - for _, conn := range conns { - if conn.Name == name { - return &conn, nil - } - } - return nil, nil -} - -// TestConnection tests connection parameters before creating. -func (c *Client) TestConnection(plugin string, req *ConnectionTestRequest) (*ConnectionTestResult, error) { - return doPost[ConnectionTestResult](c, fmt.Sprintf("/plugins/%s/test", plugin), req) -} - -// CreateConnection creates a new connection for the given plugin. -func (c *Client) CreateConnection(plugin string, req *ConnectionCreateRequest) (*Connection, error) { - return doPost[Connection](c, fmt.Sprintf("/plugins/%s/connections", plugin), req) -} - -// DeleteConnection deletes a plugin connection by ID. -func (c *Client) DeleteConnection(plugin string, connID int) error { - url := fmt.Sprintf("%s/plugins/%s/connections/%d", c.BaseURL, plugin, connID) - req, err := http.NewRequest(http.MethodDelete, url, nil) - if err != nil { - return err - } - resp, err := c.HTTPClient.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("reading response: %w", err) - } - if resp.StatusCode == http.StatusNotFound { - return fmt.Errorf("connection not found: plugin=%s id=%d", plugin, connID) - } - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { - return fmt.Errorf("DELETE /plugins/%s/connections/%d returned %d: %s", plugin, connID, resp.StatusCode, body) - } - return nil -} - -// TestSavedConnection tests an already-created connection by ID. -func (c *Client) TestSavedConnection(plugin string, connID int) (*ConnectionTestResult, error) { - url := fmt.Sprintf("%s/plugins/%s/connections/%d/test", c.BaseURL, plugin, connID) - - reqBody := bytes.NewBufferString("{}") - resp, err := c.HTTPClient.Post(url, "application/json", reqBody) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("reading response: %w", err) - } - var result ConnectionTestResult - if err := json.Unmarshal(body, &result); err != nil { - // Non-JSON response is ok — treat as success if status 200 - if resp.StatusCode == http.StatusOK { - return &ConnectionTestResult{Success: true}, nil - } - return nil, fmt.Errorf("test connection returned %d: %s", resp.StatusCode, body) - } - return &result, nil -} - -// GetConnection retrieves a single connection by plugin and ID. -func (c *Client) GetConnection(plugin string, connID int) (*Connection, error) { - return doGet[Connection](c, fmt.Sprintf("/plugins/%s/connections/%d", plugin, connID)) -} - -// UpdateConnection patches an existing connection for the given plugin. -func (c *Client) UpdateConnection(plugin string, connID int, req *ConnectionUpdateRequest) (*Connection, error) { - return doPatch[Connection](c, fmt.Sprintf("/plugins/%s/connections/%d", plugin, connID), req) -} - -// HealthStatus represents the response from /health or /ping. -type HealthStatus struct { - Status string `json:"status"` -} - -// Health returns the DevLake health status. -func (c *Client) Health() (*HealthStatus, error) { - resp, err := c.HTTPClient.Get(c.BaseURL + "/ping") - if err != nil { - return nil, err - } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("reading response: %w", err) - } - var hs HealthStatus - _ = json.Unmarshal(body, &hs) - if resp.StatusCode == http.StatusOK { - if hs.Status == "" { - hs.Status = "ok" - } - return &hs, nil - } - return nil, fmt.Errorf("health check returned %d: %s", resp.StatusCode, body) -} - -// doPost is a generic helper for POST requests that return JSON. -func doPost[T any](c *Client, path string, payload any) (*T, error) { - jsonBody, err := json.Marshal(payload) - if err != nil { - return nil, err - } - - url := c.BaseURL + path - resp, err := c.HTTPClient.Post(url, "application/json", bytes.NewReader(jsonBody)) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("reading response: %w", err) - } - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - return nil, fmt.Errorf("POST %s returned %d: %s", path, resp.StatusCode, body) - } - - var result T - if err := json.Unmarshal(body, &result); err != nil { - return nil, err - } - return &result, nil -} - -// doGet is a generic helper for GET requests that return JSON. -func doGet[T any](c *Client, path string) (*T, error) { - resp, err := c.HTTPClient.Get(c.BaseURL + path) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("reading response: %w", err) - } - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("GET %s returned %d: %s", path, resp.StatusCode, body) - } - - var result T - if err := json.Unmarshal(body, &result); err != nil { - return nil, err - } - return &result, nil -} - -// doPut is a generic helper for PUT requests that return JSON. -func doPut[T any](c *Client, path string, payload any) (*T, error) { - jsonBody, err := json.Marshal(payload) - if err != nil { - return nil, err - } - - req, err := http.NewRequest(http.MethodPut, c.BaseURL+path, bytes.NewReader(jsonBody)) - if err != nil { - return nil, err - } - req.Header.Set("Content-Type", "application/json") - - resp, err := c.HTTPClient.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("reading response: %w", err) - } - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - return nil, fmt.Errorf("PUT %s returned %d: %s", path, resp.StatusCode, body) - } - - var result T - if err := json.Unmarshal(body, &result); err != nil { - return nil, err - } - return &result, nil -} - -// doPatch is a generic helper for PATCH requests that return JSON. -func doPatch[T any](c *Client, path string, payload any) (*T, error) { - jsonBody, err := json.Marshal(payload) - if err != nil { - return nil, err - } - - req, err := http.NewRequest(http.MethodPatch, c.BaseURL+path, bytes.NewReader(jsonBody)) - if err != nil { - return nil, err - } - req.Header.Set("Content-Type", "application/json") - - resp, err := c.HTTPClient.Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("reading response: %w", err) - } - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("PATCH %s returned %d: %s", path, resp.StatusCode, body) - } - - var result T - if err := json.Unmarshal(body, &result); err != nil { - return nil, err - } - return &result, nil -} - -// CreateScopeConfig creates a scope config for a plugin connection. -func (c *Client) CreateScopeConfig(plugin string, connID int, cfg *ScopeConfig) (*ScopeConfig, error) { - return doPost[ScopeConfig](c, fmt.Sprintf("/plugins/%s/connections/%d/scope-configs", plugin, connID), cfg) -} - -// ListScopeConfigs returns all scope configs for a plugin connection. -func (c *Client) ListScopeConfigs(plugin string, connID int) ([]ScopeConfig, error) { - result, err := doGet[[]ScopeConfig](c, fmt.Sprintf("/plugins/%s/connections/%d/scope-configs", plugin, connID)) - if err != nil { - return nil, err - } - return *result, nil -} - -// PutScopes batch-upserts scopes for a plugin connection. -func (c *Client) PutScopes(plugin string, connID int, req *ScopeBatchRequest) error { - _, err := doPut[json.RawMessage](c, fmt.Sprintf("/plugins/%s/connections/%d/scopes", plugin, connID), req) - return err -} - -// ListScopes returns the scopes configured on a plugin connection. -func (c *Client) ListScopes(plugin string, connID int) (*ScopeListResponse, error) { - return doGet[ScopeListResponse](c, fmt.Sprintf("/plugins/%s/connections/%d/scopes?pageSize=100&page=1", plugin, connID)) -} - -// ListProjects returns all DevLake projects. -func (c *Client) ListProjects() ([]Project, error) { - result, err := doGet[ProjectListResponse](c, "/projects") - if err != nil { - return nil, err - } - return result.Projects, nil -} - -// DeleteProject deletes a project by name. -func (c *Client) DeleteProject(name string) error { - url := fmt.Sprintf("%s/projects/%s", c.BaseURL, name) - req, err := http.NewRequest(http.MethodDelete, url, nil) - if err != nil { - return err - } - resp, err := c.HTTPClient.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("reading response: %w", err) - } - if resp.StatusCode == http.StatusNotFound { - return fmt.Errorf("project not found: %s", name) - } - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { - return fmt.Errorf("DELETE /projects/%s returned %d: %s", name, resp.StatusCode, body) - } - return nil -} - -// DeleteScope removes a scope from a plugin connection. -func (c *Client) DeleteScope(plugin string, connID int, scopeID string) error { - url := fmt.Sprintf("%s/plugins/%s/connections/%d/scopes/%s", c.BaseURL, plugin, connID, url.PathEscape(scopeID)) - req, err := http.NewRequest(http.MethodDelete, url, nil) - if err != nil { - return err - } - resp, err := c.HTTPClient.Do(req) - if err != nil { - return err - } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("reading response: %w", err) - } - if resp.StatusCode == http.StatusNotFound { - return fmt.Errorf("scope not found: plugin=%s connID=%d scopeID=%s", plugin, connID, scopeID) - } - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { - return fmt.Errorf("DELETE /plugins/%s/connections/%d/scopes/%s returned %d: %s", plugin, connID, scopeID, resp.StatusCode, body) - } - return nil -} - -// CreateProject creates a new DevLake project. -func (c *Client) CreateProject(project *Project) (*Project, error) { - return doPost[Project](c, "/projects", project) -} - -// GetProject retrieves a project by name. -func (c *Client) GetProject(name string) (*Project, error) { - return doGet[Project](c, fmt.Sprintf("/projects/%s", name)) -} - -// PatchBlueprint updates a blueprint by ID. -func (c *Client) PatchBlueprint(id int, patch *BlueprintPatch) (*Blueprint, error) { - return doPatch[Blueprint](c, fmt.Sprintf("/blueprints/%d", id), patch) -} - -// TriggerBlueprint triggers a blueprint to run and returns the pipeline. -func (c *Client) TriggerBlueprint(id int) (*Pipeline, error) { - return doPost[Pipeline](c, fmt.Sprintf("/blueprints/%d/trigger", id), struct{}{}) -} - -// GetPipeline retrieves a pipeline by ID. -func (c *Client) GetPipeline(id int) (*Pipeline, error) { - return doGet[Pipeline](c, fmt.Sprintf("/pipelines/%d", id)) -} - -// ListRemoteScopes queries the DevLake remote-scope API for a plugin connection. -// groupID and pageToken are optional (pass "" to omit). -func (c *Client) ListRemoteScopes(plugin string, connID int, groupID, pageToken string) (*RemoteScopeResponse, error) { - path := fmt.Sprintf("/plugins/%s/connections/%d/remote-scopes", plugin, connID) - q := url.Values{} - if groupID != "" { - q.Set("groupId", groupID) - } - if pageToken != "" { - q.Set("pageToken", pageToken) - } - if len(q) > 0 { - path += "?" + q.Encode() - } - return doGet[RemoteScopeResponse](c, path) -} - -// SearchRemoteScopes queries the DevLake search-remote-scopes API for a plugin connection. -// page and pageSize control pagination; pass 0 to use DevLake defaults. -func (c *Client) SearchRemoteScopes(plugin string, connID int, search string, page, pageSize int) (*RemoteScopeResponse, error) { - path := fmt.Sprintf("/plugins/%s/connections/%d/search-remote-scopes", plugin, connID) - q := url.Values{} - if search != "" { - q.Set("search", search) - } - if page > 0 { - q.Set("page", fmt.Sprintf("%d", page)) - } - if pageSize > 0 { - q.Set("pageSize", fmt.Sprintf("%d", pageSize)) - } - if len(q) > 0 { - path += "?" + q.Encode() - } - return doGet[RemoteScopeResponse](c, path) -} - -// TriggerMigration triggers the DevLake database migration endpoint. -func (c *Client) TriggerMigration() error { - resp, err := c.HTTPClient.Get(c.BaseURL + "/proceed-db-migration") - if err != nil { - return err - } - resp.Body.Close() - return nil -} - -// PipelineListResponse is the response from GET /pipelines. -type PipelineListResponse struct { - Pipelines []Pipeline `json:"pipelines"` - Count int64 `json:"count"` -} - -// ListPipelines returns pipelines with optional query parameters. -// status can be empty, "TASK_CREATED", "TASK_RUNNING", "TASK_COMPLETED", "TASK_FAILED", etc. -// blueprintID filters by blueprint (0 = no filter). -// page and pageSize control pagination (0 = use defaults). -func (c *Client) ListPipelines(status string, blueprintID, page, pageSize int) (*PipelineListResponse, error) { - path := "/pipelines" - q := url.Values{} - if status != "" { - q.Set("status", status) - } - if blueprintID > 0 { - q.Set("blueprint_id", fmt.Sprintf("%d", blueprintID)) - } - if page > 0 { - q.Set("page", fmt.Sprintf("%d", page)) - } - if pageSize > 0 { - q.Set("pagesize", fmt.Sprintf("%d", pageSize)) - } - if len(q) > 0 { - path += "?" + q.Encode() - } - return doGet[PipelineListResponse](c, path) -} +// Package devlake provides an HTTP client for the DevLake REST API. +package devlake + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" +) + +// Client wraps HTTP calls to the DevLake backend API. +type Client struct { + BaseURL string + HTTPClient *http.Client +} + +// NewClient creates a Client for the given base URL. +func NewClient(baseURL string) *Client { + return NewClientWithTimeout(baseURL, 90*time.Second) +} + +// NewClientWithTimeout creates a Client for the given base URL and HTTP timeout. +func NewClientWithTimeout(baseURL string, timeout time.Duration) *Client { + return &Client{ + BaseURL: baseURL, + HTTPClient: &http.Client{ + Timeout: timeout, + }, + } +} + +// Ping checks if the DevLake backend is reachable. +func (c *Client) Ping() error { + resp, err := c.HTTPClient.Get(c.BaseURL + "/ping") + if err != nil { + return fmt.Errorf("cannot reach DevLake at %s/ping: %w", c.BaseURL, err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("DevLake returned status %d from /ping", resp.StatusCode) + } + return nil +} + +// Connection represents a DevLake plugin connection. +type Connection struct { + ID int `json:"id"` + Name string `json:"name"` + Endpoint string `json:"endpoint,omitempty"` + Proxy string `json:"proxy,omitempty"` + Token string `json:"token,omitempty"` + Organization string `json:"organization,omitempty"` + Enterprise string `json:"enterprise,omitempty"` +} + +// ConnectionUpdateRequest is the payload for PATCH /plugins/{plugin}/connections/{id}. +// Fields with omitempty are only included in the request when non-empty, +// enabling sparse updates (only changed fields are sent). +type ConnectionUpdateRequest struct { + Name string `json:"name,omitempty"` + Endpoint string `json:"endpoint,omitempty"` + Proxy string `json:"proxy,omitempty"` + AuthMethod string `json:"authMethod,omitempty"` + Token string `json:"token,omitempty"` + Organization string `json:"organization,omitempty"` + Enterprise string `json:"enterprise,omitempty"` +} + +// ConnectionCreateRequest is the payload for creating a plugin connection. +type ConnectionCreateRequest struct { + Name string `json:"name"` + Endpoint string `json:"endpoint"` + Proxy string `json:"proxy,omitempty"` + AuthMethod string `json:"authMethod"` + Token string `json:"token,omitempty"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` + EnableGraphql bool `json:"enableGraphql,omitempty"` + RateLimitPerHour int `json:"rateLimitPerHour"` + Organization string `json:"organization,omitempty"` + Enterprise string `json:"enterprise,omitempty"` + TokenExpiresAt string `json:"tokenExpiresAt,omitempty"` + RefreshTokenExpiresAt string `json:"refreshTokenExpiresAt,omitempty"` +} + +// ConnectionTestRequest is the payload for testing a connection before creating. +type ConnectionTestRequest struct { + Name string `json:"name"` + Endpoint string `json:"endpoint"` + AuthMethod string `json:"authMethod"` + Token string `json:"token,omitempty"` + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` + EnableGraphql bool `json:"enableGraphql,omitempty"` + RateLimitPerHour int `json:"rateLimitPerHour"` + Proxy string `json:"proxy"` + Organization string `json:"organization,omitempty"` + Enterprise string `json:"enterprise,omitempty"` +} + +// ConnectionTestResult is the response from testing a connection. +type ConnectionTestResult struct { + Success bool `json:"success"` + Message string `json:"message"` +} + +// ListConnections returns all connections for a plugin (e.g. "github", "gh-copilot"). +func (c *Client) ListConnections(plugin string) ([]Connection, error) { + resp, err := c.HTTPClient.Get(fmt.Sprintf("%s/plugins/%s/connections", c.BaseURL, plugin)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading response: %w", err) + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("list connections returned %d: %s", resp.StatusCode, body) + } + + var conns []Connection + if err := json.Unmarshal(body, &conns); err != nil { + return nil, err + } + return conns, nil +} + +// FindConnectionByName returns the first connection matching the given name, or nil. +func (c *Client) FindConnectionByName(plugin, name string) (*Connection, error) { + conns, err := c.ListConnections(plugin) + if err != nil { + return nil, err + } + for _, conn := range conns { + if conn.Name == name { + return &conn, nil + } + } + return nil, nil +} + +// TestConnection tests connection parameters before creating. +func (c *Client) TestConnection(plugin string, req *ConnectionTestRequest) (*ConnectionTestResult, error) { + return doPost[ConnectionTestResult](c, fmt.Sprintf("/plugins/%s/test", plugin), req) +} + +// CreateConnection creates a new connection for the given plugin. +func (c *Client) CreateConnection(plugin string, req *ConnectionCreateRequest) (*Connection, error) { + return doPost[Connection](c, fmt.Sprintf("/plugins/%s/connections", plugin), req) +} + +// DeleteConnection deletes a plugin connection by ID. +func (c *Client) DeleteConnection(plugin string, connID int) error { + url := fmt.Sprintf("%s/plugins/%s/connections/%d", c.BaseURL, plugin, connID) + req, err := http.NewRequest(http.MethodDelete, url, nil) + if err != nil { + return err + } + resp, err := c.HTTPClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("reading response: %w", err) + } + if resp.StatusCode == http.StatusNotFound { + return fmt.Errorf("connection not found: plugin=%s id=%d", plugin, connID) + } + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("DELETE /plugins/%s/connections/%d returned %d: %s", plugin, connID, resp.StatusCode, body) + } + return nil +} + +// TestSavedConnection tests an already-created connection by ID. +func (c *Client) TestSavedConnection(plugin string, connID int) (*ConnectionTestResult, error) { + url := fmt.Sprintf("%s/plugins/%s/connections/%d/test", c.BaseURL, plugin, connID) + + reqBody := bytes.NewBufferString("{}") + resp, err := c.HTTPClient.Post(url, "application/json", reqBody) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading response: %w", err) + } + var result ConnectionTestResult + if err := json.Unmarshal(body, &result); err != nil { + // Non-JSON response is ok — treat as success if status 200 + if resp.StatusCode == http.StatusOK { + return &ConnectionTestResult{Success: true}, nil + } + return nil, fmt.Errorf("test connection returned %d: %s", resp.StatusCode, body) + } + return &result, nil +} + +// GetConnection retrieves a single connection by plugin and ID. +func (c *Client) GetConnection(plugin string, connID int) (*Connection, error) { + return doGet[Connection](c, fmt.Sprintf("/plugins/%s/connections/%d", plugin, connID)) +} + +// UpdateConnection patches an existing connection for the given plugin. +func (c *Client) UpdateConnection(plugin string, connID int, req *ConnectionUpdateRequest) (*Connection, error) { + return doPatch[Connection](c, fmt.Sprintf("/plugins/%s/connections/%d", plugin, connID), req) +} + +// HealthStatus represents the response from /health or /ping. +type HealthStatus struct { + Status string `json:"status"` +} + +// Health returns the DevLake health status. +func (c *Client) Health() (*HealthStatus, error) { + resp, err := c.HTTPClient.Get(c.BaseURL + "/ping") + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading response: %w", err) + } + var hs HealthStatus + _ = json.Unmarshal(body, &hs) + if resp.StatusCode == http.StatusOK { + if hs.Status == "" { + hs.Status = "ok" + } + return &hs, nil + } + return nil, fmt.Errorf("health check returned %d: %s", resp.StatusCode, body) +} + +// doPost is a generic helper for POST requests that return JSON. +func doPost[T any](c *Client, path string, payload any) (*T, error) { + jsonBody, err := json.Marshal(payload) + if err != nil { + return nil, err + } + + url := c.BaseURL + path + resp, err := c.HTTPClient.Post(url, "application/json", bytes.NewReader(jsonBody)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading response: %w", err) + } + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + return nil, fmt.Errorf("POST %s returned %d: %s", path, resp.StatusCode, body) + } + + var result T + if err := json.Unmarshal(body, &result); err != nil { + return nil, err + } + return &result, nil +} + +// doGet is a generic helper for GET requests that return JSON. +func doGet[T any](c *Client, path string) (*T, error) { + resp, err := c.HTTPClient.Get(c.BaseURL + path) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading response: %w", err) + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("GET %s returned %d: %s", path, resp.StatusCode, body) + } + + var result T + if err := json.Unmarshal(body, &result); err != nil { + return nil, err + } + return &result, nil +} + +// doGetNoBody is a helper for GET requests that only need a successful 2xx status. +func doGetNoBody(c *Client, path string) error { + resp, err := c.HTTPClient.Get(c.BaseURL + path) + if err != nil { + if resp != nil { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + } + return fmt.Errorf("GET %s: %w", path, err) + } + + drainAndClose := func() { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + } + + if resp.StatusCode >= http.StatusOK && resp.StatusCode < http.StatusMultipleChoices { + drainAndClose() + return nil + } + + body, err := io.ReadAll(io.LimitReader(resp.Body, 512)) + if err != nil { + drainAndClose() + return fmt.Errorf("GET %s: reading response: %w", path, err) + } + drainAndClose() + + bodyText := strings.TrimSpace(string(body)) + if bodyText != "" { + return fmt.Errorf("GET %s: DevLake returned %s: %s", path, resp.Status, bodyText) + } + return fmt.Errorf("GET %s: DevLake returned %s", path, resp.Status) +} + +// doPut is a generic helper for PUT requests that return JSON. +func doPut[T any](c *Client, path string, payload any) (*T, error) { + jsonBody, err := json.Marshal(payload) + if err != nil { + return nil, err + } + + req, err := http.NewRequest(http.MethodPut, c.BaseURL+path, bytes.NewReader(jsonBody)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading response: %w", err) + } + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + return nil, fmt.Errorf("PUT %s returned %d: %s", path, resp.StatusCode, body) + } + + var result T + if err := json.Unmarshal(body, &result); err != nil { + return nil, err + } + return &result, nil +} + +// doPatch is a generic helper for PATCH requests that return JSON. +func doPatch[T any](c *Client, path string, payload any) (*T, error) { + jsonBody, err := json.Marshal(payload) + if err != nil { + return nil, err + } + + req, err := http.NewRequest(http.MethodPatch, c.BaseURL+path, bytes.NewReader(jsonBody)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading response: %w", err) + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("PATCH %s returned %d: %s", path, resp.StatusCode, body) + } + + var result T + if err := json.Unmarshal(body, &result); err != nil { + return nil, err + } + return &result, nil +} + +// CreateScopeConfig creates a scope config for a plugin connection. +func (c *Client) CreateScopeConfig(plugin string, connID int, cfg *ScopeConfig) (*ScopeConfig, error) { + return doPost[ScopeConfig](c, fmt.Sprintf("/plugins/%s/connections/%d/scope-configs", plugin, connID), cfg) +} + +// ListScopeConfigs returns all scope configs for a plugin connection. +func (c *Client) ListScopeConfigs(plugin string, connID int) ([]ScopeConfig, error) { + result, err := doGet[[]ScopeConfig](c, fmt.Sprintf("/plugins/%s/connections/%d/scope-configs", plugin, connID)) + if err != nil { + return nil, err + } + return *result, nil +} + +// PutScopes batch-upserts scopes for a plugin connection. +func (c *Client) PutScopes(plugin string, connID int, req *ScopeBatchRequest) error { + _, err := doPut[json.RawMessage](c, fmt.Sprintf("/plugins/%s/connections/%d/scopes", plugin, connID), req) + return err +} + +// ListScopes returns the scopes configured on a plugin connection. +func (c *Client) ListScopes(plugin string, connID int) (*ScopeListResponse, error) { + return doGet[ScopeListResponse](c, fmt.Sprintf("/plugins/%s/connections/%d/scopes?pageSize=100&page=1", plugin, connID)) +} + +// ListProjects returns all DevLake projects. +func (c *Client) ListProjects() ([]Project, error) { + result, err := doGet[ProjectListResponse](c, "/projects") + if err != nil { + return nil, err + } + return result.Projects, nil +} + +// DeleteProject deletes a project by name. +func (c *Client) DeleteProject(name string) error { + url := fmt.Sprintf("%s/projects/%s", c.BaseURL, name) + req, err := http.NewRequest(http.MethodDelete, url, nil) + if err != nil { + return err + } + resp, err := c.HTTPClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("reading response: %w", err) + } + if resp.StatusCode == http.StatusNotFound { + return fmt.Errorf("project not found: %s", name) + } + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("DELETE /projects/%s returned %d: %s", name, resp.StatusCode, body) + } + return nil +} + +// DeleteScope removes a scope from a plugin connection. +func (c *Client) DeleteScope(plugin string, connID int, scopeID string) error { + url := fmt.Sprintf("%s/plugins/%s/connections/%d/scopes/%s", c.BaseURL, plugin, connID, url.PathEscape(scopeID)) + req, err := http.NewRequest(http.MethodDelete, url, nil) + if err != nil { + return err + } + resp, err := c.HTTPClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("reading response: %w", err) + } + if resp.StatusCode == http.StatusNotFound { + return fmt.Errorf("scope not found: plugin=%s connID=%d scopeID=%s", plugin, connID, scopeID) + } + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("DELETE /plugins/%s/connections/%d/scopes/%s returned %d: %s", plugin, connID, scopeID, resp.StatusCode, body) + } + return nil +} + +// CreateProject creates a new DevLake project. +func (c *Client) CreateProject(project *Project) (*Project, error) { + return doPost[Project](c, "/projects", project) +} + +// GetProject retrieves a project by name. +func (c *Client) GetProject(name string) (*Project, error) { + return doGet[Project](c, fmt.Sprintf("/projects/%s", name)) +} + +// PatchBlueprint updates a blueprint by ID. +func (c *Client) PatchBlueprint(id int, patch *BlueprintPatch) (*Blueprint, error) { + return doPatch[Blueprint](c, fmt.Sprintf("/blueprints/%d", id), patch) +} + +// TriggerBlueprint triggers a blueprint to run and returns the pipeline. +func (c *Client) TriggerBlueprint(id int) (*Pipeline, error) { + return doPost[Pipeline](c, fmt.Sprintf("/blueprints/%d/trigger", id), struct{}{}) +} + +// GetPipeline retrieves a pipeline by ID. +func (c *Client) GetPipeline(id int) (*Pipeline, error) { + return doGet[Pipeline](c, fmt.Sprintf("/pipelines/%d", id)) +} + +// ListRemoteScopes queries the DevLake remote-scope API for a plugin connection. +// groupID and pageToken are optional (pass "" to omit). +func (c *Client) ListRemoteScopes(plugin string, connID int, groupID, pageToken string) (*RemoteScopeResponse, error) { + path := fmt.Sprintf("/plugins/%s/connections/%d/remote-scopes", plugin, connID) + q := url.Values{} + if groupID != "" { + q.Set("groupId", groupID) + } + if pageToken != "" { + q.Set("pageToken", pageToken) + } + if len(q) > 0 { + path += "?" + q.Encode() + } + return doGet[RemoteScopeResponse](c, path) +} + +// SearchRemoteScopes queries the DevLake search-remote-scopes API for a plugin connection. +// page and pageSize control pagination; pass 0 to use DevLake defaults. +func (c *Client) SearchRemoteScopes(plugin string, connID int, search string, page, pageSize int) (*RemoteScopeResponse, error) { + path := fmt.Sprintf("/plugins/%s/connections/%d/search-remote-scopes", plugin, connID) + q := url.Values{} + if search != "" { + q.Set("search", search) + } + if page > 0 { + q.Set("page", fmt.Sprintf("%d", page)) + } + if pageSize > 0 { + q.Set("pageSize", fmt.Sprintf("%d", pageSize)) + } + if len(q) > 0 { + path += "?" + q.Encode() + } + return doGet[RemoteScopeResponse](c, path) +} + +// TriggerMigration triggers the DevLake database migration endpoint. +func (c *Client) TriggerMigration() error { + return doGetNoBody(c, "/proceed-db-migration") +} + +// PipelineListResponse is the response from GET /pipelines. +type PipelineListResponse struct { + Pipelines []Pipeline `json:"pipelines"` + Count int64 `json:"count"` +} + +// ListPipelines returns pipelines with optional query parameters. +// status can be empty, "TASK_CREATED", "TASK_RUNNING", "TASK_COMPLETED", "TASK_FAILED", etc. +// blueprintID filters by blueprint (0 = no filter). +// page and pageSize control pagination (0 = use defaults). +func (c *Client) ListPipelines(status string, blueprintID, page, pageSize int) (*PipelineListResponse, error) { + path := "/pipelines" + q := url.Values{} + if status != "" { + q.Set("status", status) + } + if blueprintID > 0 { + q.Set("blueprint_id", fmt.Sprintf("%d", blueprintID)) + } + if page > 0 { + q.Set("page", fmt.Sprintf("%d", page)) + } + if pageSize > 0 { + q.Set("pagesize", fmt.Sprintf("%d", pageSize)) + } + if len(q) > 0 { + path += "?" + q.Encode() + } + return doGet[PipelineListResponse](c, path) +} diff --git a/internal/devlake/client_test.go b/internal/devlake/client_test.go index ea5efec..870b435 100644 --- a/internal/devlake/client_test.go +++ b/internal/devlake/client_test.go @@ -2,9 +2,11 @@ package devlake import ( "encoding/json" + "net" "net/http" "net/http/httptest" "strings" + "sync" "testing" ) @@ -909,6 +911,142 @@ func TestHealth(t *testing.T) { } } +func TestTriggerMigration(t *testing.T) { + tests := []struct { + name string + statusCode int + body string + wantErr bool + wantErrText string + }{ + { + name: "success", + statusCode: http.StatusOK, + }, + { + name: "no content", + statusCode: http.StatusNoContent, + }, + { + name: "server error with body", + statusCode: http.StatusServiceUnavailable, + body: "warming up", + wantErr: true, + wantErrText: "GET /proceed-db-migration: DevLake returned 503 Service Unavailable: warming up", + }, + { + name: "server error without body", + statusCode: http.StatusBadGateway, + wantErr: true, + wantErrText: "GET /proceed-db-migration: DevLake returned 502 Bad Gateway", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/proceed-db-migration" { + t.Errorf("path = %s, want /proceed-db-migration", r.URL.Path) + } + w.WriteHeader(tt.statusCode) + _, _ = w.Write([]byte(tt.body)) + })) + defer srv.Close() + + client := NewClient(srv.URL) + err := client.TriggerMigration() + + if tt.wantErr { + if err == nil { + t.Fatal("expected error, got nil") + } + if err.Error() != tt.wantErrText { + t.Fatalf("error = %q, want %q", err.Error(), tt.wantErrText) + } + return + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + } +} + +func TestTriggerMigration_DrainsBodyForConnectionReuse(t *testing.T) { + t.Run("success body", func(t *testing.T) { + testTriggerMigrationConnectionReuse(t, http.StatusOK) + }) + t.Run("error body", func(t *testing.T) { + testTriggerMigrationConnectionReuse(t, http.StatusServiceUnavailable) + }) +} + +func testTriggerMigrationConnectionReuse(t *testing.T, firstStatus int) { + t.Helper() + + var ( + mu sync.Mutex + callCount int + newConnCount int + unexpectedCallCount int + ) + + largeBody := strings.Repeat("x", 2048) + srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + callCount++ + currentCall := callCount + mu.Unlock() + + switch currentCall { + case 1: + w.WriteHeader(firstStatus) + _, _ = w.Write([]byte(largeBody)) + case 2: + w.WriteHeader(http.StatusNoContent) + default: + mu.Lock() + unexpectedCallCount = currentCall + mu.Unlock() + http.Error(w, "unexpected call", http.StatusInternalServerError) + } + })) + srv.Config.ConnState = func(_ net.Conn, state http.ConnState) { + if state != http.StateNew { + return + } + mu.Lock() + newConnCount++ + mu.Unlock() + } + srv.Start() + defer srv.Close() + + client := NewClient(srv.URL) + + firstErr := client.TriggerMigration() + if firstStatus >= http.StatusOK && firstStatus < http.StatusMultipleChoices { + if firstErr != nil { + t.Fatalf("first TriggerMigration() error = %v, want nil", firstErr) + } + } else if firstErr == nil { + t.Fatal("first TriggerMigration() error = nil, want non-nil") + } + + if err := client.TriggerMigration(); err != nil { + t.Fatalf("second TriggerMigration() error = %v, want nil", err) + } + + mu.Lock() + defer mu.Unlock() + if unexpectedCallCount != 0 { + t.Fatalf("unexpected call count: %d", unexpectedCallCount) + } + if newConnCount != 1 { + t.Fatalf("newConnCount = %d, want 1", newConnCount) + } +} + // TestTestSavedConnection tests the TestSavedConnection method. func TestTestSavedConnection(t *testing.T) { tests := []struct {