diff --git a/src/Adaptive.Aeron.Tests/ImageTest.cs b/src/Adaptive.Aeron.Tests/ImageTest.cs
index 28a60b1..7e6434a 100644
--- a/src/Adaptive.Aeron.Tests/ImageTest.cs
+++ b/src/Adaptive.Aeron.Tests/ImageTest.cs
@@ -578,6 +578,277 @@ public void ShouldPollFragmentsToBoundedFragmentHandlerWithMaxPositionAboveIntMa
.Then(A.CallTo(() => Position.SetRelease(TERM_BUFFER_LENGTH)).MustHaveHappened());
}
+ [Test]
+ public void BlockPollDeliversBlockAndAdvancesPosition()
+ {
+ var initialPosition = LogBufferDescriptor.ComputePosition(
+ INITIAL_TERM_ID, 0, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
+ Position.SetRelease(initialPosition);
+ var image = CreateImage();
+
+ InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(0));
+
+ int handlerCallCount = 0;
+ int receivedOffset = -1, receivedLength = -1, receivedSessionId = -1, receivedTermId = -1;
+ var bytes = image.BlockPoll(
+ (buffer, offset, length, sessionId, termId) =>
+ {
+ handlerCallCount++;
+ receivedOffset = offset;
+ receivedLength = length;
+ receivedSessionId = sessionId;
+ receivedTermId = termId;
+ },
+ int.MaxValue);
+
+ Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
+ Assert.AreEqual(1, handlerCallCount);
+ Assert.AreEqual(0, receivedOffset);
+ Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength);
+ Assert.AreEqual(SESSION_ID, receivedSessionId);
+ Assert.AreEqual(INITIAL_TERM_ID, receivedTermId);
+ Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position);
+ }
+
+ [Test]
+ public void BlockPollReturnsZeroAndDoesNothingWhenClosed()
+ {
+ var initialPosition = LogBufferDescriptor.ComputePosition(
+ INITIAL_TERM_ID, 0, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
+ Position.SetRelease(initialPosition);
+ var image = CreateImage();
+
+ InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(0));
+ image.Close();
+
+ bool handlerCalled = false;
+ var bytes = image.BlockPoll((b, o, l, s, t) => handlerCalled = true, int.MaxValue);
+
+ Assert.AreEqual(0, bytes);
+ Assert.IsFalse(handlerCalled);
+ Assert.AreEqual(initialPosition, image.Position);
+ }
+
+ [Test]
+ public void BlockPollReturnsZeroWhenNoFramesAvailable()
+ {
+ var image = CreateImage();
+
+ bool handlerCalled = false;
+ var bytes = image.BlockPoll((b, o, l, s, t) => handlerCalled = true, int.MaxValue);
+
+ Assert.AreEqual(0, bytes);
+ Assert.IsFalse(handlerCalled);
+ }
+
+ [Test]
+ public void BlockPollRespectsBlockLengthLimit()
+ {
+ var initialPosition = LogBufferDescriptor.ComputePosition(
+ INITIAL_TERM_ID, 0, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
+ Position.SetRelease(initialPosition);
+ var image = CreateImage();
+
+ InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(0));
+ InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(1));
+
+ int handlerCallCount = 0;
+ int receivedLength = -1;
+ var bytes = image.BlockPoll(
+ (b, o, l, s, t) => { handlerCallCount++; receivedLength = l; },
+ ALIGNED_FRAME_LENGTH + 1);
+
+ Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
+ Assert.AreEqual(1, handlerCallCount);
+ Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength);
+ Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position);
+ }
+
+ [Test]
+ public void BlockPollDeliversPaddingFrameWhenFirstFrameIsPadding()
+ {
+ // Padding frame placed near end of term so the helper's TermRebuilder.Insert call
+ // copies only ALIGNED_FRAME_LENGTH bytes (RcvBuffer's capacity). Scan starts at
+ // paddingOffset and the first frame is padding, exercising the corner case.
+ int paddingOffset = TERM_BUFFER_LENGTH - ALIGNED_FRAME_LENGTH;
+ var initialPosition = LogBufferDescriptor.ComputePosition(
+ INITIAL_TERM_ID, paddingOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
+ Position.SetRelease(initialPosition);
+ var image = CreateImage();
+
+ InsertPaddingFrame(INITIAL_TERM_ID, paddingOffset);
+
+ int handlerCallCount = 0;
+ int receivedLength = -1;
+ var bytes = image.BlockPoll(
+ (b, o, l, s, t) => { handlerCallCount++; receivedLength = l; },
+ TERM_BUFFER_LENGTH);
+
+ Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
+ Assert.AreEqual(1, handlerCallCount);
+ Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength);
+ Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position);
+ }
+
+ [Test]
+ public void RawPollDeliversBlockAndAdvancesPosition()
+ {
+ var initialPosition = LogBufferDescriptor.ComputePosition(
+ INITIAL_TERM_ID, 0, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
+ Position.SetRelease(initialPosition);
+ var image = CreateImage();
+
+ InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(0));
+
+ int handlerCallCount = 0;
+ long receivedFileOffset = -1;
+ int receivedTermOffset = -1, receivedLength = -1, receivedSessionId = -1, receivedTermId = -1;
+ var bytes = image.RawPoll(
+ (fs, fileOffset, buf, termOffset, length, sessionId, termId) =>
+ {
+ handlerCallCount++;
+ receivedFileOffset = fileOffset;
+ receivedTermOffset = termOffset;
+ receivedLength = length;
+ receivedSessionId = sessionId;
+ receivedTermId = termId;
+ },
+ int.MaxValue);
+
+ Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
+ Assert.AreEqual(1, handlerCallCount);
+ Assert.AreEqual(0L, receivedFileOffset);
+ Assert.AreEqual(0, receivedTermOffset);
+ Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength);
+ Assert.AreEqual(SESSION_ID, receivedSessionId);
+ Assert.AreEqual(INITIAL_TERM_ID, receivedTermId);
+ Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position);
+ }
+
+ [Test]
+ public void RawPollComputesFileOffsetForNonZeroPartition()
+ {
+ // Place the subscriber position into partition index 2; fileOffset must be
+ // (capacity * activeIndex) + termOffset.
+ int activeTermId = INITIAL_TERM_ID + 2;
+ int termOffset = OffsetForFrame(0);
+ var initialPosition = LogBufferDescriptor.ComputePosition(
+ activeTermId, termOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
+ Position.SetRelease(initialPosition);
+ var image = CreateImage();
+
+ InsertDataFrame(activeTermId, termOffset);
+
+ long receivedFileOffset = -1;
+ var bytes = image.RawPoll(
+ (fs, fileOffset, buf, to, l, s, t) => receivedFileOffset = fileOffset,
+ int.MaxValue);
+
+ Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
+ long expectedFileOffset = (long)TERM_BUFFER_LENGTH * 2 + termOffset;
+ Assert.AreEqual(expectedFileOffset, receivedFileOffset);
+ }
+
+ [Test]
+ public void RawPollReturnsZeroAndDoesNothingWhenClosed()
+ {
+ var initialPosition = LogBufferDescriptor.ComputePosition(
+ INITIAL_TERM_ID, 0, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
+ Position.SetRelease(initialPosition);
+ var image = CreateImage();
+
+ InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(0));
+ image.Close();
+
+ bool handlerCalled = false;
+ var bytes = image.RawPoll((fs, fo, buf, to, l, s, t) => handlerCalled = true, int.MaxValue);
+
+ Assert.AreEqual(0, bytes);
+ Assert.IsFalse(handlerCalled);
+ Assert.AreEqual(initialPosition, image.Position);
+ }
+
+ [Test]
+ public void RawPollReturnsZeroWhenNoFramesAvailable()
+ {
+ var image = CreateImage();
+
+ bool handlerCalled = false;
+ var bytes = image.RawPoll((fs, fo, buf, to, l, s, t) => handlerCalled = true, int.MaxValue);
+
+ Assert.AreEqual(0, bytes);
+ Assert.IsFalse(handlerCalled);
+ }
+
+ [Test]
+ public void RawPollDeliversPaddingFrameWhenFirstFrameIsPadding()
+ {
+ int paddingOffset = TERM_BUFFER_LENGTH - ALIGNED_FRAME_LENGTH;
+ var initialPosition = LogBufferDescriptor.ComputePosition(
+ INITIAL_TERM_ID, paddingOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
+ Position.SetRelease(initialPosition);
+ var image = CreateImage();
+
+ InsertPaddingFrame(INITIAL_TERM_ID, paddingOffset);
+
+ int handlerCallCount = 0;
+ int receivedLength = -1;
+ var bytes = image.RawPoll(
+ (fs, fo, buf, to, l, s, t) => { handlerCallCount++; receivedLength = l; },
+ TERM_BUFFER_LENGTH);
+
+ Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
+ Assert.AreEqual(1, handlerCallCount);
+ Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength);
+ Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position);
+ }
+
+ [Test]
+ public void BlockPollClampsBlockLengthLimitWithoutIntegerOverflow()
+ {
+ int frameOffset = ALIGNED_FRAME_LENGTH;
+ var initialPosition = LogBufferDescriptor.ComputePosition(
+ INITIAL_TERM_ID, frameOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
+ Position.SetRelease(initialPosition);
+ var image = CreateImage();
+
+ InsertDataFrame(INITIAL_TERM_ID, frameOffset);
+
+ int handlerCallCount = 0;
+ int receivedLength = -1;
+ var bytes = image.BlockPoll(
+ (b, o, l, s, t) => { handlerCallCount++; receivedLength = l; },
+ int.MaxValue);
+
+ Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
+ Assert.AreEqual(1, handlerCallCount);
+ Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength);
+ Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position);
+ }
+
+ [Test]
+ public void RawPollClampsBlockLengthLimitWithoutIntegerOverflow()
+ {
+ int frameOffset = ALIGNED_FRAME_LENGTH;
+ var initialPosition = LogBufferDescriptor.ComputePosition(
+ INITIAL_TERM_ID, frameOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
+ Position.SetRelease(initialPosition);
+ var image = CreateImage();
+
+ InsertDataFrame(INITIAL_TERM_ID, frameOffset);
+
+ int handlerCallCount = 0;
+ int receivedLength = -1;
+ var bytes = image.RawPoll(
+ (fs, fo, buf, to, l, s, t) => { handlerCallCount++; receivedLength = l; },
+ int.MaxValue);
+
+ Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
+ Assert.AreEqual(1, handlerCallCount);
+ Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength);
+ Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position);
+ }
+
private Image CreateImage()
{
return new Image(Subscription, SESSION_ID, Position, LogBuffers, ErrorHandler, SOURCE_IDENTITY,
diff --git a/src/Adaptive.Aeron/Image.cs b/src/Adaptive.Aeron/Image.cs
index 18072c7..c0cea93 100644
--- a/src/Adaptive.Aeron/Image.cs
+++ b/src/Adaptive.Aeron/Image.cs
@@ -1,5 +1,5 @@
/*
- * Copyright 2014 - 2017 Adaptive Financial Consulting Ltd
+ * Copyright 2014 - 2026 Adaptive Financial Consulting Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
*/
using System;
+using System.IO;
using System.Runtime.CompilerServices;
using Adaptive.Aeron.LogBuffer;
using Adaptive.Aeron.Protocol;
@@ -262,14 +263,11 @@ public bool PublicationRevoked
}
- /////
- ///// The to the raw log of the Image.
- /////
- ///// the to the raw log of the Image.
- //public FileChannel FileChannel()
- //{
- // return logBuffers.FileChannel();
- //}
+ ///
+ /// The for the raw log of the Image.
+ ///
+ /// the for the raw log of the Image.
+ public FileStream FileStream => _logBuffers.FileStream;
///
/// Poll for new messages in a stream. If new messages are found beyond the last consumed position then they
@@ -814,7 +812,9 @@ public int BlockPoll(BlockHandler handler, int blockLengthLimit)
var position = _subscriberPosition.Get();
var offset = (int) position & _termLengthMask;
- var limitOffset = Math.Min(offset + blockLengthLimit, _termLengthMask + 1);
+ var capacity = _termLengthMask + 1;
+ var highLimitOffset = (long)offset + blockLengthLimit;
+ var limitOffset = (long)capacity < highLimitOffset ? capacity : (int)highLimitOffset;
var termBuffer = ActiveTermBuffer(position);
var resultingOffset = TermBlockScanner.Scan(termBuffer, offset, limitOffset);
var length = resultingOffset - offset;
@@ -833,7 +833,68 @@ public int BlockPoll(BlockHandler handler, int blockLengthLimit)
}
finally
{
- if (_isClosed)
+ if (!_isClosed)
+ {
+ _subscriberPosition.SetRelease(position + length);
+ }
+ }
+ }
+
+ return length;
+ }
+
+ ///
+ /// Poll for new messages in a stream. If new messages are found beyond the last consumed position then they
+ /// will be delivered to the up to a limited number of bytes.
+ ///
+ /// This method is useful for operations like bulk archiving a stream to a file or a socket.
+ ///
+ ///
+ /// A scan will terminate if a padding frame is encountered. If first frame in a scan is padding then a block
+ /// for the padding is notified. If the padding comes after the first frame in a scan then the scan terminates
+ /// at the offset the padding frame begins. Padding frames are delivered singularly in a block.
+ ///
+ ///
+ /// Padding frames may be for a greater range than the limit offset but only the header needs to be valid so
+ /// relevant length of the frame is .
+ ///
+ ///
+ /// to which block is delivered.
+ /// up to which a block may be in length.
+ /// the number of bytes that have been consumed.
+ public int RawPoll(RawBlockHandler handler, int blockLengthLimit)
+ {
+ if (_isClosed)
+ {
+ return 0;
+ }
+
+ long position = _subscriberPosition.Get();
+ int offset = (int)position & _termLengthMask;
+ int activeIndex = LogBufferDescriptor.IndexByPosition(position, PositionBitsToShift);
+ UnsafeBuffer termBuffer = _termBuffers[activeIndex];
+ int capacity = termBuffer.Capacity;
+ long highLimitOffset = (long)offset + blockLengthLimit;
+ int limitOffset = (long)capacity < highLimitOffset ? capacity : (int)highLimitOffset;
+ int resultingOffset = TermBlockScanner.Scan(termBuffer, offset, limitOffset);
+ int length = resultingOffset - offset;
+
+ if (resultingOffset > offset)
+ {
+ try
+ {
+ long fileOffset = ((long)capacity * activeIndex) + offset;
+ int termId = termBuffer.GetInt(offset + TERM_ID_FIELD_OFFSET);
+
+ handler(_logBuffers.FileStream, fileOffset, termBuffer, offset, length, SessionId, termId);
+ }
+ catch (Exception ex)
+ {
+ _errorHandler.OnError(ex);
+ }
+ finally
+ {
+ if (!_isClosed)
{
_subscriberPosition.SetRelease(position + length);
}
diff --git a/src/Adaptive.Aeron/LogBuffer/RawBlockHandler.cs b/src/Adaptive.Aeron/LogBuffer/RawBlockHandler.cs
new file mode 100644
index 0000000..b67dd17
--- /dev/null
+++ b/src/Adaptive.Aeron/LogBuffer/RawBlockHandler.cs
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2026 Adaptive Financial Consulting Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.IO;
+using Adaptive.Agrona.Concurrent;
+
+namespace Adaptive.Aeron.LogBuffer
+{
+ ///
+ /// Handler for a raw block of fragments from the log that are contained in the underlying file.
+ ///
+ /// read-only stream over the log file containing the block.
+ /// at which the block begins, including any frame headers.
+ /// mapped over the block of fragments.
+ /// in at which the block begins, including any frame headers.
+ /// of the block in bytes, including any frame headers, aligned up to .
+ /// of the stream of fragments.
+ /// of the stream of fragments.
+ public delegate void RawBlockHandler(
+ FileStream fileStream,
+ long fileOffset,
+ UnsafeBuffer termBuffer,
+ int termOffset,
+ int length,
+ int sessionId,
+ int termId);
+}
diff --git a/src/Adaptive.Aeron/LogBuffers.cs b/src/Adaptive.Aeron/LogBuffers.cs
index 27845bf..e3950a8 100644
--- a/src/Adaptive.Aeron/LogBuffers.cs
+++ b/src/Adaptive.Aeron/LogBuffers.cs
@@ -1,5 +1,5 @@
/*
- * Copyright 2014 - 2017 Adaptive Financial Consulting Ltd
+ * Copyright 2014 - 2026 Adaptive Financial Consulting Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@ public class LogBuffers : IDisposable
private readonly UnsafeBuffer[] _termBuffers = new UnsafeBuffer[LogBufferDescriptor.PARTITION_COUNT];
private readonly UnsafeBuffer _logMetaDataBuffer;
private readonly MappedByteBuffer[] _mappedByteBuffers;
+ private readonly FileStream _fileStream;
// ReSharper disable once UnusedMember.Global
//
@@ -57,12 +58,19 @@ public LogBuffers(string logFileName)
int termLength = 0;
UnsafeBuffer logMetaDataBuffer = null;
MappedByteBuffer[] mappedByteBuffers = null;
+ FileStream fileStream = null;
try
{
var fileInfo = new FileInfo(logFileName);
var logLength = fileInfo.Length;
+
+ fileStream = new FileStream(
+ logFileName,
+ FileMode.Open,
+ FileAccess.Read,
+ FileShare.ReadWrite | FileShare.Delete);
if (logLength < LogBufferDescriptor.LOG_META_DATA_LENGTH)
{
@@ -137,15 +145,23 @@ public LogBuffers(string logFileName)
}
catch (InvalidOperationException)
{
- Dispose(logMetaDataBuffer, mappedByteBuffers);
+ Dispose(logMetaDataBuffer, mappedByteBuffers, fileStream);
throw;
}
-
+
_termLength = termLength;
_logMetaDataBuffer = logMetaDataBuffer;
_mappedByteBuffers = mappedByteBuffers;
+ _fileStream = fileStream;
}
+ ///
+ /// Read-only stream over the log file. Shares the underlying file with the memory mapping;
+ /// callers must not write to it. Useful for raw-block consumers that want positioned reads
+ /// or zero-copy hand-off to a socket via Socket.SendFile.
+ ///
+ public FileStream FileStream => _fileStream;
+
public UnsafeBuffer[] DuplicateTermBuffers()
{
return _termBuffers;
@@ -182,7 +198,7 @@ public void PreTouch()
public void Dispose()
{
- Dispose(_logMetaDataBuffer, _mappedByteBuffers);
+ Dispose(_logMetaDataBuffer, _mappedByteBuffers, _fileStream);
}
///
@@ -230,7 +246,8 @@ public long LingerDeadlineNs()
return lingerDeadlineNs;
}
- private static void Dispose(UnsafeBuffer logMetaDataBuffer, MappedByteBuffer[] mappedByteBuffers)
+ private static void Dispose(
+ UnsafeBuffer logMetaDataBuffer, MappedByteBuffer[] mappedByteBuffers, FileStream fileStream)
{
if (null != logMetaDataBuffer)
{
@@ -247,6 +264,10 @@ private static void Dispose(UnsafeBuffer logMetaDataBuffer, MappedByteBuffer[] m
}
}
+ if (null != fileStream)
+ {
+ fileStream.Dispose();
+ }
}
}
}
\ No newline at end of file
diff --git a/src/Adaptive.Aeron/Subscription.cs b/src/Adaptive.Aeron/Subscription.cs
index ca82530..e6175b9 100644
--- a/src/Adaptive.Aeron/Subscription.cs
+++ b/src/Adaptive.Aeron/Subscription.cs
@@ -290,6 +290,27 @@ public long BlockPoll(BlockHandler blockHandler, int blockLengthLimit)
return bytesConsumed;
}
+ ///
+ /// Poll in a non-blocking manner for available message fragments on each in turn,
+ /// delivered to the supplied as raw blocks.
+ ///
+ /// This method is useful for operations like bulk archiving a stream to file.
+ ///
+ ///
+ /// to receive a block of fragments from each .
+ /// for each polled.
+ /// the number of bytes consumed across all images.
+ public long RawPoll(RawBlockHandler rawBlockHandler, int blockLengthLimit)
+ {
+ long bytesConsumed = 0;
+ foreach (var image in _fields.images)
+ {
+ bytesConsumed += image.RawPoll(rawBlockHandler, blockLengthLimit);
+ }
+
+ return bytesConsumed;
+ }
+
///
/// Is this subscription connected by having at least one open publication .
///