Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@

package org.apache.ignite.internal;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_DISABLED;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT;
Expand Down Expand Up @@ -56,9 +59,12 @@ public class LongJVMPauseDetector {
public static final int DFLT_JVM_PAUSE_DETECTOR_LAST_EVENTS_COUNT = 20;

/** Precision. */
private static final int PRECISION =
private static final long PRECISION =
getInteger(IGNITE_JVM_PAUSE_DETECTOR_PRECISION, DFLT_JVM_PAUSE_DETECTOR_PRECISION);

/** Precision. */
private static final long PRECISION_NANOS = PRECISION * 1_000_000L;

/** Threshold. */
private static final int THRESHOLD =
getInteger(IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD, DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD);
Expand All @@ -83,15 +89,18 @@ public class LongJVMPauseDetector {
private long longPausesCnt;

/** Long pause total duration. */
private long longPausesTotalDuration;
private long longPausesTotalDurationNanos;

/** Last detector's wake up time. */
private long lastWakeUpTime;
private long lastWakeUpTimeNanos = System.nanoTime();

/** Long pauses timestamps. */
@GridToStringInclude
private final long[] longPausesTimestamps = new long[EVT_CNT];

@GridToStringExclude
private final long[] longPausesMonotonicTimestamps = new long[EVT_CNT];

/** Long pauses durations. */
@GridToStringInclude
private final long[] longPausesDurations = new long[EVT_CNT];
Expand All @@ -117,53 +126,41 @@ public void start() {
}

final Thread worker = new IgniteThread(igniteInstanceName, "jvm-pause-detector-worker", () -> {
synchronized (this) {
lastWakeUpTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}

if (log.isDebugEnabled())
log.debug(Thread.currentThread().getName() + " has been started.");

while (true) {
try {
Thread.sleep(PRECISION);

final long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
final long pause = now - PRECISION - lastWakeUpTime;

if (pause >= THRESHOLD) {
log.warning("Possible too long JVM pause: " + pause + " milliseconds.");

try {
// don't worry, wait will release monitor and all props will be accessible
synchronized (this) {
final int next = (int)(longPausesCnt % EVT_CNT);

longPausesCnt++;

longPausesTotalDuration += pause;

longPausesTimestamps[next] = now;

longPausesDurations[next] = pause;

lastWakeUpTime = now;
}
}
else {
synchronized (this) {
lastWakeUpTime = now;
lastWakeUpTimeNanos = System.nanoTime();
long awaitDeadline = lastWakeUpTimeNanos + PRECISION_NANOS;
long awaitDeadlineMillis = awaitDeadline / 1_000_000L;
int awaitDeadlineNanos = Math.toIntExact(awaitDeadline % 1_000_000);
while (System.nanoTime() <= awaitDeadline)
wait(awaitDeadlineMillis, awaitDeadlineNanos);
long nanoTime = System.nanoTime();
long pause = nanoTime - awaitDeadline;
long pauseMillis = TimeUnit.NANOSECONDS.toMillis(pause);
if (pauseMillis >= THRESHOLD) {
log.warning("Possible too long JVM pause: " + pauseMillis + " ms. " +
"Precision: " + PRECISION + " ms.");
final int next = (int) (longPausesCnt++ % EVT_CNT);
longPausesTotalDurationNanos += pause;
longPausesTimestamps[next] = System.currentTimeMillis();
longPausesMonotonicTimestamps[next] = nanoTime;
longPausesDurations[next] = pauseMillis;
}
}
}
}
catch (InterruptedException e) {
Thread locThread = Thread.currentThread();
} catch (InterruptedException e) {
Thread locThread = Thread.currentThread();

if (workerRef.compareAndSet(locThread, null))
log.error(locThread.getName() + " has been interrupted.", e);
else if (log.isDebugEnabled())
log.debug(locThread.getName() + " has been stopped.");
if (workerRef.compareAndSet(locThread, null))
log.error(locThread.getName() + " has been interrupted.", e);
else if (log.isDebugEnabled())
log.debug(locThread.getName() + " has been stopped.");

break;
}
break;
}
}
});

Expand Down Expand Up @@ -209,14 +206,14 @@ synchronized long longPausesCount() {
* @return Long JVM pauses total duration.
*/
synchronized long longPausesTotalDuration() {
return longPausesTotalDuration;
return TimeUnit.NANOSECONDS.toMillis(longPausesTotalDurationNanos);
}

/**
* @return Last checker's wake up time.
*/
public synchronized long getLastWakeUpTime() {
return lastWakeUpTime;
public synchronized long getLastWakeUpTimeNanos() {
return lastWakeUpTimeNanos;
}

/**
Expand All @@ -232,16 +229,41 @@ synchronized Map<Long, Long> longPauseEvents() {
}

/**
* @return Pair ({@code last long pause event time}, {@code pause time duration}) or {@code null}, if long pause
* wasn't occurred.
* @return last long pause spotted or -1 otherwise
*/
public synchronized @Nullable IgniteBiTuple<Long, Long> getLastLongPause() {
int lastPauseIdx = (int)((EVT_CNT + longPausesCnt - 1) % EVT_CNT);

if (longPausesTimestamps[lastPauseIdx] == 0)
return null;
public synchronized long getLastLongPause() {
int lastPauseIdx = Math.toIntExact(longPausesCnt % EVT_CNT);
long lastLongPause = longPausesDurations[lastPauseIdx];
return lastLongPause == 0 ? -1 : lastLongPause;
}

return new IgniteBiTuple<>(longPausesTimestamps[lastPauseIdx], longPausesDurations[lastPauseIdx]);
/**
* @param tracker Check point Tracker.
* @return Tries to explain total pauses spotted during check point process
* since {@link CheckpointMetricsTracker#checkPointStartNanos()}
* due to {@link CheckpointMetricsTracker#checkPointEndNanos()}
* or {@link Optional#empty()} if none was found
*/
public synchronized String getTotalSpottedPausesExplain(CheckpointMetricsTracker tracker) {
int lastPointer = (int) (longPausesCnt % EVT_CNT);
int pausesSpottedTimes = 0;
int curPointer = lastPointer;
List<String> pausesExplains = new LinkedList<>();
long checkPointStartNanos = tracker.checkPointStartNanos();
do {
if (longPausesMonotonicTimestamps[curPointer] <= checkPointStartNanos)
break;
pausesExplains.add(String.format(
"%d ms at %d",
longPausesDurations[curPointer],
longPausesTimestamps[curPointer]));
pausesSpottedTimes++;
curPointer = curPointer == 0 ? EVT_CNT - 1 : curPointer - 1;
} while (curPointer != lastPointer);
return String.format("Pause detecor spotted %d pauses: %s. Each with precision %d ms",
pausesSpottedTimes,
pausesExplains,
PRECISION);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
package org.apache.ignite.internal.processors.cache.persistence.checkpoint;

import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -32,6 +37,10 @@
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.sun.management.GcInfo;
import com.sun.management.internal.GarbageCollectorExtImpl;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
Expand Down Expand Up @@ -122,6 +131,9 @@ public class Checkpointer extends GridWorker {
/** Timeout between partition file destroy and checkpoint to handle it. */
private static final long PARTITION_DESTROY_CHECKPOINT_TIMEOUT = 30 * 1000; // 30 Seconds.

/** Jvm pause explain pattern. */
private static final String JVM_PAUSE_EXPLAIN_PATTERN = "Possible JVM Pause explaination: [ %s ]";

/** Avoid the start checkpoint if checkpointer was canceled. */
private volatile boolean skipCheckpointOnNodeStop = getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false);

Expand Down Expand Up @@ -465,8 +477,6 @@ private void doCheckpoint() {

if (chp.hasDelta()) {
if (log.isInfoEnabled()) {
long possibleJvmPauseDur = possibleLongJvmPauseDuration(tracker);

log.info(
String.format(
CHECKPOINT_STARTED_LOG_FORMAT,
Expand All @@ -480,7 +490,7 @@ private void doCheckpoint() {
tracker.splitAndSortCpPagesDuration(),
tracker.recoveryDataWriteDuration(),
tracker.writeCheckpointEntryDuration(),
possibleJvmPauseDur > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms, " : "",
possibleLongJvmPauseExplaination(tracker),
chp.pagesSize,
chp.progress.reason()
)
Expand Down Expand Up @@ -928,27 +938,54 @@ private void waitCheckpointEvent() {

/**
* @param tracker Checkpoint metrics tracker.
* @return Duration of possible JVM pause, if it was detected, or {@code -1} otherwise.
* @return Explain possible JVM pause.
*/
private long possibleLongJvmPauseDuration(CheckpointMetricsTracker tracker) {
if (LongJVMPauseDetector.enabled()) {
if (tracker.lockWaitDuration() + tracker.lockHoldDuration() > longJvmPauseThreshold) {
long now = System.currentTimeMillis();

// We must get last wake up time before search possible pause in events map.
long wakeUpTime = pauseDetector.getLastWakeUpTime();

IgniteBiTuple<Long, Long> lastLongPause = pauseDetector.getLastLongPause();

if (lastLongPause != null && tracker.checkpointStartTime() < lastLongPause.get1())
return lastLongPause.get2();

if (now - wakeUpTime > longJvmPauseThreshold)
return now - wakeUpTime;
}
private String possibleLongJvmPauseExplaination(CheckpointMetricsTracker tracker) {
long lockDurationMillis = tracker.lockDurationMillis();
if (LongJVMPauseDetector.enabled() && lockDurationMillis > longJvmPauseThreshold) {
List<String> explains = new LinkedList<>();
explains.add(String.format("Checkpoint lock took %d ms", lockDurationMillis));
explains.addAll(getGCExplains(tracker));
explains.add(pauseDetector.getTotalSpottedPausesExplain(tracker));
return String.format(JVM_PAUSE_EXPLAIN_PATTERN, String.join("; ", explains)) + ", ";
}
return "";
}

return -1L;
/**
* @param tracker Checkpoint Tracker.
* @return Collection of every last GC if it happened within checkpoint process bounds
*/
private Collection<String> getGCExplains(CheckpointMetricsTracker tracker) {
return ManagementFactory.getGarbageCollectorMXBeans()
.stream()
.filter(GarbageCollectorExtImpl.class::isInstance)
.map(GarbageCollectorExtImpl.class::cast)
.map(mxBean -> getGCExplain(mxBean, tracker))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}

/**
* @param mxBean GC Mx bean.
* @param tracker Check point Tracker.
* @return GC explain if it happend within check point process bounds
*/
private Optional<String> getGCExplain(GarbageCollectorExtImpl mxBean, CheckpointMetricsTracker tracker) {
String gcName = mxBean.getName();
GcInfo lastGcInfo = mxBean.getLastGcInfo();
RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
long uptimeNanos = runtimeMXBean.getUptime() * 1_000_000L;
long checkPointStartNanos = tracker.checkPointStartNanos();
long checkPointEndNanos = tracker.checkPointEndNanos();
long monotonicStartTimeNanos = System.nanoTime() - uptimeNanos;
long monotonicGCStartTimeNanos = monotonicStartTimeNanos + lastGcInfo.getStartTime() * 1_000_000L;
long monotonicGCEndTimeNanos = monotonicStartTimeNanos + lastGcInfo.getEndTime() * 1_000_000L;
if (checkPointStartNanos >= monotonicGCEndTimeNanos || checkPointEndNanos <= monotonicGCStartTimeNanos)
return Optional.empty();
long duration = lastGcInfo.getDuration();
return Optional.of(String.format("%s most recent GC took %d ms", gcName, duration));
}

/**
Expand Down Expand Up @@ -995,6 +1032,7 @@ private void startCheckpointProgress() {
*
* @deprecated Should be rewritten to public API.
*/
@Deprecated
public IgniteInternalFuture<Void> enableCheckpoints(boolean enable) {
GridFutureAdapter<Void> fut = new GridFutureAdapter<>();

Expand Down
Loading