Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,25 @@ public class ConfigOptions {
.withDescription("The amount of time to sleep when fetch bucket error occurs.")
.withFallbackKeys("log.replica.fetch-backoff-interval");

public static final ConfigOption<Duration> COORDINATOR_REQUEST_RETRY_BACKOFF =
key("coordinator.request-retry.backoff-interval")
.durationType()
.defaultValue(Duration.ofMillis(100))
.withDescription(
"The backoff duration the coordinator waits before retrying a "
+ "control-plane request to a tablet server after a "
+ "transient RPC-layer failure. Mirrors Kafka's "
+ "ControllerChannelManager retry backoff (hardcoded 100ms).");

public static final ConfigOption<Duration> COORDINATOR_REQUEST_TIMEOUT =
key("coordinator.request.timeout")
.durationType()
.defaultValue(Duration.ofSeconds(30))
.withDescription(
"The timeout the sender thread waits for a response to a "
+ "control-plane request before treating it as failed "
+ "and retrying.");

public static final ConfigOption<MemorySize> LOG_REPLICA_FETCH_MAX_BYTES =
key("log.replica.fetch.max-bytes")
.memoryType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ public class MetricNames {
public static final String PARTITION_COUNT = "partitionCount";
public static final String REPLICAS_TO_DELETE_COUNT = "replicasToDeleteCount";

// for coordinator sender (per-tablet-server control request sender threads)
public static final String SENDER_TOTAL_QUEUE_SIZE = "senderTotalQueueSize";
public static final String SENDER_QUEUE_SIZE = "senderQueueSize";
public static final String SENDER_QUEUE_TIME_MS = "senderQueueTimeMs";
public static final String SENDER_RETRY_COUNT = "senderRetryCount";
public static final String SENDER_STALE_DROP_COUNT = "senderStaleDropCount";
public static final String SENDER_ALIVE = "senderAlive";

// for coordinator event processor
public static final String EVENT_QUEUE_SIZE = "eventQueueSize";
public static final String EVENT_QUEUE_TIME_MS = "eventQueueTimeMs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.LoggerFactory;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/** An abstract thread that is shutdownable . */
public abstract class ShutdownableThread extends Thread {
Expand Down Expand Up @@ -110,6 +111,16 @@ public void run() {
log.info("Stopped");
}

/**
* Causes the current thread to wait until the shutdown is initiated, or the specified waiting
* time elapses.
*/
public void pause(long timeout, TimeUnit unit) throws InterruptedException {
if (shutdownInitiated.await(timeout, unit)) {
log.trace("shutdownInitiated latch count reached zero. Shutdown called.");
}
}

public boolean isRunning() {
return !isShutdownInitiated();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import org.apache.fluss.metrics.Gauge;
import org.apache.fluss.metrics.Histogram;
import org.apache.fluss.metrics.Meter;
import org.apache.fluss.metrics.Metric;
import org.apache.fluss.metrics.SimpleCounter;
import org.apache.fluss.metrics.groups.MetricGroup;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
Expand All @@ -37,6 +39,7 @@ public class TestMetricGroup implements MetricGroup {
private final Map<String, String> variables;
private final BiFunction<String, Optional<CharacterFilter>, String> metricIdentifierFunction;
private final BiFunction<CharacterFilter, Optional<Character>, String> logicalScopeFunction;
private final Map<String, Metric> metrics = new HashMap<>();

public TestMetricGroup(
String[] scopeComponents,
Expand All @@ -49,32 +52,48 @@ public TestMetricGroup(
this.logicalScopeFunction = logicalScopeFunction;
}

/** Creates a default {@link TestMetricGroup} suitable for unit tests. */
public static TestMetricGroup createTestMetricGroup() {
return newBuilder().build();
}

public static TestMetricGroupBuilder newBuilder() {
return new TestMetricGroupBuilder();
}

/** Returns a metric previously registered under the given name. */
public Metric getMetric(String name) {
return metrics.get(name);
}

@Override
public Counter counter(String name) {
return new SimpleCounter();
SimpleCounter counter = new SimpleCounter();
metrics.put(name, counter);
return counter;
}

@Override
public <C extends Counter> C counter(String name, C counter) {
metrics.put(name, counter);
return counter;
}

@Override
public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
metrics.put(name, gauge);
return gauge;
}

@Override
public <H extends Histogram> H histogram(String name, H histogram) {
metrics.put(name, histogram);
return histogram;
}

@Override
public <M extends Meter> M meter(String name, M meter) {
metrics.put(name, meter);
return meter;
}

Expand Down
Loading