From bdb6cde6659e419085cabba983ca106cc9e50d2b Mon Sep 17 00:00:00 2001 From: tarunm Date: Fri, 19 Jun 2026 11:04:22 +0000 Subject: [PATCH 1/2] fix(minion): fail single-segment task when segment upload fails BaseSingleSegmentConversionExecutor caught upload exceptions, logged and metered them, then returned the conversion result normally -- so the minion task reported SUCCESS even though the converted segment was never uploaded. Helix marked the task COMPLETED and never retried, silently leaving the segment un-refreshed/un-purged/un-compacted. The swallow was an accidental control-flow change introduced in #10978 (observability for upload/download failures): the download branch metered, logged, and rethrew, but the upload branch omitted the rethrow. Rethrow the upload exception, mirroring the download path, so the task fails and is retried. Move the tarred-file cleanup into a finally block so it still runs on the failure path, and remove the now-dead uploadSuccessful flag. Affects all single-segment conversion tasks (RealtimeToOffline, Purge, RefreshSegment, UpsertCompaction, etc.). On upload failure these now report task FAILURE (and retry) instead of SUCCESS; operators alerting on task state will see failures that were previously hidden. --- .../BaseSingleSegmentConversionExecutor.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java index d3a0494dce7f..241069bcbc8f 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java @@ -195,7 +195,6 @@ public SegmentConversionResult executeTask(PinotTaskConfig pinotTaskConfig) BatchConfigProperties.SegmentPushType pushType = getSegmentPushType(configs); _eventObserver.notifyProgress(_pinotTaskConfig, "Uploading segment: " + segmentName + " (push mode: " + pushType + ")"); - boolean uploadSuccessful = true; try { switch (pushType) { case TAR: @@ -210,19 +209,17 @@ public SegmentConversionResult executeTask(PinotTaskConfig pinotTaskConfig) throw new UnsupportedOperationException("Unrecognized push mode: " + pushType); } } catch (Exception e) { - uploadSuccessful = false; _minionMetrics.addMeteredTableValue(tableNameWithType, MinionMeter.SEGMENT_UPLOAD_FAIL_COUNT, 1L); LOGGER.error("Segment upload failed for segment {}, table {}", segmentName, tableNameWithType, e); _eventObserver.notifyTaskError(_pinotTaskConfig, e); - } - if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) { - LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath()); - } - - if (uploadSuccessful) { - LOGGER.info("Done executing {} on table: {}, segment: {}", taskType, tableNameWithType, segmentName); + throw e; + } finally { + if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) { + LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath()); + } } + LOGGER.info("Done executing {} on table: {}, segment: {}", taskType, tableNameWithType, segmentName); return segmentConversionResult; } finally { FileUtils.deleteQuietly(tempDataDir); From bda2c72dc3928c57313d7bebf888c7a3fe94d272 Mon Sep 17 00:00:00 2001 From: tarunm Date: Fri, 19 Jun 2026 11:42:57 +0000 Subject: [PATCH 2/2] test(minion): add regression test for single-segment upload-failure handling Adds BaseSingleSegmentConversionExecutorTest covering executeTask's upload- failure path: a failed segment upload must propagate (task fails and retries) instead of being silently reported as success. The test static-mocks SegmentConversionUtils.uploadSegment and uses a test-only executor that stubs the download/CRC/convert/ZK-modifier hooks so executeTask reaches the upload step without a server, controller, or deep store. Includes a success-path control test. Verified to fail on the pre-fix executor and pass on the fixed one. --- ...seSingleSegmentConversionExecutorTest.java | 192 ++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutorTest.java diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutorTest.java new file mode 100644 index 000000000000..f2110fdd2813 --- /dev/null +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutorTest.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.plugin.minion.tasks; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; +import org.apache.pinot.common.metrics.MinionMetrics; +import org.apache.pinot.core.common.MinionConstants; +import org.apache.pinot.core.minion.PinotTaskConfig; +import org.apache.pinot.minion.MinionContext; +import org.apache.pinot.minion.event.MinionEventObservers; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.instance.InstanceType; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +/** + * Tests the {@link BaseSingleSegmentConversionExecutor#executeTask} upload-failure handling: a segment-upload failure + * must propagate so the task is marked failed (and retried) rather than being silently reported as successful. + */ +public class BaseSingleSegmentConversionExecutorTest { + private static final File TEMP_DIR = + new File(FileUtils.getTempDirectory(), "BaseSingleSegmentConversionExecutorTest"); + private static final File SEGMENT_DIR = new File(TEMP_DIR, "segment"); + private static final File DATA_DIR = new File(TEMP_DIR, "minionData"); + + private static final int NUM_ROWS = 5; + private static final String RAW_TABLE_NAME = "testTable"; + private static final String TABLE_NAME_WITH_TYPE = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME); + private static final String SEGMENT_NAME = "testSegment"; + private static final String TASK_TYPE = "TestSingleSegmentConversionTask"; + private static final String TASK_ID = "Task_" + TASK_TYPE + "_0"; + private static final long SEGMENT_CRC = 100L; + private static final String D1 = "d1"; + + private File _segmentIndexDir; + + @BeforeClass + public void setUp() + throws Exception { + FileUtils.deleteDirectory(TEMP_DIR); + MinionMetrics.register(Mockito.mock(MinionMetrics.class)); + + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(D1, FieldSpec.DataType.INT).build(); + List rows = new ArrayList<>(NUM_ROWS); + for (int i = 0; i < NUM_ROWS; i++) { + GenericRow row = new GenericRow(); + row.putValue(D1, i); + rows.add(row); + } + + SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); + config.setInstanceType(InstanceType.MINION); + config.setOutDir(SEGMENT_DIR.getPath()); + config.setSegmentName(SEGMENT_NAME); + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, new GenericRowRecordReader(rows)); + driver.build(); + _segmentIndexDir = new File(SEGMENT_DIR, SEGMENT_NAME); + + Assert.assertTrue(DATA_DIR.mkdirs()); + MinionContext.getInstance().setDataDir(DATA_DIR); + // executeTask resolves the event observer from the registry by task id; register one so it is non-null. + MinionEventObservers.getInstance().addMinionEventObserver(TASK_ID, MinionTaskTestUtils.getMinionProgressObserver()); + } + + @Test + public void testExecuteTaskRethrowsWhenUploadFails() + throws Exception { + try (MockedStatic mocked = Mockito.mockStatic(SegmentConversionUtils.class)) { + mocked.when(() -> SegmentConversionUtils.uploadSegment(Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.any(File.class))) + .thenThrow(new RuntimeException("simulated upload failure")); + + TestSingleSegmentConversionExecutor executor = new TestSingleSegmentConversionExecutor(); + try { + executor.executeTask(createTaskConfig()); + Assert.fail("executeTask must rethrow when segment upload fails, not report success"); + } catch (RuntimeException e) { + Assert.assertEquals(e.getMessage(), "simulated upload failure"); + } + } + } + + @Test + public void testExecuteTaskSucceedsWhenUploadSucceeds() + throws Exception { + try (MockedStatic mocked = Mockito.mockStatic(SegmentConversionUtils.class)) { + // uploadSegment is a no-op by default for the mocked static, simulating a successful upload. + TestSingleSegmentConversionExecutor executor = new TestSingleSegmentConversionExecutor(); + SegmentConversionResult result = executor.executeTask(createTaskConfig()); + Assert.assertEquals(result.getSegmentName(), SEGMENT_NAME); + Assert.assertEquals(result.getTableNameWithType(), TABLE_NAME_WITH_TYPE); + mocked.verify(() -> SegmentConversionUtils.uploadSegment(Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.any(File.class))); + } + } + + private PinotTaskConfig createTaskConfig() { + Map configs = new HashMap<>(); + configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME_WITH_TYPE); + configs.put(MinionConstants.SEGMENT_NAME_KEY, SEGMENT_NAME); + configs.put(MinionConstants.DOWNLOAD_URL_KEY, "http://unused/download"); + configs.put(MinionConstants.UPLOAD_URL_KEY, "http://unused/upload"); + configs.put(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY, Long.toString(SEGMENT_CRC)); + configs.put("TASK_ID", TASK_ID); + return new PinotTaskConfig(TASK_TYPE, configs); + } + + @AfterClass + public void tearDown() + throws Exception { + // Restore the process-global state mutated in setUp so it does not leak into other test classes. + MinionEventObservers.getInstance().removeMinionEventObserver(TASK_ID); + MinionContext.getInstance().setDataDir(null); + FileUtils.deleteDirectory(TEMP_DIR); + } + + /** + * Minimal concrete executor that stubs out the infrastructure-dependent hooks (download, CRC check, conversion, ZK + * metadata modifier) so {@code executeTask} runs to the upload step without a server, controller, or deep store. + */ + private class TestSingleSegmentConversionExecutor extends BaseSingleSegmentConversionExecutor { + @Override + protected File downloadSegmentToLocalAndUntar(String tableNameWithType, String segmentName, String deepstoreURL, + String taskType, File tempDataDir, String suffix) + throws Exception { + File indexDir = new File(tempDataDir, "inputSegment"); + FileUtils.copyDirectory(_segmentIndexDir, indexDir); + return indexDir; + } + + @Override + protected long getSegmentCrc(String tableNameWithType, String segmentName) { + return SEGMENT_CRC; + } + + @Override + protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File indexDir, File workingDir) + throws Exception { + File convertedDir = new File(workingDir, SEGMENT_NAME); + FileUtils.copyDirectory(indexDir, convertedDir); + return new SegmentConversionResult.Builder().setFile(convertedDir) + .setTableNameWithType(pinotTaskConfig.getConfigs().get(MinionConstants.TABLE_NAME_KEY)) + .setSegmentName(SEGMENT_NAME).build(); + } + + @Override + protected SegmentZKMetadataCustomMapModifier getSegmentZKMetadataCustomMapModifier(PinotTaskConfig pinotTaskConfig, + SegmentConversionResult segmentConversionResult) { + return new SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE, + Collections.emptyMap()); + } + } +}