From 7358ef3cba1122933a47f39fbc1ca265e3b1b82c Mon Sep 17 00:00:00 2001 From: Eric Bowden Date: Mon, 4 May 2026 15:30:13 -0500 Subject: [PATCH] Ported Cluster and Archive client unit tests from Java Added a new "Unsealer.Fody" Fody addin to allow tests to subclass sealed classes (Debug builds only). Changed EgressPoller.OnFragment to skip over unknown schemaIds and continue to match Java behavior Added ArchiveException.ErrorCodeAsString, made AeronArchive.QuietClose public, and added MessageRetryAttempts config option for parity with Java Changed serialization of booleans in URIs to be lowercase "true/false" rather than .NET's default "True/False" Changed AeronArchive.AsyncConnect to match Java's version and state machine Fixed wrong decoder used in EgressAdapter.OnFragment's new leader event case --- src/Adaptive.Aeron.sln | 209 +++++++++ src/Adaptive.Aeron/Adaptive.Aeron.csproj | 5 + src/Adaptive.Aeron/FodyWeavers.xml | 1 + src/Adaptive.Aeron/FodyWeavers.xsd | 1 + src/Adaptive.Agrona/SuppressedExceptions.cs | 40 ++ .../Adaptive.Archiver.Tests.csproj | 20 + .../AeronArchiveTest.cs | 433 ++++++++++++++++++ .../ArchiveExceptionTest.cs | 35 ++ .../Adaptive.Archiver.csproj | 20 + src/Adaptive.Archiver/AeronArchive.cs | 192 +++++--- src/Adaptive.Archiver/ArchiveException.cs | 28 ++ src/Adaptive.Archiver/FodyWeavers.xml | 5 + src/Adaptive.Archiver/FodyWeavers.xsd | 27 ++ .../Adaptive.Cluster.Tests.csproj | 21 + .../Client/AeronClusterAsyncConnectTest.cs | 361 +++++++++++++++ .../Client/AeronClusterContextTest.cs | 101 ++++ .../Client/AeronClusterTest.cs | 293 ++++++++++++ .../Client/EgressAdapterTest.cs | 303 ++++++++++++ .../Client/EgressPollerTest.cs | 82 ++++ src/Adaptive.Cluster/Adaptive.Cluster.csproj | 20 + src/Adaptive.Cluster/Client/AeronCluster.cs | 69 ++- src/Adaptive.Cluster/Client/EgressAdapter.cs | 2 +- src/Adaptive.Cluster/Client/EgressPoller.cs | 3 +- src/Adaptive.Cluster/FodyWeavers.xml | 5 + src/Adaptive.Cluster/FodyWeavers.xsd | 27 ++ src/Weavers/Unsealer.Fody/ModuleWeaver.cs | 80 ++++ .../Unsealer.Fody/Unsealer.Fody.csproj | 16 + 27 files changed, 2330 insertions(+), 69 deletions(-) create mode 100644 src/Adaptive.Agrona/SuppressedExceptions.cs create mode 100644 src/Adaptive.Archiver.Tests/Adaptive.Archiver.Tests.csproj create mode 100644 src/Adaptive.Archiver.Tests/AeronArchiveTest.cs create mode 100644 src/Adaptive.Archiver.Tests/ArchiveExceptionTest.cs create mode 100644 src/Adaptive.Archiver/FodyWeavers.xml create mode 100644 src/Adaptive.Archiver/FodyWeavers.xsd create mode 100644 src/Adaptive.Cluster.Tests/Adaptive.Cluster.Tests.csproj create mode 100644 src/Adaptive.Cluster.Tests/Client/AeronClusterAsyncConnectTest.cs create mode 100644 src/Adaptive.Cluster.Tests/Client/AeronClusterContextTest.cs create mode 100644 src/Adaptive.Cluster.Tests/Client/AeronClusterTest.cs create mode 100644 src/Adaptive.Cluster.Tests/Client/EgressAdapterTest.cs create mode 100644 src/Adaptive.Cluster.Tests/Client/EgressPollerTest.cs create mode 100644 src/Adaptive.Cluster/FodyWeavers.xml create mode 100644 src/Adaptive.Cluster/FodyWeavers.xsd create mode 100644 src/Weavers/Unsealer.Fody/ModuleWeaver.cs create mode 100644 src/Weavers/Unsealer.Fody/Unsealer.Fody.csproj diff --git a/src/Adaptive.Aeron.sln b/src/Adaptive.Aeron.sln index 85e19e37..0dd11214 100644 --- a/src/Adaptive.Aeron.sln +++ b/src/Adaptive.Aeron.sln @@ -51,92 +51,300 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Adaptive.Aeron.Samples.Clus EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Adaptive.Aeron.Samples.ClusterClient", "Samples\Adaptive.Aeron.Samples.ClusterClient\Adaptive.Aeron.Samples.ClusterClient.csproj", "{DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Adaptive.Archiver.Tests", "Adaptive.Archiver.Tests\Adaptive.Archiver.Tests.csproj", "{F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Adaptive.Cluster.Tests", "Adaptive.Cluster.Tests\Adaptive.Cluster.Tests.csproj", "{572CA144-CECC-43AC-AF10-5BEA6007C2DE}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Weavers", "Weavers", "{E2872BA3-3393-4325-66A6-EE7620D75132}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Unsealer.Fody", "Weavers\Unsealer.Fody\Unsealer.Fody.csproj", "{E28646AF-556E-423E-B54F-ADFBD75F2611}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU + Debug|x64 = Debug|x64 + Debug|x86 = Debug|x86 Release|Any CPU = Release|Any CPU + Release|x64 = Release|x64 + Release|x86 = Release|x86 EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Debug|x64.ActiveCfg = Debug|Any CPU + {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Debug|x64.Build.0 = Debug|Any CPU + {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Debug|x86.ActiveCfg = Debug|Any CPU + {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Debug|x86.Build.0 = Debug|Any CPU {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Release|Any CPU.ActiveCfg = Release|Any CPU {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Release|Any CPU.Build.0 = Release|Any CPU + {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Release|x64.ActiveCfg = Release|Any CPU + {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Release|x64.Build.0 = Release|Any CPU + {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Release|x86.ActiveCfg = Release|Any CPU + {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Release|x86.Build.0 = Release|Any CPU {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Debug|x64.ActiveCfg = Debug|Any CPU + {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Debug|x64.Build.0 = Debug|Any CPU + {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Debug|x86.ActiveCfg = Debug|Any CPU + {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Debug|x86.Build.0 = Debug|Any CPU {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Release|Any CPU.ActiveCfg = Release|Any CPU {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Release|Any CPU.Build.0 = Release|Any CPU + {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Release|x64.ActiveCfg = Release|Any CPU + {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Release|x64.Build.0 = Release|Any CPU + {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Release|x86.ActiveCfg = Release|Any CPU + {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Release|x86.Build.0 = Release|Any CPU {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Debug|Any CPU.Build.0 = Debug|Any CPU + {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Debug|x64.ActiveCfg = Debug|Any CPU + {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Debug|x64.Build.0 = Debug|Any CPU + {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Debug|x86.ActiveCfg = Debug|Any CPU + {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Debug|x86.Build.0 = Debug|Any CPU {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Release|Any CPU.ActiveCfg = Release|Any CPU {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Release|Any CPU.Build.0 = Release|Any CPU + {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Release|x64.ActiveCfg = Release|Any CPU + {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Release|x64.Build.0 = Release|Any CPU + {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Release|x86.ActiveCfg = Release|Any CPU + {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Release|x86.Build.0 = Release|Any CPU {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Debug|x64.ActiveCfg = Debug|Any CPU + {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Debug|x64.Build.0 = Debug|Any CPU + {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Debug|x86.ActiveCfg = Debug|Any CPU + {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Debug|x86.Build.0 = Debug|Any CPU {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Release|Any CPU.ActiveCfg = Release|Any CPU {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Release|Any CPU.Build.0 = Release|Any CPU + {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Release|x64.ActiveCfg = Release|Any CPU + {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Release|x64.Build.0 = Release|Any CPU + {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Release|x86.ActiveCfg = Release|Any CPU + {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Release|x86.Build.0 = Release|Any CPU {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Debug|x64.ActiveCfg = Debug|Any CPU + {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Debug|x64.Build.0 = Debug|Any CPU + {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Debug|x86.ActiveCfg = Debug|Any CPU + {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Debug|x86.Build.0 = Debug|Any CPU {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Release|Any CPU.ActiveCfg = Release|Any CPU {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Release|Any CPU.Build.0 = Release|Any CPU + {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Release|x64.ActiveCfg = Release|Any CPU + {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Release|x64.Build.0 = Release|Any CPU + {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Release|x86.ActiveCfg = Release|Any CPU + {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Release|x86.Build.0 = Release|Any CPU {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Debug|x64.ActiveCfg = Debug|Any CPU + {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Debug|x64.Build.0 = Debug|Any CPU + {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Debug|x86.ActiveCfg = Debug|Any CPU + {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Debug|x86.Build.0 = Debug|Any CPU {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Release|Any CPU.ActiveCfg = Release|Any CPU {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Release|Any CPU.Build.0 = Release|Any CPU + {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Release|x64.ActiveCfg = Release|Any CPU + {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Release|x64.Build.0 = Release|Any CPU + {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Release|x86.ActiveCfg = Release|Any CPU + {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Release|x86.Build.0 = Release|Any CPU {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Debug|Any CPU.Build.0 = Debug|Any CPU + {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Debug|x64.ActiveCfg = Debug|Any CPU + {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Debug|x64.Build.0 = Debug|Any CPU + {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Debug|x86.ActiveCfg = Debug|Any CPU + {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Debug|x86.Build.0 = Debug|Any CPU {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Release|Any CPU.ActiveCfg = Release|Any CPU {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Release|Any CPU.Build.0 = Release|Any CPU + {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Release|x64.ActiveCfg = Release|Any CPU + {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Release|x64.Build.0 = Release|Any CPU + {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Release|x86.ActiveCfg = Release|Any CPU + {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Release|x86.Build.0 = Release|Any CPU {B8324314-0695-4919-91AE-9683972D4125}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {B8324314-0695-4919-91AE-9683972D4125}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B8324314-0695-4919-91AE-9683972D4125}.Debug|x64.ActiveCfg = Debug|Any CPU + {B8324314-0695-4919-91AE-9683972D4125}.Debug|x64.Build.0 = Debug|Any CPU + {B8324314-0695-4919-91AE-9683972D4125}.Debug|x86.ActiveCfg = Debug|Any CPU + {B8324314-0695-4919-91AE-9683972D4125}.Debug|x86.Build.0 = Debug|Any CPU {B8324314-0695-4919-91AE-9683972D4125}.Release|Any CPU.ActiveCfg = Release|Any CPU {B8324314-0695-4919-91AE-9683972D4125}.Release|Any CPU.Build.0 = Release|Any CPU + {B8324314-0695-4919-91AE-9683972D4125}.Release|x64.ActiveCfg = Release|Any CPU + {B8324314-0695-4919-91AE-9683972D4125}.Release|x64.Build.0 = Release|Any CPU + {B8324314-0695-4919-91AE-9683972D4125}.Release|x86.ActiveCfg = Release|Any CPU + {B8324314-0695-4919-91AE-9683972D4125}.Release|x86.Build.0 = Release|Any CPU {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Debug|x64.ActiveCfg = Debug|Any CPU + {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Debug|x64.Build.0 = Debug|Any CPU + {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Debug|x86.ActiveCfg = Debug|Any CPU + {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Debug|x86.Build.0 = Debug|Any CPU {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Release|Any CPU.ActiveCfg = Release|Any CPU {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Release|Any CPU.Build.0 = Release|Any CPU + {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Release|x64.ActiveCfg = Release|Any CPU + {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Release|x64.Build.0 = Release|Any CPU + {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Release|x86.ActiveCfg = Release|Any CPU + {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Release|x86.Build.0 = Release|Any CPU {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Debug|x64.ActiveCfg = Debug|Any CPU + {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Debug|x64.Build.0 = Debug|Any CPU + {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Debug|x86.ActiveCfg = Debug|Any CPU + {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Debug|x86.Build.0 = Debug|Any CPU {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Release|Any CPU.ActiveCfg = Release|Any CPU {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Release|Any CPU.Build.0 = Release|Any CPU + {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Release|x64.ActiveCfg = Release|Any CPU + {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Release|x64.Build.0 = Release|Any CPU + {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Release|x86.ActiveCfg = Release|Any CPU + {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Release|x86.Build.0 = Release|Any CPU {00DF060E-573A-4236-B462-89CE45698191}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {00DF060E-573A-4236-B462-89CE45698191}.Debug|Any CPU.Build.0 = Debug|Any CPU + {00DF060E-573A-4236-B462-89CE45698191}.Debug|x64.ActiveCfg = Debug|Any CPU + {00DF060E-573A-4236-B462-89CE45698191}.Debug|x64.Build.0 = Debug|Any CPU + {00DF060E-573A-4236-B462-89CE45698191}.Debug|x86.ActiveCfg = Debug|Any CPU + {00DF060E-573A-4236-B462-89CE45698191}.Debug|x86.Build.0 = Debug|Any CPU {00DF060E-573A-4236-B462-89CE45698191}.Release|Any CPU.ActiveCfg = Release|Any CPU {00DF060E-573A-4236-B462-89CE45698191}.Release|Any CPU.Build.0 = Release|Any CPU + {00DF060E-573A-4236-B462-89CE45698191}.Release|x64.ActiveCfg = Release|Any CPU + {00DF060E-573A-4236-B462-89CE45698191}.Release|x64.Build.0 = Release|Any CPU + {00DF060E-573A-4236-B462-89CE45698191}.Release|x86.ActiveCfg = Release|Any CPU + {00DF060E-573A-4236-B462-89CE45698191}.Release|x86.Build.0 = Release|Any CPU {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Debug|x64.ActiveCfg = Debug|Any CPU + {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Debug|x64.Build.0 = Debug|Any CPU + {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Debug|x86.ActiveCfg = Debug|Any CPU + {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Debug|x86.Build.0 = Debug|Any CPU {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Release|Any CPU.ActiveCfg = Release|Any CPU {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Release|Any CPU.Build.0 = Release|Any CPU + {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Release|x64.ActiveCfg = Release|Any CPU + {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Release|x64.Build.0 = Release|Any CPU + {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Release|x86.ActiveCfg = Release|Any CPU + {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Release|x86.Build.0 = Release|Any CPU {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Debug|x64.ActiveCfg = Debug|Any CPU + {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Debug|x64.Build.0 = Debug|Any CPU + {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Debug|x86.ActiveCfg = Debug|Any CPU + {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Debug|x86.Build.0 = Debug|Any CPU {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Release|Any CPU.ActiveCfg = Release|Any CPU {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Release|Any CPU.Build.0 = Release|Any CPU + {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Release|x64.ActiveCfg = Release|Any CPU + {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Release|x64.Build.0 = Release|Any CPU + {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Release|x86.ActiveCfg = Release|Any CPU + {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Release|x86.Build.0 = Release|Any CPU {2D449099-F238-4105-97B8-697F93FA07CF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {2D449099-F238-4105-97B8-697F93FA07CF}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2D449099-F238-4105-97B8-697F93FA07CF}.Debug|x64.ActiveCfg = Debug|Any CPU + {2D449099-F238-4105-97B8-697F93FA07CF}.Debug|x64.Build.0 = Debug|Any CPU + {2D449099-F238-4105-97B8-697F93FA07CF}.Debug|x86.ActiveCfg = Debug|Any CPU + {2D449099-F238-4105-97B8-697F93FA07CF}.Debug|x86.Build.0 = Debug|Any CPU {2D449099-F238-4105-97B8-697F93FA07CF}.Release|Any CPU.ActiveCfg = Release|Any CPU {2D449099-F238-4105-97B8-697F93FA07CF}.Release|Any CPU.Build.0 = Release|Any CPU + {2D449099-F238-4105-97B8-697F93FA07CF}.Release|x64.ActiveCfg = Release|Any CPU + {2D449099-F238-4105-97B8-697F93FA07CF}.Release|x64.Build.0 = Release|Any CPU + {2D449099-F238-4105-97B8-697F93FA07CF}.Release|x86.ActiveCfg = Release|Any CPU + {2D449099-F238-4105-97B8-697F93FA07CF}.Release|x86.Build.0 = Release|Any CPU {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Debug|x64.ActiveCfg = Debug|Any CPU + {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Debug|x64.Build.0 = Debug|Any CPU + {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Debug|x86.ActiveCfg = Debug|Any CPU + {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Debug|x86.Build.0 = Debug|Any CPU {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Release|Any CPU.ActiveCfg = Release|Any CPU {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Release|Any CPU.Build.0 = Release|Any CPU + {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Release|x64.ActiveCfg = Release|Any CPU + {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Release|x64.Build.0 = Release|Any CPU + {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Release|x86.ActiveCfg = Release|Any CPU + {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Release|x86.Build.0 = Release|Any CPU {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Debug|x64.ActiveCfg = Debug|Any CPU + {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Debug|x64.Build.0 = Debug|Any CPU + {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Debug|x86.ActiveCfg = Debug|Any CPU + {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Debug|x86.Build.0 = Debug|Any CPU {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Release|Any CPU.ActiveCfg = Release|Any CPU {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Release|Any CPU.Build.0 = Release|Any CPU + {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Release|x64.ActiveCfg = Release|Any CPU + {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Release|x64.Build.0 = Release|Any CPU + {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Release|x86.ActiveCfg = Release|Any CPU + {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Release|x86.Build.0 = Release|Any CPU {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Debug|x64.ActiveCfg = Debug|Any CPU + {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Debug|x64.Build.0 = Debug|Any CPU + {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Debug|x86.ActiveCfg = Debug|Any CPU + {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Debug|x86.Build.0 = Debug|Any CPU {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Release|Any CPU.ActiveCfg = Release|Any CPU {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Release|Any CPU.Build.0 = Release|Any CPU + {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Release|x64.ActiveCfg = Release|Any CPU + {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Release|x64.Build.0 = Release|Any CPU + {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Release|x86.ActiveCfg = Release|Any CPU + {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Release|x86.Build.0 = Release|Any CPU {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Debug|x64.ActiveCfg = Debug|Any CPU + {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Debug|x64.Build.0 = Debug|Any CPU + {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Debug|x86.ActiveCfg = Debug|Any CPU + {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Debug|x86.Build.0 = Debug|Any CPU {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Release|Any CPU.ActiveCfg = Release|Any CPU {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Release|Any CPU.Build.0 = Release|Any CPU + {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Release|x64.ActiveCfg = Release|Any CPU + {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Release|x64.Build.0 = Release|Any CPU + {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Release|x86.ActiveCfg = Release|Any CPU + {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Release|x86.Build.0 = Release|Any CPU {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Debug|x64.ActiveCfg = Debug|Any CPU + {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Debug|x64.Build.0 = Debug|Any CPU + {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Debug|x86.ActiveCfg = Debug|Any CPU + {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Debug|x86.Build.0 = Debug|Any CPU {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Release|Any CPU.ActiveCfg = Release|Any CPU {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Release|Any CPU.Build.0 = Release|Any CPU + {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Release|x64.ActiveCfg = Release|Any CPU + {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Release|x64.Build.0 = Release|Any CPU + {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Release|x86.ActiveCfg = Release|Any CPU + {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Release|x86.Build.0 = Release|Any CPU {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Debug|x64.ActiveCfg = Debug|Any CPU + {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Debug|x64.Build.0 = Debug|Any CPU + {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Debug|x86.ActiveCfg = Debug|Any CPU + {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Debug|x86.Build.0 = Debug|Any CPU {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Release|Any CPU.ActiveCfg = Release|Any CPU {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Release|Any CPU.Build.0 = Release|Any CPU + {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Release|x64.ActiveCfg = Release|Any CPU + {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Release|x64.Build.0 = Release|Any CPU + {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Release|x86.ActiveCfg = Release|Any CPU + {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Release|x86.Build.0 = Release|Any CPU + {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Debug|x64.ActiveCfg = Debug|Any CPU + {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Debug|x64.Build.0 = Debug|Any CPU + {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Debug|x86.ActiveCfg = Debug|Any CPU + {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Debug|x86.Build.0 = Debug|Any CPU + {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Release|Any CPU.Build.0 = Release|Any CPU + {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Release|x64.ActiveCfg = Release|Any CPU + {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Release|x64.Build.0 = Release|Any CPU + {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Release|x86.ActiveCfg = Release|Any CPU + {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Release|x86.Build.0 = Release|Any CPU + {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Debug|x64.ActiveCfg = Debug|Any CPU + {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Debug|x64.Build.0 = Debug|Any CPU + {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Debug|x86.ActiveCfg = Debug|Any CPU + {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Debug|x86.Build.0 = Debug|Any CPU + {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Release|Any CPU.Build.0 = Release|Any CPU + {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Release|x64.ActiveCfg = Release|Any CPU + {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Release|x64.Build.0 = Release|Any CPU + {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Release|x86.ActiveCfg = Release|Any CPU + {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Release|x86.Build.0 = Release|Any CPU + {E28646AF-556E-423E-B54F-ADFBD75F2611}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E28646AF-556E-423E-B54F-ADFBD75F2611}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E28646AF-556E-423E-B54F-ADFBD75F2611}.Debug|x64.ActiveCfg = Debug|Any CPU + {E28646AF-556E-423E-B54F-ADFBD75F2611}.Debug|x64.Build.0 = Debug|Any CPU + {E28646AF-556E-423E-B54F-ADFBD75F2611}.Debug|x86.ActiveCfg = Debug|Any CPU + {E28646AF-556E-423E-B54F-ADFBD75F2611}.Debug|x86.Build.0 = Debug|Any CPU + {E28646AF-556E-423E-B54F-ADFBD75F2611}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E28646AF-556E-423E-B54F-ADFBD75F2611}.Release|Any CPU.Build.0 = Release|Any CPU + {E28646AF-556E-423E-B54F-ADFBD75F2611}.Release|x64.ActiveCfg = Release|Any CPU + {E28646AF-556E-423E-B54F-ADFBD75F2611}.Release|x64.Build.0 = Release|Any CPU + {E28646AF-556E-423E-B54F-ADFBD75F2611}.Release|x86.ActiveCfg = Release|Any CPU + {E28646AF-556E-423E-B54F-ADFBD75F2611}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -162,6 +370,7 @@ Global {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A} = {9EAD0078-CA03-486D-BF2E-2C891955CE16} {61C62DB8-CB67-4E3F-A2EA-30547183C64F} = {1807629C-08BB-487B-821E-8E0B72F5C4AC} {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3} = {1807629C-08BB-487B-821E-8E0B72F5C4AC} + {E28646AF-556E-423E-B54F-ADFBD75F2611} = {E2872BA3-3393-4325-66A6-EE7620D75132} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {374403EA-85FA-4ACA-8E38-FB938FFC7016} diff --git a/src/Adaptive.Aeron/Adaptive.Aeron.csproj b/src/Adaptive.Aeron/Adaptive.Aeron.csproj index db0b5f68..25a7395f 100644 --- a/src/Adaptive.Aeron/Adaptive.Aeron.csproj +++ b/src/Adaptive.Aeron/Adaptive.Aeron.csproj @@ -24,6 +24,11 @@ all + + all + false + + diff --git a/src/Adaptive.Aeron/FodyWeavers.xml b/src/Adaptive.Aeron/FodyWeavers.xml index 42637c5c..7adcd688 100644 --- a/src/Adaptive.Aeron/FodyWeavers.xml +++ b/src/Adaptive.Aeron/FodyWeavers.xml @@ -1,4 +1,5 @@  + \ No newline at end of file diff --git a/src/Adaptive.Aeron/FodyWeavers.xsd b/src/Adaptive.Aeron/FodyWeavers.xsd index 0eeb31cf..42ece42f 100644 --- a/src/Adaptive.Aeron/FodyWeavers.xsd +++ b/src/Adaptive.Aeron/FodyWeavers.xsd @@ -5,6 +5,7 @@ + diff --git a/src/Adaptive.Agrona/SuppressedExceptions.cs b/src/Adaptive.Agrona/SuppressedExceptions.cs new file mode 100644 index 00000000..9907c3e5 --- /dev/null +++ b/src/Adaptive.Agrona/SuppressedExceptions.cs @@ -0,0 +1,40 @@ +using System; +using System.Collections.Generic; + +namespace Adaptive.Agrona +{ + /// + /// Extension methods that emulate Java's Throwable.addSuppressed / getSuppressed + /// on .NET's type via the dictionary. + /// + public static class SuppressedExceptions + { + private const string SuppressedKey = "Adaptive.Agrona.SuppressedExceptions"; + + public static void AddSuppressed(this Exception primary, Exception suppressed) + { + if (primary == null || suppressed == null || ReferenceEquals(primary, suppressed)) + { + return; + } + + if (!(primary.Data[SuppressedKey] is List list)) + { + list = new List(); + primary.Data[SuppressedKey] = list; + } + + list.Add(suppressed); + } + + public static IReadOnlyList GetSuppressed(this Exception primary) + { + if (primary?.Data[SuppressedKey] is List list) + { + return list; + } + + return Array.Empty(); + } + } +} diff --git a/src/Adaptive.Archiver.Tests/Adaptive.Archiver.Tests.csproj b/src/Adaptive.Archiver.Tests/Adaptive.Archiver.Tests.csproj new file mode 100644 index 00000000..6353c47c --- /dev/null +++ b/src/Adaptive.Archiver.Tests/Adaptive.Archiver.Tests.csproj @@ -0,0 +1,20 @@ + + + + net8.0 + false + + + + + + + + + + + + + + + diff --git a/src/Adaptive.Archiver.Tests/AeronArchiveTest.cs b/src/Adaptive.Archiver.Tests/AeronArchiveTest.cs new file mode 100644 index 00000000..3130f27c --- /dev/null +++ b/src/Adaptive.Archiver.Tests/AeronArchiveTest.cs @@ -0,0 +1,433 @@ +using System; +using System.Threading; +using Adaptive.Aeron; +using Adaptive.Aeron.Exceptions; +using Adaptive.Agrona; +using Adaptive.Agrona.Concurrent; +using FakeItEasy; +using NUnit.Framework; +using AeronType = Adaptive.Aeron.Aeron; +using Context = Adaptive.Archiver.AeronArchive.Context; + +namespace Adaptive.Archiver.Tests +{ + public class AeronArchiveTest + { + private const string SESSION_ID_PARAM_NAME = "session-id"; + private const string MTU_LENGTH_PARAM_NAME = "mtu"; + private const string TERM_LENGTH_PARAM_NAME = "term-length"; + private const string SPARSE_PARAM_NAME = "sparse"; + + private AeronType aeron; + private ControlResponsePoller controlResponsePoller; + private ArchiveProxy archiveProxy; + private IErrorHandler errorHandler; + + [SetUp] + public void SetUp() + { + aeron = A.Fake(); + controlResponsePoller = A.Fake(); + archiveProxy = A.Fake(); + errorHandler = A.Fake(); + } + + [Test] + public void AsyncConnectShouldConcludeContext() + { + var ctx = A.Fake(); + var expectedException = new InvalidOperationException("test"); + A.CallTo(() => ctx.Conclude()).Throws(expectedException); + + var actualException = Assert.Throws(() => AeronArchive.ConnectAsync(ctx)); + Assert.AreSame(expectedException, actualException); + + A.CallTo(() => ctx.Conclude()).MustHaveHappened(); + A.CallTo(ctx).MustHaveHappenedOnceExactly(); + } + + [Test] + public void AsyncConnectShouldCloseContext() + { + const string responseChannel = "aeron:udp?endpoint=localhost:1234"; + const int responseStreamId = 49; + var ctx = A.Fake(); + A.CallTo(() => ctx.AeronClient()).Returns(aeron); + A.CallTo(() => ctx.ControlResponseChannel()).Returns(responseChannel); + A.CallTo(() => ctx.ControlResponseStreamId()).Returns(responseStreamId); + var error = new InvalidOperationException("subscription"); + A.CallTo(() => aeron.AsyncAddSubscription( + responseChannel, + responseStreamId, + A._, + A._)) + .Throws(error); + + var actualException = Assert.Throws(() => AeronArchive.ConnectAsync(ctx)); + Assert.AreSame(error, actualException); + + A.CallTo(() => ctx.Conclude()).MustHaveHappened().Then( + A.CallTo(() => ctx.AeronClient()).MustHaveHappened()).Then( + A.CallTo(() => ctx.ControlResponseChannel()).MustHaveHappened()).Then( + A.CallTo(() => ctx.ControlResponseStreamId()).MustHaveHappened()).Then( + A.CallTo(() => aeron.AsyncAddSubscription( + responseChannel, responseStreamId, A._, A._)) + .MustHaveHappened()).Then( + A.CallTo(() => ctx.Dispose()).MustHaveHappened()); + } + + [Test] + public void AsyncConnectShouldCloseResourceInCaseOfExceptionUponStartup() + { + const string responseChannel = "aeron:udp?endpoint=localhost:0"; + const int responseStreamId = 49; + const string requestChannel = "aeron:udp?endpoint=localhost:1234"; + const int requestStreamId = -15; + const long subscriptionId = -3275938475934759L; + + var ctx = A.Fake(); + A.CallTo(() => ctx.AeronClient()).Returns(aeron); + A.CallTo(() => ctx.ControlResponseChannel()).Returns(responseChannel); + A.CallTo(() => ctx.ControlResponseStreamId()).Returns(responseStreamId); + A.CallTo(() => ctx.ControlRequestChannel()).Returns(requestChannel); + A.CallTo(() => ctx.ControlRequestStreamId()).Returns(requestStreamId); + A.CallTo(() => aeron.AsyncAddSubscription( + responseChannel, + responseStreamId, + A._, + A._)) + .Returns(subscriptionId); + var error = new IndexOutOfRangeException("exception"); + A.CallTo(() => aeron.Ctx).Throws(error); + + var actualException = Assert.Throws(() => AeronArchive.ConnectAsync(ctx)); + Assert.AreSame(error, actualException); + + A.CallTo(() => ctx.Conclude()).MustHaveHappened().Then( + A.CallTo(() => ctx.AeronClient()).MustHaveHappened()).Then( + A.CallTo(() => ctx.ControlResponseChannel()).MustHaveHappened()).Then( + A.CallTo(() => ctx.ControlResponseStreamId()).MustHaveHappened()).Then( + A.CallTo(() => aeron.AsyncAddSubscription( + responseChannel, responseStreamId, A._, A._)) + .MustHaveHappened()).Then( + A.CallTo(() => aeron.AsyncRemoveSubscription(subscriptionId)).MustHaveHappened()).Then( + A.CallTo(() => ctx.Dispose()).MustHaveHappened()); + } + + [Test] + public void ShouldQuietClose() + { + var previousException = new Exception(); + var thrownException = new Exception(); + + var throwingCloseable = A.Fake(); + var nonThrowingCloseable = A.Fake(); + A.CallTo(() => throwingCloseable.Dispose()).Throws(thrownException); + + Assert.IsNull(AeronArchive.QuietClose(null, nonThrowingCloseable)); + Assert.AreSame(previousException, AeronArchive.QuietClose(previousException, nonThrowingCloseable)); + var ex = AeronArchive.QuietClose(previousException, throwingCloseable); + Assert.AreSame(previousException, ex); + Assert.AreSame(thrownException, ex.GetSuppressed()[0]); + Assert.AreSame(thrownException, AeronArchive.QuietClose(null, throwingCloseable)); + } + + [TestCase(AeronType.NULL_VALUE)] + [TestCase(long.MaxValue)] + [TestCase(long.MinValue)] + [TestCase(0L)] + [TestCase(4468236482L)] + public void ShouldReturnAssignedArchiveId(long archiveId) + { + const long controlSessionId = -3924293L; + A.CallTo(() => aeron.Ctx).Returns(new AeronType.Context()); + var context = new Context() + .AeronClient(aeron) + .IdleStrategy(new NoOpIdleStrategy()) + .MessageTimeoutNs(100) + .Lock(new NoOpLock()) + .ErrorHandler(errorHandler) + .OwnsAeronClient(true); + + var aeronArchive = new AeronArchive(context, controlResponsePoller, archiveProxy, controlSessionId, archiveId); + + Assert.AreEqual(archiveId, aeronArchive.ArchiveId()); + } + + [TestCase( + "aeron:udp?endpoint=localhost:3388|mtu=2048", + "aeron:udp?session-id=5|endpoint=localhost:0|sparse=true|mtu=1024")] + [TestCase( + "aeron:udp?endpoint=localhost:3388", + "aeron:udp?control=localhost:10000|control-mode=dynamic")] + [TestCase( + "aeron:udp?endpoint=localhost:3388", + "aeron:udp?control-mode=manual")] + [TestCase( + "aeron:ipc?alias=request|ssc=false|linger=0|session-id=42|sparse=false", + "aeron:ipc?term-length=64k|alias=response")] + public void ShouldAddAUniqueSessionIdParameterToBothRequestAndResponseChannels( + string requestChannel, string responseChannel) + { + const int requestStreamId = 42; + const int responseStreamId = -19; + int sessionId = BitUtil.GenerateRandomisedId(); + A.CallTo(() => aeron.NextSessionId(requestStreamId)).Returns(sessionId); + + var context = new Context() + .AeronClient(aeron) + .OwnsAeronClient(false) + .ErrorHandler(errorHandler) + .ControlRequestChannel(requestChannel) + .ControlRequestStreamId(requestStreamId) + .ControlResponseChannel(responseChannel) + .ControlResponseStreamId(responseStreamId) + .ControlTermBufferSparse(false) + .ControlTermBufferLength(128 * 1024) + .ControlMtuLength(4096); + + Assert.AreEqual(requestChannel, context.ControlRequestChannel()); + Assert.AreEqual(requestStreamId, context.ControlRequestStreamId()); + Assert.AreEqual(responseChannel, context.ControlResponseChannel()); + Assert.AreEqual(responseStreamId, context.ControlResponseStreamId()); + + context.Conclude(); + + A.CallTo(() => aeron.NextSessionId(requestStreamId)).MustHaveHappened(); + Assert.AreEqual(requestStreamId, context.ControlRequestStreamId()); + Assert.AreEqual(responseStreamId, context.ControlResponseStreamId()); + + var actualRequestChannel = ChannelUri.Parse(context.ControlRequestChannel()); + var actualResponseChannel = ChannelUri.Parse(context.ControlResponseChannel()); + Assert.IsTrue(actualRequestChannel.ContainsKey(SESSION_ID_PARAM_NAME), "session-id was not added"); + Assert.AreEqual(sessionId.ToString(), actualRequestChannel.Get(SESSION_ID_PARAM_NAME)); + Assert.AreEqual(sessionId.ToString(), actualResponseChannel.Get(SESSION_ID_PARAM_NAME)); + + ChannelUri.Parse(requestChannel).ForEachParameter((key, value) => + { + if (!SESSION_ID_PARAM_NAME.Equals(key)) + { + Assert.AreEqual(value, actualRequestChannel.Get(key)); + } + }); + + ChannelUri.Parse(responseChannel).ForEachParameter((key, value) => + { + if (!SESSION_ID_PARAM_NAME.Equals(key)) + { + Assert.AreEqual(value, actualResponseChannel.Get(key)); + } + }); + } + + [Test] + public void ShouldNotAddASessionIdIfControlModeResponseIsSpecifiedOnTheResponseChannel() + { + const int requestStreamId = 100; + const int responseStreamId = 200; + const string requestChannel = "aeron:udp?endpoint=localhost:8080"; + const string responseChannel = "aeron:udp?control-mode=response|control=localhost:10002"; + var context = new Context() + .AeronClient(aeron) + .OwnsAeronClient(false) + .ErrorHandler(errorHandler) + .ControlRequestChannel(requestChannel) + .ControlRequestStreamId(requestStreamId) + .ControlResponseChannel(responseChannel) + .ControlResponseStreamId(responseStreamId); + + Assert.AreEqual(requestChannel, context.ControlRequestChannel()); + Assert.AreEqual(requestStreamId, context.ControlRequestStreamId()); + Assert.AreEqual(responseChannel, context.ControlResponseChannel()); + Assert.AreEqual(responseStreamId, context.ControlResponseStreamId()); + + context.Conclude(); + + Assert.AreEqual(requestStreamId, context.ControlRequestStreamId()); + Assert.AreEqual(responseStreamId, context.ControlResponseStreamId()); + + var actualRequestChannel = ChannelUri.Parse(context.ControlRequestChannel()); + var actualResponseChannel = ChannelUri.Parse(context.ControlResponseChannel()); + Assert.IsNull(actualRequestChannel.Get(SESSION_ID_PARAM_NAME), "unexpected session-id on request channel"); + Assert.IsNull(actualResponseChannel.Get(SESSION_ID_PARAM_NAME), "unexpected session-id on response channel"); + + ChannelUri.Parse(requestChannel).ForEachParameter( + (key, value) => Assert.AreEqual(value, actualRequestChannel.Get(key))); + + ChannelUri.Parse(responseChannel).ForEachParameter( + (key, value) => Assert.AreEqual(value, actualResponseChannel.Get(key))); + } + + [Test] + public void ShouldAddDefaultUriParametersIfNotSpecified() + { + const int requestStreamId = 10; + const int responseStreamId = 20; + const string requestChannel = "aeron:udp?endpoint=localhost:8080"; + const string responseChannel = "aeron:udp?endpoint=localhost:0"; + var context = new Context() + .AeronClient(aeron) + .OwnsAeronClient(false) + .ErrorHandler(errorHandler) + .ControlRequestChannel(requestChannel) + .ControlRequestStreamId(requestStreamId) + .ControlResponseChannel(responseChannel) + .ControlResponseStreamId(responseStreamId) + .ControlMtuLength(2048) + .ControlTermBufferLength(256 * 1024) + .ControlTermBufferSparse(true); + + Assert.AreEqual(requestChannel, context.ControlRequestChannel()); + Assert.AreEqual(requestStreamId, context.ControlRequestStreamId()); + Assert.AreEqual(responseChannel, context.ControlResponseChannel()); + Assert.AreEqual(responseStreamId, context.ControlResponseStreamId()); + + context.Conclude(); + + Assert.AreEqual(requestStreamId, context.ControlRequestStreamId()); + Assert.AreEqual(responseStreamId, context.ControlResponseStreamId()); + + var actualRequestChannel = ChannelUri.Parse(context.ControlRequestChannel()); + var actualResponseChannel = ChannelUri.Parse(context.ControlResponseChannel()); + Assert.AreEqual(context.ControlMtuLength().ToString(), actualRequestChannel.Get(MTU_LENGTH_PARAM_NAME)); + Assert.AreEqual(context.ControlMtuLength().ToString(), actualResponseChannel.Get(MTU_LENGTH_PARAM_NAME)); + Assert.AreEqual( + context.ControlTermBufferLength().ToString(), + actualRequestChannel.Get(TERM_LENGTH_PARAM_NAME)); + Assert.AreEqual( + context.ControlTermBufferLength().ToString(), + actualResponseChannel.Get(TERM_LENGTH_PARAM_NAME)); + string sparseStr = context.ControlTermBufferSparse() ? "true" : "false"; + Assert.AreEqual(sparseStr, actualRequestChannel.Get(SPARSE_PARAM_NAME)); + Assert.AreEqual(sparseStr, actualResponseChannel.Get(SPARSE_PARAM_NAME)); + } + + [TestCase(int.MinValue)] + [TestCase(-1)] + [TestCase(0)] + public void ShouldRejectInvalidRetryAttempts(int retryAttempts) + { + var context = new Context() + .AeronClient(aeron) + .ControlRequestChannel("aeron:udp") + .ControlResponseChannel("aeron:udp") + .MessageRetryAttempts(retryAttempts); + Assert.AreEqual(retryAttempts, context.MessageRetryAttempts()); + + var exception = Assert.Throws(() => context.Conclude()); + Assert.AreEqual( + "AeronArchive.Context.messageRetryAttempts must be > 0, got: " + retryAttempts, + exception.Message); + } + + [Test] + public void MaxRetryAttemptsDefaultValue() + { + var context = new Context(); + Assert.AreEqual(AeronArchive.Configuration.MESSAGE_RETRY_ATTEMPTS_DEFAULT, context.MessageRetryAttempts()); + } + + [Test] + public void MaxRetryAttemptsSystemProperty() + { + Config.Params[AeronArchive.Configuration.MESSAGE_RETRY_ATTEMPTS_PROP_NAME] = "111"; + try + { + var context = new Context(); + Assert.AreEqual(111, context.MessageRetryAttempts()); + } + finally + { + Config.Params.Remove(AeronArchive.Configuration.MESSAGE_RETRY_ATTEMPTS_PROP_NAME); + } + } + + [Test] + public void CloseNotOwningAeronClient() + { + const long controlSessionId = 42L; + const long archiveId = -190L; + + var aeronContext = A.Fake(); + A.CallTo(() => aeronContext.NanoClock()).Returns(SystemNanoClock.INSTANCE); + A.CallTo(() => aeron.Ctx).Returns(aeronContext); + var aeronException = new SynchronizationLockException("aeron closed"); + A.CallTo(() => aeron.Dispose()).Throws(aeronException); + + var publication = A.Fake(); + A.CallTo(() => publication.IsConnected).Returns(true); + var publicationException = new InvalidOperationException("publication is closed"); + A.CallTo(() => publication.Dispose()).Throws(publicationException); + + var subscription = A.Fake(); + A.CallTo(() => controlResponsePoller.Subscription()).Returns(subscription); + var subscriptionException = new IndexOutOfRangeException("subscription"); + A.CallTo(() => subscription.Dispose()).Throws(subscriptionException); + + A.CallTo(() => archiveProxy.Pub()).Returns(publication); + var closeSessionException = new IndexOutOfRangeException(); + A.CallTo(() => archiveProxy.CloseSession(controlSessionId)).Throws(closeSessionException); + + var context = new Context() + .AeronClient(aeron) + .IdleStrategy(new NoOpIdleStrategy()) + .MessageTimeoutNs(100) + .Lock(new NoOpLock()) + .ErrorHandler(errorHandler) + .OwnsAeronClient(false); + var aeronArchive = new AeronArchive(context, controlResponsePoller, archiveProxy, controlSessionId, archiveId); + + aeronArchive.Dispose(); + + A.CallTo(() => errorHandler.OnError(A.That.Matches(ex => + ReferenceEquals(closeSessionException, ex) && + ex.GetSuppressed().Count >= 2 && + ReferenceEquals(publicationException, ex.GetSuppressed()[0]) && + ReferenceEquals(subscriptionException, ex.GetSuppressed()[1])))).MustHaveHappened(); + A.CallTo(errorHandler).MustHaveHappenedOnceExactly(); + A.CallTo(() => publication.Dispose()).MustHaveHappened(); + A.CallTo(() => subscription.Dispose()).MustHaveHappened(); + } + + [Test] + public void CloseOwningAeronClient() + { + const long controlSessionId = 42L; + const long archiveId = 555L; + + var aeronContext = A.Fake(); + A.CallTo(() => aeronContext.NanoClock()).Returns(SystemNanoClock.INSTANCE); + A.CallTo(() => aeron.Ctx).Returns(aeronContext); + var aeronException = new SynchronizationLockException("aeron closed"); + A.CallTo(() => aeron.Dispose()).Throws(aeronException); + + var publication = A.Fake(); + A.CallTo(() => publication.IsConnected).Returns(true); + A.CallTo(() => publication.Dispose()).Throws(new InvalidOperationException("publication is closed")); + + var subscription = A.Fake(); + A.CallTo(() => controlResponsePoller.Subscription()).Returns(subscription); + A.CallTo(() => subscription.Dispose()).Throws(new IndexOutOfRangeException("subscription")); + + A.CallTo(() => archiveProxy.Pub()).Returns(publication); + var closeSessionException = new IndexOutOfRangeException(); + A.CallTo(() => archiveProxy.CloseSession(controlSessionId)).Throws(closeSessionException); + + var context = new Context() + .AeronClient(aeron) + .IdleStrategy(new NoOpIdleStrategy()) + .MessageTimeoutNs(100) + .Lock(new NoOpLock()) + .ErrorHandler(errorHandler) + .OwnsAeronClient(true); + var aeronArchive = new AeronArchive(context, controlResponsePoller, archiveProxy, controlSessionId, archiveId); + + var ex = Assert.Throws(() => aeronArchive.Dispose()); + + Assert.AreSame(closeSessionException, ex); + A.CallTo(() => errorHandler.OnError(closeSessionException)).MustHaveHappened(); + A.CallTo(errorHandler).MustHaveHappenedOnceExactly(); + Assert.AreEqual(aeronException, ex.GetSuppressed()[0]); + } + } +} diff --git a/src/Adaptive.Archiver.Tests/ArchiveExceptionTest.cs b/src/Adaptive.Archiver.Tests/ArchiveExceptionTest.cs new file mode 100644 index 00000000..5747249a --- /dev/null +++ b/src/Adaptive.Archiver.Tests/ArchiveExceptionTest.cs @@ -0,0 +1,35 @@ +using NUnit.Framework; + +namespace Adaptive.Archiver.Tests +{ + public class ArchiveExceptionTest + { + [TestCase(0, "GENERIC")] + [TestCase(1, "ACTIVE_LISTING")] + [TestCase(2, "ACTIVE_RECORDING")] + [TestCase(3, "ACTIVE_SUBSCRIPTION")] + [TestCase(4, "UNKNOWN_SUBSCRIPTION")] + [TestCase(5, "UNKNOWN_RECORDING")] + [TestCase(6, "UNKNOWN_REPLAY")] + [TestCase(7, "MAX_REPLAYS")] + [TestCase(8, "MAX_RECORDINGS")] + [TestCase(9, "INVALID_EXTENSION")] + [TestCase(10, "AUTHENTICATION_REJECTED")] + [TestCase(11, "STORAGE_SPACE")] + [TestCase(12, "UNKNOWN_REPLICATION")] + [TestCase(13, "UNAUTHORISED_ACTION")] + [TestCase(14, "REPLICATION_CONNECTION_FAILURE")] + public void ErrorCodeAsString(int errorCode, string expected) + { + Assert.AreEqual(expected, ArchiveException.ErrorCodeAsString(errorCode)); + } + + [TestCase(-1)] + [TestCase(1111111)] + [TestCase(54)] + public void ShouldHandleUnknownErrorCodes(int errorCode) + { + Assert.AreEqual("unknown error code: " + errorCode, ArchiveException.ErrorCodeAsString(errorCode)); + } + } +} diff --git a/src/Adaptive.Archiver/Adaptive.Archiver.csproj b/src/Adaptive.Archiver/Adaptive.Archiver.csproj index 81176e35..e5c80eb7 100644 --- a/src/Adaptive.Archiver/Adaptive.Archiver.csproj +++ b/src/Adaptive.Archiver/Adaptive.Archiver.csproj @@ -23,4 +23,24 @@ + + + all + runtime; build; native; contentfiles; analyzers + + + all + + + all + false + + + + + + + + + \ No newline at end of file diff --git a/src/Adaptive.Archiver/AeronArchive.cs b/src/Adaptive.Archiver/AeronArchive.cs index cf6f4d76..b532657b 100644 --- a/src/Adaptive.Archiver/AeronArchive.cs +++ b/src/Adaptive.Archiver/AeronArchive.cs @@ -186,7 +186,7 @@ public void Dispose() rethrow = true; if (null != resultEx) { - //resultEx.AddSuppressed(ex); + resultEx.AddSuppressed(ex); } else { @@ -2492,6 +2492,16 @@ public class Configuration /// public static readonly long MESSAGE_TIMEOUT_DEFAULT_NS = 10_000_000_000; + /// + /// The number of retry attempts to be made when offering messages to the archive. + /// + public const string MESSAGE_RETRY_ATTEMPTS_PROP_NAME = "aeron.archive.message.retry.attempts"; + + /// + /// Default number of retry attempts to be made when offering messages to the archive. + /// + public const int MESSAGE_RETRY_ATTEMPTS_DEFAULT = 3; + /// /// Channel for sending control messages to an archive. /// @@ -2641,6 +2651,16 @@ public static long MessageTimeoutNs() return Config.GetDurationInNanos(MESSAGE_TIMEOUT_PROP_NAME, MESSAGE_TIMEOUT_DEFAULT_NS); } + /// + /// The number of retry attempts to be made when offering messages to the archive. + /// + /// the number of retry attempts. + /// + public static int MessageRetryAttempts() + { + return Config.GetInteger(MESSAGE_RETRY_ATTEMPTS_PROP_NAME, MESSAGE_RETRY_ATTEMPTS_DEFAULT); + } + /// /// Should term buffer files be sparse for control request and response streams. /// @@ -2790,6 +2810,7 @@ public class Context private int _isConcluded = 0; internal long messageTimeoutNs = Configuration.MessageTimeoutNs(); + internal int messageRetryAttempts = Configuration.MessageRetryAttempts(); internal String clientName = AeronArchive.Configuration.ClientName(); internal string recordingEventsChannel = Configuration.RecordingEventsChannel(); internal int recordingEventsStreamId = Configuration.RecordingEventsStreamId(); @@ -2841,6 +2862,12 @@ public void Conclude() throw new ConfigurationException("AeronArchive.Context.clientName length must be <= " + Aeron.Aeron.Configuration.MAX_CLIENT_NAME_LENGTH); } + if (messageRetryAttempts <= 0) + { + throw new ConfigurationException( + "AeronArchive.Context.messageRetryAttempts must be > 0, got: " + messageRetryAttempts); + } + if (null == aeron) { aeron = Aeron.Aeron.Connect( @@ -2912,6 +2939,28 @@ public long MessageTimeoutNs() return messageTimeoutNs; } + /// + /// Set the number of retry attempts to be made when offering messages to the archive. + /// + /// the number of retry attempts. + /// this for a fluent API. + /// + public Context MessageRetryAttempts(int messageRetryAttempts) + { + this.messageRetryAttempts = messageRetryAttempts; + return this; + } + + /// + /// The number of retry attempts to be made when offering messages to the archive. + /// + /// the number of retry attempts. + /// + public int MessageRetryAttempts() + { + return messageRetryAttempts; + } + /// /// Get the channel URI on which the recording events publication will publish. /// @@ -3392,7 +3441,7 @@ private ChannelUri ApplyDefaultParams(string channel) if (!channelUri.ContainsKey(SPARSE_PARAM_NAME)) { - channelUri.Put(SPARSE_PARAM_NAME, Convert.ToString(controlTermBufferSparse)); + channelUri.Put(SPARSE_PARAM_NAME, controlTermBufferSparse ? "true" : "false"); } return channelUri; @@ -3410,44 +3459,44 @@ public class AsyncConnect : IDisposable public enum AsyncConnectState { /// - /// Initial state of adding a publication for control request channel. + /// Initial state of awaiting an asynchronous subscription for the control response channel. /// - ADD_PUBLICATION = 0, + AWAIT_SUBSCRIPTION = 0, + + /// + /// Add publication for control request channel. + /// + ADD_PUBLICATION = 1, /// /// Await publication being added. /// - AWAIT_PUBLICATION_CONNECTED = 1, + AWAIT_PUBLICATION_CONNECTED = 2, /// /// Sending connect request to the Archive. /// - SEND_CONNECT_REQUEST = 2, + SEND_CONNECT_REQUEST = 3, /// /// Await response subscription connected. /// - AWAIT_SUBSCRIPTION_CONNECTED = 3, + AWAIT_SUBSCRIPTION_CONNECTED = 4, /// /// Await connect response. /// - AWAIT_CONNECT_RESPONSE = 4, + AWAIT_CONNECT_RESPONSE = 5, /// /// Send archive-id request. /// - SEND_ARCHIVE_ID_REQUEST = 5, + SEND_ARCHIVE_ID_REQUEST = 6, /// /// Await response for the archive-id request. /// - AWAIT_ARCHIVE_ID_RESPONSE = 6, - - /// - /// Archive connection established. - /// - DONE = 7, + AWAIT_ARCHIVE_ID_RESPONSE = 7, /// /// Sending a challenge response. @@ -3457,18 +3506,24 @@ public enum AsyncConnectState /// /// Await challenge response. /// - AWAIT_CHALLENGE_RESPONSE = 9 + AWAIT_CHALLENGE_RESPONSE = 9, + + /// + /// Archive connection established. + /// + DONE = 10 } internal static readonly int PROTOCOL_VERSION_WITH_ARCHIVE_ID = SemanticVersion.Compose(1, 11, 0); private readonly Context ctx; - private readonly ControlResponsePoller controlResponsePoller; private readonly long deadlineNs; + private ControlResponsePoller controlResponsePoller; + private long subscriptionRegistrationId = Aeron.Aeron.NULL_VALUE; private long publicationRegistrationId = Aeron.Aeron.NULL_VALUE; private long correlationId = Aeron.Aeron.NULL_VALUE; private long controlSessionId = Aeron.Aeron.NULL_VALUE; private byte[] encodedCredentialsFromChallenge = null; - private AsyncConnectState state = AsyncConnectState.ADD_PUBLICATION; + private AsyncConnectState state = AsyncConnectState.AWAIT_SUBSCRIPTION; private ArchiveProxy archiveProxy; private AeronArchive aeronArchive; @@ -3480,26 +3535,25 @@ internal AsyncConnect(Context ctx) Aeron.Aeron aeron = ctx.AeronClient(); - controlResponsePoller = new ControlResponsePoller(aeron.AddSubscription( - ctx.ControlResponseChannel(), ctx.ControlResponseStreamId(), null, (image) => + subscriptionRegistrationId = aeron.AsyncAddSubscription( + ctx.ControlResponseChannel(), + ctx.ControlResponseStreamId(), + null, + (image) => { AeronArchive client = aeronArchive; if (null != client) { client.State(ArchiveState.DISCONNECTED); } - })); + }); - CheckAndSetupResponseChannel(ctx, controlResponsePoller.Subscription()); - - publicationRegistrationId = - aeron.AsyncAddExclusivePublication(ctx.ControlRequestChannel(), ctx.ControlRequestStreamId()); deadlineNs = aeron.Ctx.NanoClock().NanoTime() + ctx.MessageTimeoutNs(); } - catch (Exception ex) + catch (Exception) { Dispose(); - throw ex; + throw; } } @@ -3525,8 +3579,11 @@ public void Dispose() { if (null != controlResponsePoller) { - IErrorHandler errorHandler = ctx.ErrorHandler(); - CloseHelper.Dispose(errorHandler, controlResponsePoller.Subscription()); + CloseHelper.Dispose(ctx.ErrorHandler(), controlResponsePoller.Subscription()); + } + else if (Aeron.Aeron.NULL_VALUE != subscriptionRegistrationId) + { + ctx.AeronClient().AsyncRemoveSubscription(subscriptionRegistrationId); } if (null != archiveProxy) @@ -3578,23 +3635,34 @@ public AeronArchive Poll() { CheckDeadline(); + if (AsyncConnectState.AWAIT_SUBSCRIPTION == state) + { + AwaitSubscription(); + } + if (AsyncConnectState.ADD_PUBLICATION == state) { - ExclusivePublication publication = - ctx.AeronClient().GetExclusivePublication(publicationRegistrationId); + Aeron.Aeron aeron = ctx.AeronClient(); + if (Aeron.Aeron.NULL_VALUE == publicationRegistrationId) + { + publicationRegistrationId = aeron.AsyncAddExclusivePublication( + ctx.ControlRequestChannel(), ctx.ControlRequestStreamId()); + } + + ExclusivePublication publication = aeron.GetExclusivePublication(publicationRegistrationId); if (null != publication) { string clientInfo = "name=" + ctx.ClientName(); // + " " + AeronCounters.formatVersionInfo(AeronArchiveVersion.VERSION, AeronArchiveVersion.GIT_SHA); - publicationRegistrationId = Aeron.Aeron.NULL_VALUE; archiveProxy = new ArchiveProxy( publication, ctx.IdleStrategy(), - ctx.AeronClient().Ctx.NanoClock(), + aeron.Ctx.NanoClock(), ctx.MessageTimeoutNs(), - ArchiveProxy.DEFAULT_RETRY_ATTEMPTS, + ctx.MessageRetryAttempts(), ctx.CredentialsSupplier(), clientInfo); + publicationRegistrationId = Aeron.Aeron.NULL_VALUE; State(AsyncConnectState.AWAIT_PUBLICATION_CONNECTED); } @@ -3731,14 +3799,29 @@ private void State(AsyncConnectState newState) state = newState; } + private void AwaitSubscription() + { + Subscription subscription = ctx.AeronClient().GetSubscription(subscriptionRegistrationId); + if (null != subscription) + { + CheckAndSetupResponseChannel(ctx, subscription); + controlResponsePoller = new ControlResponsePoller(subscription); + subscriptionRegistrationId = Aeron.Aeron.NULL_VALUE; + State(AsyncConnectState.ADD_PUBLICATION); + } + } + private void CheckDeadline() { if (deadlineNs - ctx.AeronClient().Ctx.NanoClock().NanoTime() < 0) { + object publication = null != archiveProxy ? (object)archiveProxy.Pub() : ctx.ControlRequestChannel(); + object subscription = null != controlResponsePoller + ? (object)controlResponsePoller.Subscription() + : ctx.ControlResponseChannel(); throw new TimeoutException("Archive connect timeout: step=" + state + - ((int)state < 3 - ? " publication.uri=" + ctx.ControlRequestChannel() - : " subscription.uri=" + ctx.ControlResponseChannel())); + " publication=" + publication + + " subscription=" + subscription); } try @@ -3767,31 +3850,28 @@ private AeronArchive TransitionToDone(long archiveId) } } - static Exception QuietClose(Exception previousException, IDisposable disposable) + public static Exception QuietClose(Exception previousException, IDisposable disposable) { - Exception resultException = previousException; + if (disposable == null) + { + return previousException; + } - if (disposable != null) + try { - try - { - disposable.Dispose(); - } - catch (Exception ex) + disposable.Dispose(); + return previousException; + } + catch (Exception ex) + { + if (previousException == null) { - if (resultException != null) - { - // No built-in suppression — so wrap both exceptions - resultException = new AggregateException(resultException, ex); - } - else - { - resultException = ex; - } + return ex; } - } - return resultException; + previousException.AddSuppressed(ex); + return previousException; + } } private static void CheckAndSetupResponseChannel(Context ctx, Subscription subscription) diff --git a/src/Adaptive.Archiver/ArchiveException.cs b/src/Adaptive.Archiver/ArchiveException.cs index 10e12b42..6a5a033c 100644 --- a/src/Adaptive.Archiver/ArchiveException.cs +++ b/src/Adaptive.Archiver/ArchiveException.cs @@ -170,5 +170,33 @@ public ArchiveException(string message, int errorCode, long correlationId, Categ ErrorCode = errorCode; CorrelationId = correlationId; } + + /// + /// Get the human-readable name for an error code. + /// + /// to lookup. + /// the name of the error code, or "unknown error code: {errorCode}" if unrecognised. + public static string ErrorCodeAsString(int errorCode) + { + switch (errorCode) + { + case GENERIC: return "GENERIC"; + case ACTIVE_LISTING: return "ACTIVE_LISTING"; + case ACTIVE_RECORDING: return "ACTIVE_RECORDING"; + case ACTIVE_SUBSCRIPTION: return "ACTIVE_SUBSCRIPTION"; + case UNKNOWN_SUBSCRIPTION: return "UNKNOWN_SUBSCRIPTION"; + case UNKNOWN_RECORDING: return "UNKNOWN_RECORDING"; + case UNKNOWN_REPLAY: return "UNKNOWN_REPLAY"; + case MAX_REPLAYS: return "MAX_REPLAYS"; + case MAX_RECORDINGS: return "MAX_RECORDINGS"; + case INVALID_EXTENSION: return "INVALID_EXTENSION"; + case AUTHENTICATION_REJECTED: return "AUTHENTICATION_REJECTED"; + case STORAGE_SPACE: return "STORAGE_SPACE"; + case UNKNOWN_REPLICATION: return "UNKNOWN_REPLICATION"; + case UNAUTHORISED_ACTION: return "UNAUTHORISED_ACTION"; + case REPLICATION_CONNECTION_FAILURE: return "REPLICATION_CONNECTION_FAILURE"; + default: return "unknown error code: " + errorCode; + } + } } } \ No newline at end of file diff --git a/src/Adaptive.Archiver/FodyWeavers.xml b/src/Adaptive.Archiver/FodyWeavers.xml new file mode 100644 index 00000000..5415e271 --- /dev/null +++ b/src/Adaptive.Archiver/FodyWeavers.xml @@ -0,0 +1,5 @@ + + + + + diff --git a/src/Adaptive.Archiver/FodyWeavers.xsd b/src/Adaptive.Archiver/FodyWeavers.xsd new file mode 100644 index 00000000..42ece42f --- /dev/null +++ b/src/Adaptive.Archiver/FodyWeavers.xsd @@ -0,0 +1,27 @@ + + + + + + + + + + + + 'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed. + + + + + A comma-separated list of error codes that can be safely ignored in assembly verification. + + + + + 'false' to turn off automatic generation of the XML Schema file. + + + + + \ No newline at end of file diff --git a/src/Adaptive.Cluster.Tests/Adaptive.Cluster.Tests.csproj b/src/Adaptive.Cluster.Tests/Adaptive.Cluster.Tests.csproj new file mode 100644 index 00000000..d3dad117 --- /dev/null +++ b/src/Adaptive.Cluster.Tests/Adaptive.Cluster.Tests.csproj @@ -0,0 +1,21 @@ + + + + net8.0 + false + + + + + + + + + + + + + + + + diff --git a/src/Adaptive.Cluster.Tests/Client/AeronClusterAsyncConnectTest.cs b/src/Adaptive.Cluster.Tests/Client/AeronClusterAsyncConnectTest.cs new file mode 100644 index 00000000..f2b2ae1c --- /dev/null +++ b/src/Adaptive.Cluster.Tests/Client/AeronClusterAsyncConnectTest.cs @@ -0,0 +1,361 @@ +using System; +using Adaptive.Aeron; +using Adaptive.Aeron.LogBuffer; +using Adaptive.Aeron.Protocol; +using Adaptive.Aeron.Security; +using Adaptive.Agrona; +using Adaptive.Agrona.Concurrent; +using Adaptive.Cluster.Client; +using Adaptive.Cluster.Codecs; +using FakeItEasy; +using NUnit.Framework; +using AeronType = Adaptive.Aeron.Aeron; +using AsyncConnectState = Adaptive.Cluster.Client.AeronCluster.AsyncConnect.AsyncConnectState; + +namespace Adaptive.Cluster.Tests.Client +{ + public class AeronClusterAsyncConnectTest + { + private const long ONE_HOUR_IN_NANOS = 3_600_000_000_000L; + + private class TrackingContext : AeronCluster.Context + { + public int DisposeCount { get; private set; } + + public override void Dispose() + { + DisposeCount++; + base.Dispose(); + } + } + + private AeronType aeron; + private AeronType.Context aeronContext; + private TrackingContext context; + + [SetUp] + public void Before() + { + aeron = A.Fake(); + + aeronContext = new AeronType.Context().NanoClock(SystemNanoClock.INSTANCE); + + A.CallTo(() => aeron.Ctx).Returns(aeronContext); + + context = new TrackingContext(); + context + .AeronClient(aeron) + .OwnsAeronClient(false) + .EgressChannel("aeron:udp?endpoint=localhost:0") + .EgressStreamId(42) + .IngressChannel("aeron:udp?endpoint=replace-me:5555") + .IngressStreamId(-19) + .CredentialsSupplier(new NullCredentialsSupplier()) + .IdleStrategy(new NoOpIdleStrategy()); + } + + [Test] + public void InitialState() + { + var asyncConnect = new AeronCluster.AsyncConnect(context, 1L); + Assert.AreEqual(AsyncConnectState.CREATE_EGRESS_SUBSCRIPTION, asyncConnect.State()); + Assert.AreEqual((int)AsyncConnectState.CREATE_EGRESS_SUBSCRIPTION, asyncConnect.Step()); + } + + [Test] + public void ShouldCloseAsyncSubscription() + { + const long subscriptionId = 999L; + A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId())) + .Returns(subscriptionId); + A.CallTo(() => aeron.GetSubscription(subscriptionId)).Returns((Subscription)null); + + var asyncConnect = new AeronCluster.AsyncConnect( + context, aeronContext.NanoClock().NanoTime() + ONE_HOUR_IN_NANOS); + + Assert.IsNull(asyncConnect.Poll()); + Assert.AreEqual(AsyncConnectState.CREATE_EGRESS_SUBSCRIPTION, asyncConnect.State()); + + asyncConnect.Dispose(); + + A.CallTo(() => aeron.Ctx).MustHaveHappened().Then( + A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId())) + .MustHaveHappened()).Then( + A.CallTo(() => aeron.GetSubscription(subscriptionId)).MustHaveHappened()).Then( + A.CallTo(() => aeron.AsyncRemoveSubscription(subscriptionId)).MustHaveHappened()); + Assert.AreEqual(1, context.DisposeCount); + } + + [Test] + public void ShouldCloseEgressSubscription() + { + const long subscriptionId = -4343L; + A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId())) + .Returns(subscriptionId); + var subscription = A.Fake(); + A.CallTo(() => aeron.GetSubscription(subscriptionId)).Returns(subscription); + + var asyncConnect = new AeronCluster.AsyncConnect( + context, aeronContext.NanoClock().NanoTime() + ONE_HOUR_IN_NANOS); + + Assert.IsNull(asyncConnect.Poll()); + Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State()); + + asyncConnect.Dispose(); + A.CallTo(() => subscription.Dispose()).MustHaveHappened(); + Assert.AreEqual(1, context.DisposeCount); + A.CallTo(() => aeron.AsyncRemoveSubscription(subscriptionId)).MustNotHaveHappened(); + } + + [Test] + public void ShouldCloseAsyncPublication() + { + const long subscriptionId = 87L; + A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId())) + .Returns(subscriptionId); + var subscription = A.Fake(); + A.CallTo(() => aeron.GetSubscription(subscriptionId)).Returns(subscription); + + context.IsIngressExclusive(true); + const long publicationId = long.MaxValue; + A.CallTo(() => aeron.AsyncAddExclusivePublication(context.IngressChannel(), context.IngressStreamId())) + .Returns(publicationId); + A.CallTo(() => aeron.GetExclusivePublication(publicationId)).Returns((ExclusivePublication)null); + + var asyncConnect = new AeronCluster.AsyncConnect( + context, aeronContext.NanoClock().NanoTime() + ONE_HOUR_IN_NANOS); + + Assert.IsNull(asyncConnect.Poll()); + Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State()); + + Assert.IsNull(asyncConnect.Poll()); + Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State()); + + asyncConnect.Dispose(); + + A.CallTo(() => aeron.Ctx).MustHaveHappened().Then( + A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId())) + .MustHaveHappened()).Then( + A.CallTo(() => aeron.GetSubscription(subscriptionId)).MustHaveHappened()).Then( + A.CallTo(() => aeron.AsyncAddExclusivePublication(context.IngressChannel(), context.IngressStreamId())) + .MustHaveHappened()).Then( + A.CallTo(() => aeron.GetExclusivePublication(publicationId)).MustHaveHappened()).Then( + A.CallTo(() => aeron.AsyncRemovePublication(publicationId)).MustHaveHappened()).Then( + A.CallTo(() => subscription.Dispose()).MustHaveHappened()); + Assert.AreEqual(1, context.DisposeCount); + } + + [Test] + public void ShouldCloseIngressPublicationsOnMembers() + { + const long subscriptionId = 42L; + A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId())) + .Returns(subscriptionId); + var subscription = A.Fake(); + A.CallTo(() => aeron.GetSubscription(subscriptionId)).Returns(subscription); + + const int ingressStreamId = 878; + context + .IsIngressExclusive(true) + .IngressEndpoints("0=localhost:20000,1=localhost:20001,2=localhost:20002") + .IngressStreamId(ingressStreamId); + const long publicationId1 = -6342756432L; + var publication1 = A.Fake(); + A.CallTo(() => aeron.AsyncAddExclusivePublication("aeron:udp?endpoint=localhost:20000", ingressStreamId)) + .Returns(publicationId1); + A.CallTo(() => aeron.GetExclusivePublication(publicationId1)) + .ReturnsNextFromSequence((ExclusivePublication)null, publication1); + const long publicationId2 = AeronType.NULL_VALUE; + A.CallTo(() => aeron.AsyncAddExclusivePublication("aeron:udp?endpoint=localhost:20001", ingressStreamId)) + .Returns(publicationId2); + A.CallTo(() => aeron.GetExclusivePublication(publicationId2)).Returns((ExclusivePublication)null); + const long publicationId3 = 573495L; + A.CallTo(() => aeron.AsyncAddExclusivePublication("aeron:udp?endpoint=localhost:20002", ingressStreamId)) + .Returns(publicationId3); + A.CallTo(() => aeron.GetExclusivePublication(publicationId3)).Returns((ExclusivePublication)null); + + var asyncConnect = new AeronCluster.AsyncConnect( + context, aeronContext.NanoClock().NanoTime() + ONE_HOUR_IN_NANOS); + + Assert.IsNull(asyncConnect.Poll()); + Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State()); + + const int iterations = 10; + for (int i = 0; i < iterations; i++) + { + Assert.IsNull(asyncConnect.Poll()); + Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State()); + } + + A.CallTo(() => aeron.AsyncAddExclusivePublication("aeron:udp?endpoint=localhost:20000", ingressStreamId)) + .MustHaveHappenedANumberOfTimesMatching(n => n <= 1); + A.CallTo(() => aeron.AsyncAddExclusivePublication("aeron:udp?endpoint=localhost:20001", ingressStreamId)) + .MustHaveHappened(iterations, Times.Exactly); + A.CallTo(() => aeron.AsyncAddExclusivePublication("aeron:udp?endpoint=localhost:20002", ingressStreamId)) + .MustHaveHappenedANumberOfTimesMatching(n => n <= 1); + A.CallTo(() => aeron.GetExclusivePublication(publicationId1)) + .MustHaveHappened(2, Times.Exactly); + A.CallTo(() => aeron.GetExclusivePublication(publicationId2)) + .MustHaveHappened(iterations, Times.Exactly); + A.CallTo(() => aeron.GetExclusivePublication(publicationId3)) + .MustHaveHappened(iterations, Times.Exactly); + + asyncConnect.Dispose(); + + A.CallTo(() => subscription.Dispose()).MustHaveHappened(); + A.CallTo(subscription).MustHaveHappenedOnceExactly(); + A.CallTo(() => publication1.Dispose()).MustHaveHappened(); + Assert.AreEqual(1, context.DisposeCount); + A.CallTo(() => aeron.AsyncRemovePublication(publicationId3)) + .MustHaveHappenedANumberOfTimesMatching(n => n <= 1); + A.CallTo(() => aeron.AsyncRemoveSubscription(subscriptionId)).MustNotHaveHappened(); + A.CallTo(() => aeron.AsyncRemovePublication(publicationId1)).MustNotHaveHappened(); + A.CallTo(() => aeron.AsyncRemovePublication(publicationId2)).MustNotHaveHappened(); + } + + [Test] + public void ShouldCloseIngressPublication() + { + const long subscriptionId = 42L; + A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId())) + .Returns(subscriptionId); + var subscription = A.Fake(); + A.CallTo(() => aeron.GetSubscription(subscriptionId)).Returns(subscription); + + context.IsIngressExclusive(false); + const long publicationId = -6342756432L; + A.CallTo(() => aeron.AsyncAddPublication(context.IngressChannel(), context.IngressStreamId())) + .Returns(publicationId); + var publication = A.Fake(); + A.CallTo(() => aeron.GetPublication(publicationId)).Returns(publication); + + var asyncConnect = new AeronCluster.AsyncConnect( + context, aeronContext.NanoClock().NanoTime() + ONE_HOUR_IN_NANOS); + + Assert.IsNull(asyncConnect.Poll()); + Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State()); + + Assert.IsNull(asyncConnect.Poll()); + Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State()); + + Assert.IsNull(asyncConnect.Poll()); + Assert.AreEqual(AsyncConnectState.AWAIT_PUBLICATION_CONNECTED, asyncConnect.State()); + + asyncConnect.Dispose(); + A.CallTo(() => publication.Dispose()).MustHaveHappened(); + A.CallTo(() => subscription.Dispose()).MustHaveHappened(); + Assert.AreEqual(1, context.DisposeCount); + A.CallTo(() => aeron.AsyncRemovePublication(publicationId)).MustNotHaveHappened(); + A.CallTo(() => aeron.AsyncRemoveSubscription(subscriptionId)).MustNotHaveHappened(); + } + + [Test] + public void ShouldConnectViaIngressChannel() + { + const long subscriptionId = 42L; + long correlationIdSeq = 0; + A.CallTo(() => aeron.NextCorrelationId()).ReturnsLazily(() => ++correlationIdSeq); + A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId())) + .Returns(subscriptionId); + var subscription = A.Fake(); + A.CallTo(() => aeron.GetSubscription(subscriptionId)).Returns(subscription); + + context.IsIngressExclusive(false); + const long publicationId = -19L; + A.CallTo(() => aeron.AsyncAddPublication(context.IngressChannel(), context.IngressStreamId())) + .Returns(publicationId); + var publication = A.Fake(); + A.CallTo(() => aeron.GetPublication(publicationId)).Returns(publication); + + var asyncConnect = new AeronCluster.AsyncConnect( + context, aeronContext.NanoClock().NanoTime() + ONE_HOUR_IN_NANOS); + + Assert.IsNull(asyncConnect.Poll()); + Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State()); + + Assert.IsNull(asyncConnect.Poll()); + Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State()); + + Assert.IsNull(asyncConnect.Poll()); + Assert.AreEqual(AsyncConnectState.AWAIT_PUBLICATION_CONNECTED, asyncConnect.State()); + + const string responseChannel = "aeron:udp?endpoint=localhost:8888"; + A.CallTo(() => subscription.TryResolveChannelEndpointPort()).Returns(responseChannel); + A.CallTo(() => publication.IsConnected).Returns(true); + + Assert.IsNull(asyncConnect.Poll()); + Assert.AreEqual(AsyncConnectState.SEND_MESSAGE, asyncConnect.State()); + long sendMessageCorrelationId = correlationIdSeq; + + A.CallTo(() => publication.Offer(A._, 0, A._, A._)).Returns(8L); + + Assert.IsNull(asyncConnect.Poll()); + Assert.AreEqual(AsyncConnectState.POLL_RESPONSE, asyncConnect.State()); + + var responseBuffer = new UnsafeBuffer(new byte[256]); + var headerEncoder = new MessageHeaderEncoder(); + var sessionEventEncoder = new SessionEventEncoder(); + const long clusterSessionId = 888L; + const long leadershipTermId = 5L; + const int leaderMemberId = 2; + sessionEventEncoder + .WrapAndApplyHeader(responseBuffer, DataHeaderFlyweight.HEADER_LENGTH, headerEncoder) + .ClusterSessionId(clusterSessionId) + .CorrelationId(sendMessageCorrelationId) + .LeadershipTermId(leadershipTermId) + .LeaderMemberId(leaderMemberId) + .Code(EventCode.OK) + .Version(AeronCluster.Configuration.PROTOCOL_SEMANTIC_VERSION) + .LeaderHeartbeatTimeoutNs(SessionEventEncoder.LeaderHeartbeatTimeoutNsNullValue()) + .Detail("you are now connected"); + var egressImage = A.Fake(); + var header = new Header(1, 16, egressImage); + header.Buffer = responseBuffer; + header.Offset = 0; + var headerFlyweight = new DataHeaderFlyweight(); + headerFlyweight.Wrap(responseBuffer, 0, DataHeaderFlyweight.HEADER_LENGTH); + headerFlyweight.Flags(DataHeaderFlyweight.BEGIN_AND_END_FLAGS); + A.CallTo(() => subscription.ControlledPoll(A._, A._)) + .ReturnsLazily(call => + { + var assembler = call.GetArgument(0); + assembler.OnFragment( + responseBuffer, + DataHeaderFlyweight.HEADER_LENGTH, + sessionEventEncoder.EncodedLength(), + header); + return 1; + }); + + Assert.IsNull(asyncConnect.Poll()); + Assert.AreEqual(AsyncConnectState.CONCLUDE_CONNECT, asyncConnect.State()); + + var aeronCluster = asyncConnect.Poll(); + Assert.IsNotNull(aeronCluster); + Assert.AreEqual(leadershipTermId, aeronCluster.LeadershipTermId); + Assert.AreEqual(leaderMemberId, aeronCluster.LeaderMemberId); + Assert.AreEqual(clusterSessionId, aeronCluster.ClusterSessionId); + Assert.AreEqual( + 2 * AeronCluster.Configuration.LEADER_HEARTBEAT_TIMEOUT_DEFAULT_NS, + context.NewLeaderTimeoutNs()); + + asyncConnect.Dispose(); + A.CallTo(() => publication.Dispose()).MustNotHaveHappened(); + A.CallTo(() => subscription.Dispose()).MustNotHaveHappened(); + + A.CallTo(() => publication.TryClaim(A._, A._)).ReturnsLazily(call => + { + int length = call.GetArgument(0); + var bufferClaim = call.GetArgument(1); + bufferClaim.Wrap(responseBuffer, 0, BitUtil.Align(DataHeaderFlyweight.HEADER_LENGTH + length, DataHeaderFlyweight.HEADER_LENGTH)); + return 42L; + }); + aeronCluster.Dispose(); + Assert.IsTrue(aeronCluster.Closed); + A.CallTo(() => publication.TryClaim(A._, A._)).MustHaveHappened().Then( + A.CallTo(() => subscription.Dispose()).MustHaveHappened()).Then( + A.CallTo(() => publication.Dispose()).MustHaveHappened()); + Assert.AreEqual(1, context.DisposeCount); + } + } +} diff --git a/src/Adaptive.Cluster.Tests/Client/AeronClusterContextTest.cs b/src/Adaptive.Cluster.Tests/Client/AeronClusterContextTest.cs new file mode 100644 index 00000000..21477ef4 --- /dev/null +++ b/src/Adaptive.Cluster.Tests/Client/AeronClusterContextTest.cs @@ -0,0 +1,101 @@ +using Adaptive.Aeron; +using Adaptive.Aeron.Exceptions; +using Adaptive.Cluster.Client; +using FakeItEasy; +using NUnit.Framework; +using AeronType = Adaptive.Aeron.Aeron; + +namespace Adaptive.Cluster.Tests.Client +{ + public class AeronClusterContextTest + { + private AeronType aeron; + private AeronCluster.Context context; + + [SetUp] + public void Before() + { + aeron = A.Fake(); + + context = new AeronCluster.Context() + .AeronClient(aeron) + .IngressChannel("aeron:udp") + .EgressChannel("aeron:udp?endpoint=localhost:0"); + } + + [TestCase(null)] + [TestCase("")] + public void ConcludeThrowsConfigurationExceptionIfIngressChannelIsNotSet(string ingressChannel) + { + context.IngressChannel(ingressChannel); + + var exception = Assert.Throws(() => context.Conclude()); + Assert.AreEqual("ingressChannel must be specified", exception.Message); + } + + [Test] + public void ConcludeThrowsConfigurationExceptionIfIngressChannelIsSetToIpcAndIngressEndpointsSpecified() + { + context + .IngressChannel("aeron:ipc") + .IngressEndpoints("0,localhost:1234"); + + var exception = Assert.Throws(() => context.Conclude()); + Assert.AreEqual( + "AeronCluster.Context ingressEndpoints must be null when using IPC ingress", + exception.Message); + } + + [TestCase(null)] + [TestCase("")] + public void ConcludeThrowsConfigurationExceptionIfEgressChannelIsNotSet(string egressChannel) + { + context.EgressChannel(egressChannel); + + var exception = Assert.Throws(() => context.Conclude()); + Assert.AreEqual("egressChannel must be specified", exception.Message); + } + + [TestCase(null)] + [TestCase("")] + public void ClientNameShouldHandleEmptyValue(string clientName) + { + context.ClientName(clientName); + Assert.AreEqual("", context.ClientName()); + } + + [TestCase("test")] + [TestCase("Some other name")] + public void ClientNameShouldReturnAssignedValue(string clientName) + { + context.ClientName(clientName); + Assert.AreEqual(clientName, context.ClientName()); + } + + [TestCase("some")] + [TestCase("42")] + public void ClientNameCanBeSetViaSystemProperty(string clientName) + { + Config.Params[AeronCluster.Configuration.CLIENT_NAME_PROP_NAME] = clientName; + try + { + Assert.AreEqual(clientName, new AeronCluster.Context().ClientName()); + } + finally + { + Config.Params.Remove(AeronCluster.Configuration.CLIENT_NAME_PROP_NAME); + } + } + + [Test] + public void ClientNameMustNotExceedMaxLength() + { + context.ClientName("test" + new string('x', AeronType.Configuration.MAX_CLIENT_NAME_LENGTH)); + + var exception = Assert.Throws(() => context.Conclude()); + Assert.AreEqual( + "AeronCluster.Context.clientName length must be <= " + AeronType.Configuration.MAX_CLIENT_NAME_LENGTH, + exception.Message); + } + } +} diff --git a/src/Adaptive.Cluster.Tests/Client/AeronClusterTest.cs b/src/Adaptive.Cluster.Tests/Client/AeronClusterTest.cs new file mode 100644 index 00000000..00c10322 --- /dev/null +++ b/src/Adaptive.Cluster.Tests/Client/AeronClusterTest.cs @@ -0,0 +1,293 @@ +using System; +using System.Collections.Generic; +using Adaptive.Aeron; +using Adaptive.Aeron.LogBuffer; +using Adaptive.Aeron.Protocol; +using Adaptive.Agrona; +using Adaptive.Agrona.Collections; +using Adaptive.Agrona.Concurrent; +using Adaptive.Cluster.Client; +using Adaptive.Cluster.Codecs; +using FakeItEasy; +using NUnit.Framework; +using AeronType = Adaptive.Aeron.Aeron; + +namespace Adaptive.Cluster.Tests.Client +{ + public class AeronClusterTest + { + private const string INGRESS_ENDPOINTS = "foo:1000,bar:1000,baz:1000"; + private const int CLUSTER_SESSION_ID = 123; + + private UnsafeBuffer buffer; + private UnsafeBuffer appMessage; + private IEgressListener egressListener; + private AeronType aeron; + private AeronType.Context aeronContext; + private AeronCluster.Context context; + private ExclusivePublication ingressPublication; + private Subscription egressSubscription; + private Image egressImage; + private AeronCluster aeronCluster; + private long nanoTime; + private int leadershipTermId; + private int leaderMemberId; + private bool newLeaderEventPending; + + [SetUp] + public void SetUp() + { + nanoTime = 0; + leadershipTermId = 2; + leaderMemberId = 1; + newLeaderEventPending = false; + + buffer = new UnsafeBuffer(new byte[1024]); + appMessage = new UnsafeBuffer(new byte[8]); + egressListener = A.Fake(); + aeron = A.Fake(); + + var nanoClock = A.Fake(); + A.CallTo(() => nanoClock.NanoTime()).ReturnsLazily(() => nanoTime); + + aeronContext = new AeronType.Context(); + aeronContext.NanoClock(nanoClock); + aeronContext.SubscriberErrorHandler(RethrowingErrorHandler.INSTANCE); + + A.CallTo(() => aeron.Ctx).Returns(aeronContext); + + context = new AeronCluster.Context() + .AeronClient(aeron) + .OwnsAeronClient(false) + .EgressChannel("aeron:udp?endpoint=localhost:0") + .IngressChannel("aeron:udp") + .IdleStrategy(new NoOpIdleStrategy()) + .EgressListener(egressListener) + .NewLeaderTimeoutNs(TimeSpan.FromSeconds(1).Ticks * 100); // 1 second in nanos + + ingressPublication = A.Fake(); + egressSubscription = A.Fake(); + egressImage = A.Fake(); + + const long ingressPublicationRegistrationId = 42L; + A.CallTo(() => ingressPublication.RegistrationId).Returns(ingressPublicationRegistrationId); + A.CallTo(() => aeron.AsyncAddExclusivePublication(context.IngressChannel(), context.IngressStreamId())) + .Returns(ingressPublicationRegistrationId); + A.CallTo(() => aeron.GetExclusivePublication(ingressPublicationRegistrationId)) + .Returns(ingressPublication); + + context.Conclude(); + + A.CallTo(() => egressSubscription.Poll(A._, A._)).ReturnsLazily(call => + { + if (newLeaderEventPending) + { + newLeaderEventPending = false; + + int offset = DataHeaderFlyweight.HEADER_LENGTH; + FrameDescriptor.FrameFlags(buffer, 0, FrameDescriptor.UNFRAGMENTED); + + var newLeaderEventEncoder = new NewLeaderEventEncoder(); + newLeaderEventEncoder.WrapAndApplyHeader(buffer, offset, new MessageHeaderEncoder()); + newLeaderEventEncoder.ClusterSessionId(CLUSTER_SESSION_ID); + newLeaderEventEncoder.LeadershipTermId(++leadershipTermId); + newLeaderEventEncoder.LeaderMemberId(++leaderMemberId); + newLeaderEventEncoder.IngressEndpoints(INGRESS_ENDPOINTS); + + int length = MessageHeaderEncoder.ENCODED_LENGTH + newLeaderEventEncoder.EncodedLength(); + + var header = new Header(0, 0, egressImage); + header.Buffer = buffer; + + var handler = call.GetArgument(0); + handler.OnFragment(buffer, offset, length, header); + + return 1; + } + + return 0; + }); + + aeronCluster = new AeronCluster( + context, + new MessageHeaderEncoder(), + ingressPublication, + egressSubscription, + egressImage, + new Map(), + CLUSTER_SESSION_ID, + leadershipTermId, + leaderMemberId); + } + + [TestCase(false)] + [TestCase(true)] + public void ShouldCloseItselfWhenDisconnectedForLongerThanNewLeaderTimeout(bool withAppMessages) + { + MakeIngressPublicationReturn(Publication.NOT_CONNECTED); + if (withAppMessages) + { + Assert.AreEqual(Publication.NOT_CONNECTED, aeronCluster.Offer(appMessage, 0, 8)); + } + else + { + Assert.IsFalse(aeronCluster.SendKeepAlive()); + } + + nanoTime += context.NewLeaderTimeoutNs() - 1; + + Assert.AreEqual(0, aeronCluster.PollEgress()); + Assert.IsFalse(aeronCluster.Closed); + + nanoTime += 1; + + Assert.AreEqual(1, aeronCluster.PollEgress()); + Assert.IsTrue(aeronCluster.Closed); + } + + [Test] + public void ShouldCloseItselfAfterReachingMaxPositionOnTheIngressPublication() + { + MakeIngressPublicationReturn(Publication.MAX_POSITION_EXCEEDED); + Assert.AreEqual(Publication.MAX_POSITION_EXCEEDED, aeronCluster.Offer(appMessage, 0, 8)); + A.CallTo(() => ingressPublication.Dispose()).MustHaveHappened(); + Assert.AreEqual(1, aeronCluster.PollStateChanges()); + Assert.IsTrue(aeronCluster.Closed); + } + + public static IEnumerable ShouldStayConnectedAfterSuccessfulFailoverCases() + { + yield return new TestCaseData(false, false); + yield return new TestCaseData(false, true); + yield return new TestCaseData(true, false); + yield return new TestCaseData(true, true); + } + + [TestCaseSource(nameof(ShouldStayConnectedAfterSuccessfulFailoverCases))] + public void ShouldStayConnectedAfterSuccessfulFailover(bool withIngressDisconnect, bool withAppMessages) + { + long initialResult = withIngressDisconnect ? Publication.NOT_CONNECTED : 128L; + MakeIngressPublicationReturn(initialResult); + if (withAppMessages) + { + Assert.AreEqual(initialResult, aeronCluster.Offer(appMessage, 0, 8)); + } + else + { + Assert.AreEqual(!withIngressDisconnect, aeronCluster.SendKeepAlive()); + } + + nanoTime += context.NewLeaderTimeoutNs() - 1; + + MakeEgressSubscriptionDeliverNewLeaderEvent(); + Assert.AreEqual(1, aeronCluster.PollEgress()); + A.CallTo(() => egressListener.OnNewLeader(CLUSTER_SESSION_ID, leadershipTermId, leaderMemberId, INGRESS_ENDPOINTS)) + .MustHaveHappened(); + Assert.AreEqual(0, aeronCluster.PollEgress()); + + nanoTime += context.MessageTimeoutNs() - 1; + + MakeIngressPublicationReturn(256L); + if (withAppMessages) + { + Assert.AreEqual(256L, aeronCluster.Offer(appMessage, 0, 8)); + } + else + { + Assert.IsTrue(aeronCluster.SendKeepAlive()); + } + + nanoTime += 1; + + Assert.AreEqual(0, aeronCluster.PollEgress()); + Assert.IsFalse(aeronCluster.Closed); + } + + [TestCase(false)] + [TestCase(true)] + public void ShouldCloseItselfWhenUnableToSendMessageForLongerThanNewLeaderConnectionTimeout(bool withAppMessages) + { + MakeIngressPublicationReturn(Publication.NOT_CONNECTED); + if (withAppMessages) + { + Assert.AreEqual(Publication.NOT_CONNECTED, aeronCluster.Offer(appMessage, 0, 8)); + } + else + { + Assert.IsFalse(aeronCluster.SendKeepAlive()); + } + + nanoTime += context.NewLeaderTimeoutNs() / 2; + + MakeEgressSubscriptionDeliverNewLeaderEvent(); + + Assert.AreEqual(1, aeronCluster.PollEgress()); + Assert.IsFalse(aeronCluster.Closed); + + nanoTime += context.MessageTimeoutNs() - 1; + if (withAppMessages) + { + Assert.AreEqual(Publication.NOT_CONNECTED, aeronCluster.Offer(appMessage, 0, 8)); + } + else + { + Assert.IsFalse(aeronCluster.SendKeepAlive()); + } + + nanoTime += 1; + + Assert.AreEqual(1, aeronCluster.PollEgress()); + Assert.IsTrue(aeronCluster.Closed); + } + + [Test] + public void ShouldCloseIngressPublicationWhenEgressImageCloses() + { + // in CONNECTED state + A.CallTo(() => egressImage.Closed).Returns(true); + Assert.AreEqual(1, aeronCluster.PollEgress()); + A.CallTo(() => ingressPublication.Dispose()).MustHaveHappened(); + + A.CallTo(() => egressImage.Closed).Returns(false); + MakeEgressSubscriptionDeliverNewLeaderEvent(); + Assert.AreEqual(1, aeronCluster.PollEgress()); + A.CallTo(() => ingressPublication.Dispose()).MustHaveHappenedTwiceOrMore(); + + // and in AWAIT_NEW_LEADER_CONNECTION state too + A.CallTo(() => egressImage.Closed).Returns(true); + Assert.AreEqual(1, aeronCluster.PollEgress()); + A.CallTo(() => ingressPublication.Dispose()) + .MustHaveHappened(3, Times.OrMore); + } + + private void MakeIngressPublicationReturn(long result) + { + if (result > 0) + { + A.CallTo(() => ingressPublication.TryClaim(A._, A._)) + .ReturnsLazily(call => + { + int length = call.GetArgument(0); + length = BitUtil.Align(DataHeaderFlyweight.HEADER_LENGTH + length, FrameDescriptor.FRAME_ALIGNMENT); + var bufferClaim = call.GetArgument(1); + bufferClaim.Wrap(buffer, 0, length); + return result; + }); + } + else + { + A.CallTo(() => ingressPublication.TryClaim(A._, A._)).Returns(result); + } + + A.CallTo(() => ingressPublication.Offer( + A._, A._, A._, + A._, A._, A._, + A._)).Returns(result); + } + + private void MakeEgressSubscriptionDeliverNewLeaderEvent() + { + newLeaderEventPending = true; + } + } +} diff --git a/src/Adaptive.Cluster.Tests/Client/EgressAdapterTest.cs b/src/Adaptive.Cluster.Tests/Client/EgressAdapterTest.cs new file mode 100644 index 00000000..7b7a862c --- /dev/null +++ b/src/Adaptive.Cluster.Tests/Client/EgressAdapterTest.cs @@ -0,0 +1,303 @@ +using Adaptive.Aeron; +using Adaptive.Aeron.LogBuffer; +using Adaptive.Agrona; +using Adaptive.Agrona.Concurrent; +using Adaptive.Cluster.Client; +using Adaptive.Cluster.Codecs; +using FakeItEasy; +using NUnit.Framework; + +namespace Adaptive.Cluster.Tests.Client +{ + public class EgressAdapterTest + { + private UnsafeBuffer buffer; + private MessageHeaderEncoder messageHeaderEncoder; + private SessionMessageHeaderEncoder sessionMessageHeaderEncoder; + private SessionEventEncoder sessionEventEncoder; + private NewLeaderEventEncoder newLeaderEventEncoder; + private AdminResponseEncoder adminResponseEncoder; + + [SetUp] + public void SetUp() + { + buffer = new UnsafeBuffer(new byte[512]); + messageHeaderEncoder = new MessageHeaderEncoder(); + sessionMessageHeaderEncoder = new SessionMessageHeaderEncoder(); + sessionEventEncoder = new SessionEventEncoder(); + newLeaderEventEncoder = new NewLeaderEventEncoder(); + adminResponseEncoder = new AdminResponseEncoder(); + } + + [Test] + public void OnFragmentShouldDelegateToEgressListenerOnUnknownSchemaId() + { + const ushort schemaId = 17; + const ushort templateId = 19; + messageHeaderEncoder + .Wrap(buffer, 0) + .SchemaId(schemaId) + .TemplateId(templateId); + + var listenerExtension = A.Fake(); + var header = new Header(0, 0); + var adapter = new EgressAdapter( + A.Fake(), listenerExtension, 0, A.Fake(), 3); + + adapter.OnFragment(buffer, 0, MessageHeaderDecoder.ENCODED_LENGTH * 2, header); + + A.CallTo(() => listenerExtension.OnExtensionMessage( + A._, + templateId, + schemaId, + 0, + buffer, + MessageHeaderDecoder.ENCODED_LENGTH, + MessageHeaderDecoder.ENCODED_LENGTH)) + .MustHaveHappenedOnceExactly(); + A.CallTo(listenerExtension).MustHaveHappenedOnceExactly(); + } + + [Test] + public void DefaultEgressListenerBehaviourShouldThrowClusterExceptionOnUnknownSchemaId() + { + var listener = A.Fake(); + var adapter = new EgressAdapter(listener, 42, A.Fake(), 5); + var exception = Assert.Throws( + () => adapter.OnFragment(buffer, 0, 64, new Header(0, 0))); + Assert.AreEqual( + "expected schemaId=" + MessageHeaderDecoder.SCHEMA_ID + ", actual=0", + exception.Message); + } + + [Test] + public void OnFragmentShouldInvokeOnMessageCallbackIfSessionIdMatches() + { + const int offset = 4; + const long sessionId = 2973438724L; + const long timestamp = -46328746238764832L; + sessionMessageHeaderEncoder + .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder) + .ClusterSessionId(sessionId) + .Timestamp(timestamp); + + var egressListener = A.Fake(); + var header = new Header(0, 0); + var adapter = new EgressAdapter(egressListener, sessionId, A.Fake(), 3); + + adapter.OnFragment(buffer, offset, sessionMessageHeaderEncoder.EncodedLength(), header); + + A.CallTo(() => egressListener.OnMessage( + sessionId, + timestamp, + buffer, + offset + AeronCluster.SESSION_HEADER_LENGTH, + sessionMessageHeaderEncoder.EncodedLength() - AeronCluster.SESSION_HEADER_LENGTH, + header)) + .MustHaveHappenedOnceExactly(); + A.CallTo(egressListener).MustHaveHappenedOnceExactly(); + } + + [Test] + public void OnFragmentIsANoOpIfSessionIdDoesNotMatchOnSessionMessage() + { + const int offset = 18; + const long sessionId = 21; + const long timestamp = 1000; + sessionMessageHeaderEncoder + .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder) + .ClusterSessionId(sessionId) + .Timestamp(timestamp); + + var egressListener = A.Fake(); + var header = new Header(0, 0); + var adapter = new EgressAdapter(egressListener, -19, A.Fake(), 3); + + adapter.OnFragment(buffer, offset, sessionMessageHeaderEncoder.EncodedLength(), header); + + A.CallTo(egressListener).MustNotHaveHappened(); + } + + [Test] + public void OnFragmentShouldInvokeOnSessionEventCallbackIfSessionIdMatches() + { + const int offset = 8; + const long clusterSessionId = 42; + const long correlationId = 777; + const long leadershipTermId = 6; + const int leaderMemberId = 3; + const EventCode eventCode = EventCode.REDIRECT; + const int version = 18; + const string eventDetail = "Event details"; + sessionEventEncoder + .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder) + .ClusterSessionId(clusterSessionId) + .CorrelationId(correlationId) + .LeadershipTermId(leadershipTermId) + .LeaderMemberId(leaderMemberId) + .Code(eventCode) + .Version(version) + .Detail(eventDetail); + + var egressListener = A.Fake(); + var header = new Header(1, 3); + var adapter = new EgressAdapter(egressListener, clusterSessionId, A.Fake(), 10); + + adapter.OnFragment(buffer, offset, sessionEventEncoder.EncodedLength(), header); + + A.CallTo(() => egressListener.OnSessionEvent( + correlationId, clusterSessionId, leadershipTermId, leaderMemberId, eventCode, eventDetail)) + .MustHaveHappenedOnceExactly(); + A.CallTo(egressListener).MustHaveHappenedOnceExactly(); + } + + [Test] + public void OnFragmentIsANoOpIfSessionIdDoesNotMatchOnSessionEvent() + { + const int offset = 8; + const long clusterSessionId = 42; + const long correlationId = 777; + const long leadershipTermId = 6; + const int leaderMemberId = 3; + const EventCode eventCode = EventCode.REDIRECT; + const int version = 18; + const string eventDetail = "Event details"; + sessionEventEncoder + .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder) + .ClusterSessionId(clusterSessionId) + .CorrelationId(correlationId) + .LeadershipTermId(leadershipTermId) + .LeaderMemberId(leaderMemberId) + .Code(eventCode) + .Version(version) + .Detail(eventDetail); + + var egressListener = A.Fake(); + var header = new Header(0, 0); + var adapter = new EgressAdapter( + egressListener, clusterSessionId + 1, A.Fake(), 3); + + adapter.OnFragment(buffer, offset, sessionEventEncoder.EncodedLength(), header); + + A.CallTo(egressListener).MustNotHaveHappened(); + } + + [Test] + public void OnFragmentShouldInvokeOnNewLeaderCallbackIfSessionIdMatches() + { + const int offset = 0; + const long clusterSessionId = 0; + const long leadershipTermId = 6; + const int leaderMemberId = 9999; + const string ingressEndpoints = "ingress endpoints ..."; + newLeaderEventEncoder + .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder) + .LeadershipTermId(leadershipTermId) + .ClusterSessionId(clusterSessionId) + .LeaderMemberId(leaderMemberId) + .IngressEndpoints(ingressEndpoints); + + var egressListener = A.Fake(); + var header = new Header(1, 3); + var adapter = new EgressAdapter(egressListener, clusterSessionId, A.Fake(), 10); + + adapter.OnFragment(buffer, offset, newLeaderEventEncoder.EncodedLength(), header); + + A.CallTo(() => egressListener.OnNewLeader( + clusterSessionId, leadershipTermId, leaderMemberId, ingressEndpoints)) + .MustHaveHappenedOnceExactly(); + A.CallTo(egressListener).MustHaveHappenedOnceExactly(); + } + + [Test] + public void OnFragmentIsANoOpIfSessionIdDoesNotMatchOnNewLeader() + { + const int offset = 0; + const long clusterSessionId = -100; + const long leadershipTermId = 6; + const int leaderMemberId = 9999; + const string ingressEndpoints = "ingress endpoints ..."; + newLeaderEventEncoder + .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder) + .LeadershipTermId(leadershipTermId) + .ClusterSessionId(clusterSessionId) + .LeaderMemberId(leaderMemberId) + .IngressEndpoints(ingressEndpoints); + + var egressListener = A.Fake(); + var header = new Header(1, 3); + var adapter = new EgressAdapter(egressListener, 0, A.Fake(), 10); + + adapter.OnFragment(buffer, offset, newLeaderEventEncoder.EncodedLength(), header); + + A.CallTo(egressListener).MustNotHaveHappened(); + } + + [Test] + public void OnFragmentShouldInvokeOnAdminResponseCallbackIfSessionIdMatches() + { + const int offset = 24; + const long clusterSessionId = 18; + const long correlationId = 3274239749237498239L; + const AdminRequestType type = AdminRequestType.SNAPSHOT; + const AdminResponseCode responseCode = AdminResponseCode.UNAUTHORISED_ACCESS; + const string message = "Unauthorised access detected!"; + byte[] payload = { 0x1, 0x2, 0x3 }; + adminResponseEncoder + .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder) + .ClusterSessionId(clusterSessionId) + .CorrelationId(correlationId) + .RequestType(type) + .ResponseCode(responseCode) + .Message(message); + adminResponseEncoder.PutPayload(payload, 0, payload.Length); + + var egressListener = A.Fake(); + var header = new Header(1, 3); + var adapter = new EgressAdapter(egressListener, clusterSessionId, A.Fake(), 10); + + adapter.OnFragment(buffer, offset, adminResponseEncoder.EncodedLength(), header); + + A.CallTo(() => egressListener.OnAdminResponse( + clusterSessionId, + correlationId, + type, + responseCode, + message, + buffer, + offset + MessageHeaderEncoder.ENCODED_LENGTH + adminResponseEncoder.EncodedLength() - payload.Length, + payload.Length)) + .MustHaveHappenedOnceExactly(); + A.CallTo(egressListener).MustHaveHappenedOnceExactly(); + } + + [Test] + public void OnFragmentIsANoOpIfSessionIdDoesNotMatchOnAdminResponse() + { + const int offset = 24; + const long clusterSessionId = 18; + const long correlationId = 3274239749237498239L; + const AdminRequestType type = AdminRequestType.SNAPSHOT; + const AdminResponseCode responseCode = AdminResponseCode.OK; + const string message = "Unauthorised access detected!"; + byte[] payload = { 0x1, 0x2, 0x3 }; + adminResponseEncoder + .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder) + .ClusterSessionId(clusterSessionId) + .CorrelationId(correlationId) + .RequestType(type) + .ResponseCode(responseCode) + .Message(message); + adminResponseEncoder.PutPayload(payload, 0, payload.Length); + + var egressListener = A.Fake(); + var header = new Header(1, 3); + var adapter = new EgressAdapter( + egressListener, -clusterSessionId, A.Fake(), 10); + + adapter.OnFragment(buffer, offset, adminResponseEncoder.EncodedLength(), header); + + A.CallTo(egressListener).MustNotHaveHappened(); + } + } +} diff --git a/src/Adaptive.Cluster.Tests/Client/EgressPollerTest.cs b/src/Adaptive.Cluster.Tests/Client/EgressPollerTest.cs new file mode 100644 index 00000000..4ab0b0f4 --- /dev/null +++ b/src/Adaptive.Cluster.Tests/Client/EgressPollerTest.cs @@ -0,0 +1,82 @@ +using Adaptive.Aeron; +using Adaptive.Aeron.LogBuffer; +using Adaptive.Agrona.Concurrent; +using Adaptive.Archiver.Codecs; +using Adaptive.Cluster.Client; +using Adaptive.Cluster.Codecs; +using FakeItEasy; +using NUnit.Framework; +using ArchiveMessageHeaderEncoder = Adaptive.Archiver.Codecs.MessageHeaderEncoder; +using ClusterMessageHeaderEncoder = Adaptive.Cluster.Codecs.MessageHeaderEncoder; + +namespace Adaptive.Cluster.Tests.Client +{ + public class EgressPollerTest + { + private readonly UnsafeBuffer buffer = new UnsafeBuffer(new byte[1024]); + private readonly Header header = new Header(42, 16); + private readonly Subscription subscription = A.Fake(); + private EgressPoller egressPoller; + + [SetUp] + public void SetUp() + { + egressPoller = new EgressPoller(subscription, 10); + } + + [Test] + public void ShouldIgnoreUnknownMessageSchema() + { + const int offset = 64; + var controlResponseEncoder = new ControlResponseEncoder(); + var messageHeaderEncoder = new ArchiveMessageHeaderEncoder(); + controlResponseEncoder + .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder) + .CorrelationId(42) + .Code(ControlResponseCode.ERROR) + .ErrorMessage("test"); + + Assert.AreEqual( + ControlledFragmentHandlerAction.CONTINUE, + egressPoller.OnFragment( + buffer, + offset, + messageHeaderEncoder.EncodedLength() + controlResponseEncoder.EncodedLength(), + header)); + Assert.IsFalse(egressPoller.IsPollComplete()); + } + + [Test] + public void ShouldHandleSessionMessage() + { + const int offset = 16; + var encoder = new SessionMessageHeaderEncoder(); + var messageHeaderEncoder = new ClusterMessageHeaderEncoder(); + const long clusterSessionId = 7777L; + const long leadershipTermId = 5L; + encoder + .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder) + .ClusterSessionId(clusterSessionId) + .LeadershipTermId(leadershipTermId); + + Assert.AreEqual( + ControlledFragmentHandlerAction.BREAK, + egressPoller.OnFragment( + buffer, + offset, + messageHeaderEncoder.EncodedLength() + encoder.EncodedLength(), + header)); + Assert.IsTrue(egressPoller.IsPollComplete()); + Assert.AreEqual(clusterSessionId, egressPoller.ClusterSessionId()); + Assert.AreEqual(leadershipTermId, egressPoller.LeadershipTermId()); + + Assert.AreEqual( + ControlledFragmentHandlerAction.ABORT, + egressPoller.OnFragment( + buffer, + offset, + messageHeaderEncoder.EncodedLength() + encoder.EncodedLength(), + header)); + } + } +} diff --git a/src/Adaptive.Cluster/Adaptive.Cluster.csproj b/src/Adaptive.Cluster/Adaptive.Cluster.csproj index f2a3a1fa..a3f686fd 100644 --- a/src/Adaptive.Cluster/Adaptive.Cluster.csproj +++ b/src/Adaptive.Cluster/Adaptive.Cluster.csproj @@ -21,4 +21,24 @@ + + + all + runtime; build; native; contentfiles; analyzers + + + all + + + all + false + + + + + + + + + \ No newline at end of file diff --git a/src/Adaptive.Cluster/Client/AeronCluster.cs b/src/Adaptive.Cluster/Client/AeronCluster.cs index 5b06ccdd..e07abf60 100644 --- a/src/Adaptive.Cluster/Client/AeronCluster.cs +++ b/src/Adaptive.Cluster/Client/AeronCluster.cs @@ -956,7 +956,7 @@ public void OnNewLeader(long clusterSessionId, long leadershipTermId, int leader } else { - _publication = AddIngressPublication(_ctx, _ctx.IngressChannel(), _ctx.IngressStreamId()); + _publication = AddNewLeaderIngressPublication(_ctx, _ctx.IngressChannel(), _ctx.IngressStreamId()); } _fragmentAssembler.Clear(); @@ -1029,6 +1029,27 @@ internal static Publication AddIngressPublication(Context ctx, string channel, i } } + private Publication AddNewLeaderIngressPublication(Context ctx, string channel, int streamId) + { + long registrationId = AsyncAddIngressPublication(ctx, channel, streamId); + long deadlineNs = nanoClock.NanoTime() + ctx.MessageTimeoutNs(); + do + { + Publication publication = GetIngressPublication(ctx, registrationId); + if (null != publication) + { + return publication; + } + _idleStrategy.Idle(ctx.RunAgentInvokers()); + } + while (nanoClock.NanoTime() < deadlineNs); + + throw new AeronTimeoutException( + "failed to add new leader ingress publication (leaderMemberId=" + _leaderMemberId + + ", leadershipTermId=" + _leadershipTermId + ", channel=" + channel + ", streamId=" + streamId + + ") within " + ctx.MessageTimeoutNs() + "ns"); + } + internal static long AsyncAddIngressPublication(Context ctx, string channel, int streamId) { if (ctx.IsIngressExclusive()) @@ -1969,6 +1990,27 @@ public AgentInvoker AgentInvoker() return agentInvoker; } + internal int RunAgentInvokers() + { + int workDone = 0; + + if (null != _aeron) + { + AgentInvoker conductorAgentInvoker = _aeron.ConductorAgentInvoker; + if (null != conductorAgentInvoker) + { + workDone += conductorAgentInvoker.Invoke(); + } + } + + if (null != agentInvoker) + { + workDone += agentInvoker.Invoke(); + } + + return workDone; + } + /// /// Close the context and free applicable resources. /// @@ -2257,18 +2299,25 @@ private void CreateIngressPublications() { if (null == ctx.IngressEndpoints()) { - if (NULL_VALUE == ingressRegistrationId) - { - ingressRegistrationId = - AsyncAddIngressPublication(ctx, ctx.IngressChannel(), ctx.IngressStreamId()); - } - if (null == ingressPublication) { - ingressPublication = GetIngressPublication(ctx, ingressRegistrationId); - } + if (NULL_VALUE == ingressRegistrationId) + { + ingressRegistrationId = + AsyncAddIngressPublication(ctx, ctx.IngressChannel(), ctx.IngressStreamId()); + } - if (null != ingressPublication) + try + { + ingressPublication = GetIngressPublication(ctx, ingressRegistrationId); + } + catch (RegistrationException) + { + ingressRegistrationId = NULL_VALUE; + throw; + } + } + else { ingressRegistrationId = NULL_VALUE; State(AWAIT_PUBLICATION_CONNECTED); diff --git a/src/Adaptive.Cluster/Client/EgressAdapter.cs b/src/Adaptive.Cluster/Client/EgressAdapter.cs index e751fc81..779bb524 100644 --- a/src/Adaptive.Cluster/Client/EgressAdapter.cs +++ b/src/Adaptive.Cluster/Client/EgressAdapter.cs @@ -158,7 +158,7 @@ public void OnFragment(IDirectBuffer buffer, int offset, int length, Header head { _listener.OnNewLeader( sessionId, - _sessionEventDecoder.LeadershipTermId(), + _newLeaderEventDecoder.LeadershipTermId(), _newLeaderEventDecoder.LeaderMemberId(), _newLeaderEventDecoder.IngressEndpoints()); } diff --git a/src/Adaptive.Cluster/Client/EgressPoller.cs b/src/Adaptive.Cluster/Client/EgressPoller.cs index 35981df6..e547c941 100644 --- a/src/Adaptive.Cluster/Client/EgressPoller.cs +++ b/src/Adaptive.Cluster/Client/EgressPoller.cs @@ -211,8 +211,7 @@ public ControlledFragmentHandlerAction OnFragment(IDirectBuffer buffer, int offs int schemaId = messageHeaderDecoder.SchemaId(); if (schemaId != MessageHeaderDecoder.SCHEMA_ID) { - throw new ClusterException("expected schemaId=" + MessageHeaderDecoder.SCHEMA_ID + ", actual=" + - schemaId); + return CONTINUE; } templateId = messageHeaderDecoder.TemplateId(); diff --git a/src/Adaptive.Cluster/FodyWeavers.xml b/src/Adaptive.Cluster/FodyWeavers.xml new file mode 100644 index 00000000..5415e271 --- /dev/null +++ b/src/Adaptive.Cluster/FodyWeavers.xml @@ -0,0 +1,5 @@ + + + + + diff --git a/src/Adaptive.Cluster/FodyWeavers.xsd b/src/Adaptive.Cluster/FodyWeavers.xsd new file mode 100644 index 00000000..42ece42f --- /dev/null +++ b/src/Adaptive.Cluster/FodyWeavers.xsd @@ -0,0 +1,27 @@ + + + + + + + + + + + + 'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed. + + + + + A comma-separated list of error codes that can be safely ignored in assembly verification. + + + + + 'false' to turn off automatic generation of the XML Schema file. + + + + + \ No newline at end of file diff --git a/src/Weavers/Unsealer.Fody/ModuleWeaver.cs b/src/Weavers/Unsealer.Fody/ModuleWeaver.cs new file mode 100644 index 00000000..0207f8c9 --- /dev/null +++ b/src/Weavers/Unsealer.Fody/ModuleWeaver.cs @@ -0,0 +1,80 @@ +using System.Collections.Generic; +using System.Linq; +using Fody; +using Mono.Cecil; + +namespace Unsealer.Fody +{ + public class ModuleWeaver : BaseModuleWeaver + { + public override void Execute() + { + int unsealed = 0; + + foreach (var type in AllTypes(ModuleDefinition.Types)) + { + // Class reference types only: skip structs (IsValueType), interfaces, + // enums, and the implicit `` type. + if (!type.IsClass) continue; + if (type.IsValueType) continue; + if (type.IsInterface) continue; + if (!type.IsSealed) continue; + + // Skip compiler-generated types (lambda closures, anonymous types, + // async state machines, etc.) — these are sealed for legitimate + // reasons and shouldn't be subclassed by tests anyway. + if (HasCompilerGeneratedAttribute(type)) continue; + if (type.Name.Contains('<')) continue; // safety net for nested compiler-gen names + + // Delegate types MUST remain sealed — the CLR enforces this and + // assembly-loading throws TypeLoadException otherwise. + if (IsDelegate(type)) continue; + + // C# static classes are emitted as `abstract sealed` in IL. + // Removing `sealed` would change their meaning (no longer static). + if (type.IsAbstract) continue; + + type.IsSealed = false; + unsealed++; + } + + WriteMessage($"Unsealer.Fody: unsealed {unsealed} class types.", MessageImportance.Low); + } + + private static bool HasCompilerGeneratedAttribute(TypeDefinition type) + { + return type.HasCustomAttributes && + type.CustomAttributes.Any(a => a.AttributeType.FullName == + "System.Runtime.CompilerServices.CompilerGeneratedAttribute"); + } + + private static bool IsDelegate(TypeDefinition type) + { + var baseTypeName = type.BaseType?.FullName; + return baseTypeName == "System.MulticastDelegate" || baseTypeName == "System.Delegate"; + } + + public override IEnumerable GetAssembliesForScanning() + { + yield return "netstandard"; + yield return "mscorlib"; + } + + private static IEnumerable AllTypes(IEnumerable types) + { + foreach (var type in types) + { + yield return type; + if (type.HasNestedTypes) + { + foreach (var nested in AllTypes(type.NestedTypes)) + { + yield return nested; + } + } + } + } + + public override bool ShouldCleanReference => true; + } +} diff --git a/src/Weavers/Unsealer.Fody/Unsealer.Fody.csproj b/src/Weavers/Unsealer.Fody/Unsealer.Fody.csproj new file mode 100644 index 00000000..bfe52c5e --- /dev/null +++ b/src/Weavers/Unsealer.Fody/Unsealer.Fody.csproj @@ -0,0 +1,16 @@ + + + netstandard2.0 + Unsealer.Fody + Unsealer.Fody + false + true + Fody addin (Debug-only) that strips the `sealed` modifier from class types +in the assembly being woven, so test mocking frameworks (FakeItEasy, Moq, NSubstitute) +that rely on Castle DynamicProxy can subclass them. Mirrors what Virtuosity.Fody does +for `virtual`. Skips value types and compiler-generated types. + + + + +