From 2d819dda4d16189843bc4d47806ab53c11365287 Mon Sep 17 00:00:00 2001 From: Eric Bowden Date: Thu, 30 Apr 2026 16:15:16 -0500 Subject: [PATCH] New: Added Image.RawPoll; same semantics as Java. New: Added a public FileStream accessor for LogBuffers, enabling use of zero-copy Socket.SendFile (via SafeFileHandle) if desired. Bug: Image.BlockPoll was missing a "!" in "if (_isClosed)", so it previously would only advance the subscriber position if the image _was_ closed (and then maybe crash). Fixed. Bug: Image.BlockPoll was missing 32-bit int overflow handling for the case of (offset + blockLengthLimit) being > int.MaxValue. Fixed. --- src/Adaptive.Aeron.Tests/ImageTest.cs | 271 ++++++++++++++++++ src/Adaptive.Aeron/Image.cs | 83 +++++- .../LogBuffer/RawBlockHandler.cs | 40 +++ src/Adaptive.Aeron/LogBuffers.cs | 31 +- src/Adaptive.Aeron/Subscription.cs | 21 ++ 5 files changed, 430 insertions(+), 16 deletions(-) create mode 100644 src/Adaptive.Aeron/LogBuffer/RawBlockHandler.cs diff --git a/src/Adaptive.Aeron.Tests/ImageTest.cs b/src/Adaptive.Aeron.Tests/ImageTest.cs index 28a60b10..7e6434a3 100644 --- a/src/Adaptive.Aeron.Tests/ImageTest.cs +++ b/src/Adaptive.Aeron.Tests/ImageTest.cs @@ -578,6 +578,277 @@ public void ShouldPollFragmentsToBoundedFragmentHandlerWithMaxPositionAboveIntMa .Then(A.CallTo(() => Position.SetRelease(TERM_BUFFER_LENGTH)).MustHaveHappened()); } + [Test] + public void BlockPollDeliversBlockAndAdvancesPosition() + { + var initialPosition = LogBufferDescriptor.ComputePosition( + INITIAL_TERM_ID, 0, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID); + Position.SetRelease(initialPosition); + var image = CreateImage(); + + InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(0)); + + int handlerCallCount = 0; + int receivedOffset = -1, receivedLength = -1, receivedSessionId = -1, receivedTermId = -1; + var bytes = image.BlockPoll( + (buffer, offset, length, sessionId, termId) => + { + handlerCallCount++; + receivedOffset = offset; + receivedLength = length; + receivedSessionId = sessionId; + receivedTermId = termId; + }, + int.MaxValue); + + Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes); + Assert.AreEqual(1, handlerCallCount); + Assert.AreEqual(0, receivedOffset); + Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength); + Assert.AreEqual(SESSION_ID, receivedSessionId); + Assert.AreEqual(INITIAL_TERM_ID, receivedTermId); + Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position); + } + + [Test] + public void BlockPollReturnsZeroAndDoesNothingWhenClosed() + { + var initialPosition = LogBufferDescriptor.ComputePosition( + INITIAL_TERM_ID, 0, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID); + Position.SetRelease(initialPosition); + var image = CreateImage(); + + InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(0)); + image.Close(); + + bool handlerCalled = false; + var bytes = image.BlockPoll((b, o, l, s, t) => handlerCalled = true, int.MaxValue); + + Assert.AreEqual(0, bytes); + Assert.IsFalse(handlerCalled); + Assert.AreEqual(initialPosition, image.Position); + } + + [Test] + public void BlockPollReturnsZeroWhenNoFramesAvailable() + { + var image = CreateImage(); + + bool handlerCalled = false; + var bytes = image.BlockPoll((b, o, l, s, t) => handlerCalled = true, int.MaxValue); + + Assert.AreEqual(0, bytes); + Assert.IsFalse(handlerCalled); + } + + [Test] + public void BlockPollRespectsBlockLengthLimit() + { + var initialPosition = LogBufferDescriptor.ComputePosition( + INITIAL_TERM_ID, 0, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID); + Position.SetRelease(initialPosition); + var image = CreateImage(); + + InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(0)); + InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(1)); + + int handlerCallCount = 0; + int receivedLength = -1; + var bytes = image.BlockPoll( + (b, o, l, s, t) => { handlerCallCount++; receivedLength = l; }, + ALIGNED_FRAME_LENGTH + 1); + + Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes); + Assert.AreEqual(1, handlerCallCount); + Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength); + Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position); + } + + [Test] + public void BlockPollDeliversPaddingFrameWhenFirstFrameIsPadding() + { + // Padding frame placed near end of term so the helper's TermRebuilder.Insert call + // copies only ALIGNED_FRAME_LENGTH bytes (RcvBuffer's capacity). Scan starts at + // paddingOffset and the first frame is padding, exercising the corner case. + int paddingOffset = TERM_BUFFER_LENGTH - ALIGNED_FRAME_LENGTH; + var initialPosition = LogBufferDescriptor.ComputePosition( + INITIAL_TERM_ID, paddingOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID); + Position.SetRelease(initialPosition); + var image = CreateImage(); + + InsertPaddingFrame(INITIAL_TERM_ID, paddingOffset); + + int handlerCallCount = 0; + int receivedLength = -1; + var bytes = image.BlockPoll( + (b, o, l, s, t) => { handlerCallCount++; receivedLength = l; }, + TERM_BUFFER_LENGTH); + + Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes); + Assert.AreEqual(1, handlerCallCount); + Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength); + Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position); + } + + [Test] + public void RawPollDeliversBlockAndAdvancesPosition() + { + var initialPosition = LogBufferDescriptor.ComputePosition( + INITIAL_TERM_ID, 0, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID); + Position.SetRelease(initialPosition); + var image = CreateImage(); + + InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(0)); + + int handlerCallCount = 0; + long receivedFileOffset = -1; + int receivedTermOffset = -1, receivedLength = -1, receivedSessionId = -1, receivedTermId = -1; + var bytes = image.RawPoll( + (fs, fileOffset, buf, termOffset, length, sessionId, termId) => + { + handlerCallCount++; + receivedFileOffset = fileOffset; + receivedTermOffset = termOffset; + receivedLength = length; + receivedSessionId = sessionId; + receivedTermId = termId; + }, + int.MaxValue); + + Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes); + Assert.AreEqual(1, handlerCallCount); + Assert.AreEqual(0L, receivedFileOffset); + Assert.AreEqual(0, receivedTermOffset); + Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength); + Assert.AreEqual(SESSION_ID, receivedSessionId); + Assert.AreEqual(INITIAL_TERM_ID, receivedTermId); + Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position); + } + + [Test] + public void RawPollComputesFileOffsetForNonZeroPartition() + { + // Place the subscriber position into partition index 2; fileOffset must be + // (capacity * activeIndex) + termOffset. + int activeTermId = INITIAL_TERM_ID + 2; + int termOffset = OffsetForFrame(0); + var initialPosition = LogBufferDescriptor.ComputePosition( + activeTermId, termOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID); + Position.SetRelease(initialPosition); + var image = CreateImage(); + + InsertDataFrame(activeTermId, termOffset); + + long receivedFileOffset = -1; + var bytes = image.RawPoll( + (fs, fileOffset, buf, to, l, s, t) => receivedFileOffset = fileOffset, + int.MaxValue); + + Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes); + long expectedFileOffset = (long)TERM_BUFFER_LENGTH * 2 + termOffset; + Assert.AreEqual(expectedFileOffset, receivedFileOffset); + } + + [Test] + public void RawPollReturnsZeroAndDoesNothingWhenClosed() + { + var initialPosition = LogBufferDescriptor.ComputePosition( + INITIAL_TERM_ID, 0, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID); + Position.SetRelease(initialPosition); + var image = CreateImage(); + + InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(0)); + image.Close(); + + bool handlerCalled = false; + var bytes = image.RawPoll((fs, fo, buf, to, l, s, t) => handlerCalled = true, int.MaxValue); + + Assert.AreEqual(0, bytes); + Assert.IsFalse(handlerCalled); + Assert.AreEqual(initialPosition, image.Position); + } + + [Test] + public void RawPollReturnsZeroWhenNoFramesAvailable() + { + var image = CreateImage(); + + bool handlerCalled = false; + var bytes = image.RawPoll((fs, fo, buf, to, l, s, t) => handlerCalled = true, int.MaxValue); + + Assert.AreEqual(0, bytes); + Assert.IsFalse(handlerCalled); + } + + [Test] + public void RawPollDeliversPaddingFrameWhenFirstFrameIsPadding() + { + int paddingOffset = TERM_BUFFER_LENGTH - ALIGNED_FRAME_LENGTH; + var initialPosition = LogBufferDescriptor.ComputePosition( + INITIAL_TERM_ID, paddingOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID); + Position.SetRelease(initialPosition); + var image = CreateImage(); + + InsertPaddingFrame(INITIAL_TERM_ID, paddingOffset); + + int handlerCallCount = 0; + int receivedLength = -1; + var bytes = image.RawPoll( + (fs, fo, buf, to, l, s, t) => { handlerCallCount++; receivedLength = l; }, + TERM_BUFFER_LENGTH); + + Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes); + Assert.AreEqual(1, handlerCallCount); + Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength); + Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position); + } + + [Test] + public void BlockPollClampsBlockLengthLimitWithoutIntegerOverflow() + { + int frameOffset = ALIGNED_FRAME_LENGTH; + var initialPosition = LogBufferDescriptor.ComputePosition( + INITIAL_TERM_ID, frameOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID); + Position.SetRelease(initialPosition); + var image = CreateImage(); + + InsertDataFrame(INITIAL_TERM_ID, frameOffset); + + int handlerCallCount = 0; + int receivedLength = -1; + var bytes = image.BlockPoll( + (b, o, l, s, t) => { handlerCallCount++; receivedLength = l; }, + int.MaxValue); + + Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes); + Assert.AreEqual(1, handlerCallCount); + Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength); + Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position); + } + + [Test] + public void RawPollClampsBlockLengthLimitWithoutIntegerOverflow() + { + int frameOffset = ALIGNED_FRAME_LENGTH; + var initialPosition = LogBufferDescriptor.ComputePosition( + INITIAL_TERM_ID, frameOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID); + Position.SetRelease(initialPosition); + var image = CreateImage(); + + InsertDataFrame(INITIAL_TERM_ID, frameOffset); + + int handlerCallCount = 0; + int receivedLength = -1; + var bytes = image.RawPoll( + (fs, fo, buf, to, l, s, t) => { handlerCallCount++; receivedLength = l; }, + int.MaxValue); + + Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes); + Assert.AreEqual(1, handlerCallCount); + Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength); + Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position); + } + private Image CreateImage() { return new Image(Subscription, SESSION_ID, Position, LogBuffers, ErrorHandler, SOURCE_IDENTITY, diff --git a/src/Adaptive.Aeron/Image.cs b/src/Adaptive.Aeron/Image.cs index 18072c77..c0cea932 100644 --- a/src/Adaptive.Aeron/Image.cs +++ b/src/Adaptive.Aeron/Image.cs @@ -1,5 +1,5 @@ /* - * Copyright 2014 - 2017 Adaptive Financial Consulting Ltd + * Copyright 2014 - 2026 Adaptive Financial Consulting Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ */ using System; +using System.IO; using System.Runtime.CompilerServices; using Adaptive.Aeron.LogBuffer; using Adaptive.Aeron.Protocol; @@ -262,14 +263,11 @@ public bool PublicationRevoked } - ///// - ///// The to the raw log of the Image. - ///// - ///// the to the raw log of the Image. - //public FileChannel FileChannel() - //{ - // return logBuffers.FileChannel(); - //} + /// + /// The for the raw log of the Image. + /// + /// the for the raw log of the Image. + public FileStream FileStream => _logBuffers.FileStream; /// /// Poll for new messages in a stream. If new messages are found beyond the last consumed position then they @@ -814,7 +812,9 @@ public int BlockPoll(BlockHandler handler, int blockLengthLimit) var position = _subscriberPosition.Get(); var offset = (int) position & _termLengthMask; - var limitOffset = Math.Min(offset + blockLengthLimit, _termLengthMask + 1); + var capacity = _termLengthMask + 1; + var highLimitOffset = (long)offset + blockLengthLimit; + var limitOffset = (long)capacity < highLimitOffset ? capacity : (int)highLimitOffset; var termBuffer = ActiveTermBuffer(position); var resultingOffset = TermBlockScanner.Scan(termBuffer, offset, limitOffset); var length = resultingOffset - offset; @@ -833,7 +833,68 @@ public int BlockPoll(BlockHandler handler, int blockLengthLimit) } finally { - if (_isClosed) + if (!_isClosed) + { + _subscriberPosition.SetRelease(position + length); + } + } + } + + return length; + } + + /// + /// Poll for new messages in a stream. If new messages are found beyond the last consumed position then they + /// will be delivered to the up to a limited number of bytes. + /// + /// This method is useful for operations like bulk archiving a stream to a file or a socket. + /// + /// + /// A scan will terminate if a padding frame is encountered. If first frame in a scan is padding then a block + /// for the padding is notified. If the padding comes after the first frame in a scan then the scan terminates + /// at the offset the padding frame begins. Padding frames are delivered singularly in a block. + /// + /// + /// Padding frames may be for a greater range than the limit offset but only the header needs to be valid so + /// relevant length of the frame is . + /// + /// + /// to which block is delivered. + /// up to which a block may be in length. + /// the number of bytes that have been consumed. + public int RawPoll(RawBlockHandler handler, int blockLengthLimit) + { + if (_isClosed) + { + return 0; + } + + long position = _subscriberPosition.Get(); + int offset = (int)position & _termLengthMask; + int activeIndex = LogBufferDescriptor.IndexByPosition(position, PositionBitsToShift); + UnsafeBuffer termBuffer = _termBuffers[activeIndex]; + int capacity = termBuffer.Capacity; + long highLimitOffset = (long)offset + blockLengthLimit; + int limitOffset = (long)capacity < highLimitOffset ? capacity : (int)highLimitOffset; + int resultingOffset = TermBlockScanner.Scan(termBuffer, offset, limitOffset); + int length = resultingOffset - offset; + + if (resultingOffset > offset) + { + try + { + long fileOffset = ((long)capacity * activeIndex) + offset; + int termId = termBuffer.GetInt(offset + TERM_ID_FIELD_OFFSET); + + handler(_logBuffers.FileStream, fileOffset, termBuffer, offset, length, SessionId, termId); + } + catch (Exception ex) + { + _errorHandler.OnError(ex); + } + finally + { + if (!_isClosed) { _subscriberPosition.SetRelease(position + length); } diff --git a/src/Adaptive.Aeron/LogBuffer/RawBlockHandler.cs b/src/Adaptive.Aeron/LogBuffer/RawBlockHandler.cs new file mode 100644 index 00000000..b67dd175 --- /dev/null +++ b/src/Adaptive.Aeron/LogBuffer/RawBlockHandler.cs @@ -0,0 +1,40 @@ +/* + * Copyright 2026 Adaptive Financial Consulting Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.IO; +using Adaptive.Agrona.Concurrent; + +namespace Adaptive.Aeron.LogBuffer +{ + /// + /// Handler for a raw block of fragments from the log that are contained in the underlying file. + /// + /// read-only stream over the log file containing the block. + /// at which the block begins, including any frame headers. + /// mapped over the block of fragments. + /// in at which the block begins, including any frame headers. + /// of the block in bytes, including any frame headers, aligned up to . + /// of the stream of fragments. + /// of the stream of fragments. + public delegate void RawBlockHandler( + FileStream fileStream, + long fileOffset, + UnsafeBuffer termBuffer, + int termOffset, + int length, + int sessionId, + int termId); +} diff --git a/src/Adaptive.Aeron/LogBuffers.cs b/src/Adaptive.Aeron/LogBuffers.cs index 27845bfa..e3950a8c 100644 --- a/src/Adaptive.Aeron/LogBuffers.cs +++ b/src/Adaptive.Aeron/LogBuffers.cs @@ -1,5 +1,5 @@ /* - * Copyright 2014 - 2017 Adaptive Financial Consulting Ltd + * Copyright 2014 - 2026 Adaptive Financial Consulting Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,6 +39,7 @@ public class LogBuffers : IDisposable private readonly UnsafeBuffer[] _termBuffers = new UnsafeBuffer[LogBufferDescriptor.PARTITION_COUNT]; private readonly UnsafeBuffer _logMetaDataBuffer; private readonly MappedByteBuffer[] _mappedByteBuffers; + private readonly FileStream _fileStream; // ReSharper disable once UnusedMember.Global // @@ -57,12 +58,19 @@ public LogBuffers(string logFileName) int termLength = 0; UnsafeBuffer logMetaDataBuffer = null; MappedByteBuffer[] mappedByteBuffers = null; + FileStream fileStream = null; try { var fileInfo = new FileInfo(logFileName); var logLength = fileInfo.Length; + + fileStream = new FileStream( + logFileName, + FileMode.Open, + FileAccess.Read, + FileShare.ReadWrite | FileShare.Delete); if (logLength < LogBufferDescriptor.LOG_META_DATA_LENGTH) { @@ -137,15 +145,23 @@ public LogBuffers(string logFileName) } catch (InvalidOperationException) { - Dispose(logMetaDataBuffer, mappedByteBuffers); + Dispose(logMetaDataBuffer, mappedByteBuffers, fileStream); throw; } - + _termLength = termLength; _logMetaDataBuffer = logMetaDataBuffer; _mappedByteBuffers = mappedByteBuffers; + _fileStream = fileStream; } + /// + /// Read-only stream over the log file. Shares the underlying file with the memory mapping; + /// callers must not write to it. Useful for raw-block consumers that want positioned reads + /// or zero-copy hand-off to a socket via Socket.SendFile. + /// + public FileStream FileStream => _fileStream; + public UnsafeBuffer[] DuplicateTermBuffers() { return _termBuffers; @@ -182,7 +198,7 @@ public void PreTouch() public void Dispose() { - Dispose(_logMetaDataBuffer, _mappedByteBuffers); + Dispose(_logMetaDataBuffer, _mappedByteBuffers, _fileStream); } /// @@ -230,7 +246,8 @@ public long LingerDeadlineNs() return lingerDeadlineNs; } - private static void Dispose(UnsafeBuffer logMetaDataBuffer, MappedByteBuffer[] mappedByteBuffers) + private static void Dispose( + UnsafeBuffer logMetaDataBuffer, MappedByteBuffer[] mappedByteBuffers, FileStream fileStream) { if (null != logMetaDataBuffer) { @@ -247,6 +264,10 @@ private static void Dispose(UnsafeBuffer logMetaDataBuffer, MappedByteBuffer[] m } } + if (null != fileStream) + { + fileStream.Dispose(); + } } } } \ No newline at end of file diff --git a/src/Adaptive.Aeron/Subscription.cs b/src/Adaptive.Aeron/Subscription.cs index ca82530a..e6175b95 100644 --- a/src/Adaptive.Aeron/Subscription.cs +++ b/src/Adaptive.Aeron/Subscription.cs @@ -290,6 +290,27 @@ public long BlockPoll(BlockHandler blockHandler, int blockLengthLimit) return bytesConsumed; } + /// + /// Poll in a non-blocking manner for available message fragments on each in turn, + /// delivered to the supplied as raw blocks. + /// + /// This method is useful for operations like bulk archiving a stream to file. + /// + /// + /// to receive a block of fragments from each . + /// for each polled. + /// the number of bytes consumed across all images. + public long RawPoll(RawBlockHandler rawBlockHandler, int blockLengthLimit) + { + long bytesConsumed = 0; + foreach (var image in _fields.images) + { + bytesConsumed += image.RawPoll(rawBlockHandler, blockLengthLimit); + } + + return bytesConsumed; + } + /// /// Is this subscription connected by having at least one open publication . ///