Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.scalar.db.dataloader.cli.command.dataexport;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* A simple utility class to report export progress to the console.
*
* <p>This class is intended to be used in scenarios where there is no event-driven listener for
* export progress, but feedback to the user is still valuable.
*
* <p>It displays:
*
* <ul>
* <li>A starting message when export begins
* <li>A completion message with total records exported and time taken
* <li>Error messages in case of failures (via {@link #reportError(String, Throwable)})
* </ul>
*/
public class ConsoleExportProgressReporter {

private final long startTime;
private final AtomicBoolean completed = new AtomicBoolean(false);
private final String outputFile;

/**
* Constructs a reporter and logs the export start.
*
* @param outputFile the file to which data will be exported
*/
public ConsoleExportProgressReporter(String outputFile) {
this.outputFile = outputFile;
this.startTime = System.currentTimeMillis();
System.out.println("📤 Starting export...");
System.out.println("📁 Exporting data to file: " + outputFile);
}

/**
* Reports the completion of the export process, including total records exported and time taken.
*
* @param totalExported the total number of records exported
*/
public void reportCompletion(long totalExported) {
if (completed.getAndSet(true)) {
return;
}
long elapsed = System.currentTimeMillis() - startTime;
System.out.printf(
"%n✅ Export completed: %,d records exported to %s in %s%n",
totalExported, outputFile, formatElapsed(elapsed));
}

/**
* Prints a formatted error message to the console.
*
* @param message the error description
* @param throwable the associated exception (can be null)
*/
public static void reportError(String message, Throwable throwable) {
System.err.println("%n❌ Export failed: " + message);
if (throwable != null) {
System.err.println("Cause: " + throwable.getMessage());
}
}

/**
* Prints a formatted waring message to the console.
*
* @param message the error description
*/
public static void reportWarning(String message) {
System.err.printf("%n⚠️ Warning: %s%n", message);
}

/**
* Formats elapsed time in "Xm Ys" format.
*
* @param elapsedMillis the elapsed time in milliseconds
* @return a human-readable string of the elapsed time
*/
private String formatElapsed(long elapsedMillis) {
long seconds = (elapsedMillis / 1000) % 60;
long minutes = (elapsedMillis / 1000) / 60;
return String.format("%dm %ds", minutes, seconds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.scalar.db.dataloader.core.dataexport.CsvExportManager;
import com.scalar.db.dataloader.core.dataexport.ExportManager;
import com.scalar.db.dataloader.core.dataexport.ExportOptions;
import com.scalar.db.dataloader.core.dataexport.ExportReport;
import com.scalar.db.dataloader.core.dataexport.JsonExportManager;
import com.scalar.db.dataloader.core.dataexport.JsonLineExportManager;
import com.scalar.db.dataloader.core.dataexport.producer.ProducerTaskFactory;
Expand All @@ -38,17 +39,13 @@
import java.util.Objects;
import java.util.concurrent.Callable;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import picocli.CommandLine.Model.CommandSpec;
import picocli.CommandLine.Spec;

@CommandLine.Command(name = "export", description = "export data from a ScalarDB table")
public class ExportCommand extends ExportCommandOptions implements Callable<Integer> {

private static final Logger logger = LoggerFactory.getLogger(ExportCommand.class);

@Spec CommandSpec spec;

@Override
Expand Down Expand Up @@ -97,23 +94,23 @@ public Integer call() throws Exception {
String filePath =
getOutputAbsoluteFilePath(
outputDirectory, outputFileName, exportOptions.getOutputFileFormat());
logger.info("Exporting data to file: {}", filePath);

try (BufferedWriter writer =
Files.newBufferedWriter(Paths.get(filePath), Charset.defaultCharset(), CREATE, APPEND)) {
exportManager.startExport(exportOptions, tableMetadata, writer);
ConsoleExportProgressReporter reporter = new ConsoleExportProgressReporter(filePath);
ExportReport report = exportManager.startExport(exportOptions, tableMetadata, writer);
reporter.reportCompletion(report.getExportedRowCount());
}

} catch (DirectoryValidationException e) {
logger.error("Invalid output directory path: {}", outputDirectory);
ConsoleExportProgressReporter.reportError("Invalid output directory: " + outputDirectory, e);
return 1;
} catch (InvalidFilePathException e) {
logger.error(
"The ScalarDB connection settings file path is invalid or the file is missing: {}",
scalarDbPropertiesFilePath);
ConsoleExportProgressReporter.reportError(
"Invalid ScalarDB connection file path: " + scalarDbPropertiesFilePath, e);
return 1;
} catch (TableMetadataException e) {
logger.error("Failed to retrieve table metadata: {}", e.getMessage());
ConsoleExportProgressReporter.reportError("Failed to retrieve table metadata", e);
return 1;
}
return 0;
Expand Down Expand Up @@ -160,7 +157,8 @@ private void warnAboutIgnoredDeprecatedOptions() {
+ "Use the 'scalar.db.consensus_commit.include_metadata.enabled' configuration property "
+ "in your ScalarDB properties file to control whether transaction metadata is included in scan operations.|@");

logger.warn(warning);
// logger.warn(warning);
ConsoleExportProgressReporter.reportWarning(warning);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package com.scalar.db.dataloader.cli.command.dataimport;

import com.scalar.db.dataloader.core.dataimport.ImportEventListener;
import com.scalar.db.dataloader.core.dataimport.datachunk.ImportDataChunkStatus;
import com.scalar.db.dataloader.core.dataimport.task.result.ImportTaskResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchResult;
import com.scalar.db.dataloader.core.dataimport.transactionbatch.ImportTransactionBatchStatus;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class ConsoleImportProgressListener implements ImportEventListener {

private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final long startTime;
private final Map<Integer, String> chunkLogs = new ConcurrentHashMap<>();
private final Map<Integer, String> chunkFailureLogs = new ConcurrentHashMap<>();
private final AtomicLong totalRecords = new AtomicLong();
private final AtomicLong totalSuccess = new AtomicLong();
private final AtomicLong totalFailures = new AtomicLong();
private volatile boolean completed = false;

@SuppressWarnings("FutureReturnValueIgnored")
public ConsoleImportProgressListener(Duration updateInterval) {
startTime = System.currentTimeMillis();
scheduler.scheduleAtFixedRate(
this::render, 0, updateInterval.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
public void onDataChunkStarted(ImportDataChunkStatus status) {
chunkLogs.put(
status.getDataChunkId(),
String.format(
"🔄 Chunk %d: Processing... %d records so far",
status.getDataChunkId(), totalRecords.get()));
}

@Override
public void onDataChunkCompleted(ImportDataChunkStatus status) {
long elapsedMillis = status.getEndTime().toEpochMilli() - status.getStartTime().toEpochMilli();
double elapsedSeconds = elapsedMillis / 1000.0;

int chunkId = status.getDataChunkId();
int total = status.getTotalRecords();
int success = status.getSuccessCount();
int failure = status.getFailureCount();

totalRecords.addAndGet(total);
totalSuccess.addAndGet(success);
totalFailures.addAndGet(failure);

String message =
(failure == 0)
? String.format(
"✓ Chunk %d: %d records imported (%.1fs), %d records imported successfully",
chunkId, total, elapsedSeconds, success)
: String.format(
"✓ Chunk %d: %d records imported (%.1fs), %d records imported successfully, import of %d records failed",
chunkId, total, elapsedSeconds, success, failure);

chunkLogs.put(chunkId, message);
}

@Override
public void onTransactionBatchStarted(ImportTransactionBatchStatus batchStatus) {
// Not used currently, but could be extended for detailed batch-level progress
}

@Override
public void onTransactionBatchCompleted(ImportTransactionBatchResult batchResult) {
if (!batchResult.isSuccess()) {
chunkFailureLogs.put(
batchResult.getDataChunkId(),
String.format(
"❌ Chunk id: %d, Transaction batch id: %d failed: %d records could not be imported",
batchResult.getDataChunkId(),
batchResult.getTransactionBatchId(),
batchResult.getRecords().size()));
}
}

@Override
public void onTaskComplete(ImportTaskResult taskResult) {
// No-op currently, could be extended to summarize task-level results
}

@Override
public void onAllDataChunksCompleted() {
completed = true;
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(2, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
scheduler.shutdownNow();
}
render(); // Final render after shutdown
}

private void render() {
StringBuilder builder = new StringBuilder();
long now = System.currentTimeMillis();
long elapsed = now - startTime;
double recPerSec = (totalRecords.get() * 1000.0) / (elapsed == 0 ? 1 : elapsed);

builder.append(
String.format(
"\rImporting... %d records | %.0f rec/s | %s%n",
totalRecords.get(), recPerSec, formatElapsed(elapsed)));

chunkLogs.values().stream().sorted().forEach(line -> builder.append(line).append("\n"));
chunkFailureLogs.values().stream().sorted().forEach(line -> builder.append(line).append("\n"));

if (completed) {
builder.append(
String.format(
"%n✅ Import completed: %d records succeeded, %d failed%n",
totalSuccess.get(), totalFailures.get()));
}
clearConsole();
System.out.print(builder);
System.out.flush();
}

private String formatElapsed(long elapsedMillis) {
long seconds = (elapsedMillis / 1000) % 60;
long minutes = (elapsedMillis / 1000) / 60;
return String.format("%dm %02ds elapsed", minutes, seconds);
}

private void clearConsole() {
System.out.print("\033[H\033[2J"); // ANSI escape to clear screen
System.out.flush();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -97,6 +98,7 @@ public Integer call() throws Exception {
importLoggerConfig,
logWriterFactory,
transactionFactory);
importManager.addListener(new ConsoleImportProgressListener(Duration.ofSeconds(1)));
importManager.startImport();
}
return 0;
Expand Down
Loading