Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions esutil/bulk_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,29 +570,24 @@ func (w *worker) flush(ctx context.Context) error {
res, err := req.Do(ctx, w.bi.config.Client)
if err != nil {
atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err))
}
return fmt.Errorf("flush: %s", err)
err := fmt.Errorf("flush: %w", err)
w.handleError(ctx, err)
return err
}
if res.Body != nil {
defer res.Body.Close()
}
if res.IsError() {
atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))
// TODO(karmi): Wrap error (include response struct)
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", res.String()))
}
return fmt.Errorf("flush: %s", res.String())
err := fmt.Errorf("flush: %s", res.String())
w.handleError(ctx, err)
return err
}

if err := w.bi.config.Decoder.UnmarshalFromReader(res.Body, &blk); err != nil {
// TODO(karmi): Wrap error (include response struct)
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err))
}
return fmt.Errorf("flush: error parsing response body: %s", err)
w.handleError(ctx, fmt.Errorf("flush: %w", err))
return fmt.Errorf("flush: error parsing response body: %w", err)
}

for i, blkItem := range blk.Items {
Expand Down Expand Up @@ -638,6 +633,21 @@ func (w *worker) flush(ctx context.Context) error {
return err
}

func (w *worker) notifyItemsOnError(ctx context.Context, err error) {
for _, item := range w.items {
if item.OnFailure != nil {
item.OnFailure(ctx, item, BulkIndexerResponseItem{}, err)
}
}
}

func (w *worker) handleError(ctx context.Context, err error) {
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, err)
}
w.notifyItemsOnError(ctx, err)
}

type defaultJSONDecoder struct{}

func (d defaultJSONDecoder) UnmarshalFromReader(r io.Reader, blk *BulkIndexerResponse) error {
Expand Down
134 changes: 133 additions & 1 deletion esutil/bulk_indexer_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,133 @@ func TestBulkIndexer(t *testing.T) {
}
})

t.Run("TooManyRequests - Fail", func(t *testing.T) {
var (
wg sync.WaitGroup
numItems = 2
)

esCfg := elasticsearch.Config{
Transport: &mockTransport{
RoundTripFunc: func(*http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusTooManyRequests,
Status: "429 TooManyRequests",
Body: io.NopCloser(strings.NewReader(`{"took":1}`)),
}, nil
},
},

MaxRetries: 5,
RetryOnStatus: []int{502, 503, 504, 429},
RetryBackoff: func(i int) time.Duration {
if os.Getenv("DEBUG") != "" {
fmt.Printf("*** Retry #%d\n", i)
}
return time.Duration(i) * 100 * time.Millisecond
},
}
if os.Getenv("DEBUG") != "" {
esCfg.Logger = &elastictransport.ColorLogger{Output: os.Stdout}
}
es, _ := elasticsearch.NewClient(esCfg)

biCfg := BulkIndexerConfig{NumWorkers: 1, FlushBytes: 28 * 2, Client: es}
if os.Getenv("DEBUG") != "" {
biCfg.DebugLogger = log.New(os.Stdout, "", 0)
}

bi, _ := NewBulkIndexer(biCfg)

biiFailureCallbacksCalled := atomic.Uint32{}
biiSuccessCallbacksCalled := atomic.Uint32{}

for i := 1; i <= numItems; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
err := bi.Add(context.Background(), BulkIndexerItem{
Action: "foo",
Body: strings.NewReader(`{"title":"foo"}`),
OnFailure: func(ctx context.Context, item BulkIndexerItem, item2 BulkIndexerResponseItem, err error) {
_ = biiFailureCallbacksCalled.Add(1)
if err == nil {
t.Errorf("Unexpected nil error in BulkIndexerItem.OnFailure callback")
}
},
OnSuccess: func(ctx context.Context, item BulkIndexerItem, item2 BulkIndexerResponseItem) {
_ = biiSuccessCallbacksCalled.Add(1)
},
})
if err != nil {
t.Errorf("Unexpected error: %s", err)
return
}
}(i)
}
wg.Wait()

if err := bi.Close(context.Background()); err != nil {
t.Errorf("Unexpected error: %s", err)
}

// BulksIndexerItem.OnFailure() callbacks are called for all items.
if biiFailureCallbacksCalled.Load() != uint32(numItems) {
t.Errorf("Unexpected NumFailedCallbacks: want=%d, got=%d", numItems, biiFailureCallbacksCalled.Load())
}

// BulkIndexerItem.OnSuccess() callbacks are not called.
if biiSuccessCallbacksCalled.Load() != 0 {
t.Errorf("Unexpected NumSuccessCallbacks: want=%d, got=%d", 0, biiSuccessCallbacksCalled.Load())
}
})

t.Run("JSON Decoder Failure", func(t *testing.T) {
es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{}})

biFailureCallbacksCalled := atomic.Uint32{}
bi, _ := NewBulkIndexer(BulkIndexerConfig{
Client: es,
Decoder: customJSONDecoder{
err: fmt.Errorf("Custom JSON decoder error"),
},
OnError: func(ctx context.Context, err error) {
_ = biFailureCallbacksCalled.Add(1)
},
})

biiFailureCallbacksCalled := atomic.Uint32{}

err := bi.Add(context.Background(), BulkIndexerItem{
Action: "index",
DocumentID: "1",
Body: strings.NewReader(`{"title":"foo"}`),
OnFailure: func(ctx context.Context, item BulkIndexerItem, item2 BulkIndexerResponseItem, err error) {
_ = biiFailureCallbacksCalled.Add(1)
if err == nil {
t.Errorf("Unexpected nil error in BulkIndexerItem.OnFailure callback")
}
},
})
if err != nil {
t.Fatalf("Unexpected error, got %s", err)
}

if err := bi.Close(context.Background()); err != nil {
t.Errorf("Unexpected error: %s", err)
}

// BulksIndexerItem.OnFailure() callbacks are called only for failed items.
if biiFailureCallbacksCalled.Load() != 1 {
t.Errorf("Unexpected NumFailedCallbacks: want=%d, got=%d", 1, biiFailureCallbacksCalled.Load())
}

// BulkIndexer.OnError() callbacks are called for all errors.
if biFailureCallbacksCalled.Load() != 2 {
t.Errorf("Unexpected NumFailedCallbacks: want=%d, got=%d", 2, biFailureCallbacksCalled.Load())
}
})

t.Run("Custom JSON Decoder", func(t *testing.T) {
es, _ := elasticsearch.NewClient(elasticsearch.Config{Transport: &mockTransport{}})
bi, _ := NewBulkIndexer(BulkIndexerConfig{Client: es, Decoder: customJSONDecoder{}})
Expand Down Expand Up @@ -926,8 +1053,13 @@ func TestBulkIndexerItem(t *testing.T) {
})
}

type customJSONDecoder struct{}
type customJSONDecoder struct {
err error
}

func (d customJSONDecoder) UnmarshalFromReader(r io.Reader, blk *BulkIndexerResponse) error {
if d.err != nil {
return d.err
}
return json.NewDecoder(r).Decode(blk)
}