diff --git a/esutil/bulk_indexer.go b/esutil/bulk_indexer.go index 5ed206e9f6..03453d9cfe 100644 --- a/esutil/bulk_indexer.go +++ b/esutil/bulk_indexer.go @@ -570,29 +570,24 @@ func (w *worker) flush(ctx context.Context) error { res, err := req.Do(ctx, w.bi.config.Client) if err != nil { atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items))) - if w.bi.config.OnError != nil { - w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err)) - } - return fmt.Errorf("flush: %s", err) + err := fmt.Errorf("flush: %w", err) + w.handleError(ctx, err) + return err } if res.Body != nil { defer res.Body.Close() } if res.IsError() { atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items))) - // TODO(karmi): Wrap error (include response struct) - if w.bi.config.OnError != nil { - w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", res.String())) - } - return fmt.Errorf("flush: %s", res.String()) + err := fmt.Errorf("flush: %s", res.String()) + w.handleError(ctx, err) + return err } if err := w.bi.config.Decoder.UnmarshalFromReader(res.Body, &blk); err != nil { // TODO(karmi): Wrap error (include response struct) - if w.bi.config.OnError != nil { - w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err)) - } - return fmt.Errorf("flush: error parsing response body: %s", err) + w.handleError(ctx, fmt.Errorf("flush: %w", err)) + return fmt.Errorf("flush: error parsing response body: %w", err) } for i, blkItem := range blk.Items { @@ -638,6 +633,21 @@ func (w *worker) flush(ctx context.Context) error { return err } +func (w *worker) notifyItemsOnError(ctx context.Context, err error) { + for _, item := range w.items { + if item.OnFailure != nil { + item.OnFailure(ctx, item, BulkIndexerResponseItem{}, err) + } + } +} + +func (w *worker) handleError(ctx context.Context, err error) { + if w.bi.config.OnError != nil { + w.bi.config.OnError(ctx, err) + } + w.notifyItemsOnError(ctx, err) +} + type defaultJSONDecoder struct{} func (d defaultJSONDecoder) UnmarshalFromReader(r io.Reader, blk *BulkIndexerResponse) error { diff --git a/esutil/bulk_indexer_internal_test.go b/esutil/bulk_indexer_internal_test.go index da3a4463bc..9f4d81970d 100644 --- a/esutil/bulk_indexer_internal_test.go +++ b/esutil/bulk_indexer_internal_test.go @@ -572,6 +572,133 @@ func TestBulkIndexer(t *testing.T) { } }) + t.Run("TooManyRequests - Fail", func(t *testing.T) { + var ( + wg sync.WaitGroup + numItems = 2 + ) + + esCfg := elasticsearch.Config{ + Transport: &mockTransport{ + RoundTripFunc: func(*http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusTooManyRequests, + Status: "429 TooManyRequests", + Body: io.NopCloser(strings.NewReader(`{"took":1}`)), + }, nil + }, + }, + + MaxRetries: 5, + RetryOnStatus: []int{502, 503, 504, 429}, + RetryBackoff: func(i int) time.Duration { + if os.Getenv("DEBUG") != "" { + fmt.Printf("*** Retry #%d\n", i) + } + return time.Duration(i) * 100 * time.Millisecond + }, + } + if os.Getenv("DEBUG") != "" { + esCfg.Logger = &elastictransport.ColorLogger{Output: os.Stdout} + } + es, _ := elasticsearch.NewClient(esCfg) + + biCfg := BulkIndexerConfig{NumWorkers: 1, FlushBytes: 28 * 2, Client: es} + if os.Getenv("DEBUG") != "" { + biCfg.DebugLogger = log.New(os.Stdout, "", 0) + } + + bi, _ := NewBulkIndexer(biCfg) + + biiFailureCallbacksCalled := atomic.Uint32{} + biiSuccessCallbacksCalled := atomic.Uint32{} + + for i := 1; i <= numItems; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + err := bi.Add(context.Background(), BulkIndexerItem{ + Action: "foo", + Body: strings.NewReader(`{"title":"foo"}`), + OnFailure: func(ctx context.Context, item BulkIndexerItem, item2 BulkIndexerResponseItem, err error) { + _ = biiFailureCallbacksCalled.Add(1) + if err == nil { + t.Errorf("Unexpected nil error in BulkIndexerItem.OnFailure callback") + } + }, + OnSuccess: func(ctx context.Context, item BulkIndexerItem, item2 BulkIndexerResponseItem) { + _ = biiSuccessCallbacksCalled.Add(1) + }, + }) + if err != nil { + t.Errorf("Unexpected error: %s", err) + return + } + }(i) + } + wg.Wait() + + if err := bi.Close(context.Background()); err != nil { + t.Errorf("Unexpected error: %s", err) + } + + // BulksIndexerItem.OnFailure() callbacks are called for all items. + if biiFailureCallbacksCalled.Load() != uint32(numItems) { + t.Errorf("Unexpected NumFailedCallbacks: want=%d, got=%d", numItems, biiFailureCallbacksCalled.Load()) + } + + // BulkIndexerItem.OnSuccess() callbacks are not called. + if biiSuccessCallbacksCalled.Load() != 0 { + t.Errorf("Unexpected NumSuccessCallbacks: want=%d, got=%d", 0, biiSuccessCallbacksCalled.Load()) + } + }) + + t.Run("JSON Decoder Failure", func(t *testing.T) { + es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{}}) + + biFailureCallbacksCalled := atomic.Uint32{} + bi, _ := NewBulkIndexer(BulkIndexerConfig{ + Client: es, + Decoder: customJSONDecoder{ + err: fmt.Errorf("Custom JSON decoder error"), + }, + OnError: func(ctx context.Context, err error) { + _ = biFailureCallbacksCalled.Add(1) + }, + }) + + biiFailureCallbacksCalled := atomic.Uint32{} + + err := bi.Add(context.Background(), BulkIndexerItem{ + Action: "index", + DocumentID: "1", + Body: strings.NewReader(`{"title":"foo"}`), + OnFailure: func(ctx context.Context, item BulkIndexerItem, item2 BulkIndexerResponseItem, err error) { + _ = biiFailureCallbacksCalled.Add(1) + if err == nil { + t.Errorf("Unexpected nil error in BulkIndexerItem.OnFailure callback") + } + }, + }) + if err != nil { + t.Fatalf("Unexpected error, got %s", err) + } + + if err := bi.Close(context.Background()); err != nil { + t.Errorf("Unexpected error: %s", err) + } + + // BulksIndexerItem.OnFailure() callbacks are called only for failed items. + if biiFailureCallbacksCalled.Load() != 1 { + t.Errorf("Unexpected NumFailedCallbacks: want=%d, got=%d", 1, biiFailureCallbacksCalled.Load()) + } + + // BulkIndexer.OnError() callbacks are called for all errors. + if biFailureCallbacksCalled.Load() != 2 { + t.Errorf("Unexpected NumFailedCallbacks: want=%d, got=%d", 2, biFailureCallbacksCalled.Load()) + } + }) + t.Run("Custom JSON Decoder", func(t *testing.T) { es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{}}) bi, _ := NewBulkIndexer(BulkIndexerConfig{Client: es, Decoder: customJSONDecoder{}}) @@ -926,8 +1053,13 @@ func TestBulkIndexerItem(t *testing.T) { }) } -type customJSONDecoder struct{} +type customJSONDecoder struct { + err error +} func (d customJSONDecoder) UnmarshalFromReader(r io.Reader, blk *BulkIndexerResponse) error { + if d.err != nil { + return d.err + } return json.NewDecoder(r).Decode(blk) }