Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 271 additions & 0 deletions src/Adaptive.Aeron.Tests/ImageTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,277 @@ public void ShouldPollFragmentsToBoundedFragmentHandlerWithMaxPositionAboveIntMa
.Then(A.CallTo(() => Position.SetRelease(TERM_BUFFER_LENGTH)).MustHaveHappened());
}

[Test]
public void BlockPollDeliversBlockAndAdvancesPosition()
{
var initialPosition = LogBufferDescriptor.ComputePosition(
INITIAL_TERM_ID, 0, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
Position.SetRelease(initialPosition);
var image = CreateImage();

InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(0));

int handlerCallCount = 0;
int receivedOffset = -1, receivedLength = -1, receivedSessionId = -1, receivedTermId = -1;
var bytes = image.BlockPoll(
(buffer, offset, length, sessionId, termId) =>
{
handlerCallCount++;
receivedOffset = offset;
receivedLength = length;
receivedSessionId = sessionId;
receivedTermId = termId;
},
int.MaxValue);

Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
Assert.AreEqual(1, handlerCallCount);
Assert.AreEqual(0, receivedOffset);
Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength);
Assert.AreEqual(SESSION_ID, receivedSessionId);
Assert.AreEqual(INITIAL_TERM_ID, receivedTermId);
Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position);
}

[Test]
public void BlockPollReturnsZeroAndDoesNothingWhenClosed()
{
var initialPosition = LogBufferDescriptor.ComputePosition(
INITIAL_TERM_ID, 0, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
Position.SetRelease(initialPosition);
var image = CreateImage();

InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(0));
image.Close();

bool handlerCalled = false;
var bytes = image.BlockPoll((b, o, l, s, t) => handlerCalled = true, int.MaxValue);

Assert.AreEqual(0, bytes);
Assert.IsFalse(handlerCalled);
Assert.AreEqual(initialPosition, image.Position);
}

[Test]
public void BlockPollReturnsZeroWhenNoFramesAvailable()
{
var image = CreateImage();

bool handlerCalled = false;
var bytes = image.BlockPoll((b, o, l, s, t) => handlerCalled = true, int.MaxValue);

Assert.AreEqual(0, bytes);
Assert.IsFalse(handlerCalled);
}

[Test]
public void BlockPollRespectsBlockLengthLimit()
{
var initialPosition = LogBufferDescriptor.ComputePosition(
INITIAL_TERM_ID, 0, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
Position.SetRelease(initialPosition);
var image = CreateImage();

InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(0));
InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(1));

int handlerCallCount = 0;
int receivedLength = -1;
var bytes = image.BlockPoll(
(b, o, l, s, t) => { handlerCallCount++; receivedLength = l; },
ALIGNED_FRAME_LENGTH + 1);

Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
Assert.AreEqual(1, handlerCallCount);
Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength);
Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position);
}

[Test]
public void BlockPollDeliversPaddingFrameWhenFirstFrameIsPadding()
{
// Padding frame placed near end of term so the helper's TermRebuilder.Insert call
// copies only ALIGNED_FRAME_LENGTH bytes (RcvBuffer's capacity). Scan starts at
// paddingOffset and the first frame is padding, exercising the corner case.
int paddingOffset = TERM_BUFFER_LENGTH - ALIGNED_FRAME_LENGTH;
var initialPosition = LogBufferDescriptor.ComputePosition(
INITIAL_TERM_ID, paddingOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
Position.SetRelease(initialPosition);
var image = CreateImage();

InsertPaddingFrame(INITIAL_TERM_ID, paddingOffset);

int handlerCallCount = 0;
int receivedLength = -1;
var bytes = image.BlockPoll(
(b, o, l, s, t) => { handlerCallCount++; receivedLength = l; },
TERM_BUFFER_LENGTH);

Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
Assert.AreEqual(1, handlerCallCount);
Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength);
Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position);
}

[Test]
public void RawPollDeliversBlockAndAdvancesPosition()
{
var initialPosition = LogBufferDescriptor.ComputePosition(
INITIAL_TERM_ID, 0, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
Position.SetRelease(initialPosition);
var image = CreateImage();

InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(0));

int handlerCallCount = 0;
long receivedFileOffset = -1;
int receivedTermOffset = -1, receivedLength = -1, receivedSessionId = -1, receivedTermId = -1;
var bytes = image.RawPoll(
(fs, fileOffset, buf, termOffset, length, sessionId, termId) =>
{
handlerCallCount++;
receivedFileOffset = fileOffset;
receivedTermOffset = termOffset;
receivedLength = length;
receivedSessionId = sessionId;
receivedTermId = termId;
},
int.MaxValue);

Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
Assert.AreEqual(1, handlerCallCount);
Assert.AreEqual(0L, receivedFileOffset);
Assert.AreEqual(0, receivedTermOffset);
Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength);
Assert.AreEqual(SESSION_ID, receivedSessionId);
Assert.AreEqual(INITIAL_TERM_ID, receivedTermId);
Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position);
}

[Test]
public void RawPollComputesFileOffsetForNonZeroPartition()
{
// Place the subscriber position into partition index 2; fileOffset must be
// (capacity * activeIndex) + termOffset.
int activeTermId = INITIAL_TERM_ID + 2;
int termOffset = OffsetForFrame(0);
var initialPosition = LogBufferDescriptor.ComputePosition(
activeTermId, termOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
Position.SetRelease(initialPosition);
var image = CreateImage();

InsertDataFrame(activeTermId, termOffset);

long receivedFileOffset = -1;
var bytes = image.RawPoll(
(fs, fileOffset, buf, to, l, s, t) => receivedFileOffset = fileOffset,
int.MaxValue);

Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
long expectedFileOffset = (long)TERM_BUFFER_LENGTH * 2 + termOffset;
Assert.AreEqual(expectedFileOffset, receivedFileOffset);
}

[Test]
public void RawPollReturnsZeroAndDoesNothingWhenClosed()
{
var initialPosition = LogBufferDescriptor.ComputePosition(
INITIAL_TERM_ID, 0, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
Position.SetRelease(initialPosition);
var image = CreateImage();

InsertDataFrame(INITIAL_TERM_ID, OffsetForFrame(0));
image.Close();

bool handlerCalled = false;
var bytes = image.RawPoll((fs, fo, buf, to, l, s, t) => handlerCalled = true, int.MaxValue);

Assert.AreEqual(0, bytes);
Assert.IsFalse(handlerCalled);
Assert.AreEqual(initialPosition, image.Position);
}

[Test]
public void RawPollReturnsZeroWhenNoFramesAvailable()
{
var image = CreateImage();

bool handlerCalled = false;
var bytes = image.RawPoll((fs, fo, buf, to, l, s, t) => handlerCalled = true, int.MaxValue);

Assert.AreEqual(0, bytes);
Assert.IsFalse(handlerCalled);
}

[Test]
public void RawPollDeliversPaddingFrameWhenFirstFrameIsPadding()
{
int paddingOffset = TERM_BUFFER_LENGTH - ALIGNED_FRAME_LENGTH;
var initialPosition = LogBufferDescriptor.ComputePosition(
INITIAL_TERM_ID, paddingOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
Position.SetRelease(initialPosition);
var image = CreateImage();

InsertPaddingFrame(INITIAL_TERM_ID, paddingOffset);

int handlerCallCount = 0;
int receivedLength = -1;
var bytes = image.RawPoll(
(fs, fo, buf, to, l, s, t) => { handlerCallCount++; receivedLength = l; },
TERM_BUFFER_LENGTH);

Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
Assert.AreEqual(1, handlerCallCount);
Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength);
Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position);
}

[Test]
public void BlockPollClampsBlockLengthLimitWithoutIntegerOverflow()
{
int frameOffset = ALIGNED_FRAME_LENGTH;
var initialPosition = LogBufferDescriptor.ComputePosition(
INITIAL_TERM_ID, frameOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
Position.SetRelease(initialPosition);
var image = CreateImage();

InsertDataFrame(INITIAL_TERM_ID, frameOffset);

int handlerCallCount = 0;
int receivedLength = -1;
var bytes = image.BlockPoll(
(b, o, l, s, t) => { handlerCallCount++; receivedLength = l; },
int.MaxValue);

Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
Assert.AreEqual(1, handlerCallCount);
Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength);
Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position);
}

[Test]
public void RawPollClampsBlockLengthLimitWithoutIntegerOverflow()
{
int frameOffset = ALIGNED_FRAME_LENGTH;
var initialPosition = LogBufferDescriptor.ComputePosition(
INITIAL_TERM_ID, frameOffset, POSITION_BITS_TO_SHIFT, INITIAL_TERM_ID);
Position.SetRelease(initialPosition);
var image = CreateImage();

InsertDataFrame(INITIAL_TERM_ID, frameOffset);

int handlerCallCount = 0;
int receivedLength = -1;
var bytes = image.RawPoll(
(fs, fo, buf, to, l, s, t) => { handlerCallCount++; receivedLength = l; },
int.MaxValue);

Assert.AreEqual(ALIGNED_FRAME_LENGTH, bytes);
Assert.AreEqual(1, handlerCallCount);
Assert.AreEqual(ALIGNED_FRAME_LENGTH, receivedLength);
Assert.AreEqual(initialPosition + ALIGNED_FRAME_LENGTH, image.Position);
}

private Image CreateImage()
{
return new Image(Subscription, SESSION_ID, Position, LogBuffers, ErrorHandler, SOURCE_IDENTITY,
Expand Down
83 changes: 72 additions & 11 deletions src/Adaptive.Aeron/Image.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014 - 2017 Adaptive Financial Consulting Ltd
* Copyright 2014 - 2026 Adaptive Financial Consulting Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
*/

using System;
using System.IO;
using System.Runtime.CompilerServices;
using Adaptive.Aeron.LogBuffer;
using Adaptive.Aeron.Protocol;
Expand Down Expand Up @@ -262,14 +263,11 @@ public bool PublicationRevoked
}


///// <summary>
///// The <seealso cref="FileChannel"/> to the raw log of the Image.
///// </summary>
///// <returns> the <seealso cref="FileChannel"/> to the raw log of the Image. </returns>
//public FileChannel FileChannel()
//{
// return logBuffers.FileChannel();
//}
/// <summary>
/// The <see cref="FileStream"/> for the raw log of the Image.
/// </summary>
/// <returns> the <see cref="FileStream"/> for the raw log of the Image. </returns>
public FileStream FileStream => _logBuffers.FileStream;

/// <summary>
/// Poll for new messages in a stream. If new messages are found beyond the last consumed position then they
Expand Down Expand Up @@ -814,7 +812,9 @@ public int BlockPoll(BlockHandler handler, int blockLengthLimit)

var position = _subscriberPosition.Get();
var offset = (int) position & _termLengthMask;
var limitOffset = Math.Min(offset + blockLengthLimit, _termLengthMask + 1);
var capacity = _termLengthMask + 1;
var highLimitOffset = (long)offset + blockLengthLimit;
var limitOffset = (long)capacity < highLimitOffset ? capacity : (int)highLimitOffset;
var termBuffer = ActiveTermBuffer(position);
var resultingOffset = TermBlockScanner.Scan(termBuffer, offset, limitOffset);
var length = resultingOffset - offset;
Expand All @@ -833,7 +833,68 @@ public int BlockPoll(BlockHandler handler, int blockLengthLimit)
}
finally
{
if (_isClosed)
if (!_isClosed)
{
_subscriberPosition.SetRelease(position + length);
}
}
}

return length;
}

/// <summary>
/// Poll for new messages in a stream. If new messages are found beyond the last consumed position then they
/// will be delivered to the <see cref="RawBlockHandler"/> up to a limited number of bytes.
/// <para>
/// This method is useful for operations like bulk archiving a stream to a file or a socket.
/// </para>
/// <para>
/// A scan will terminate if a padding frame is encountered. If first frame in a scan is padding then a block
/// for the padding is notified. If the padding comes after the first frame in a scan then the scan terminates
/// at the offset the padding frame begins. Padding frames are delivered singularly in a block.
/// </para>
/// <para>
/// Padding frames may be for a greater range than the limit offset but only the header needs to be valid so
/// relevant length of the frame is <see cref="DataHeaderFlyweight.HEADER_LENGTH"/>.
/// </para>
/// </summary>
/// <param name="handler"> to which block is delivered. </param>
/// <param name="blockLengthLimit"> up to which a block may be in length. </param>
/// <returns> the number of bytes that have been consumed. </returns>
public int RawPoll(RawBlockHandler handler, int blockLengthLimit)
{
if (_isClosed)
{
return 0;
}

long position = _subscriberPosition.Get();
int offset = (int)position & _termLengthMask;
int activeIndex = LogBufferDescriptor.IndexByPosition(position, PositionBitsToShift);
UnsafeBuffer termBuffer = _termBuffers[activeIndex];
int capacity = termBuffer.Capacity;
long highLimitOffset = (long)offset + blockLengthLimit;
int limitOffset = (long)capacity < highLimitOffset ? capacity : (int)highLimitOffset;
int resultingOffset = TermBlockScanner.Scan(termBuffer, offset, limitOffset);
int length = resultingOffset - offset;

if (resultingOffset > offset)
{
try
{
long fileOffset = ((long)capacity * activeIndex) + offset;
int termId = termBuffer.GetInt(offset + TERM_ID_FIELD_OFFSET);

handler(_logBuffers.FileStream, fileOffset, termBuffer, offset, length, SessionId, termId);
}
catch (Exception ex)
{
_errorHandler.OnError(ex);
}
finally
{
if (!_isClosed)
{
_subscriberPosition.SetRelease(position + length);
}
Expand Down
Loading
Loading