From 07e11ecf49f5375cdb83c6bb1d9bf8f9c135dad1 Mon Sep 17 00:00:00 2001 From: Anderson Ribeiro Date: Mon, 6 Mar 2023 23:28:51 +0000 Subject: [PATCH 1/4] notify items if an error occurs in bulk indexer --- esutil/bulk_indexer.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/esutil/bulk_indexer.go b/esutil/bulk_indexer.go index 5ed206e9f6..d6e592077c 100644 --- a/esutil/bulk_indexer.go +++ b/esutil/bulk_indexer.go @@ -573,7 +573,8 @@ func (w *worker) flush(ctx context.Context) error { if w.bi.config.OnError != nil { w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err)) } - return fmt.Errorf("flush: %s", err) + w.notifyItemsOnError(ctx, err) + return fmt.Errorf("flush: %s", fmt.Errorf("flush: %s", err)) } if res.Body != nil { defer res.Body.Close() @@ -584,6 +585,7 @@ func (w *worker) flush(ctx context.Context) error { if w.bi.config.OnError != nil { w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", res.String())) } + w.notifyItemsOnError(ctx, err) return fmt.Errorf("flush: %s", res.String()) } @@ -592,6 +594,7 @@ func (w *worker) flush(ctx context.Context) error { if w.bi.config.OnError != nil { w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err)) } + w.notifyItemsOnError(ctx, err) return fmt.Errorf("flush: error parsing response body: %s", err) } @@ -638,6 +641,14 @@ func (w *worker) flush(ctx context.Context) error { return err } +func (w *worker) notifyItemsOnError(ctx context.Context, err error) { + for _, item := range w.items { + if item.OnFailure != nil { + item.OnFailure(ctx, item, BulkIndexerResponseItem{}, fmt.Errorf("flush: %w", err)) + } + } +} + type defaultJSONDecoder struct{} func (d defaultJSONDecoder) UnmarshalFromReader(r io.Reader, blk *BulkIndexerResponse) error { From 9811779be56ed25c44e6e1f8374160e9eff0fd5d Mon Sep 17 00:00:00 2001 From: Anderson Ribeiro Date: Mon, 6 Mar 2023 23:48:16 +0000 Subject: [PATCH 2/4] fix message error on line 577 --- esutil/bulk_indexer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/esutil/bulk_indexer.go b/esutil/bulk_indexer.go index d6e592077c..909492f669 100644 --- a/esutil/bulk_indexer.go +++ b/esutil/bulk_indexer.go @@ -574,7 +574,7 @@ func (w *worker) flush(ctx context.Context) error { w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err)) } w.notifyItemsOnError(ctx, err) - return fmt.Errorf("flush: %s", fmt.Errorf("flush: %s", err)) + return fmt.Errorf("flush: %s", err) } if res.Body != nil { defer res.Body.Close() From be6fc9630b3be400557d898094d78e62abca8ef1 Mon Sep 17 00:00:00 2001 From: Matt Devy Date: Wed, 12 Nov 2025 16:20:19 +0000 Subject: [PATCH 3/4] test: OnFailure is called per item --- esutil/bulk_indexer_internal_test.go | 128 ++++++++++++++++++++++++++- 1 file changed, 127 insertions(+), 1 deletion(-) diff --git a/esutil/bulk_indexer_internal_test.go b/esutil/bulk_indexer_internal_test.go index da3a4463bc..be8ae7427e 100644 --- a/esutil/bulk_indexer_internal_test.go +++ b/esutil/bulk_indexer_internal_test.go @@ -572,6 +572,127 @@ func TestBulkIndexer(t *testing.T) { } }) + t.Run("TooManyRequests - Fail", func(t *testing.T) { + var ( + wg sync.WaitGroup + numItems = 2 + ) + + esCfg := elasticsearch.Config{ + Transport: &mockTransport{ + RoundTripFunc: func(*http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusTooManyRequests, + Status: "429 TooManyRequests", + Body: io.NopCloser(strings.NewReader(`{"took":1}`)), + }, nil + }, + }, + + MaxRetries: 5, + RetryOnStatus: []int{502, 503, 504, 429}, + RetryBackoff: func(i int) time.Duration { + if os.Getenv("DEBUG") != "" { + fmt.Printf("*** Retry #%d\n", i) + } + return time.Duration(i) * 100 * time.Millisecond + }, + } + if os.Getenv("DEBUG") != "" { + esCfg.Logger = &elastictransport.ColorLogger{Output: os.Stdout} + } + es, _ := elasticsearch.NewClient(esCfg) + + biCfg := BulkIndexerConfig{NumWorkers: 1, FlushBytes: 28 * 2, Client: es} + if os.Getenv("DEBUG") != "" { + biCfg.DebugLogger = log.New(os.Stdout, "", 0) + } + + bi, _ := NewBulkIndexer(biCfg) + + biiFailureCallbacksCalled := atomic.Uint32{} + biiSuccessCallbacksCalled := atomic.Uint32{} + + for i := 1; i <= numItems; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + err := bi.Add(context.Background(), BulkIndexerItem{ + Action: "foo", + Body: strings.NewReader(`{"title":"foo"}`), + OnFailure: func(ctx context.Context, item BulkIndexerItem, item2 BulkIndexerResponseItem, err error) { + _ = biiFailureCallbacksCalled.Add(1) + }, + OnSuccess: func(ctx context.Context, item BulkIndexerItem, item2 BulkIndexerResponseItem) { + _ = biiSuccessCallbacksCalled.Add(1) + }, + }) + if err != nil { + t.Errorf("Unexpected error: %s", err) + return + } + }(i) + } + wg.Wait() + + if err := bi.Close(context.Background()); err != nil { + t.Errorf("Unexpected error: %s", err) + } + + // BulksIndexerItem.OnFailure() callbacks are called for all items. + if biiFailureCallbacksCalled.Load() != uint32(numItems) { + t.Errorf("Unexpected NumFailedCallbacks: want=%d, got=%d", numItems, biiFailureCallbacksCalled.Load()) + } + + // BulkIndexerItem.OnSuccess() callbacks are not called. + if biiSuccessCallbacksCalled.Load() != 0 { + t.Errorf("Unexpected NumSuccessCallbacks: want=%d, got=%d", 0, biiSuccessCallbacksCalled.Load()) + } + }) + + t.Run("JSON Decoder Failure", func(t *testing.T) { + es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{}}) + + biFailureCallbacksCalled := atomic.Uint32{} + bi, _ := NewBulkIndexer(BulkIndexerConfig{ + Client: es, + Decoder: customJSONDecoder{ + err: fmt.Errorf("Custom JSON decoder error"), + }, + OnError: func(ctx context.Context, err error) { + _ = biFailureCallbacksCalled.Add(1) + }, + }) + + biiFailureCallbacksCalled := atomic.Uint32{} + + err := bi.Add(context.Background(), BulkIndexerItem{ + Action: "index", + DocumentID: "1", + Body: strings.NewReader(`{"title":"foo"}`), + OnFailure: func(ctx context.Context, item BulkIndexerItem, item2 BulkIndexerResponseItem, err error) { + _ = biiFailureCallbacksCalled.Add(1) + }, + }) + if err != nil { + t.Fatalf("Unexpected error, got %s", err) + } + + if err := bi.Close(context.Background()); err != nil { + t.Errorf("Unexpected error: %s", err) + } + + // BulksIndexerItem.OnFailure() callbacks are called only for failed items. + if biiFailureCallbacksCalled.Load() != 1 { + t.Errorf("Unexpected NumFailedCallbacks: want=%d, got=%d", 1, biiFailureCallbacksCalled.Load()) + } + + // BulkIndexer.OnError() callbacks are called for all errors. + if biFailureCallbacksCalled.Load() != 2 { + t.Errorf("Unexpected NumFailedCallbacks: want=%d, got=%d", 2, biFailureCallbacksCalled.Load()) + } + }) + t.Run("Custom JSON Decoder", func(t *testing.T) { es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{}}) bi, _ := NewBulkIndexer(BulkIndexerConfig{Client: es, Decoder: customJSONDecoder{}}) @@ -926,8 +1047,13 @@ func TestBulkIndexerItem(t *testing.T) { }) } -type customJSONDecoder struct{} +type customJSONDecoder struct { + err error +} func (d customJSONDecoder) UnmarshalFromReader(r io.Reader, blk *BulkIndexerResponse) error { + if d.err != nil { + return d.err + } return json.NewDecoder(r).Decode(blk) } From 0b48a8907c7ec4be1e3663038bbf6de397f2073f Mon Sep 17 00:00:00 2001 From: Matt Devy Date: Thu, 13 Nov 2025 13:12:11 +0000 Subject: [PATCH 4/4] refactor: use a method to handle errors * makes code DRYer * fixes bug where `res.IsError()` resulted in BulkIndexerItem.OnError receiving a nil `err` --- esutil/bulk_indexer.go | 33 ++++++++++++++-------------- esutil/bulk_indexer_internal_test.go | 6 +++++ 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/esutil/bulk_indexer.go b/esutil/bulk_indexer.go index 909492f669..03453d9cfe 100644 --- a/esutil/bulk_indexer.go +++ b/esutil/bulk_indexer.go @@ -570,32 +570,24 @@ func (w *worker) flush(ctx context.Context) error { res, err := req.Do(ctx, w.bi.config.Client) if err != nil { atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items))) - if w.bi.config.OnError != nil { - w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err)) - } - w.notifyItemsOnError(ctx, err) - return fmt.Errorf("flush: %s", err) + err := fmt.Errorf("flush: %w", err) + w.handleError(ctx, err) + return err } if res.Body != nil { defer res.Body.Close() } if res.IsError() { atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items))) - // TODO(karmi): Wrap error (include response struct) - if w.bi.config.OnError != nil { - w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", res.String())) - } - w.notifyItemsOnError(ctx, err) - return fmt.Errorf("flush: %s", res.String()) + err := fmt.Errorf("flush: %s", res.String()) + w.handleError(ctx, err) + return err } if err := w.bi.config.Decoder.UnmarshalFromReader(res.Body, &blk); err != nil { // TODO(karmi): Wrap error (include response struct) - if w.bi.config.OnError != nil { - w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err)) - } - w.notifyItemsOnError(ctx, err) - return fmt.Errorf("flush: error parsing response body: %s", err) + w.handleError(ctx, fmt.Errorf("flush: %w", err)) + return fmt.Errorf("flush: error parsing response body: %w", err) } for i, blkItem := range blk.Items { @@ -644,11 +636,18 @@ func (w *worker) flush(ctx context.Context) error { func (w *worker) notifyItemsOnError(ctx context.Context, err error) { for _, item := range w.items { if item.OnFailure != nil { - item.OnFailure(ctx, item, BulkIndexerResponseItem{}, fmt.Errorf("flush: %w", err)) + item.OnFailure(ctx, item, BulkIndexerResponseItem{}, err) } } } +func (w *worker) handleError(ctx context.Context, err error) { + if w.bi.config.OnError != nil { + w.bi.config.OnError(ctx, err) + } + w.notifyItemsOnError(ctx, err) +} + type defaultJSONDecoder struct{} func (d defaultJSONDecoder) UnmarshalFromReader(r io.Reader, blk *BulkIndexerResponse) error { diff --git a/esutil/bulk_indexer_internal_test.go b/esutil/bulk_indexer_internal_test.go index be8ae7427e..9f4d81970d 100644 --- a/esutil/bulk_indexer_internal_test.go +++ b/esutil/bulk_indexer_internal_test.go @@ -622,6 +622,9 @@ func TestBulkIndexer(t *testing.T) { Body: strings.NewReader(`{"title":"foo"}`), OnFailure: func(ctx context.Context, item BulkIndexerItem, item2 BulkIndexerResponseItem, err error) { _ = biiFailureCallbacksCalled.Add(1) + if err == nil { + t.Errorf("Unexpected nil error in BulkIndexerItem.OnFailure callback") + } }, OnSuccess: func(ctx context.Context, item BulkIndexerItem, item2 BulkIndexerResponseItem) { _ = biiSuccessCallbacksCalled.Add(1) @@ -672,6 +675,9 @@ func TestBulkIndexer(t *testing.T) { Body: strings.NewReader(`{"title":"foo"}`), OnFailure: func(ctx context.Context, item BulkIndexerItem, item2 BulkIndexerResponseItem, err error) { _ = biiFailureCallbacksCalled.Add(1) + if err == nil { + t.Errorf("Unexpected nil error in BulkIndexerItem.OnFailure callback") + } }, }) if err != nil {