From 5b7708cd93a4c8ff429beb03333c5d1b9cc931ea Mon Sep 17 00:00:00 2001 From: Eric Bowden Date: Mon, 4 May 2026 14:45:31 -0500 Subject: [PATCH] Ported Aeron client unit tests from Java Changed StorageSpaceException.IsStorageSpaceError and some log messages and added the ChannelUri.Diff function to better match Java implementation Fixed a bug in ChannelUriStringBuilder.NakDelay setter that caused it to not actually set the NakDelay Various other small changes for testability --- src/Adaptive.Aeron.Tests/AeronCountersTest.cs | 221 ++++++++++ .../ChannelUriStringBuilderTest.cs | 391 ++++++++++++++++++ .../Command/CounterMessageFlyweightTest.cs | 50 +++ .../Command/TerminateDriverFlyweightTest.cs | 35 ++ .../Exceptions/StorageSpaceExceptionTest.cs | 49 +++ .../LogBuffer/HeaderTest.cs | 128 ++++++ .../LogBuffer/HeaderWriterTest.cs | 59 +++ .../LogBuffer/LogBufferDescriptorTest.cs | 76 ++++ src/Adaptive.Aeron.Tests/LogBuffersTest.cs | 87 ++++ src/Adaptive.Aeron/Adaptive.Aeron.csproj | 3 + src/Adaptive.Aeron/AeronCounters.cs | 45 ++ src/Adaptive.Aeron/ChannelUri.cs | 55 +++ src/Adaptive.Aeron/ChannelUriStringBuilder.cs | 66 ++- .../Command/CounterMessageFlyweight.cs | 2 +- .../Command/TerminateDriverFlyweight.cs | 2 +- .../Exceptions/StorageSpaceException.cs | 15 +- .../LogBuffer/LogBufferDescriptor.cs | 2 +- .../Concurrent/Status/CountersReader.cs | 8 +- src/Adaptive.Agrona/SystemUtil.cs | 92 +++++ 19 files changed, 1367 insertions(+), 19 deletions(-) create mode 100644 src/Adaptive.Aeron.Tests/AeronCountersTest.cs create mode 100644 src/Adaptive.Aeron.Tests/ChannelUriStringBuilderTest.cs create mode 100644 src/Adaptive.Aeron.Tests/Command/CounterMessageFlyweightTest.cs create mode 100644 src/Adaptive.Aeron.Tests/Command/TerminateDriverFlyweightTest.cs create mode 100644 src/Adaptive.Aeron.Tests/Exceptions/StorageSpaceExceptionTest.cs create mode 100644 src/Adaptive.Aeron.Tests/LogBuffer/HeaderTest.cs create mode 100644 src/Adaptive.Aeron.Tests/LogBuffer/HeaderWriterTest.cs create mode 100644 src/Adaptive.Aeron.Tests/LogBuffer/LogBufferDescriptorTest.cs create mode 100644 src/Adaptive.Aeron.Tests/LogBuffersTest.cs diff --git a/src/Adaptive.Aeron.Tests/AeronCountersTest.cs b/src/Adaptive.Aeron.Tests/AeronCountersTest.cs new file mode 100644 index 00000000..8cc2058c --- /dev/null +++ b/src/Adaptive.Aeron.Tests/AeronCountersTest.cs @@ -0,0 +1,221 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; +using Adaptive.Agrona; +using Adaptive.Agrona.Concurrent; +using Adaptive.Agrona.Concurrent.Status; +using NUnit.Framework; +using static Adaptive.Agrona.Concurrent.Status.CountersReader; + +namespace Adaptive.Aeron.Tests +{ + public class AeronCountersTest + { + [Test] + public void ShouldNotHaveOverlappingCounterTypeIds() + { + var fieldByTypeId = new Dictionary(); + var duplicates = new Dictionary>(); + + var typeIdFields = typeof(AeronCounters).GetFields(BindingFlags.Public | BindingFlags.Static) + .Where(f => f.IsLiteral || f.IsInitOnly) + .Where(f => f.Name.EndsWith("_TYPE_ID")) + .Where(f => f.FieldType == typeof(int)); + + foreach (var f in typeIdFields) + { + int typeId = (int)f.GetValue(null); + if (fieldByTypeId.TryGetValue(typeId, out var existing)) + { + if (!duplicates.TryGetValue(typeId, out var list)) + { + list = new List(); + duplicates[typeId] = list; + } + if (!list.Contains(f)) list.Add(f); + list.Add(existing); + } + else + { + fieldByTypeId[typeId] = f; + } + } + + if (duplicates.Count > 0) + { + var lines = duplicates.Select(kv => kv.Key + " -> " + + string.Join(", ", kv.Value.Select(f => f.DeclaringType?.Name + "." + f.Name))); + Assert.Fail("Duplicate typeIds: " + string.Join("; ", lines)); + } + } + + [TestCase( + "1.42.1", + "8165495befc07e997a7f2f7743beab9d3846b0a5", + "version=1.42.1 commit=8165495befc07e997a7f2f7743beab9d3846b0a5")] + [TestCase("1.43.0-SNAPSHOT", "abc", "version=1.43.0-SNAPSHOT commit=abc")] + [TestCase("NIL", "12345678", "version=NIL commit=12345678")] + public void ShouldFormatVersionInfo(string fullVersion, string commitHash, string expected) + { + Assert.AreEqual(expected, AeronCounters.FormatVersionInfo(fullVersion, commitHash)); + } + + [TestCase("xyz", "1234567890", "version=xyz commit=1234567890")] + [TestCase("1.43.0-SNAPSHOT", "abc", "version=1.43.0-SNAPSHOT commit=abc")] + public void ShouldAppendVersionInfo(string fullVersion, string commitHash, string formatted) + { + string expected = " " + formatted; + var buffer = new ExpandableArrayBuffer(32); + const int offset = 5; + buffer.SetMemory(0, buffer.Capacity, 0xFF); + + int length = AeronCounters.AppendVersionInfo(buffer, offset, fullVersion, commitHash); + + Assert.AreEqual(expected.Length, length); + Assert.AreEqual(expected, buffer.GetStringWithoutLengthAscii(offset, length)); + } + + [TestCase(int.MinValue)] + [TestCase(-1)] + public void AppendToLabelThrowsArgumentExceptionIfCounterIsNegative(int counterId) + { + var exception = Assert.Throws( + () => AeronCounters.AppendToLabel(new UnsafeBuffer(new byte[0]), counterId, "test")); + Assert.AreEqual("counter id " + counterId + " is negative", exception.Message); + } + + [Test] + public void AppendToLabelThrowsArgumentNullExceptionIfBufferIsNull() + { + Assert.Throws( + () => AeronCounters.AppendToLabel(null, 5, "test")); + } + + [TestCase(1_000_000)] + [TestCase(int.MaxValue)] + public void AppendToLabelThrowsArgumentExceptionIfCounterIsOutOfRange(int counterId) + { + var metaDataBuffer = new UnsafeBuffer(new byte[METADATA_LENGTH * 3]); + + var exception = Assert.Throws( + () => AeronCounters.AppendToLabel(metaDataBuffer, counterId, "test")); + Assert.AreEqual("counter id " + counterId + " out of range: 0 - maxCounterId=2", exception.Message); + } + + [TestCase(RECORD_UNUSED)] + [TestCase(RECORD_RECLAIMED)] + public void AppendToLabelThrowsArgumentExceptionIfCounterIsInWrongState(int state) + { + var metaDataBuffer = new UnsafeBuffer(new byte[METADATA_LENGTH * 2]); + const int counterId = 1; + int metaDataOffset = MetaDataOffset(counterId); + metaDataBuffer.PutInt(metaDataOffset, state); + + var exception = Assert.Throws( + () => AeronCounters.AppendToLabel(metaDataBuffer, counterId, "test")); + Assert.AreEqual("counter id 1 is not allocated, state: " + state, exception.Message); + } + + [Test] + public void AppendToLabelShouldAddSuffix() + { + var countersManager = new CountersManager( + new UnsafeBuffer(new byte[METADATA_LENGTH]), + new UnsafeBuffer(new byte[COUNTER_LENGTH]), + Encoding.ASCII); + int counterId = countersManager.Allocate("initial value: "); + + int length = AeronCounters.AppendToLabel(countersManager.MetaDataBuffer, counterId, "test"); + + Assert.AreEqual(4, length); + Assert.AreEqual("initial value: test", countersManager.GetCounterLabel(counterId)); + } + + [Test] + public void AppendToLabelShouldAddAPortionOfSuffixUpToTheMaxLength() + { + var countersManager = new CountersManager( + new UnsafeBuffer(new byte[METADATA_LENGTH]), + new UnsafeBuffer(new byte[COUNTER_LENGTH]), + Encoding.ASCII); + const string initialLabel = "this is a test counter"; + int counterId = countersManager.Allocate(initialLabel); + string hugeSuffix = " - 42" + new string('x', MAX_LABEL_LENGTH); + + int length = AeronCounters.AppendToLabel(countersManager.MetaDataBuffer, counterId, hugeSuffix); + + Assert.AreNotEqual(hugeSuffix.Length, length); + Assert.AreEqual(MAX_LABEL_LENGTH - initialLabel.Length, length); + Assert.AreEqual(initialLabel + hugeSuffix.Substring(0, length), countersManager.GetCounterLabel(counterId)); + } + + [Test] + public void AppendToLabelIsANoOpIfThereIsNoSpaceInTheLabel() + { + var countersManager = new CountersManager( + new UnsafeBuffer(new byte[METADATA_LENGTH]), + new UnsafeBuffer(new byte[COUNTER_LENGTH]), + Encoding.ASCII); + string label = new string('a', MAX_LABEL_LENGTH); + int counterId = countersManager.Allocate(label); + + int length = AeronCounters.AppendToLabel(countersManager.MetaDataBuffer, counterId, "test"); + + Assert.AreEqual(0, length); + Assert.AreEqual(label, countersManager.GetCounterLabel(counterId)); + } + + [Test] + public void SetReferenceIdShouldThrowArgumentNullExceptionIfMetadataBufferIsNull() + { + Assert.Throws( + () => AeronCounters.SetReferenceId(null, new UnsafeBuffer(new byte[0]), 1, 123)); + } + + [Test] + public void SetReferenceIdShouldThrowArgumentNullExceptionIfValuesBufferIsNull() + { + Assert.Throws( + () => AeronCounters.SetReferenceId(new UnsafeBuffer(new byte[0]), null, 1, 123)); + } + + [Test] + public void SetReferenceIdShouldRejectNegativeCounterId() + { + var exception = Assert.Throws( + () => AeronCounters.SetReferenceId( + new UnsafeBuffer(new byte[0]), new UnsafeBuffer(new byte[0]), -4, 123)); + Assert.AreEqual("counter id -4 is negative", exception.Message); + } + + [Test] + public void SetReferenceIdShouldRejectCounterIdWhichIsOutOfRange() + { + var exception = Assert.Throws( + () => AeronCounters.SetReferenceId( + new UnsafeBuffer(new byte[2 * METADATA_LENGTH]), + new UnsafeBuffer(new byte[0]), + 42, + 777)); + Assert.AreEqual("counter id 42 out of range: 0 - maxCounterId=1", exception.Message); + } + + [TestCase(long.MinValue)] + [TestCase(0L)] + [TestCase(54375943437284L)] + [TestCase(long.MaxValue)] + public void SetReferenceIdShouldSetSpecifiedValue(long referenceId) + { + const int counterId = 7; + + var metadataBuffer = new UnsafeBuffer(new byte[(counterId + 1) * METADATA_LENGTH]); + var valuesBuffer = new UnsafeBuffer(new byte[(counterId + 1) * COUNTER_LENGTH]); + + AeronCounters.SetReferenceId(metadataBuffer, valuesBuffer, counterId, referenceId); + + Assert.AreEqual(referenceId, valuesBuffer.GetLong(CounterOffset(counterId) + REFERENCE_ID_OFFSET)); + } + } +} diff --git a/src/Adaptive.Aeron.Tests/ChannelUriStringBuilderTest.cs b/src/Adaptive.Aeron.Tests/ChannelUriStringBuilderTest.cs new file mode 100644 index 00000000..f8db45eb --- /dev/null +++ b/src/Adaptive.Aeron.Tests/ChannelUriStringBuilderTest.cs @@ -0,0 +1,391 @@ +using System; +using NUnit.Framework; + +namespace Adaptive.Aeron.Tests +{ + public class ChannelUriStringBuilderTest + { + // Constants moved here to avoid pulling private nested constants from Aeron.Configuration. + private const string MAX_RESEND_PARAM_NAME = "max-resend"; + private const string PUBLICATION_WINDOW_LENGTH_PARAM_NAME = "pub-wnd"; + private const string STREAM_ID_PARAM_NAME = "stream-id"; + private const string UNTETHERED_LINGER_TIMEOUT_PARAM_NAME = "untethered-linger-timeout"; + private const string UNTETHERED_RESTING_TIMEOUT_PARAM_NAME = "untethered-resting-timeout"; + private const string UNTETHERED_WINDOW_LIMIT_TIMEOUT_PARAM_NAME = "untethered-window-limit-timeout"; + private const string RESPONSE_CORRELATION_ID_PARAM_NAME = "response-correlation-id"; + + [Test] + public void ShouldValidateMedia() + { + Assert.Throws(() => new ChannelUriStringBuilder().Validate()); + } + + [Test] + public void ShouldValidateEndpointOrControl() + { + Assert.Throws(() => new ChannelUriStringBuilder().Media("udp").Validate()); + } + + [Test] + public void ShouldValidateInitialPosition() + { + Assert.Throws( + () => new ChannelUriStringBuilder().Media("udp").Endpoint("address:port").TermId(999).Validate()); + } + + [Test] + public void ShouldGenerateBasicIpcChannel() + { + var builder = new ChannelUriStringBuilder().Media("ipc"); + Assert.AreEqual("aeron:ipc", builder.Build()); + } + + [Test] + public void ShouldGenerateBasicUdpChannel() + { + var builder = new ChannelUriStringBuilder() + .Media("udp") + .Endpoint("localhost:9999"); + Assert.AreEqual("aeron:udp?endpoint=localhost:9999", builder.Build()); + } + + [Test] + public void ShouldGenerateBasicUdpChannelSpy() + { + var builder = new ChannelUriStringBuilder() + .Prefix("aeron-spy") + .Media("udp") + .Endpoint("localhost:9999"); + Assert.AreEqual("aeron-spy:aeron:udp?endpoint=localhost:9999", builder.Build()); + } + + [Test] + public void ShouldGenerateComplexUdpChannel() + { + var builder = new ChannelUriStringBuilder() + .Media("udp") + .Endpoint("localhost:9999") + .Ttl(9) + .TermLength(1024 * 128); + Assert.AreEqual("aeron:udp?endpoint=localhost:9999|term-length=128k|ttl=9", builder.Build()); + } + + [Test] + public void ShouldGenerateReplayUdpChannel() + { + var builder = new ChannelUriStringBuilder() + .Media("udp") + .Endpoint("address:9999") + .TermLength(1024 * 128) + .InitialTermId(777) + .TermId(999) + .TermOffset(64); + Assert.AreEqual( + "aeron:udp?endpoint=address:9999|term-length=128k|init-term-id=777|term-id=999|term-offset=64", + builder.Build()); + } + + [Test] + public void ShouldGenerateChannelWithSocketParameters() + { + var builder = new ChannelUriStringBuilder() + .Media("udp") + .Endpoint("address:9999") + .SocketSndbufLength(8192) + .SocketRcvbufLength(4096); + Assert.AreEqual( + "aeron:udp?endpoint=address:9999|so-sndbuf=8k|so-rcvbuf=4k", + builder.Build()); + } + + [Test] + public void ShouldGenerateChannelWithReceiverWindow() + { + var builder = new ChannelUriStringBuilder() + .Media("udp") + .Endpoint("address:9999") + .ReceiverWindowLength(8192); + Assert.AreEqual( + "aeron:udp?endpoint=address:9999|rcv-wnd=8k", + builder.Build()); + } + + [Test] + public void ShouldGenerateChannelWithLingerTimeout() + { + const long lingerNs = 987654321123456789L; + var builder = new ChannelUriStringBuilder() + .Media("ipc") + .Linger(lingerNs); + + Assert.AreEqual(lingerNs, builder.Linger()); + Assert.AreEqual("aeron:ipc?linger=987654321123456789ns", builder.Build()); + } + + [Test] + public void ShouldGenerateChannelWithoutLingerTimeoutIfNullIsPassed() + { + var builder = new ChannelUriStringBuilder() + .Media("udp") + .Endpoint("address:9999") + .Linger((long?)null); + + Assert.IsNull(builder.Linger()); + Assert.AreEqual("aeron:udp?endpoint=address:9999", builder.Build()); + } + + [Test] + public void ShouldRejectNegativeLingerTimeout() + { + var exception = Assert.Throws( + () => new ChannelUriStringBuilder().Media("udp").Endpoint("address:9999").Linger(-1L)); + Assert.AreEqual("`linger` value cannot be negative: -1", exception.Message); + } + + [Test] + public void ShouldCopyLingerTimeoutFromChannelUriHumanForm() + { + var builder = new ChannelUriStringBuilder(); + builder.Linger(ChannelUri.Parse("aeron:ipc?linger=7200s")); + Assert.AreEqual(2L * 60 * 60 * 1_000_000_000, builder.Linger()); + } + + [Test] + public void ShouldCopyLingerTimeoutFromChannelUriNanoseconds() + { + var builder = new ChannelUriStringBuilder(); + builder.Linger(ChannelUri.Parse("aeron:udp?linger=19191919191919191")); + Assert.AreEqual(19191919191919191L, builder.Linger()); + } + + [Test] + public void ShouldCopyLingerTimeoutFromChannelUriNoValue() + { + var builder = new ChannelUriStringBuilder(); + builder.Linger(ChannelUri.Parse("aeron:udp?endpoint=localhost:8080")); + Assert.IsNull(builder.Linger()); + } + + [Test] + public void ShouldCopyLingerTimeoutFromChannelUriNegativeValue() + { + var channelUri = ChannelUri.Parse("aeron:udp?linger=-1000"); + Assert.Throws(() => new ChannelUriStringBuilder().Linger(channelUri)); + } + + [Test] + public void ShouldRejectInvalidOffsets() + { + Assert.Throws( + () => new ChannelUriStringBuilder().MediaReceiveTimestampOffset("breserved")); + Assert.Throws( + () => new ChannelUriStringBuilder().ChannelReceiveTimestampOffset("breserved")); + Assert.Throws( + () => new ChannelUriStringBuilder().ChannelSendTimestampOffset("breserved")); + } + + [Test] + public void ShouldRejectInvalidNakDelay() + { + // .NET throws FormatException (its parsing-error convention) where Java throws NumberFormatException. + // Test passes either, since both indicate "could not parse". + Assert.Catch(() => new ChannelUriStringBuilder().NakDelay("foo")); + } + + [Test] + public void ShouldHandleNakDelayWithUnits() + { + Assert.AreEqual(1000L, new ChannelUriStringBuilder().NakDelay("1us").NakDelay()); + Assert.AreEqual(1L, new ChannelUriStringBuilder().NakDelay("1ns").NakDelay()); + Assert.AreEqual(1000000L, new ChannelUriStringBuilder().NakDelay("1ms").NakDelay()); + } + + [Test] + public void ShouldHandleUntetheredWindowLimitTimeoutWithUnits() + { + Assert.AreEqual(1000L, new ChannelUriStringBuilder() + .UntetheredWindowLimitTimeout("1us").UntetheredWindowLimitTimeoutNs()); + Assert.AreEqual(1L, new ChannelUriStringBuilder() + .UntetheredWindowLimitTimeout("1ns").UntetheredWindowLimitTimeoutNs()); + Assert.AreEqual(1000000L, new ChannelUriStringBuilder() + .UntetheredWindowLimitTimeout("1ms").UntetheredWindowLimitTimeoutNs()); + } + + [Test] + public void ShouldHandleUntetheredRestingTimeoutWithUnits() + { + Assert.AreEqual(1000L, new ChannelUriStringBuilder() + .UntetheredRestingTimeout("1us").UntetheredRestingTimeoutNs()); + Assert.AreEqual(1L, new ChannelUriStringBuilder() + .UntetheredRestingTimeout("1ns").UntetheredRestingTimeoutNs()); + Assert.AreEqual(1000000L, new ChannelUriStringBuilder() + .UntetheredRestingTimeout("1ms").UntetheredRestingTimeoutNs()); + } + + [Test] + public void ShouldHandleMaxRetransmits() + { + Assert.AreEqual(20, new ChannelUriStringBuilder().MaxResend(20).MaxResend()); + Assert.IsTrue(new ChannelUriStringBuilder().MaxResend(20).Build() + .Contains(MAX_RESEND_PARAM_NAME + "=20")); + Assert.AreEqual(30, new ChannelUriStringBuilder() + .MaxResend(ChannelUri.Parse(new ChannelUriStringBuilder().MaxResend(30).Build())) + .MaxResend()); + } + + [Test] + public void ShouldHandleStreamId() + { + Assert.IsNull(new ChannelUriStringBuilder().StreamId()); + + const int streamId = 1234; + Assert.AreEqual(streamId, new ChannelUriStringBuilder().StreamId(streamId).StreamId()); + + string uri = new ChannelUriStringBuilder().StreamId(streamId).Build(); + Assert.AreEqual(streamId.ToString(), ChannelUri.Parse(uri).Get(STREAM_ID_PARAM_NAME)); + } + + [Test] + public void ShouldRejectInvalidStreamId() + { + var uri = ChannelUri.Parse("aeron:ipc?stream-id=abc"); + Assert.Throws(() => new ChannelUriStringBuilder().StreamId(uri)); + } + + [Test] + public void ShouldHandlePublicationWindowLength() + { + Assert.IsNull(new ChannelUriStringBuilder().PublicationWindowLength()); + + const int pubWindowLength = 7777; + Assert.AreEqual(pubWindowLength, + new ChannelUriStringBuilder().PublicationWindowLength(pubWindowLength).PublicationWindowLength()); + + string uri = new ChannelUriStringBuilder().PublicationWindowLength(pubWindowLength).Build(); + Assert.AreEqual( + pubWindowLength.ToString(), + ChannelUri.Parse(uri).Get(PUBLICATION_WINDOW_LENGTH_PARAM_NAME)); + } + + [TestCase("abc")] + [TestCase("1000000000000")] + public void ShouldRejectInvalidPublicationWindowLength(string pubWnd) + { + var uri = ChannelUri.Parse("aeron:ipc"); + uri.Put(PUBLICATION_WINDOW_LENGTH_PARAM_NAME, pubWnd); + // .NET throws FormatException for "abc", OverflowException for the too-big value. + Assert.Catch(() => new ChannelUriStringBuilder().PublicationWindowLength(uri)); + } + + [TestCase("this.will.not.work")] + [TestCase("-2")] + public void ShouldThrowAnExceptionOnInvalidResponseCorrelationId(string responseCorrelationId) + { + var channelUri = ChannelUri.Parse("aeron:udp?" + RESPONSE_CORRELATION_ID_PARAM_NAME + + "=" + responseCorrelationId); + Assert.Throws( + () => new ChannelUriStringBuilder().ResponseCorrelationId(channelUri)); + } + + [TestCase("prototype")] + [TestCase("2")] + public void ShouldNotThrowAnExceptionOnValidResponseCorrelationId(string responseCorrelationId) + { + var channelUri = ChannelUri.Parse("aeron:udp?" + RESPONSE_CORRELATION_ID_PARAM_NAME + + "=" + responseCorrelationId); + Assert.DoesNotThrow(() => new ChannelUriStringBuilder().ResponseCorrelationId(channelUri)); + } + + [Test] + public void ShouldBuildChannelBuilderUsingExistingStringWithAllTheFields() + { + const string uri = "aeron-spy:aeron:udp?endpoint=127.0.0.1:0|interface=127.0.0.1|control=127.0.0.2:0|" + + "control-mode=manual|tags=2,4|alias=foo|cc=cubic|fc=min|reliable=false|ttl=16|mtu=8992|" + + "term-length=1m|init-term-id=5|term-offset=64|term-id=4353|session-id=2314234|gtag=3|" + + "linger=100000055000001ns|sparse=true|eos=true|tether=false|group=false|ssc=true|so-sndbuf=8m|" + + "so-rcvbuf=2m|rcv-wnd=1m|media-rcv-ts-offset=reserved|channel-rcv-ts-offset=0|" + + "channel-snd-ts-offset=8|response-endpoint=127.0.0.3:0|response-correlation-id=12345|nak-delay=100us|" + + "untethered-window-limit-timeout=1us|untethered-resting-timeout=5us|stream-id=87|pub-wnd=10224"; + + var fromString = ChannelUri.Parse(uri); + var fromBuilder = ChannelUri.Parse(new ChannelUriStringBuilder(uri).Build()); + + CollectionAssert.IsEmpty(fromString.Diff(fromBuilder)); + } + + [Test] + public void ShouldBuildChannelBuilderUsingExistingStringWithTaggedSessionIdAndIpc() + { + const string uri = "aeron:ipc?session-id=tag:123456"; + + var fromString = ChannelUri.Parse(uri); + var fromBuilder = ChannelUri.Parse(new ChannelUriStringBuilder(uri).Build()); + + CollectionAssert.IsEmpty(fromString.Diff(fromBuilder)); + } + + [TestCase(1000L, 666L, 2002L)] + [TestCase(50L, 40L, 30L)] + public void ShouldHandleUntetheredParameters( + long untetheredWindowLimitTimeoutNs, + long untetheredLingerTimeoutNs, + long untetheredRestingTimeoutNs) + { + var builder = new ChannelUriStringBuilder("aeron:ipc") + .UntetheredWindowLimitTimeoutNs(untetheredWindowLimitTimeoutNs) + .UntetheredLingerTimeoutNs(untetheredLingerTimeoutNs) + .UntetheredRestingTimeoutNs(untetheredRestingTimeoutNs); + + Assert.AreEqual(untetheredWindowLimitTimeoutNs, builder.UntetheredWindowLimitTimeoutNs()); + Assert.AreEqual(untetheredLingerTimeoutNs, builder.UntetheredLingerTimeoutNs()); + Assert.AreEqual(untetheredRestingTimeoutNs, builder.UntetheredRestingTimeoutNs()); + + var uri = ChannelUri.Parse(builder.Build()); + Assert.AreEqual( + Adaptive.Agrona.SystemUtil.FormatDuration(untetheredWindowLimitTimeoutNs), + uri.Get(UNTETHERED_WINDOW_LIMIT_TIMEOUT_PARAM_NAME)); + Assert.AreEqual( + Adaptive.Agrona.SystemUtil.FormatDuration(untetheredLingerTimeoutNs), + uri.Get(UNTETHERED_LINGER_TIMEOUT_PARAM_NAME)); + Assert.AreEqual( + Adaptive.Agrona.SystemUtil.FormatDuration(untetheredRestingTimeoutNs), + uri.Get(UNTETHERED_RESTING_TIMEOUT_PARAM_NAME)); + } + + [Test] + public void ShouldFormatSizeAndDurationsWhenCreatingChannelString() + { + const long microsecond = 1000L; + const long millisecond = 1_000_000L; + const long second = 1_000_000_000L; + + string channel = new ChannelUriStringBuilder() + .Media("udp") + .Endpoint("localhost:5050") + .ReceiverWindowLength(1024) + .Mtu(8192) + .TermLength(4 * 1024 * 1024) + .SocketSndbufLength(64 * 1024) + .SocketRcvbufLength(32 * 1024) + .PublicationWindowLength(1024 * 1024) + .UntetheredWindowLimitTimeoutNs(100 * microsecond) + .UntetheredLingerTimeoutNs(3 * millisecond) + .UntetheredRestingTimeoutNs(1 * second) + .Linger(50 * millisecond) + .NakDelay(123456789L) + .MaxResend(1000) + .Tether(true) + .Rejoin(false) + .StreamId(-87) + .InitialPosition(17 * 1024 * 1024, -9, 4 * 1024 * 1024) + .Build(); + + CollectionAssert.IsEmpty( + ChannelUri.Parse(channel).Diff( + ChannelUri.Parse("aeron:udp?endpoint=localhost:5050|mtu=8k|term-length=4m|rcv-wnd=1k|so-sndbuf=64k|" + + "so-rcvbuf=32k|pub-wnd=1m|untethered-linger-timeout=3ms|untethered-window-limit-timeout=100us|" + + "untethered-resting-timeout=1s|linger=50ms|nak-delay=123456789ns|max-resend=1000|rejoin=false|" + + "tether=true|stream-id=-87|term-id=-5|init-term-id=-9|term-offset=1048576"))); + } + } +} diff --git a/src/Adaptive.Aeron.Tests/Command/CounterMessageFlyweightTest.cs b/src/Adaptive.Aeron.Tests/Command/CounterMessageFlyweightTest.cs new file mode 100644 index 00000000..41393de8 --- /dev/null +++ b/src/Adaptive.Aeron.Tests/Command/CounterMessageFlyweightTest.cs @@ -0,0 +1,50 @@ +using Adaptive.Aeron.Command; +using Adaptive.Agrona; +using Adaptive.Agrona.Concurrent; +using NUnit.Framework; + +namespace Adaptive.Aeron.Tests.Command +{ + public class CounterMessageFlyweightTest + { + private readonly UnsafeBuffer buffer = new UnsafeBuffer(new byte[128]); + private readonly CounterMessageFlyweight flyweight = new CounterMessageFlyweight(); + + [Test] + public void KeyBuffer() + { + const int offset = 24; + buffer.SetMemory(0, offset, 15); + flyweight.Wrap(buffer, offset); + + flyweight.KeyBuffer(NewBuffer(16), 4, 8); + + Assert.AreEqual(8, flyweight.KeyBufferLength()); + Assert.AreEqual(CounterMessageFlyweight.KEY_BUFFER_OFFSET, flyweight.KeyBufferOffset()); + } + + [Test] + public void LabelBuffer() + { + const int offset = 40; + buffer.SetMemory(0, offset, 0xFF); + flyweight.Wrap(buffer, offset); + flyweight.KeyBuffer(NewBuffer(16), 6, 9); + + flyweight.LabelBuffer(NewBuffer(32), 2, 21); + + Assert.AreEqual(21, flyweight.LabelBufferLength()); + Assert.AreEqual(CounterMessageFlyweight.KEY_BUFFER_OFFSET + 16, flyweight.LabelBufferOffset()); + Assert.AreEqual(CounterMessageFlyweight.KEY_BUFFER_OFFSET + 37, flyweight.Length()); + } + + private static IDirectBuffer NewBuffer(int length) + { + var bytes = new byte[length]; + for (int i = 0; i < length; i++) bytes[i] = 1; + var buffer = new UnsafeBuffer(new byte[4 + length]); + buffer.PutBytes(4, bytes); + return buffer; + } + } +} diff --git a/src/Adaptive.Aeron.Tests/Command/TerminateDriverFlyweightTest.cs b/src/Adaptive.Aeron.Tests/Command/TerminateDriverFlyweightTest.cs new file mode 100644 index 00000000..b5d81858 --- /dev/null +++ b/src/Adaptive.Aeron.Tests/Command/TerminateDriverFlyweightTest.cs @@ -0,0 +1,35 @@ +using Adaptive.Aeron.Command; +using Adaptive.Agrona; +using Adaptive.Agrona.Concurrent; +using NUnit.Framework; + +namespace Adaptive.Aeron.Tests.Command +{ + public class TerminateDriverFlyweightTest + { + [Test] + public void TokenBuffer() + { + const int offset = 24; + var buffer = new UnsafeBuffer(new byte[128]); + buffer.SetMemory(0, offset, 15); + var flyweight = new TerminateDriverFlyweight(); + flyweight.Wrap(buffer, offset); + + flyweight.TokenBuffer(NewBuffer(16), 4, 8); + + Assert.AreEqual(8, flyweight.TokenBufferLength()); + Assert.AreEqual(TerminateDriverFlyweight.TOKEN_BUFFER_OFFSET, flyweight.TokenBufferOffset()); + Assert.AreEqual(TerminateDriverFlyweight.TOKEN_BUFFER_OFFSET + 8, flyweight.Length()); + } + + private static IDirectBuffer NewBuffer(int length) + { + var bytes = new byte[length]; + for (int i = 0; i < length; i++) bytes[i] = 1; + var buffer = new UnsafeBuffer(new byte[4 + length]); + buffer.PutBytes(4, bytes); + return buffer; + } + } +} diff --git a/src/Adaptive.Aeron.Tests/Exceptions/StorageSpaceExceptionTest.cs b/src/Adaptive.Aeron.Tests/Exceptions/StorageSpaceExceptionTest.cs new file mode 100644 index 00000000..1b747fa9 --- /dev/null +++ b/src/Adaptive.Aeron.Tests/Exceptions/StorageSpaceExceptionTest.cs @@ -0,0 +1,49 @@ +using System; +using System.Collections.Generic; +using System.IO; +using Adaptive.Aeron.Exceptions; +using NUnit.Framework; + +namespace Adaptive.Aeron.Tests.Exceptions +{ + public class StorageSpaceExceptionTest + { + [Test] + public void IsStorageSpaceErrorReturnsFalseIfNull() + { + Assert.IsFalse(StorageSpaceException.IsStorageSpaceError(null)); + } + + [Test] + public void IsStorageSpaceErrorReturnsFalseIfNotIOException() + { + Assert.IsFalse(StorageSpaceException.IsStorageSpaceError( + new ArgumentException("No space left on device"))); + } + + [Test] + public void IsStorageSpaceErrorReturnsFalseIfWrongMessage() + { + Assert.IsFalse(StorageSpaceException.IsStorageSpaceError( + new ArgumentException("Es steht nicht genug Speicherplatz auf dem Datenträger zur Verfügung"))); + } + + [TestCaseSource(nameof(StorageSpaceErrors))] + public void IsStorageSpaceErrorReturnsTrueWhenIOExceptionWithAParticularMessage(Exception exception) + { + Assert.IsTrue(StorageSpaceException.IsStorageSpaceError(exception)); + } + + public static IEnumerable StorageSpaceErrors() + { + yield return new TestCaseData(new StorageSpaceException("test")); + yield return new TestCaseData(new AeronException(new StorageSpaceException("test2"))); + yield return new TestCaseData(new IOException("No space left on device")); + yield return new TestCaseData(new IOException("There is not enough space on the disk")); + yield return new TestCaseData(new IOException( + "something else", new IOException("No space left on device"))); + yield return new TestCaseData(new AeronException( + new ArgumentException("wrap", new IOException("There is not enough space on the disk")))); + } + } +} diff --git a/src/Adaptive.Aeron.Tests/LogBuffer/HeaderTest.cs b/src/Adaptive.Aeron.Tests/LogBuffer/HeaderTest.cs new file mode 100644 index 00000000..cc83059a --- /dev/null +++ b/src/Adaptive.Aeron.Tests/LogBuffer/HeaderTest.cs @@ -0,0 +1,128 @@ +using Adaptive.Aeron.LogBuffer; +using Adaptive.Aeron.Protocol; +using Adaptive.Agrona.Concurrent; +using NUnit.Framework; + +namespace Adaptive.Aeron.Tests.LogBuffer +{ + public class HeaderTest + { + [Test] + public void ConstructorInitializedData() + { + const int initialTermId = 18; + object context = "context-value-Long.MaxValue"; + const int positionBitsToShift = 2; + var header = new Header(initialTermId, positionBitsToShift, context); + + Assert.AreEqual(initialTermId, header.InitialTermId); + Assert.AreEqual(positionBitsToShift, header.PositionBitsToShift); + Assert.AreEqual(context, header.Context); + } + + [TestCase(0, 2, 100, 1024, 5, 1172L)] + [TestCase(42, 16, 13, 4096, 46, 266272L)] + [TestCase(1, 30, 1024, 1073741824, 111, 119185343488L)] + public void PositionCalculationTheEndOfTheMessageInTheLog( + int initialTermId, + int positionBitsToShift, + int frameLength, + int termOffset, + int termId, + long expectedPosition) + { + var dataHeaderFlyweight = new DataHeaderFlyweight(); + var header = new Header(initialTermId, positionBitsToShift); + header.Buffer = dataHeaderFlyweight; + header.Offset = 0; + dataHeaderFlyweight.Wrap(new byte[64], 16, 32); + dataHeaderFlyweight.FrameLength(frameLength); + dataHeaderFlyweight.TermId(termId); + dataHeaderFlyweight.TermOffset(termOffset); + + Assert.AreEqual(expectedPosition, header.Position); + Assert.AreEqual(Aeron.NULL_VALUE, header.FragmentedFrameLength); + } + + [Test] + public void OffsetIsRelativeToTheBufferStart() + { + var header = new Header(42, 3, "xyz"); + + Assert.AreEqual(0, header.Offset); + + header.Offset = 142; + + Assert.AreEqual(142, header.Offset); + } + + [TestCase(103, (byte)0x3, (byte)0x1A, (short)0x6, 2080, -46234, 333, 5, 909090909090909L)] + [TestCase(512, (byte)0x1, (byte)0xC, (short)0x1, 1073741824, 42, -876, 1543, -4632842384627834687L)] + public void ShouldReadDataFromTheBuffer( + int frameLength, + byte version, + byte flags, + short type, + int termOffset, + int sessionId, + int streamId, + int termId, + long reservedValue) + { + var array = new byte[100]; + const int offset = 16; + var dataHeaderFlyweight = new DataHeaderFlyweight(); + dataHeaderFlyweight.Wrap(array, offset, 64); + + dataHeaderFlyweight + .FrameLength(frameLength) + .Version(version) + .Flags(flags) + .HeaderType(type); + + dataHeaderFlyweight + .TermOffset(termOffset) + .SessionId(sessionId) + .StreamId(streamId) + .TermId(termId) + .ReservedValue(reservedValue); + + var header = new Header(5, 22); + header.Buffer = new UnsafeBuffer(array); + header.Offset = offset; + + Assert.AreEqual(frameLength, header.FrameLength); + Assert.AreEqual(flags, header.Flags); + Assert.AreEqual(type, header.Type); + Assert.AreEqual(termOffset, header.TermOffset); + Assert.AreEqual(sessionId, header.SessionId); + Assert.AreEqual(streamId, header.StreamId); + Assert.AreEqual(termId, header.TermId); + Assert.AreEqual(reservedValue, header.ReservedValue); + } + + [Test] + public void ShouldOverrideInitialTermId() + { + const int initialTermId = -178; + const int newInitialTermId = 871; + var header = new Header(initialTermId, 3); + Assert.AreEqual(initialTermId, header.InitialTermId); + + header.InitialTermId = newInitialTermId; + Assert.AreEqual(newInitialTermId, header.InitialTermId); + } + + [Test] + public void ShouldOverridePositionBitsToShift() + { + const int positionBitsToShift = -6; + const int newPositionBitsToShift = 20; + var header = new Header(42, positionBitsToShift); + Assert.AreEqual(positionBitsToShift, header.PositionBitsToShift); + + header.PositionBitsToShift = newPositionBitsToShift; + Assert.AreEqual(newPositionBitsToShift, header.PositionBitsToShift); + } + } +} diff --git a/src/Adaptive.Aeron.Tests/LogBuffer/HeaderWriterTest.cs b/src/Adaptive.Aeron.Tests/LogBuffer/HeaderWriterTest.cs new file mode 100644 index 00000000..42245cd8 --- /dev/null +++ b/src/Adaptive.Aeron.Tests/LogBuffer/HeaderWriterTest.cs @@ -0,0 +1,59 @@ +using Adaptive.Aeron.LogBuffer; +using Adaptive.Aeron.Protocol; +using Adaptive.Agrona.Concurrent; +using NUnit.Framework; + +namespace Adaptive.Aeron.Tests.LogBuffer +{ + public class HeaderWriterTest + { + // Java's HeaderWriter.newInstance() picks NativeBigEndianHeaderWriter on big-endian machines. + // .NET only ships the little-endian writer, since .NET runs on LE platforms (x86, x64, ARM64). + // Tests therefore exercise only the little-endian variant. + + private readonly UnsafeBuffer defaultHeaderBuffer = new UnsafeBuffer(new byte[32]); + private readonly UnsafeBuffer termBuffer = new UnsafeBuffer(new byte[1024]); + + [SetUp] + public void Before() + { + for (int i = 0; i < defaultHeaderBuffer.Capacity; i++) defaultHeaderBuffer.PutByte(i, 0xFF); + for (int i = 0; i < termBuffer.Capacity; i++) termBuffer.PutByte(i, 0xFF); + } + + // Java byte values are signed (-128..127); .NET byte is unsigned (0..255). + // Bit patterns map: Java -2 -> 0xFE -> 254; Java -128 -> 0x80 -> 128. + [TestCase(100, (byte)8, (byte)5, (short)9, 352, -777, -1000, -33)] + [TestCase(-99, (byte)254, (byte)7, (short)1, 8, 42, 3, 89)] + [TestCase(123, (byte)0, (byte)0, (short)0, 0, 0, 0, 0)] + [TestCase(32, (byte)1, (byte)128, (short)4, 96, int.MaxValue, int.MinValue, int.MinValue)] + public void ShouldEncodeHeaderUsingLittleEndianByteOrder( + int frameLength, + byte version, + byte flags, + short headerType, + int termOffset, + int sessionId, + int streamId, + int termId) + { + defaultHeaderBuffer.PutByte(HeaderFlyweight.VERSION_FIELD_OFFSET, version); + defaultHeaderBuffer.PutByte(HeaderFlyweight.FLAGS_FIELD_OFFSET, flags); + defaultHeaderBuffer.PutShort(HeaderFlyweight.TYPE_FIELD_OFFSET, headerType); + defaultHeaderBuffer.PutInt(DataHeaderFlyweight.SESSION_ID_FIELD_OFFSET, sessionId); + defaultHeaderBuffer.PutInt(DataHeaderFlyweight.STREAM_ID_FIELD_OFFSET, streamId); + + var headerWriter = new HeaderWriter(defaultHeaderBuffer); + headerWriter.Write(termBuffer, termOffset, frameLength, termId); + + Assert.AreEqual(-frameLength, termBuffer.GetInt(termOffset + HeaderFlyweight.FRAME_LENGTH_FIELD_OFFSET)); + Assert.AreEqual(version, termBuffer.GetByte(termOffset + HeaderFlyweight.VERSION_FIELD_OFFSET)); + Assert.AreEqual(flags, termBuffer.GetByte(termOffset + HeaderFlyweight.FLAGS_FIELD_OFFSET)); + Assert.AreEqual(headerType, termBuffer.GetShort(termOffset + HeaderFlyweight.TYPE_FIELD_OFFSET)); + Assert.AreEqual(termOffset, termBuffer.GetInt(termOffset + DataHeaderFlyweight.TERM_OFFSET_FIELD_OFFSET)); + Assert.AreEqual(sessionId, termBuffer.GetInt(termOffset + DataHeaderFlyweight.SESSION_ID_FIELD_OFFSET)); + Assert.AreEqual(streamId, termBuffer.GetInt(termOffset + DataHeaderFlyweight.STREAM_ID_FIELD_OFFSET)); + Assert.AreEqual(termId, termBuffer.GetInt(termOffset + DataHeaderFlyweight.TERM_ID_FIELD_OFFSET)); + } + } +} diff --git a/src/Adaptive.Aeron.Tests/LogBuffer/LogBufferDescriptorTest.cs b/src/Adaptive.Aeron.Tests/LogBuffer/LogBufferDescriptorTest.cs new file mode 100644 index 00000000..1c95a80f --- /dev/null +++ b/src/Adaptive.Aeron.Tests/LogBuffer/LogBufferDescriptorTest.cs @@ -0,0 +1,76 @@ +using Adaptive.Aeron.LogBuffer; +using Adaptive.Agrona.Concurrent; +using NUnit.Framework; +using static Adaptive.Aeron.LogBuffer.LogBufferDescriptor; + +namespace Adaptive.Aeron.Tests.LogBuffer +{ + public class LogBufferDescriptorTest + { + private readonly UnsafeBuffer metadataBuffer = new UnsafeBuffer(new byte[LOG_META_DATA_LENGTH]); + + [Test] + public void RotateLogShouldCasActiveTermCountEvenWhenTermIdDoesNotMatch() + { + const int termId = 5; + const int termCount = 1; + RawTail(metadataBuffer, 2, PackTail(termId, 1024)); + RawTail(metadataBuffer, 0, PackTail(termId + 1, 2048)); + RawTail(metadataBuffer, 1, PackTail(termId + 2, 4096)); + ActiveTermCount(metadataBuffer, termCount); + + Assert.IsTrue(RotateLog(metadataBuffer, termCount, termId)); + + Assert.AreEqual(PackTail(termId, 1024), RawTail(metadataBuffer, 2)); + Assert.AreEqual(PackTail(termId + 1, 2048), RawTail(metadataBuffer, 0)); + Assert.AreEqual(PackTail(termId + 2, 4096), RawTail(metadataBuffer, 1)); + Assert.AreEqual(termCount + 1, ActiveTermCount(metadataBuffer)); + } + + [Test] + public void RotateLogShouldCasActiveTermCountAfterSettingTailForTheNextTerm() + { + const int termId = 51; + const int termCount = 19; + RawTail(metadataBuffer, 1, PackTail(termId, 1024)); + RawTail(metadataBuffer, 2, PackTail(termId + 1 - PARTITION_COUNT, 2048)); + RawTail(metadataBuffer, 0, PackTail(termId + 2 - PARTITION_COUNT, 4096)); + ActiveTermCount(metadataBuffer, termCount); + + Assert.IsTrue(RotateLog(metadataBuffer, termCount, termId)); + + Assert.AreEqual(PackTail(termId, 1024), RawTail(metadataBuffer, 1)); + Assert.AreEqual(PackTail(termId + 1, 0), RawTail(metadataBuffer, 2)); + Assert.AreEqual(PackTail(termId + 2 - PARTITION_COUNT, 4096), RawTail(metadataBuffer, 0)); + Assert.AreEqual(termCount + 1, ActiveTermCount(metadataBuffer)); + } + + [Test] + public void RotateLogIsANoOpIfNeitherTailNorActiveTermCountCanBeChanged() + { + const int termId = 23; + const int termCount = 42; + RawTail(metadataBuffer, 0, PackTail(termId, 1024)); + RawTail(metadataBuffer, 1, PackTail(termId + 18, 2048)); + RawTail(metadataBuffer, 2, PackTail(termId - 19, 4096)); + ActiveTermCount(metadataBuffer, termCount); + + Assert.IsFalse(RotateLog(metadataBuffer, 3, termId)); + + Assert.AreEqual(PackTail(termId, 1024), RawTail(metadataBuffer, 0)); + Assert.AreEqual(PackTail(termId + 18, 2048), RawTail(metadataBuffer, 1)); + Assert.AreEqual(PackTail(termId - 19, 4096), RawTail(metadataBuffer, 2)); + Assert.AreEqual(termCount, ActiveTermCount(metadataBuffer)); + } + + [TestCase(0, 1376, 0)] + [TestCase(10, 1024, 64)] + [TestCase(2048, 2048, 2080)] + [TestCase(4096, 1024, 4224)] + [TestCase(7997, 992, 8288)] + public void ShouldComputeFragmentedFrameLength(int length, int maxPayloadLength, int frameLength) + { + Assert.AreEqual(LogBufferDescriptor.ComputeFragmentedFrameLength(length, maxPayloadLength), frameLength); + } + } +} diff --git a/src/Adaptive.Aeron.Tests/LogBuffersTest.cs b/src/Adaptive.Aeron.Tests/LogBuffersTest.cs new file mode 100644 index 00000000..aaa2ac58 --- /dev/null +++ b/src/Adaptive.Aeron.Tests/LogBuffersTest.cs @@ -0,0 +1,87 @@ +using System; +using System.IO; +using Adaptive.Aeron.LogBuffer; +using Adaptive.Agrona.Concurrent; +using NUnit.Framework; +using static Adaptive.Aeron.LogBuffer.LogBufferDescriptor; + +namespace Adaptive.Aeron.Tests +{ + public class LogBuffersTest + { + private string tempDir; + + [SetUp] + public void SetUp() + { + tempDir = Path.Combine(Path.GetTempPath(), "Aeron.NET-LogBuffersTest-" + Guid.NewGuid()); + Directory.CreateDirectory(tempDir); + } + + [TearDown] + public void TearDown() + { + try { Directory.Delete(tempDir, true); } catch { /* ignore */ } + } + + [TestCase(-100)] + [TestCase(0)] + [TestCase(TERM_MIN_LENGTH >> 1)] + [TestCase(TERM_MAX_LENGTH + 1)] + [TestCase(TERM_MAX_LENGTH - 1)] + public void ThrowsIllegalStateExceptionIfTermLengthIsInvalid(int termLength) + { + var logFile = Path.Combine(tempDir, "test.log"); + var contents = new byte[LOG_META_DATA_LENGTH]; + var buffer = new UnsafeBuffer(contents); + TermLength(buffer, termLength); + File.WriteAllBytes(logFile, contents); + Assert.AreEqual(contents.Length, new FileInfo(logFile).Length); + + var exception = Assert.Throws(() => new LogBuffers(logFile)); + Assert.IsTrue( + exception.Message.StartsWith("Term length") && exception.Message.EndsWith("length=" + termLength), + "Unexpected message: " + exception.Message); + } + + [TestCase(-100)] + [TestCase(0)] + [TestCase(PAGE_MIN_SIZE >> 1)] + [TestCase(PAGE_MAX_SIZE + 1)] + [TestCase(PAGE_MAX_SIZE - 1)] + public void ThrowsIllegalStateExceptionIfPageSizeIsInvalid(int pageSize) + { + var logFile = Path.Combine(tempDir, "test.log"); + var contents = new byte[LOG_META_DATA_LENGTH]; + var buffer = new UnsafeBuffer(contents); + TermLength(buffer, TERM_MIN_LENGTH); + PageSize(buffer, pageSize); + File.WriteAllBytes(logFile, contents); + Assert.AreEqual(contents.Length, new FileInfo(logFile).Length); + + var exception = Assert.Throws(() => new LogBuffers(logFile)); + Assert.IsTrue( + exception.Message.StartsWith("Page size") && exception.Message.EndsWith("page size=" + pageSize), + "Unexpected message: " + exception.Message); + } + + [Test] + public void ThrowsIllegalStateExceptionIfLogFileSizeIsLessThanLogMetaDataLength() + { + var logFile = Path.Combine(tempDir, "test.log"); + const int extraShort = 5; + int fileLength = LOG_META_DATA_LENGTH - extraShort; + var contents = new byte[fileLength]; + var buffer = new UnsafeBuffer(contents); + TermLength(buffer, TERM_MIN_LENGTH); + PageSize(buffer, PAGE_MIN_SIZE); + File.WriteAllBytes(logFile, contents); + Assert.AreEqual(contents.Length, new FileInfo(logFile).Length); + + var exception = Assert.Throws(() => new LogBuffers(logFile)); + Assert.AreEqual( + "Log file length less than min length of " + LOG_META_DATA_LENGTH + ": length=" + fileLength, + exception.Message); + } + } +} diff --git a/src/Adaptive.Aeron/Adaptive.Aeron.csproj b/src/Adaptive.Aeron/Adaptive.Aeron.csproj index db0b5f68..de10659d 100644 --- a/src/Adaptive.Aeron/Adaptive.Aeron.csproj +++ b/src/Adaptive.Aeron/Adaptive.Aeron.csproj @@ -31,4 +31,7 @@ + + + \ No newline at end of file diff --git a/src/Adaptive.Aeron/AeronCounters.cs b/src/Adaptive.Aeron/AeronCounters.cs index 69f6a0eb..41ee7f97 100644 --- a/src/Adaptive.Aeron/AeronCounters.cs +++ b/src/Adaptive.Aeron/AeronCounters.cs @@ -836,6 +836,23 @@ public static int AppendVersionInfo(IMutableDirectBuffer tempBuffer, int offset, return length; } + /// + /// Append version information at the end of the counter's label, including a commit hash. + /// + /// to append label to. + /// at which current label data ends. + /// of the component. + /// identifying the commit. + /// length of the suffix appended. + public static int AppendVersionInfo( + IMutableDirectBuffer tempBuffer, int offset, string fullVersion, string commitHashCode) + { + int length = tempBuffer.PutStringWithoutLengthAscii(offset, " "); + length += tempBuffer.PutStringWithoutLengthAscii( + offset + length, FormatVersionInfo(fullVersion, commitHashCode)); + return length; + } + /// /// Append specified {@code value} at the end of the counter's label as ASCII encoded value up to the /// . @@ -884,6 +901,34 @@ public static string FormatVersionInfo(string fullVersion) return "version=" + fullVersion; } + /// + /// Format version information together with a commit hash for display purposes. + /// + /// of the component. + /// identifying the commit. + /// formatted String. + public static string FormatVersionInfo(string fullVersion, string commitHash) + { + return "version=" + fullVersion + " commit=" + commitHash; + } + + /// + /// Set a reference id for a given counter id. + /// + /// containing the counter metadata. + /// containing the counter values. + /// to be set. + /// to set for the counter. + public static void SetReferenceId( + IAtomicBuffer metaDataBuffer, IAtomicBuffer valuesBuffer, int counterId, long referenceId) + { + if (null == metaDataBuffer) throw new ArgumentNullException(nameof(metaDataBuffer)); + if (null == valuesBuffer) throw new ArgumentNullException(nameof(valuesBuffer)); + ValidateCounterId(metaDataBuffer, counterId); + + valuesBuffer.PutLongRelease(CounterOffset(counterId) + REFERENCE_ID_OFFSET, referenceId); + } + private static void ValidateCounterId(IAtomicBuffer metaDataBuffer, int counterId) { if (counterId < 0) diff --git a/src/Adaptive.Aeron/ChannelUri.cs b/src/Adaptive.Aeron/ChannelUri.cs index 667742d9..8d93139d 100644 --- a/src/Adaptive.Aeron/ChannelUri.cs +++ b/src/Adaptive.Aeron/ChannelUri.cs @@ -575,6 +575,61 @@ public void ForEachParameter(Action consumer) _params.ForEach(consumer); } + /// + /// Compute a map of differences between this and another. Each entry's value + /// is a "this != that" string describing the divergence. + /// + /// to compare against. + /// a map of differences, empty if the two URIs are equivalent. + public IDictionary Diff(ChannelUri that) + { + var differingValues = new Dictionary(); + + if (!string.Equals(_prefix, that._prefix)) + { + differingValues["prefix"] = _prefix + " != " + that._prefix; + } + + if (!string.Equals(_media, that._media)) + { + differingValues["media"] = _media + " != " + that._media; + } + + _params.ForEach((key, value) => + { + string thatValue = that._params.Get(key); + if (!string.Equals(value, thatValue)) + { + differingValues[key] = value + " != " + thatValue; + } + }); + + if (!TagsEqual(_tags, that._tags)) + { + differingValues[TAGS_PARAM_NAME] = TagsToString(_tags) + " != " + TagsToString(that._tags); + } + + return differingValues; + } + + private static bool TagsEqual(string[] a, string[] b) + { + if (ReferenceEquals(a, b)) return true; + if (a == null || b == null) return false; + if (a.Length != b.Length) return false; + for (int i = 0; i < a.Length; i++) + { + if (!string.Equals(a[i], b[i])) return false; + } + return true; + } + + private static string TagsToString(string[] tags) + { + if (tags == null) return "null"; + return "[" + string.Join(", ", tags) + "]"; + } + /// /// Determines if this channel has specified control-mode=response. /// diff --git a/src/Adaptive.Aeron/ChannelUriStringBuilder.cs b/src/Adaptive.Aeron/ChannelUriStringBuilder.cs index a29a1e3b..f4c3a3a3 100644 --- a/src/Adaptive.Aeron/ChannelUriStringBuilder.cs +++ b/src/Adaptive.Aeron/ChannelUriStringBuilder.cs @@ -927,7 +927,7 @@ public ChannelUriStringBuilder Linger(long? lingerNs) { if (null != lingerNs && lingerNs < 0) { - throw new ArgumentException("linger value cannot be negative: " + lingerNs); + throw new ArgumentException("`linger` value cannot be negative: " + lingerNs); } _linger = lingerNs; @@ -1962,7 +1962,7 @@ public ChannelUriStringBuilder ResponseCorrelationId(ChannelUri channelUri) /// public ChannelUriStringBuilder NakDelay(string nakDelay) { - if (null != this.nakDelay) + if (null != nakDelay) { this.nakDelay = SystemUtil.ParseDuration(NAK_DELAY_PARAM_NAME, nakDelay); } @@ -1974,6 +1974,22 @@ public ChannelUriStringBuilder NakDelay(string nakDelay) return this; } + /// + /// The delay in nanoseconds to apply before sending a NAK in response to a gap being detected by the receiver. + /// + /// in nanoseconds; must be non-negative; null clears. + /// this for a fluent API. + /// + public ChannelUriStringBuilder NakDelay(long? nakDelayNs) + { + if (null != nakDelayNs && nakDelayNs.Value < 0) + { + throw new ArgumentException("`" + NAK_DELAY_PARAM_NAME + "` value cannot be negative: " + nakDelayNs); + } + this.nakDelay = nakDelayNs; + return this; + } + /// /// The delay to apply before sending a NAK in response to a gap being detected by the receiver. /// @@ -2331,8 +2347,8 @@ public string Build() AppendParameter(_sb, INTERFACE_PARAM_NAME, _networkInterface); AppendParameter(_sb, MDC_CONTROL_PARAM_NAME, _controlEndpoint); AppendParameter(_sb, MDC_CONTROL_MODE_PARAM_NAME, _controlMode); - AppendParameter(_sb, MTU_LENGTH_PARAM_NAME, _mtu); - AppendParameter(_sb, TERM_LENGTH_PARAM_NAME, _termLength); + AppendSize(_sb, MTU_LENGTH_PARAM_NAME, _mtu); + AppendSize(_sb, TERM_LENGTH_PARAM_NAME, _termLength); AppendParameter(_sb, INITIAL_TERM_ID_PARAM_NAME, _initialTermId); AppendParameter(_sb, TERM_ID_PARAM_NAME, _termId); AppendParameter(_sb, TERM_OFFSET_PARAM_NAME, _termOffset); @@ -2344,7 +2360,7 @@ public string Build() AppendParameter(_sb, TTL_PARAM_NAME, _ttl); AppendParameter(_sb, RELIABLE_STREAM_PARAM_NAME, _reliable); - AppendParameter(_sb, LINGER_PARAM_NAME, _linger); + AppendDuration(_sb, LINGER_PARAM_NAME, _linger); AppendParameter(_sb, ALIAS_PARAM_NAME, _alias); AppendParameter(_sb, CONGESTION_CONTROL_PARAM_NAME, _cc); AppendParameter(_sb, FLOW_CONTROL_PARAM_NAME, _fc); @@ -2355,22 +2371,22 @@ public string Build() AppendParameter(_sb, GROUP_PARAM_NAME, _group); AppendParameter(_sb, REJOIN_PARAM_NAME, _rejoin); AppendParameter(_sb, SPIES_SIMULATE_CONNECTION_PARAM_NAME, _ssc); - AppendParameter(_sb, SOCKET_SNDBUF_PARAM_NAME, _socketSndbufLength); - AppendParameter(_sb, SOCKET_RCVBUF_PARAM_NAME, _socketRcvbufLength); - AppendParameter(_sb, RECEIVER_WINDOW_LENGTH_PARAM_NAME, _receiverWindowLength); + AppendSize(_sb, SOCKET_SNDBUF_PARAM_NAME, _socketSndbufLength); + AppendSize(_sb, SOCKET_RCVBUF_PARAM_NAME, _socketRcvbufLength); + AppendSize(_sb, RECEIVER_WINDOW_LENGTH_PARAM_NAME, _receiverWindowLength); AppendParameter(_sb, MEDIA_RCV_TIMESTAMP_OFFSET_PARAM_NAME, _mediaReceiveTimestampOffset); AppendParameter(_sb, CHANNEL_RECEIVE_TIMESTAMP_OFFSET_PARAM_NAME, _channelReceiveTimestampOffset); AppendParameter(_sb, CHANNEL_SEND_TIMESTAMP_OFFSET_PARAM_NAME, _channelSendTimestampOffset); AppendParameter(_sb, RESPONSE_ENDPOINT_PARAM_NAME, _responseEndpoint); AppendParameter(_sb, RESPONSE_CORRELATION_ID_PARAM_NAME, _responseCorrelationId); - AppendParameter(_sb, NAK_DELAY_PARAM_NAME, nakDelay); - AppendParameter(_sb, UNTETHERED_WINDOW_LIMIT_TIMEOUT_PARAM_NAME, _untetheredWindowLimitTimeoutNs); - AppendParameter(_sb, UNTETHERED_LINGER_TIMEOUT_PARAM_NAME, _untetheredLingerTimeoutNs); - AppendParameter(_sb, UNTETHERED_RESTING_TIMEOUT_PARAM_NAME, untetheredRestingTimeoutNs); + AppendDuration(_sb, NAK_DELAY_PARAM_NAME, nakDelay); + AppendDuration(_sb, UNTETHERED_WINDOW_LIMIT_TIMEOUT_PARAM_NAME, _untetheredWindowLimitTimeoutNs); + AppendDuration(_sb, UNTETHERED_LINGER_TIMEOUT_PARAM_NAME, _untetheredLingerTimeoutNs); + AppendDuration(_sb, UNTETHERED_RESTING_TIMEOUT_PARAM_NAME, untetheredRestingTimeoutNs); AppendParameter(_sb, MAX_RESEND_PARAM_NAME, _maxResend); AppendParameter(_sb, STREAM_ID_PARAM_NAME, _streamId); - AppendParameter(_sb, PUBLICATION_WINDOW_LENGTH_PARAM_NAME, _publicationWindowLength); + AppendSize(_sb, PUBLICATION_WINDOW_LENGTH_PARAM_NAME, _publicationWindowLength); char lastChar = _sb[_sb.Length - 1]; @@ -2386,7 +2402,29 @@ private static void AppendParameter(StringBuilder sb, String paramName, object p { if (null != paramValue) { - sb.Append(paramName).Append('=').Append(paramValue).Append('|'); + sb.Append(paramName).Append('=').Append(FormatValue(paramValue)).Append('|'); + } + } + + private static string FormatValue(object value) + { + if (value is bool b) return b ? "true" : "false"; + return value.ToString(); + } + + private static void AppendSize(StringBuilder sb, string paramName, int? value) + { + if (null != value) + { + sb.Append(paramName).Append('=').Append(SystemUtil.FormatSize(value.Value)).Append('|'); + } + } + + private static void AppendDuration(StringBuilder sb, string paramName, long? valueNs) + { + if (null != valueNs) + { + sb.Append(paramName).Append('=').Append(SystemUtil.FormatDuration(valueNs.Value)).Append('|'); } } diff --git a/src/Adaptive.Aeron/Command/CounterMessageFlyweight.cs b/src/Adaptive.Aeron/Command/CounterMessageFlyweight.cs index 0acc6267..684308ca 100644 --- a/src/Adaptive.Aeron/Command/CounterMessageFlyweight.cs +++ b/src/Adaptive.Aeron/Command/CounterMessageFlyweight.cs @@ -54,7 +54,7 @@ public class CounterMessageFlyweight : CorrelatedMessageFlyweight { private static readonly int COUNTER_TYPE_ID_FIELD_OFFSET = CORRELATION_ID_FIELD_OFFSET + BitUtil.SIZE_OF_LONG; private static readonly int KEY_LENGTH_OFFSET = COUNTER_TYPE_ID_FIELD_OFFSET + BitUtil.SIZE_OF_INT; - static readonly int KEY_BUFFER_OFFSET = KEY_LENGTH_OFFSET + BitUtil.SIZE_OF_INT; + internal static readonly int KEY_BUFFER_OFFSET = KEY_LENGTH_OFFSET + BitUtil.SIZE_OF_INT; private static readonly int MINIMUM_LENGTH = KEY_BUFFER_OFFSET + BitUtil.SIZE_OF_INT; /// diff --git a/src/Adaptive.Aeron/Command/TerminateDriverFlyweight.cs b/src/Adaptive.Aeron/Command/TerminateDriverFlyweight.cs index 3f25a3f1..021704d7 100644 --- a/src/Adaptive.Aeron/Command/TerminateDriverFlyweight.cs +++ b/src/Adaptive.Aeron/Command/TerminateDriverFlyweight.cs @@ -27,7 +27,7 @@ namespace Adaptive.Aeron.Command public class TerminateDriverFlyweight : CorrelatedMessageFlyweight { private static readonly int TOKEN_LENGTH_OFFSET = CORRELATION_ID_FIELD_OFFSET + BitUtil.SIZE_OF_LONG; - static readonly int TOKEN_BUFFER_OFFSET = TOKEN_LENGTH_OFFSET + BitUtil.SIZE_OF_INT; + internal static readonly int TOKEN_BUFFER_OFFSET = TOKEN_LENGTH_OFFSET + BitUtil.SIZE_OF_INT; private static readonly int MINIMUM_LENGTH = TOKEN_LENGTH_OFFSET + BitUtil.SIZE_OF_INT; /// diff --git a/src/Adaptive.Aeron/Exceptions/StorageSpaceException.cs b/src/Adaptive.Aeron/Exceptions/StorageSpaceException.cs index a70592d3..ef8e2ded 100644 --- a/src/Adaptive.Aeron/Exceptions/StorageSpaceException.cs +++ b/src/Adaptive.Aeron/Exceptions/StorageSpaceException.cs @@ -28,9 +28,22 @@ public static bool IsStorageSpaceError(Exception error) Exception cause = error; while (null != cause) { + if (cause is StorageSpaceException) + { + return true; + } + if (cause is IOException) { - if ((uint)error.HResult == 0x80070070) + string message = cause.Message; + if (null != message && + (message.Contains("No space left on device") || + message.Contains("There is not enough space on the disk"))) + { + return true; + } + + if ((uint)cause.HResult == 0x80070070) { return true; } diff --git a/src/Adaptive.Aeron/LogBuffer/LogBufferDescriptor.cs b/src/Adaptive.Aeron/LogBuffer/LogBufferDescriptor.cs index 1e27596c..924c43b7 100644 --- a/src/Adaptive.Aeron/LogBuffer/LogBufferDescriptor.cs +++ b/src/Adaptive.Aeron/LogBuffer/LogBufferDescriptor.cs @@ -431,7 +431,7 @@ public static void CheckTermLength(int termLength) if (termLength > TERM_MAX_LENGTH) ThrowHelper.ThrowInvalidOperationException( - $"Term length more than max length of {TERM_MAX_LENGTH:D}: length = {termLength:D}"); + $"Term length more than max length of {TERM_MAX_LENGTH:D}: length={termLength:D}"); if (!IsPowerOfTwo(termLength)) ThrowHelper.ThrowInvalidOperationException("Term length not a power of 2: length=" + termLength); diff --git a/src/Adaptive.Agrona/Concurrent/Status/CountersReader.cs b/src/Adaptive.Agrona/Concurrent/Status/CountersReader.cs index 97c0a944..e05b9f8f 100644 --- a/src/Adaptive.Agrona/Concurrent/Status/CountersReader.cs +++ b/src/Adaptive.Agrona/Concurrent/Status/CountersReader.cs @@ -139,6 +139,12 @@ public class CountersReader /// public static readonly int OWNER_ID_OFFSET = REGISTRATION_ID_OFFSET + BitUtil.SIZE_OF_LONG; + /// + /// Offset in the record at which the reference id field is stored. The reference id is an abstract concept + /// which can be used to associate counters with another resource lifecycle for reference accounting. + /// + public static readonly int REFERENCE_ID_OFFSET = OWNER_ID_OFFSET + BitUtil.SIZE_OF_LONG; + /// /// Offset in the record at which the type id field is stored. /// @@ -445,7 +451,7 @@ public string GetCounterLabel(int counterId) return LabelValue(MetaDataOffset(counterId)); } - private void ValidateCounterId(int counterId) + protected void ValidateCounterId(int counterId) { if (counterId < 0 || counterId > MaxCounterId) { diff --git a/src/Adaptive.Agrona/SystemUtil.cs b/src/Adaptive.Agrona/SystemUtil.cs index bbd8c98e..2c9b5a36 100644 --- a/src/Adaptive.Agrona/SystemUtil.cs +++ b/src/Adaptive.Agrona/SystemUtil.cs @@ -120,5 +120,97 @@ public static long ParseDuration(string propertyName, string propertyValue) } } } + + private const long ONE_KILOBYTE = 1024; + private const long ONE_MEGABYTE = 1024 * 1024; + private const long ONE_GIGABYTE = 1024 * 1024 * 1024; + + /// + /// Format size value as the shortest possible string with a 'k', 'm', or 'g' suffix when the value is + /// an exact multiple of the corresponding power-of-two. Returns the bare integer otherwise. + /// + /// to format. Must be non-negative. + /// formatted value. + /// if is negative. + public static string FormatSize(long size) + { + if (size < 0) + { + throw new ArgumentException("size must be positive: " + size); + } + + if (size >= ONE_GIGABYTE) + { + long value = size / ONE_GIGABYTE; + if (size == value * ONE_GIGABYTE) + { + return value + "g"; + } + } + + if (size >= ONE_MEGABYTE) + { + long value = size / ONE_MEGABYTE; + if (size == value * ONE_MEGABYTE) + { + return value + "m"; + } + } + + if (size >= ONE_KILOBYTE) + { + long value = size / ONE_KILOBYTE; + if (size == value * ONE_KILOBYTE) + { + return value + "k"; + } + } + + return size.ToString(); + } + + /// + /// Format duration value as the shortest possible string with a 'ns', 'us', 'ms', or 's' suffix. + /// Returns the bare integer with 'ns' suffix otherwise. + /// + /// value in nanoseconds. Must be non-negative. + /// formatted value. + /// if is negative. + public static string FormatDuration(long durationNs) + { + if (durationNs < 0) + { + throw new ArgumentException("duration must be positive: " + durationNs); + } + + if (durationNs >= SECONDS_TO_NANOS) + { + long value = durationNs / SECONDS_TO_NANOS; + if (durationNs == value * SECONDS_TO_NANOS) + { + return value + "s"; + } + } + + if (durationNs >= MILLS_TO_NANOS) + { + long value = durationNs / MILLS_TO_NANOS; + if (durationNs == value * MILLS_TO_NANOS) + { + return value + "ms"; + } + } + + if (durationNs >= MICROS_TO_NANOS) + { + long value = durationNs / MICROS_TO_NANOS; + if (durationNs == value * MICROS_TO_NANOS) + { + return value + "us"; + } + } + + return durationNs + "ns"; + } } } \ No newline at end of file