diff --git a/src/Adaptive.Aeron.sln b/src/Adaptive.Aeron.sln
index 85e19e3..0dd1121 100644
--- a/src/Adaptive.Aeron.sln
+++ b/src/Adaptive.Aeron.sln
@@ -51,92 +51,300 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Adaptive.Aeron.Samples.Clus
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Adaptive.Aeron.Samples.ClusterClient", "Samples\Adaptive.Aeron.Samples.ClusterClient\Adaptive.Aeron.Samples.ClusterClient.csproj", "{DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Adaptive.Archiver.Tests", "Adaptive.Archiver.Tests\Adaptive.Archiver.Tests.csproj", "{F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Adaptive.Cluster.Tests", "Adaptive.Cluster.Tests\Adaptive.Cluster.Tests.csproj", "{572CA144-CECC-43AC-AF10-5BEA6007C2DE}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Weavers", "Weavers", "{E2872BA3-3393-4325-66A6-EE7620D75132}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Unsealer.Fody", "Weavers\Unsealer.Fody\Unsealer.Fody.csproj", "{E28646AF-556E-423E-B54F-ADFBD75F2611}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
+ Debug|x64 = Debug|x64
+ Debug|x86 = Debug|x86
Release|Any CPU = Release|Any CPU
+ Release|x64 = Release|x64
+ Release|x86 = Release|x86
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Debug|x64.Build.0 = Debug|Any CPU
+ {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Debug|x86.Build.0 = Debug|Any CPU
{3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Release|Any CPU.Build.0 = Release|Any CPU
+ {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Release|x64.ActiveCfg = Release|Any CPU
+ {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Release|x64.Build.0 = Release|Any CPU
+ {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Release|x86.ActiveCfg = Release|Any CPU
+ {3E3FA5C6-CFBA-4604-B10E-C7EC97D226CF}.Release|x86.Build.0 = Release|Any CPU
{E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Debug|x64.Build.0 = Debug|Any CPU
+ {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Debug|x86.Build.0 = Debug|Any CPU
{E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Release|Any CPU.Build.0 = Release|Any CPU
+ {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Release|x64.ActiveCfg = Release|Any CPU
+ {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Release|x64.Build.0 = Release|Any CPU
+ {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Release|x86.ActiveCfg = Release|Any CPU
+ {E6F84F8E-68C6-43C8-A20E-E1F91DF99ABC}.Release|x86.Build.0 = Release|Any CPU
{46A1DE6D-55B0-4D64-959F-D19B8D378842}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{46A1DE6D-55B0-4D64-959F-D19B8D378842}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Debug|x64.Build.0 = Debug|Any CPU
+ {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Debug|x86.Build.0 = Debug|Any CPU
{46A1DE6D-55B0-4D64-959F-D19B8D378842}.Release|Any CPU.ActiveCfg = Release|Any CPU
{46A1DE6D-55B0-4D64-959F-D19B8D378842}.Release|Any CPU.Build.0 = Release|Any CPU
+ {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Release|x64.ActiveCfg = Release|Any CPU
+ {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Release|x64.Build.0 = Release|Any CPU
+ {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Release|x86.ActiveCfg = Release|Any CPU
+ {46A1DE6D-55B0-4D64-959F-D19B8D378842}.Release|x86.Build.0 = Release|Any CPU
{A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Debug|x64.Build.0 = Debug|Any CPU
+ {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Debug|x86.Build.0 = Debug|Any CPU
{A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Release|Any CPU.Build.0 = Release|Any CPU
+ {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Release|x64.ActiveCfg = Release|Any CPU
+ {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Release|x64.Build.0 = Release|Any CPU
+ {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Release|x86.ActiveCfg = Release|Any CPU
+ {A1ABAA99-CAAE-41B2-B3F9-9BF2C23DFA8B}.Release|x86.Build.0 = Release|Any CPU
{29386401-8D9B-4F56-8B8D-A8242D4E295D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{29386401-8D9B-4F56-8B8D-A8242D4E295D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Debug|x64.Build.0 = Debug|Any CPU
+ {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Debug|x86.Build.0 = Debug|Any CPU
{29386401-8D9B-4F56-8B8D-A8242D4E295D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{29386401-8D9B-4F56-8B8D-A8242D4E295D}.Release|Any CPU.Build.0 = Release|Any CPU
+ {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Release|x64.ActiveCfg = Release|Any CPU
+ {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Release|x64.Build.0 = Release|Any CPU
+ {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Release|x86.ActiveCfg = Release|Any CPU
+ {29386401-8D9B-4F56-8B8D-A8242D4E295D}.Release|x86.Build.0 = Release|Any CPU
{BB3FCE67-1719-4A1F-98BF-B56509C08794}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BB3FCE67-1719-4A1F-98BF-B56509C08794}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Debug|x64.Build.0 = Debug|Any CPU
+ {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Debug|x86.Build.0 = Debug|Any CPU
{BB3FCE67-1719-4A1F-98BF-B56509C08794}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BB3FCE67-1719-4A1F-98BF-B56509C08794}.Release|Any CPU.Build.0 = Release|Any CPU
+ {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Release|x64.ActiveCfg = Release|Any CPU
+ {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Release|x64.Build.0 = Release|Any CPU
+ {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Release|x86.ActiveCfg = Release|Any CPU
+ {BB3FCE67-1719-4A1F-98BF-B56509C08794}.Release|x86.Build.0 = Release|Any CPU
{16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Debug|x64.Build.0 = Debug|Any CPU
+ {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Debug|x86.Build.0 = Debug|Any CPU
{16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Release|Any CPU.Build.0 = Release|Any CPU
+ {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Release|x64.ActiveCfg = Release|Any CPU
+ {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Release|x64.Build.0 = Release|Any CPU
+ {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Release|x86.ActiveCfg = Release|Any CPU
+ {16B25C6A-67C5-46E9-95BE-DB3B17D70DF8}.Release|x86.Build.0 = Release|Any CPU
{B8324314-0695-4919-91AE-9683972D4125}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B8324314-0695-4919-91AE-9683972D4125}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {B8324314-0695-4919-91AE-9683972D4125}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {B8324314-0695-4919-91AE-9683972D4125}.Debug|x64.Build.0 = Debug|Any CPU
+ {B8324314-0695-4919-91AE-9683972D4125}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {B8324314-0695-4919-91AE-9683972D4125}.Debug|x86.Build.0 = Debug|Any CPU
{B8324314-0695-4919-91AE-9683972D4125}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B8324314-0695-4919-91AE-9683972D4125}.Release|Any CPU.Build.0 = Release|Any CPU
+ {B8324314-0695-4919-91AE-9683972D4125}.Release|x64.ActiveCfg = Release|Any CPU
+ {B8324314-0695-4919-91AE-9683972D4125}.Release|x64.Build.0 = Release|Any CPU
+ {B8324314-0695-4919-91AE-9683972D4125}.Release|x86.ActiveCfg = Release|Any CPU
+ {B8324314-0695-4919-91AE-9683972D4125}.Release|x86.Build.0 = Release|Any CPU
{A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Debug|x64.Build.0 = Debug|Any CPU
+ {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Debug|x86.Build.0 = Debug|Any CPU
{A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Release|Any CPU.Build.0 = Release|Any CPU
+ {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Release|x64.ActiveCfg = Release|Any CPU
+ {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Release|x64.Build.0 = Release|Any CPU
+ {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Release|x86.ActiveCfg = Release|Any CPU
+ {A15D0AF3-F1D9-4AD4-9542-EB021EA31EA6}.Release|x86.Build.0 = Release|Any CPU
{183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Debug|x64.Build.0 = Debug|Any CPU
+ {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Debug|x86.Build.0 = Debug|Any CPU
{183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Release|Any CPU.Build.0 = Release|Any CPU
+ {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Release|x64.ActiveCfg = Release|Any CPU
+ {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Release|x64.Build.0 = Release|Any CPU
+ {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Release|x86.ActiveCfg = Release|Any CPU
+ {183BDA16-AF0C-43CD-BF4E-73DC3284A5C7}.Release|x86.Build.0 = Release|Any CPU
{00DF060E-573A-4236-B462-89CE45698191}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{00DF060E-573A-4236-B462-89CE45698191}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {00DF060E-573A-4236-B462-89CE45698191}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {00DF060E-573A-4236-B462-89CE45698191}.Debug|x64.Build.0 = Debug|Any CPU
+ {00DF060E-573A-4236-B462-89CE45698191}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {00DF060E-573A-4236-B462-89CE45698191}.Debug|x86.Build.0 = Debug|Any CPU
{00DF060E-573A-4236-B462-89CE45698191}.Release|Any CPU.ActiveCfg = Release|Any CPU
{00DF060E-573A-4236-B462-89CE45698191}.Release|Any CPU.Build.0 = Release|Any CPU
+ {00DF060E-573A-4236-B462-89CE45698191}.Release|x64.ActiveCfg = Release|Any CPU
+ {00DF060E-573A-4236-B462-89CE45698191}.Release|x64.Build.0 = Release|Any CPU
+ {00DF060E-573A-4236-B462-89CE45698191}.Release|x86.ActiveCfg = Release|Any CPU
+ {00DF060E-573A-4236-B462-89CE45698191}.Release|x86.Build.0 = Release|Any CPU
{DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Debug|x64.Build.0 = Debug|Any CPU
+ {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Debug|x86.Build.0 = Debug|Any CPU
{DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Release|Any CPU.Build.0 = Release|Any CPU
+ {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Release|x64.ActiveCfg = Release|Any CPU
+ {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Release|x64.Build.0 = Release|Any CPU
+ {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Release|x86.ActiveCfg = Release|Any CPU
+ {DCB69FB3-9E28-4A0F-9E25-DAE0D083E555}.Release|x86.Build.0 = Release|Any CPU
{E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Debug|x64.Build.0 = Debug|Any CPU
+ {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Debug|x86.Build.0 = Debug|Any CPU
{E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Release|Any CPU.Build.0 = Release|Any CPU
+ {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Release|x64.ActiveCfg = Release|Any CPU
+ {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Release|x64.Build.0 = Release|Any CPU
+ {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Release|x86.ActiveCfg = Release|Any CPU
+ {E2506D08-AF46-4B53-817A-A56B43C7C1EA}.Release|x86.Build.0 = Release|Any CPU
{2D449099-F238-4105-97B8-697F93FA07CF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2D449099-F238-4105-97B8-697F93FA07CF}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {2D449099-F238-4105-97B8-697F93FA07CF}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {2D449099-F238-4105-97B8-697F93FA07CF}.Debug|x64.Build.0 = Debug|Any CPU
+ {2D449099-F238-4105-97B8-697F93FA07CF}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {2D449099-F238-4105-97B8-697F93FA07CF}.Debug|x86.Build.0 = Debug|Any CPU
{2D449099-F238-4105-97B8-697F93FA07CF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2D449099-F238-4105-97B8-697F93FA07CF}.Release|Any CPU.Build.0 = Release|Any CPU
+ {2D449099-F238-4105-97B8-697F93FA07CF}.Release|x64.ActiveCfg = Release|Any CPU
+ {2D449099-F238-4105-97B8-697F93FA07CF}.Release|x64.Build.0 = Release|Any CPU
+ {2D449099-F238-4105-97B8-697F93FA07CF}.Release|x86.ActiveCfg = Release|Any CPU
+ {2D449099-F238-4105-97B8-697F93FA07CF}.Release|x86.Build.0 = Release|Any CPU
{8038E62C-1D44-48AF-9E2B-C02228D4156F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8038E62C-1D44-48AF-9E2B-C02228D4156F}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Debug|x64.Build.0 = Debug|Any CPU
+ {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Debug|x86.Build.0 = Debug|Any CPU
{8038E62C-1D44-48AF-9E2B-C02228D4156F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8038E62C-1D44-48AF-9E2B-C02228D4156F}.Release|Any CPU.Build.0 = Release|Any CPU
+ {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Release|x64.ActiveCfg = Release|Any CPU
+ {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Release|x64.Build.0 = Release|Any CPU
+ {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Release|x86.ActiveCfg = Release|Any CPU
+ {8038E62C-1D44-48AF-9E2B-C02228D4156F}.Release|x86.Build.0 = Release|Any CPU
{9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Debug|x64.Build.0 = Debug|Any CPU
+ {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Debug|x86.Build.0 = Debug|Any CPU
{9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Release|Any CPU.Build.0 = Release|Any CPU
+ {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Release|x64.ActiveCfg = Release|Any CPU
+ {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Release|x64.Build.0 = Release|Any CPU
+ {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Release|x86.ActiveCfg = Release|Any CPU
+ {9C2B605E-5C8C-4BDC-9977-96FADDBA1967}.Release|x86.Build.0 = Release|Any CPU
{FD430CC0-2053-4C2D-845E-8BC421D6F114}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{FD430CC0-2053-4C2D-845E-8BC421D6F114}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Debug|x64.Build.0 = Debug|Any CPU
+ {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Debug|x86.Build.0 = Debug|Any CPU
{FD430CC0-2053-4C2D-845E-8BC421D6F114}.Release|Any CPU.ActiveCfg = Release|Any CPU
{FD430CC0-2053-4C2D-845E-8BC421D6F114}.Release|Any CPU.Build.0 = Release|Any CPU
+ {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Release|x64.ActiveCfg = Release|Any CPU
+ {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Release|x64.Build.0 = Release|Any CPU
+ {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Release|x86.ActiveCfg = Release|Any CPU
+ {FD430CC0-2053-4C2D-845E-8BC421D6F114}.Release|x86.Build.0 = Release|Any CPU
{5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Debug|x64.Build.0 = Debug|Any CPU
+ {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Debug|x86.Build.0 = Debug|Any CPU
{5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Release|Any CPU.Build.0 = Release|Any CPU
+ {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Release|x64.ActiveCfg = Release|Any CPU
+ {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Release|x64.Build.0 = Release|Any CPU
+ {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Release|x86.ActiveCfg = Release|Any CPU
+ {5500E5A6-B22F-4FA3-B933-8C844A3CEC1A}.Release|x86.Build.0 = Release|Any CPU
{61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Debug|x64.Build.0 = Debug|Any CPU
+ {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Debug|x86.Build.0 = Debug|Any CPU
{61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Release|Any CPU.Build.0 = Release|Any CPU
+ {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Release|x64.ActiveCfg = Release|Any CPU
+ {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Release|x64.Build.0 = Release|Any CPU
+ {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Release|x86.ActiveCfg = Release|Any CPU
+ {61C62DB8-CB67-4E3F-A2EA-30547183C64F}.Release|x86.Build.0 = Release|Any CPU
{DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Debug|x64.Build.0 = Debug|Any CPU
+ {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Debug|x86.Build.0 = Debug|Any CPU
{DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Release|Any CPU.Build.0 = Release|Any CPU
+ {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Release|x64.ActiveCfg = Release|Any CPU
+ {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Release|x64.Build.0 = Release|Any CPU
+ {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Release|x86.ActiveCfg = Release|Any CPU
+ {DB22F44D-0ABF-428F-BAA1-E0D89696ABD3}.Release|x86.Build.0 = Release|Any CPU
+ {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Debug|x64.Build.0 = Debug|Any CPU
+ {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Debug|x86.Build.0 = Debug|Any CPU
+ {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Release|Any CPU.Build.0 = Release|Any CPU
+ {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Release|x64.ActiveCfg = Release|Any CPU
+ {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Release|x64.Build.0 = Release|Any CPU
+ {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Release|x86.ActiveCfg = Release|Any CPU
+ {F1B693B4-F0C6-4467-BC09-2D1B9FB73D4E}.Release|x86.Build.0 = Release|Any CPU
+ {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Debug|x64.Build.0 = Debug|Any CPU
+ {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Debug|x86.Build.0 = Debug|Any CPU
+ {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Release|Any CPU.Build.0 = Release|Any CPU
+ {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Release|x64.ActiveCfg = Release|Any CPU
+ {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Release|x64.Build.0 = Release|Any CPU
+ {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Release|x86.ActiveCfg = Release|Any CPU
+ {572CA144-CECC-43AC-AF10-5BEA6007C2DE}.Release|x86.Build.0 = Release|Any CPU
+ {E28646AF-556E-423E-B54F-ADFBD75F2611}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {E28646AF-556E-423E-B54F-ADFBD75F2611}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {E28646AF-556E-423E-B54F-ADFBD75F2611}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {E28646AF-556E-423E-B54F-ADFBD75F2611}.Debug|x64.Build.0 = Debug|Any CPU
+ {E28646AF-556E-423E-B54F-ADFBD75F2611}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {E28646AF-556E-423E-B54F-ADFBD75F2611}.Debug|x86.Build.0 = Debug|Any CPU
+ {E28646AF-556E-423E-B54F-ADFBD75F2611}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {E28646AF-556E-423E-B54F-ADFBD75F2611}.Release|Any CPU.Build.0 = Release|Any CPU
+ {E28646AF-556E-423E-B54F-ADFBD75F2611}.Release|x64.ActiveCfg = Release|Any CPU
+ {E28646AF-556E-423E-B54F-ADFBD75F2611}.Release|x64.Build.0 = Release|Any CPU
+ {E28646AF-556E-423E-B54F-ADFBD75F2611}.Release|x86.ActiveCfg = Release|Any CPU
+ {E28646AF-556E-423E-B54F-ADFBD75F2611}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -162,6 +370,7 @@ Global
{5500E5A6-B22F-4FA3-B933-8C844A3CEC1A} = {9EAD0078-CA03-486D-BF2E-2C891955CE16}
{61C62DB8-CB67-4E3F-A2EA-30547183C64F} = {1807629C-08BB-487B-821E-8E0B72F5C4AC}
{DB22F44D-0ABF-428F-BAA1-E0D89696ABD3} = {1807629C-08BB-487B-821E-8E0B72F5C4AC}
+ {E28646AF-556E-423E-B54F-ADFBD75F2611} = {E2872BA3-3393-4325-66A6-EE7620D75132}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {374403EA-85FA-4ACA-8E38-FB938FFC7016}
diff --git a/src/Adaptive.Aeron/Adaptive.Aeron.csproj b/src/Adaptive.Aeron/Adaptive.Aeron.csproj
index db0b5f6..25a7395 100644
--- a/src/Adaptive.Aeron/Adaptive.Aeron.csproj
+++ b/src/Adaptive.Aeron/Adaptive.Aeron.csproj
@@ -24,6 +24,11 @@
all
+
+ all
+ false
+
+
diff --git a/src/Adaptive.Aeron/FodyWeavers.xml b/src/Adaptive.Aeron/FodyWeavers.xml
index 42637c5..7adcd68 100644
--- a/src/Adaptive.Aeron/FodyWeavers.xml
+++ b/src/Adaptive.Aeron/FodyWeavers.xml
@@ -1,4 +1,5 @@
+
\ No newline at end of file
diff --git a/src/Adaptive.Aeron/FodyWeavers.xsd b/src/Adaptive.Aeron/FodyWeavers.xsd
index 0eeb31c..42ece42 100644
--- a/src/Adaptive.Aeron/FodyWeavers.xsd
+++ b/src/Adaptive.Aeron/FodyWeavers.xsd
@@ -5,6 +5,7 @@
+
diff --git a/src/Adaptive.Agrona/SuppressedExceptions.cs b/src/Adaptive.Agrona/SuppressedExceptions.cs
new file mode 100644
index 0000000..9907c3e
--- /dev/null
+++ b/src/Adaptive.Agrona/SuppressedExceptions.cs
@@ -0,0 +1,40 @@
+using System;
+using System.Collections.Generic;
+
+namespace Adaptive.Agrona
+{
+ ///
+ /// Extension methods that emulate Java's Throwable.addSuppressed / getSuppressed
+ /// on .NET's type via the dictionary.
+ ///
+ public static class SuppressedExceptions
+ {
+ private const string SuppressedKey = "Adaptive.Agrona.SuppressedExceptions";
+
+ public static void AddSuppressed(this Exception primary, Exception suppressed)
+ {
+ if (primary == null || suppressed == null || ReferenceEquals(primary, suppressed))
+ {
+ return;
+ }
+
+ if (!(primary.Data[SuppressedKey] is List list))
+ {
+ list = new List();
+ primary.Data[SuppressedKey] = list;
+ }
+
+ list.Add(suppressed);
+ }
+
+ public static IReadOnlyList GetSuppressed(this Exception primary)
+ {
+ if (primary?.Data[SuppressedKey] is List list)
+ {
+ return list;
+ }
+
+ return Array.Empty();
+ }
+ }
+}
diff --git a/src/Adaptive.Archiver.Tests/Adaptive.Archiver.Tests.csproj b/src/Adaptive.Archiver.Tests/Adaptive.Archiver.Tests.csproj
new file mode 100644
index 0000000..6353c47
--- /dev/null
+++ b/src/Adaptive.Archiver.Tests/Adaptive.Archiver.Tests.csproj
@@ -0,0 +1,20 @@
+
+
+
+ net8.0
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Adaptive.Archiver.Tests/AeronArchiveTest.cs b/src/Adaptive.Archiver.Tests/AeronArchiveTest.cs
new file mode 100644
index 0000000..3130f27
--- /dev/null
+++ b/src/Adaptive.Archiver.Tests/AeronArchiveTest.cs
@@ -0,0 +1,433 @@
+using System;
+using System.Threading;
+using Adaptive.Aeron;
+using Adaptive.Aeron.Exceptions;
+using Adaptive.Agrona;
+using Adaptive.Agrona.Concurrent;
+using FakeItEasy;
+using NUnit.Framework;
+using AeronType = Adaptive.Aeron.Aeron;
+using Context = Adaptive.Archiver.AeronArchive.Context;
+
+namespace Adaptive.Archiver.Tests
+{
+ public class AeronArchiveTest
+ {
+ private const string SESSION_ID_PARAM_NAME = "session-id";
+ private const string MTU_LENGTH_PARAM_NAME = "mtu";
+ private const string TERM_LENGTH_PARAM_NAME = "term-length";
+ private const string SPARSE_PARAM_NAME = "sparse";
+
+ private AeronType aeron;
+ private ControlResponsePoller controlResponsePoller;
+ private ArchiveProxy archiveProxy;
+ private IErrorHandler errorHandler;
+
+ [SetUp]
+ public void SetUp()
+ {
+ aeron = A.Fake();
+ controlResponsePoller = A.Fake();
+ archiveProxy = A.Fake();
+ errorHandler = A.Fake();
+ }
+
+ [Test]
+ public void AsyncConnectShouldConcludeContext()
+ {
+ var ctx = A.Fake();
+ var expectedException = new InvalidOperationException("test");
+ A.CallTo(() => ctx.Conclude()).Throws(expectedException);
+
+ var actualException = Assert.Throws(() => AeronArchive.ConnectAsync(ctx));
+ Assert.AreSame(expectedException, actualException);
+
+ A.CallTo(() => ctx.Conclude()).MustHaveHappened();
+ A.CallTo(ctx).MustHaveHappenedOnceExactly();
+ }
+
+ [Test]
+ public void AsyncConnectShouldCloseContext()
+ {
+ const string responseChannel = "aeron:udp?endpoint=localhost:1234";
+ const int responseStreamId = 49;
+ var ctx = A.Fake();
+ A.CallTo(() => ctx.AeronClient()).Returns(aeron);
+ A.CallTo(() => ctx.ControlResponseChannel()).Returns(responseChannel);
+ A.CallTo(() => ctx.ControlResponseStreamId()).Returns(responseStreamId);
+ var error = new InvalidOperationException("subscription");
+ A.CallTo(() => aeron.AsyncAddSubscription(
+ responseChannel,
+ responseStreamId,
+ A._,
+ A._))
+ .Throws(error);
+
+ var actualException = Assert.Throws(() => AeronArchive.ConnectAsync(ctx));
+ Assert.AreSame(error, actualException);
+
+ A.CallTo(() => ctx.Conclude()).MustHaveHappened().Then(
+ A.CallTo(() => ctx.AeronClient()).MustHaveHappened()).Then(
+ A.CallTo(() => ctx.ControlResponseChannel()).MustHaveHappened()).Then(
+ A.CallTo(() => ctx.ControlResponseStreamId()).MustHaveHappened()).Then(
+ A.CallTo(() => aeron.AsyncAddSubscription(
+ responseChannel, responseStreamId, A._, A._))
+ .MustHaveHappened()).Then(
+ A.CallTo(() => ctx.Dispose()).MustHaveHappened());
+ }
+
+ [Test]
+ public void AsyncConnectShouldCloseResourceInCaseOfExceptionUponStartup()
+ {
+ const string responseChannel = "aeron:udp?endpoint=localhost:0";
+ const int responseStreamId = 49;
+ const string requestChannel = "aeron:udp?endpoint=localhost:1234";
+ const int requestStreamId = -15;
+ const long subscriptionId = -3275938475934759L;
+
+ var ctx = A.Fake();
+ A.CallTo(() => ctx.AeronClient()).Returns(aeron);
+ A.CallTo(() => ctx.ControlResponseChannel()).Returns(responseChannel);
+ A.CallTo(() => ctx.ControlResponseStreamId()).Returns(responseStreamId);
+ A.CallTo(() => ctx.ControlRequestChannel()).Returns(requestChannel);
+ A.CallTo(() => ctx.ControlRequestStreamId()).Returns(requestStreamId);
+ A.CallTo(() => aeron.AsyncAddSubscription(
+ responseChannel,
+ responseStreamId,
+ A._,
+ A._))
+ .Returns(subscriptionId);
+ var error = new IndexOutOfRangeException("exception");
+ A.CallTo(() => aeron.Ctx).Throws(error);
+
+ var actualException = Assert.Throws(() => AeronArchive.ConnectAsync(ctx));
+ Assert.AreSame(error, actualException);
+
+ A.CallTo(() => ctx.Conclude()).MustHaveHappened().Then(
+ A.CallTo(() => ctx.AeronClient()).MustHaveHappened()).Then(
+ A.CallTo(() => ctx.ControlResponseChannel()).MustHaveHappened()).Then(
+ A.CallTo(() => ctx.ControlResponseStreamId()).MustHaveHappened()).Then(
+ A.CallTo(() => aeron.AsyncAddSubscription(
+ responseChannel, responseStreamId, A._, A._))
+ .MustHaveHappened()).Then(
+ A.CallTo(() => aeron.AsyncRemoveSubscription(subscriptionId)).MustHaveHappened()).Then(
+ A.CallTo(() => ctx.Dispose()).MustHaveHappened());
+ }
+
+ [Test]
+ public void ShouldQuietClose()
+ {
+ var previousException = new Exception();
+ var thrownException = new Exception();
+
+ var throwingCloseable = A.Fake();
+ var nonThrowingCloseable = A.Fake();
+ A.CallTo(() => throwingCloseable.Dispose()).Throws(thrownException);
+
+ Assert.IsNull(AeronArchive.QuietClose(null, nonThrowingCloseable));
+ Assert.AreSame(previousException, AeronArchive.QuietClose(previousException, nonThrowingCloseable));
+ var ex = AeronArchive.QuietClose(previousException, throwingCloseable);
+ Assert.AreSame(previousException, ex);
+ Assert.AreSame(thrownException, ex.GetSuppressed()[0]);
+ Assert.AreSame(thrownException, AeronArchive.QuietClose(null, throwingCloseable));
+ }
+
+ [TestCase(AeronType.NULL_VALUE)]
+ [TestCase(long.MaxValue)]
+ [TestCase(long.MinValue)]
+ [TestCase(0L)]
+ [TestCase(4468236482L)]
+ public void ShouldReturnAssignedArchiveId(long archiveId)
+ {
+ const long controlSessionId = -3924293L;
+ A.CallTo(() => aeron.Ctx).Returns(new AeronType.Context());
+ var context = new Context()
+ .AeronClient(aeron)
+ .IdleStrategy(new NoOpIdleStrategy())
+ .MessageTimeoutNs(100)
+ .Lock(new NoOpLock())
+ .ErrorHandler(errorHandler)
+ .OwnsAeronClient(true);
+
+ var aeronArchive = new AeronArchive(context, controlResponsePoller, archiveProxy, controlSessionId, archiveId);
+
+ Assert.AreEqual(archiveId, aeronArchive.ArchiveId());
+ }
+
+ [TestCase(
+ "aeron:udp?endpoint=localhost:3388|mtu=2048",
+ "aeron:udp?session-id=5|endpoint=localhost:0|sparse=true|mtu=1024")]
+ [TestCase(
+ "aeron:udp?endpoint=localhost:3388",
+ "aeron:udp?control=localhost:10000|control-mode=dynamic")]
+ [TestCase(
+ "aeron:udp?endpoint=localhost:3388",
+ "aeron:udp?control-mode=manual")]
+ [TestCase(
+ "aeron:ipc?alias=request|ssc=false|linger=0|session-id=42|sparse=false",
+ "aeron:ipc?term-length=64k|alias=response")]
+ public void ShouldAddAUniqueSessionIdParameterToBothRequestAndResponseChannels(
+ string requestChannel, string responseChannel)
+ {
+ const int requestStreamId = 42;
+ const int responseStreamId = -19;
+ int sessionId = BitUtil.GenerateRandomisedId();
+ A.CallTo(() => aeron.NextSessionId(requestStreamId)).Returns(sessionId);
+
+ var context = new Context()
+ .AeronClient(aeron)
+ .OwnsAeronClient(false)
+ .ErrorHandler(errorHandler)
+ .ControlRequestChannel(requestChannel)
+ .ControlRequestStreamId(requestStreamId)
+ .ControlResponseChannel(responseChannel)
+ .ControlResponseStreamId(responseStreamId)
+ .ControlTermBufferSparse(false)
+ .ControlTermBufferLength(128 * 1024)
+ .ControlMtuLength(4096);
+
+ Assert.AreEqual(requestChannel, context.ControlRequestChannel());
+ Assert.AreEqual(requestStreamId, context.ControlRequestStreamId());
+ Assert.AreEqual(responseChannel, context.ControlResponseChannel());
+ Assert.AreEqual(responseStreamId, context.ControlResponseStreamId());
+
+ context.Conclude();
+
+ A.CallTo(() => aeron.NextSessionId(requestStreamId)).MustHaveHappened();
+ Assert.AreEqual(requestStreamId, context.ControlRequestStreamId());
+ Assert.AreEqual(responseStreamId, context.ControlResponseStreamId());
+
+ var actualRequestChannel = ChannelUri.Parse(context.ControlRequestChannel());
+ var actualResponseChannel = ChannelUri.Parse(context.ControlResponseChannel());
+ Assert.IsTrue(actualRequestChannel.ContainsKey(SESSION_ID_PARAM_NAME), "session-id was not added");
+ Assert.AreEqual(sessionId.ToString(), actualRequestChannel.Get(SESSION_ID_PARAM_NAME));
+ Assert.AreEqual(sessionId.ToString(), actualResponseChannel.Get(SESSION_ID_PARAM_NAME));
+
+ ChannelUri.Parse(requestChannel).ForEachParameter((key, value) =>
+ {
+ if (!SESSION_ID_PARAM_NAME.Equals(key))
+ {
+ Assert.AreEqual(value, actualRequestChannel.Get(key));
+ }
+ });
+
+ ChannelUri.Parse(responseChannel).ForEachParameter((key, value) =>
+ {
+ if (!SESSION_ID_PARAM_NAME.Equals(key))
+ {
+ Assert.AreEqual(value, actualResponseChannel.Get(key));
+ }
+ });
+ }
+
+ [Test]
+ public void ShouldNotAddASessionIdIfControlModeResponseIsSpecifiedOnTheResponseChannel()
+ {
+ const int requestStreamId = 100;
+ const int responseStreamId = 200;
+ const string requestChannel = "aeron:udp?endpoint=localhost:8080";
+ const string responseChannel = "aeron:udp?control-mode=response|control=localhost:10002";
+ var context = new Context()
+ .AeronClient(aeron)
+ .OwnsAeronClient(false)
+ .ErrorHandler(errorHandler)
+ .ControlRequestChannel(requestChannel)
+ .ControlRequestStreamId(requestStreamId)
+ .ControlResponseChannel(responseChannel)
+ .ControlResponseStreamId(responseStreamId);
+
+ Assert.AreEqual(requestChannel, context.ControlRequestChannel());
+ Assert.AreEqual(requestStreamId, context.ControlRequestStreamId());
+ Assert.AreEqual(responseChannel, context.ControlResponseChannel());
+ Assert.AreEqual(responseStreamId, context.ControlResponseStreamId());
+
+ context.Conclude();
+
+ Assert.AreEqual(requestStreamId, context.ControlRequestStreamId());
+ Assert.AreEqual(responseStreamId, context.ControlResponseStreamId());
+
+ var actualRequestChannel = ChannelUri.Parse(context.ControlRequestChannel());
+ var actualResponseChannel = ChannelUri.Parse(context.ControlResponseChannel());
+ Assert.IsNull(actualRequestChannel.Get(SESSION_ID_PARAM_NAME), "unexpected session-id on request channel");
+ Assert.IsNull(actualResponseChannel.Get(SESSION_ID_PARAM_NAME), "unexpected session-id on response channel");
+
+ ChannelUri.Parse(requestChannel).ForEachParameter(
+ (key, value) => Assert.AreEqual(value, actualRequestChannel.Get(key)));
+
+ ChannelUri.Parse(responseChannel).ForEachParameter(
+ (key, value) => Assert.AreEqual(value, actualResponseChannel.Get(key)));
+ }
+
+ [Test]
+ public void ShouldAddDefaultUriParametersIfNotSpecified()
+ {
+ const int requestStreamId = 10;
+ const int responseStreamId = 20;
+ const string requestChannel = "aeron:udp?endpoint=localhost:8080";
+ const string responseChannel = "aeron:udp?endpoint=localhost:0";
+ var context = new Context()
+ .AeronClient(aeron)
+ .OwnsAeronClient(false)
+ .ErrorHandler(errorHandler)
+ .ControlRequestChannel(requestChannel)
+ .ControlRequestStreamId(requestStreamId)
+ .ControlResponseChannel(responseChannel)
+ .ControlResponseStreamId(responseStreamId)
+ .ControlMtuLength(2048)
+ .ControlTermBufferLength(256 * 1024)
+ .ControlTermBufferSparse(true);
+
+ Assert.AreEqual(requestChannel, context.ControlRequestChannel());
+ Assert.AreEqual(requestStreamId, context.ControlRequestStreamId());
+ Assert.AreEqual(responseChannel, context.ControlResponseChannel());
+ Assert.AreEqual(responseStreamId, context.ControlResponseStreamId());
+
+ context.Conclude();
+
+ Assert.AreEqual(requestStreamId, context.ControlRequestStreamId());
+ Assert.AreEqual(responseStreamId, context.ControlResponseStreamId());
+
+ var actualRequestChannel = ChannelUri.Parse(context.ControlRequestChannel());
+ var actualResponseChannel = ChannelUri.Parse(context.ControlResponseChannel());
+ Assert.AreEqual(context.ControlMtuLength().ToString(), actualRequestChannel.Get(MTU_LENGTH_PARAM_NAME));
+ Assert.AreEqual(context.ControlMtuLength().ToString(), actualResponseChannel.Get(MTU_LENGTH_PARAM_NAME));
+ Assert.AreEqual(
+ context.ControlTermBufferLength().ToString(),
+ actualRequestChannel.Get(TERM_LENGTH_PARAM_NAME));
+ Assert.AreEqual(
+ context.ControlTermBufferLength().ToString(),
+ actualResponseChannel.Get(TERM_LENGTH_PARAM_NAME));
+ string sparseStr = context.ControlTermBufferSparse() ? "true" : "false";
+ Assert.AreEqual(sparseStr, actualRequestChannel.Get(SPARSE_PARAM_NAME));
+ Assert.AreEqual(sparseStr, actualResponseChannel.Get(SPARSE_PARAM_NAME));
+ }
+
+ [TestCase(int.MinValue)]
+ [TestCase(-1)]
+ [TestCase(0)]
+ public void ShouldRejectInvalidRetryAttempts(int retryAttempts)
+ {
+ var context = new Context()
+ .AeronClient(aeron)
+ .ControlRequestChannel("aeron:udp")
+ .ControlResponseChannel("aeron:udp")
+ .MessageRetryAttempts(retryAttempts);
+ Assert.AreEqual(retryAttempts, context.MessageRetryAttempts());
+
+ var exception = Assert.Throws(() => context.Conclude());
+ Assert.AreEqual(
+ "AeronArchive.Context.messageRetryAttempts must be > 0, got: " + retryAttempts,
+ exception.Message);
+ }
+
+ [Test]
+ public void MaxRetryAttemptsDefaultValue()
+ {
+ var context = new Context();
+ Assert.AreEqual(AeronArchive.Configuration.MESSAGE_RETRY_ATTEMPTS_DEFAULT, context.MessageRetryAttempts());
+ }
+
+ [Test]
+ public void MaxRetryAttemptsSystemProperty()
+ {
+ Config.Params[AeronArchive.Configuration.MESSAGE_RETRY_ATTEMPTS_PROP_NAME] = "111";
+ try
+ {
+ var context = new Context();
+ Assert.AreEqual(111, context.MessageRetryAttempts());
+ }
+ finally
+ {
+ Config.Params.Remove(AeronArchive.Configuration.MESSAGE_RETRY_ATTEMPTS_PROP_NAME);
+ }
+ }
+
+ [Test]
+ public void CloseNotOwningAeronClient()
+ {
+ const long controlSessionId = 42L;
+ const long archiveId = -190L;
+
+ var aeronContext = A.Fake();
+ A.CallTo(() => aeronContext.NanoClock()).Returns(SystemNanoClock.INSTANCE);
+ A.CallTo(() => aeron.Ctx).Returns(aeronContext);
+ var aeronException = new SynchronizationLockException("aeron closed");
+ A.CallTo(() => aeron.Dispose()).Throws(aeronException);
+
+ var publication = A.Fake();
+ A.CallTo(() => publication.IsConnected).Returns(true);
+ var publicationException = new InvalidOperationException("publication is closed");
+ A.CallTo(() => publication.Dispose()).Throws(publicationException);
+
+ var subscription = A.Fake();
+ A.CallTo(() => controlResponsePoller.Subscription()).Returns(subscription);
+ var subscriptionException = new IndexOutOfRangeException("subscription");
+ A.CallTo(() => subscription.Dispose()).Throws(subscriptionException);
+
+ A.CallTo(() => archiveProxy.Pub()).Returns(publication);
+ var closeSessionException = new IndexOutOfRangeException();
+ A.CallTo(() => archiveProxy.CloseSession(controlSessionId)).Throws(closeSessionException);
+
+ var context = new Context()
+ .AeronClient(aeron)
+ .IdleStrategy(new NoOpIdleStrategy())
+ .MessageTimeoutNs(100)
+ .Lock(new NoOpLock())
+ .ErrorHandler(errorHandler)
+ .OwnsAeronClient(false);
+ var aeronArchive = new AeronArchive(context, controlResponsePoller, archiveProxy, controlSessionId, archiveId);
+
+ aeronArchive.Dispose();
+
+ A.CallTo(() => errorHandler.OnError(A.That.Matches(ex =>
+ ReferenceEquals(closeSessionException, ex) &&
+ ex.GetSuppressed().Count >= 2 &&
+ ReferenceEquals(publicationException, ex.GetSuppressed()[0]) &&
+ ReferenceEquals(subscriptionException, ex.GetSuppressed()[1])))).MustHaveHappened();
+ A.CallTo(errorHandler).MustHaveHappenedOnceExactly();
+ A.CallTo(() => publication.Dispose()).MustHaveHappened();
+ A.CallTo(() => subscription.Dispose()).MustHaveHappened();
+ }
+
+ [Test]
+ public void CloseOwningAeronClient()
+ {
+ const long controlSessionId = 42L;
+ const long archiveId = 555L;
+
+ var aeronContext = A.Fake();
+ A.CallTo(() => aeronContext.NanoClock()).Returns(SystemNanoClock.INSTANCE);
+ A.CallTo(() => aeron.Ctx).Returns(aeronContext);
+ var aeronException = new SynchronizationLockException("aeron closed");
+ A.CallTo(() => aeron.Dispose()).Throws(aeronException);
+
+ var publication = A.Fake();
+ A.CallTo(() => publication.IsConnected).Returns(true);
+ A.CallTo(() => publication.Dispose()).Throws(new InvalidOperationException("publication is closed"));
+
+ var subscription = A.Fake();
+ A.CallTo(() => controlResponsePoller.Subscription()).Returns(subscription);
+ A.CallTo(() => subscription.Dispose()).Throws(new IndexOutOfRangeException("subscription"));
+
+ A.CallTo(() => archiveProxy.Pub()).Returns(publication);
+ var closeSessionException = new IndexOutOfRangeException();
+ A.CallTo(() => archiveProxy.CloseSession(controlSessionId)).Throws(closeSessionException);
+
+ var context = new Context()
+ .AeronClient(aeron)
+ .IdleStrategy(new NoOpIdleStrategy())
+ .MessageTimeoutNs(100)
+ .Lock(new NoOpLock())
+ .ErrorHandler(errorHandler)
+ .OwnsAeronClient(true);
+ var aeronArchive = new AeronArchive(context, controlResponsePoller, archiveProxy, controlSessionId, archiveId);
+
+ var ex = Assert.Throws(() => aeronArchive.Dispose());
+
+ Assert.AreSame(closeSessionException, ex);
+ A.CallTo(() => errorHandler.OnError(closeSessionException)).MustHaveHappened();
+ A.CallTo(errorHandler).MustHaveHappenedOnceExactly();
+ Assert.AreEqual(aeronException, ex.GetSuppressed()[0]);
+ }
+ }
+}
diff --git a/src/Adaptive.Archiver.Tests/ArchiveExceptionTest.cs b/src/Adaptive.Archiver.Tests/ArchiveExceptionTest.cs
new file mode 100644
index 0000000..5747249
--- /dev/null
+++ b/src/Adaptive.Archiver.Tests/ArchiveExceptionTest.cs
@@ -0,0 +1,35 @@
+using NUnit.Framework;
+
+namespace Adaptive.Archiver.Tests
+{
+ public class ArchiveExceptionTest
+ {
+ [TestCase(0, "GENERIC")]
+ [TestCase(1, "ACTIVE_LISTING")]
+ [TestCase(2, "ACTIVE_RECORDING")]
+ [TestCase(3, "ACTIVE_SUBSCRIPTION")]
+ [TestCase(4, "UNKNOWN_SUBSCRIPTION")]
+ [TestCase(5, "UNKNOWN_RECORDING")]
+ [TestCase(6, "UNKNOWN_REPLAY")]
+ [TestCase(7, "MAX_REPLAYS")]
+ [TestCase(8, "MAX_RECORDINGS")]
+ [TestCase(9, "INVALID_EXTENSION")]
+ [TestCase(10, "AUTHENTICATION_REJECTED")]
+ [TestCase(11, "STORAGE_SPACE")]
+ [TestCase(12, "UNKNOWN_REPLICATION")]
+ [TestCase(13, "UNAUTHORISED_ACTION")]
+ [TestCase(14, "REPLICATION_CONNECTION_FAILURE")]
+ public void ErrorCodeAsString(int errorCode, string expected)
+ {
+ Assert.AreEqual(expected, ArchiveException.ErrorCodeAsString(errorCode));
+ }
+
+ [TestCase(-1)]
+ [TestCase(1111111)]
+ [TestCase(54)]
+ public void ShouldHandleUnknownErrorCodes(int errorCode)
+ {
+ Assert.AreEqual("unknown error code: " + errorCode, ArchiveException.ErrorCodeAsString(errorCode));
+ }
+ }
+}
diff --git a/src/Adaptive.Archiver/Adaptive.Archiver.csproj b/src/Adaptive.Archiver/Adaptive.Archiver.csproj
index 81176e3..e5c80eb 100644
--- a/src/Adaptive.Archiver/Adaptive.Archiver.csproj
+++ b/src/Adaptive.Archiver/Adaptive.Archiver.csproj
@@ -23,4 +23,24 @@
+
+
+ all
+ runtime; build; native; contentfiles; analyzers
+
+
+ all
+
+
+ all
+ false
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Adaptive.Archiver/AeronArchive.cs b/src/Adaptive.Archiver/AeronArchive.cs
index cf6f4d7..b532657 100644
--- a/src/Adaptive.Archiver/AeronArchive.cs
+++ b/src/Adaptive.Archiver/AeronArchive.cs
@@ -186,7 +186,7 @@ public void Dispose()
rethrow = true;
if (null != resultEx)
{
- //resultEx.AddSuppressed(ex);
+ resultEx.AddSuppressed(ex);
}
else
{
@@ -2492,6 +2492,16 @@ public class Configuration
///
public static readonly long MESSAGE_TIMEOUT_DEFAULT_NS = 10_000_000_000;
+ ///
+ /// The number of retry attempts to be made when offering messages to the archive.
+ ///
+ public const string MESSAGE_RETRY_ATTEMPTS_PROP_NAME = "aeron.archive.message.retry.attempts";
+
+ ///
+ /// Default number of retry attempts to be made when offering messages to the archive.
+ ///
+ public const int MESSAGE_RETRY_ATTEMPTS_DEFAULT = 3;
+
///
/// Channel for sending control messages to an archive.
///
@@ -2641,6 +2651,16 @@ public static long MessageTimeoutNs()
return Config.GetDurationInNanos(MESSAGE_TIMEOUT_PROP_NAME, MESSAGE_TIMEOUT_DEFAULT_NS);
}
+ ///
+ /// The number of retry attempts to be made when offering messages to the archive.
+ ///
+ /// the number of retry attempts.
+ ///
+ public static int MessageRetryAttempts()
+ {
+ return Config.GetInteger(MESSAGE_RETRY_ATTEMPTS_PROP_NAME, MESSAGE_RETRY_ATTEMPTS_DEFAULT);
+ }
+
///
/// Should term buffer files be sparse for control request and response streams.
///
@@ -2790,6 +2810,7 @@ public class Context
private int _isConcluded = 0;
internal long messageTimeoutNs = Configuration.MessageTimeoutNs();
+ internal int messageRetryAttempts = Configuration.MessageRetryAttempts();
internal String clientName = AeronArchive.Configuration.ClientName();
internal string recordingEventsChannel = Configuration.RecordingEventsChannel();
internal int recordingEventsStreamId = Configuration.RecordingEventsStreamId();
@@ -2841,6 +2862,12 @@ public void Conclude()
throw new ConfigurationException("AeronArchive.Context.clientName length must be <= " + Aeron.Aeron.Configuration.MAX_CLIENT_NAME_LENGTH);
}
+ if (messageRetryAttempts <= 0)
+ {
+ throw new ConfigurationException(
+ "AeronArchive.Context.messageRetryAttempts must be > 0, got: " + messageRetryAttempts);
+ }
+
if (null == aeron)
{
aeron = Aeron.Aeron.Connect(
@@ -2912,6 +2939,28 @@ public long MessageTimeoutNs()
return messageTimeoutNs;
}
+ ///
+ /// Set the number of retry attempts to be made when offering messages to the archive.
+ ///
+ /// the number of retry attempts.
+ /// this for a fluent API.
+ ///
+ public Context MessageRetryAttempts(int messageRetryAttempts)
+ {
+ this.messageRetryAttempts = messageRetryAttempts;
+ return this;
+ }
+
+ ///
+ /// The number of retry attempts to be made when offering messages to the archive.
+ ///
+ /// the number of retry attempts.
+ ///
+ public int MessageRetryAttempts()
+ {
+ return messageRetryAttempts;
+ }
+
///
/// Get the channel URI on which the recording events publication will publish.
///
@@ -3392,7 +3441,7 @@ private ChannelUri ApplyDefaultParams(string channel)
if (!channelUri.ContainsKey(SPARSE_PARAM_NAME))
{
- channelUri.Put(SPARSE_PARAM_NAME, Convert.ToString(controlTermBufferSparse));
+ channelUri.Put(SPARSE_PARAM_NAME, controlTermBufferSparse ? "true" : "false");
}
return channelUri;
@@ -3410,44 +3459,44 @@ public class AsyncConnect : IDisposable
public enum AsyncConnectState
{
///
- /// Initial state of adding a publication for control request channel.
+ /// Initial state of awaiting an asynchronous subscription for the control response channel.
///
- ADD_PUBLICATION = 0,
+ AWAIT_SUBSCRIPTION = 0,
+
+ ///
+ /// Add publication for control request channel.
+ ///
+ ADD_PUBLICATION = 1,
///
/// Await publication being added.
///
- AWAIT_PUBLICATION_CONNECTED = 1,
+ AWAIT_PUBLICATION_CONNECTED = 2,
///
/// Sending connect request to the Archive.
///
- SEND_CONNECT_REQUEST = 2,
+ SEND_CONNECT_REQUEST = 3,
///
/// Await response subscription connected.
///
- AWAIT_SUBSCRIPTION_CONNECTED = 3,
+ AWAIT_SUBSCRIPTION_CONNECTED = 4,
///
/// Await connect response.
///
- AWAIT_CONNECT_RESPONSE = 4,
+ AWAIT_CONNECT_RESPONSE = 5,
///
/// Send archive-id request.
///
- SEND_ARCHIVE_ID_REQUEST = 5,
+ SEND_ARCHIVE_ID_REQUEST = 6,
///
/// Await response for the archive-id request.
///
- AWAIT_ARCHIVE_ID_RESPONSE = 6,
-
- ///
- /// Archive connection established.
- ///
- DONE = 7,
+ AWAIT_ARCHIVE_ID_RESPONSE = 7,
///
/// Sending a challenge response.
@@ -3457,18 +3506,24 @@ public enum AsyncConnectState
///
/// Await challenge response.
///
- AWAIT_CHALLENGE_RESPONSE = 9
+ AWAIT_CHALLENGE_RESPONSE = 9,
+
+ ///
+ /// Archive connection established.
+ ///
+ DONE = 10
}
internal static readonly int PROTOCOL_VERSION_WITH_ARCHIVE_ID = SemanticVersion.Compose(1, 11, 0);
private readonly Context ctx;
- private readonly ControlResponsePoller controlResponsePoller;
private readonly long deadlineNs;
+ private ControlResponsePoller controlResponsePoller;
+ private long subscriptionRegistrationId = Aeron.Aeron.NULL_VALUE;
private long publicationRegistrationId = Aeron.Aeron.NULL_VALUE;
private long correlationId = Aeron.Aeron.NULL_VALUE;
private long controlSessionId = Aeron.Aeron.NULL_VALUE;
private byte[] encodedCredentialsFromChallenge = null;
- private AsyncConnectState state = AsyncConnectState.ADD_PUBLICATION;
+ private AsyncConnectState state = AsyncConnectState.AWAIT_SUBSCRIPTION;
private ArchiveProxy archiveProxy;
private AeronArchive aeronArchive;
@@ -3480,26 +3535,25 @@ internal AsyncConnect(Context ctx)
Aeron.Aeron aeron = ctx.AeronClient();
- controlResponsePoller = new ControlResponsePoller(aeron.AddSubscription(
- ctx.ControlResponseChannel(), ctx.ControlResponseStreamId(), null, (image) =>
+ subscriptionRegistrationId = aeron.AsyncAddSubscription(
+ ctx.ControlResponseChannel(),
+ ctx.ControlResponseStreamId(),
+ null,
+ (image) =>
{
AeronArchive client = aeronArchive;
if (null != client)
{
client.State(ArchiveState.DISCONNECTED);
}
- }));
+ });
- CheckAndSetupResponseChannel(ctx, controlResponsePoller.Subscription());
-
- publicationRegistrationId =
- aeron.AsyncAddExclusivePublication(ctx.ControlRequestChannel(), ctx.ControlRequestStreamId());
deadlineNs = aeron.Ctx.NanoClock().NanoTime() + ctx.MessageTimeoutNs();
}
- catch (Exception ex)
+ catch (Exception)
{
Dispose();
- throw ex;
+ throw;
}
}
@@ -3525,8 +3579,11 @@ public void Dispose()
{
if (null != controlResponsePoller)
{
- IErrorHandler errorHandler = ctx.ErrorHandler();
- CloseHelper.Dispose(errorHandler, controlResponsePoller.Subscription());
+ CloseHelper.Dispose(ctx.ErrorHandler(), controlResponsePoller.Subscription());
+ }
+ else if (Aeron.Aeron.NULL_VALUE != subscriptionRegistrationId)
+ {
+ ctx.AeronClient().AsyncRemoveSubscription(subscriptionRegistrationId);
}
if (null != archiveProxy)
@@ -3578,23 +3635,34 @@ public AeronArchive Poll()
{
CheckDeadline();
+ if (AsyncConnectState.AWAIT_SUBSCRIPTION == state)
+ {
+ AwaitSubscription();
+ }
+
if (AsyncConnectState.ADD_PUBLICATION == state)
{
- ExclusivePublication publication =
- ctx.AeronClient().GetExclusivePublication(publicationRegistrationId);
+ Aeron.Aeron aeron = ctx.AeronClient();
+ if (Aeron.Aeron.NULL_VALUE == publicationRegistrationId)
+ {
+ publicationRegistrationId = aeron.AsyncAddExclusivePublication(
+ ctx.ControlRequestChannel(), ctx.ControlRequestStreamId());
+ }
+
+ ExclusivePublication publication = aeron.GetExclusivePublication(publicationRegistrationId);
if (null != publication)
{
string clientInfo = "name=" + ctx.ClientName();
// + " " + AeronCounters.formatVersionInfo(AeronArchiveVersion.VERSION, AeronArchiveVersion.GIT_SHA);
- publicationRegistrationId = Aeron.Aeron.NULL_VALUE;
archiveProxy = new ArchiveProxy(
publication,
ctx.IdleStrategy(),
- ctx.AeronClient().Ctx.NanoClock(),
+ aeron.Ctx.NanoClock(),
ctx.MessageTimeoutNs(),
- ArchiveProxy.DEFAULT_RETRY_ATTEMPTS,
+ ctx.MessageRetryAttempts(),
ctx.CredentialsSupplier(),
clientInfo);
+ publicationRegistrationId = Aeron.Aeron.NULL_VALUE;
State(AsyncConnectState.AWAIT_PUBLICATION_CONNECTED);
}
@@ -3731,14 +3799,29 @@ private void State(AsyncConnectState newState)
state = newState;
}
+ private void AwaitSubscription()
+ {
+ Subscription subscription = ctx.AeronClient().GetSubscription(subscriptionRegistrationId);
+ if (null != subscription)
+ {
+ CheckAndSetupResponseChannel(ctx, subscription);
+ controlResponsePoller = new ControlResponsePoller(subscription);
+ subscriptionRegistrationId = Aeron.Aeron.NULL_VALUE;
+ State(AsyncConnectState.ADD_PUBLICATION);
+ }
+ }
+
private void CheckDeadline()
{
if (deadlineNs - ctx.AeronClient().Ctx.NanoClock().NanoTime() < 0)
{
+ object publication = null != archiveProxy ? (object)archiveProxy.Pub() : ctx.ControlRequestChannel();
+ object subscription = null != controlResponsePoller
+ ? (object)controlResponsePoller.Subscription()
+ : ctx.ControlResponseChannel();
throw new TimeoutException("Archive connect timeout: step=" + state +
- ((int)state < 3
- ? " publication.uri=" + ctx.ControlRequestChannel()
- : " subscription.uri=" + ctx.ControlResponseChannel()));
+ " publication=" + publication +
+ " subscription=" + subscription);
}
try
@@ -3767,31 +3850,28 @@ private AeronArchive TransitionToDone(long archiveId)
}
}
- static Exception QuietClose(Exception previousException, IDisposable disposable)
+ public static Exception QuietClose(Exception previousException, IDisposable disposable)
{
- Exception resultException = previousException;
+ if (disposable == null)
+ {
+ return previousException;
+ }
- if (disposable != null)
+ try
{
- try
- {
- disposable.Dispose();
- }
- catch (Exception ex)
+ disposable.Dispose();
+ return previousException;
+ }
+ catch (Exception ex)
+ {
+ if (previousException == null)
{
- if (resultException != null)
- {
- // No built-in suppression — so wrap both exceptions
- resultException = new AggregateException(resultException, ex);
- }
- else
- {
- resultException = ex;
- }
+ return ex;
}
- }
- return resultException;
+ previousException.AddSuppressed(ex);
+ return previousException;
+ }
}
private static void CheckAndSetupResponseChannel(Context ctx, Subscription subscription)
diff --git a/src/Adaptive.Archiver/ArchiveException.cs b/src/Adaptive.Archiver/ArchiveException.cs
index 10e12b4..6a5a033 100644
--- a/src/Adaptive.Archiver/ArchiveException.cs
+++ b/src/Adaptive.Archiver/ArchiveException.cs
@@ -170,5 +170,33 @@ public ArchiveException(string message, int errorCode, long correlationId, Categ
ErrorCode = errorCode;
CorrelationId = correlationId;
}
+
+ ///
+ /// Get the human-readable name for an error code.
+ ///
+ /// to lookup.
+ /// the name of the error code, or "unknown error code: {errorCode}" if unrecognised.
+ public static string ErrorCodeAsString(int errorCode)
+ {
+ switch (errorCode)
+ {
+ case GENERIC: return "GENERIC";
+ case ACTIVE_LISTING: return "ACTIVE_LISTING";
+ case ACTIVE_RECORDING: return "ACTIVE_RECORDING";
+ case ACTIVE_SUBSCRIPTION: return "ACTIVE_SUBSCRIPTION";
+ case UNKNOWN_SUBSCRIPTION: return "UNKNOWN_SUBSCRIPTION";
+ case UNKNOWN_RECORDING: return "UNKNOWN_RECORDING";
+ case UNKNOWN_REPLAY: return "UNKNOWN_REPLAY";
+ case MAX_REPLAYS: return "MAX_REPLAYS";
+ case MAX_RECORDINGS: return "MAX_RECORDINGS";
+ case INVALID_EXTENSION: return "INVALID_EXTENSION";
+ case AUTHENTICATION_REJECTED: return "AUTHENTICATION_REJECTED";
+ case STORAGE_SPACE: return "STORAGE_SPACE";
+ case UNKNOWN_REPLICATION: return "UNKNOWN_REPLICATION";
+ case UNAUTHORISED_ACTION: return "UNAUTHORISED_ACTION";
+ case REPLICATION_CONNECTION_FAILURE: return "REPLICATION_CONNECTION_FAILURE";
+ default: return "unknown error code: " + errorCode;
+ }
+ }
}
}
\ No newline at end of file
diff --git a/src/Adaptive.Archiver/FodyWeavers.xml b/src/Adaptive.Archiver/FodyWeavers.xml
new file mode 100644
index 0000000..5415e27
--- /dev/null
+++ b/src/Adaptive.Archiver/FodyWeavers.xml
@@ -0,0 +1,5 @@
+
+
+
+
+
diff --git a/src/Adaptive.Archiver/FodyWeavers.xsd b/src/Adaptive.Archiver/FodyWeavers.xsd
new file mode 100644
index 0000000..42ece42
--- /dev/null
+++ b/src/Adaptive.Archiver/FodyWeavers.xsd
@@ -0,0 +1,27 @@
+
+
+
+
+
+
+
+
+
+
+
+ 'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.
+
+
+
+
+ A comma-separated list of error codes that can be safely ignored in assembly verification.
+
+
+
+
+ 'false' to turn off automatic generation of the XML Schema file.
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Adaptive.Cluster.Tests/Adaptive.Cluster.Tests.csproj b/src/Adaptive.Cluster.Tests/Adaptive.Cluster.Tests.csproj
new file mode 100644
index 0000000..d3dad11
--- /dev/null
+++ b/src/Adaptive.Cluster.Tests/Adaptive.Cluster.Tests.csproj
@@ -0,0 +1,21 @@
+
+
+
+ net8.0
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Adaptive.Cluster.Tests/Client/AeronClusterAsyncConnectTest.cs b/src/Adaptive.Cluster.Tests/Client/AeronClusterAsyncConnectTest.cs
new file mode 100644
index 0000000..f2b2ae1
--- /dev/null
+++ b/src/Adaptive.Cluster.Tests/Client/AeronClusterAsyncConnectTest.cs
@@ -0,0 +1,361 @@
+using System;
+using Adaptive.Aeron;
+using Adaptive.Aeron.LogBuffer;
+using Adaptive.Aeron.Protocol;
+using Adaptive.Aeron.Security;
+using Adaptive.Agrona;
+using Adaptive.Agrona.Concurrent;
+using Adaptive.Cluster.Client;
+using Adaptive.Cluster.Codecs;
+using FakeItEasy;
+using NUnit.Framework;
+using AeronType = Adaptive.Aeron.Aeron;
+using AsyncConnectState = Adaptive.Cluster.Client.AeronCluster.AsyncConnect.AsyncConnectState;
+
+namespace Adaptive.Cluster.Tests.Client
+{
+ public class AeronClusterAsyncConnectTest
+ {
+ private const long ONE_HOUR_IN_NANOS = 3_600_000_000_000L;
+
+ private class TrackingContext : AeronCluster.Context
+ {
+ public int DisposeCount { get; private set; }
+
+ public override void Dispose()
+ {
+ DisposeCount++;
+ base.Dispose();
+ }
+ }
+
+ private AeronType aeron;
+ private AeronType.Context aeronContext;
+ private TrackingContext context;
+
+ [SetUp]
+ public void Before()
+ {
+ aeron = A.Fake();
+
+ aeronContext = new AeronType.Context().NanoClock(SystemNanoClock.INSTANCE);
+
+ A.CallTo(() => aeron.Ctx).Returns(aeronContext);
+
+ context = new TrackingContext();
+ context
+ .AeronClient(aeron)
+ .OwnsAeronClient(false)
+ .EgressChannel("aeron:udp?endpoint=localhost:0")
+ .EgressStreamId(42)
+ .IngressChannel("aeron:udp?endpoint=replace-me:5555")
+ .IngressStreamId(-19)
+ .CredentialsSupplier(new NullCredentialsSupplier())
+ .IdleStrategy(new NoOpIdleStrategy());
+ }
+
+ [Test]
+ public void InitialState()
+ {
+ var asyncConnect = new AeronCluster.AsyncConnect(context, 1L);
+ Assert.AreEqual(AsyncConnectState.CREATE_EGRESS_SUBSCRIPTION, asyncConnect.State());
+ Assert.AreEqual((int)AsyncConnectState.CREATE_EGRESS_SUBSCRIPTION, asyncConnect.Step());
+ }
+
+ [Test]
+ public void ShouldCloseAsyncSubscription()
+ {
+ const long subscriptionId = 999L;
+ A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId()))
+ .Returns(subscriptionId);
+ A.CallTo(() => aeron.GetSubscription(subscriptionId)).Returns((Subscription)null);
+
+ var asyncConnect = new AeronCluster.AsyncConnect(
+ context, aeronContext.NanoClock().NanoTime() + ONE_HOUR_IN_NANOS);
+
+ Assert.IsNull(asyncConnect.Poll());
+ Assert.AreEqual(AsyncConnectState.CREATE_EGRESS_SUBSCRIPTION, asyncConnect.State());
+
+ asyncConnect.Dispose();
+
+ A.CallTo(() => aeron.Ctx).MustHaveHappened().Then(
+ A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId()))
+ .MustHaveHappened()).Then(
+ A.CallTo(() => aeron.GetSubscription(subscriptionId)).MustHaveHappened()).Then(
+ A.CallTo(() => aeron.AsyncRemoveSubscription(subscriptionId)).MustHaveHappened());
+ Assert.AreEqual(1, context.DisposeCount);
+ }
+
+ [Test]
+ public void ShouldCloseEgressSubscription()
+ {
+ const long subscriptionId = -4343L;
+ A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId()))
+ .Returns(subscriptionId);
+ var subscription = A.Fake();
+ A.CallTo(() => aeron.GetSubscription(subscriptionId)).Returns(subscription);
+
+ var asyncConnect = new AeronCluster.AsyncConnect(
+ context, aeronContext.NanoClock().NanoTime() + ONE_HOUR_IN_NANOS);
+
+ Assert.IsNull(asyncConnect.Poll());
+ Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State());
+
+ asyncConnect.Dispose();
+ A.CallTo(() => subscription.Dispose()).MustHaveHappened();
+ Assert.AreEqual(1, context.DisposeCount);
+ A.CallTo(() => aeron.AsyncRemoveSubscription(subscriptionId)).MustNotHaveHappened();
+ }
+
+ [Test]
+ public void ShouldCloseAsyncPublication()
+ {
+ const long subscriptionId = 87L;
+ A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId()))
+ .Returns(subscriptionId);
+ var subscription = A.Fake();
+ A.CallTo(() => aeron.GetSubscription(subscriptionId)).Returns(subscription);
+
+ context.IsIngressExclusive(true);
+ const long publicationId = long.MaxValue;
+ A.CallTo(() => aeron.AsyncAddExclusivePublication(context.IngressChannel(), context.IngressStreamId()))
+ .Returns(publicationId);
+ A.CallTo(() => aeron.GetExclusivePublication(publicationId)).Returns((ExclusivePublication)null);
+
+ var asyncConnect = new AeronCluster.AsyncConnect(
+ context, aeronContext.NanoClock().NanoTime() + ONE_HOUR_IN_NANOS);
+
+ Assert.IsNull(asyncConnect.Poll());
+ Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State());
+
+ Assert.IsNull(asyncConnect.Poll());
+ Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State());
+
+ asyncConnect.Dispose();
+
+ A.CallTo(() => aeron.Ctx).MustHaveHappened().Then(
+ A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId()))
+ .MustHaveHappened()).Then(
+ A.CallTo(() => aeron.GetSubscription(subscriptionId)).MustHaveHappened()).Then(
+ A.CallTo(() => aeron.AsyncAddExclusivePublication(context.IngressChannel(), context.IngressStreamId()))
+ .MustHaveHappened()).Then(
+ A.CallTo(() => aeron.GetExclusivePublication(publicationId)).MustHaveHappened()).Then(
+ A.CallTo(() => aeron.AsyncRemovePublication(publicationId)).MustHaveHappened()).Then(
+ A.CallTo(() => subscription.Dispose()).MustHaveHappened());
+ Assert.AreEqual(1, context.DisposeCount);
+ }
+
+ [Test]
+ public void ShouldCloseIngressPublicationsOnMembers()
+ {
+ const long subscriptionId = 42L;
+ A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId()))
+ .Returns(subscriptionId);
+ var subscription = A.Fake();
+ A.CallTo(() => aeron.GetSubscription(subscriptionId)).Returns(subscription);
+
+ const int ingressStreamId = 878;
+ context
+ .IsIngressExclusive(true)
+ .IngressEndpoints("0=localhost:20000,1=localhost:20001,2=localhost:20002")
+ .IngressStreamId(ingressStreamId);
+ const long publicationId1 = -6342756432L;
+ var publication1 = A.Fake();
+ A.CallTo(() => aeron.AsyncAddExclusivePublication("aeron:udp?endpoint=localhost:20000", ingressStreamId))
+ .Returns(publicationId1);
+ A.CallTo(() => aeron.GetExclusivePublication(publicationId1))
+ .ReturnsNextFromSequence((ExclusivePublication)null, publication1);
+ const long publicationId2 = AeronType.NULL_VALUE;
+ A.CallTo(() => aeron.AsyncAddExclusivePublication("aeron:udp?endpoint=localhost:20001", ingressStreamId))
+ .Returns(publicationId2);
+ A.CallTo(() => aeron.GetExclusivePublication(publicationId2)).Returns((ExclusivePublication)null);
+ const long publicationId3 = 573495L;
+ A.CallTo(() => aeron.AsyncAddExclusivePublication("aeron:udp?endpoint=localhost:20002", ingressStreamId))
+ .Returns(publicationId3);
+ A.CallTo(() => aeron.GetExclusivePublication(publicationId3)).Returns((ExclusivePublication)null);
+
+ var asyncConnect = new AeronCluster.AsyncConnect(
+ context, aeronContext.NanoClock().NanoTime() + ONE_HOUR_IN_NANOS);
+
+ Assert.IsNull(asyncConnect.Poll());
+ Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State());
+
+ const int iterations = 10;
+ for (int i = 0; i < iterations; i++)
+ {
+ Assert.IsNull(asyncConnect.Poll());
+ Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State());
+ }
+
+ A.CallTo(() => aeron.AsyncAddExclusivePublication("aeron:udp?endpoint=localhost:20000", ingressStreamId))
+ .MustHaveHappenedANumberOfTimesMatching(n => n <= 1);
+ A.CallTo(() => aeron.AsyncAddExclusivePublication("aeron:udp?endpoint=localhost:20001", ingressStreamId))
+ .MustHaveHappened(iterations, Times.Exactly);
+ A.CallTo(() => aeron.AsyncAddExclusivePublication("aeron:udp?endpoint=localhost:20002", ingressStreamId))
+ .MustHaveHappenedANumberOfTimesMatching(n => n <= 1);
+ A.CallTo(() => aeron.GetExclusivePublication(publicationId1))
+ .MustHaveHappened(2, Times.Exactly);
+ A.CallTo(() => aeron.GetExclusivePublication(publicationId2))
+ .MustHaveHappened(iterations, Times.Exactly);
+ A.CallTo(() => aeron.GetExclusivePublication(publicationId3))
+ .MustHaveHappened(iterations, Times.Exactly);
+
+ asyncConnect.Dispose();
+
+ A.CallTo(() => subscription.Dispose()).MustHaveHappened();
+ A.CallTo(subscription).MustHaveHappenedOnceExactly();
+ A.CallTo(() => publication1.Dispose()).MustHaveHappened();
+ Assert.AreEqual(1, context.DisposeCount);
+ A.CallTo(() => aeron.AsyncRemovePublication(publicationId3))
+ .MustHaveHappenedANumberOfTimesMatching(n => n <= 1);
+ A.CallTo(() => aeron.AsyncRemoveSubscription(subscriptionId)).MustNotHaveHappened();
+ A.CallTo(() => aeron.AsyncRemovePublication(publicationId1)).MustNotHaveHappened();
+ A.CallTo(() => aeron.AsyncRemovePublication(publicationId2)).MustNotHaveHappened();
+ }
+
+ [Test]
+ public void ShouldCloseIngressPublication()
+ {
+ const long subscriptionId = 42L;
+ A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId()))
+ .Returns(subscriptionId);
+ var subscription = A.Fake();
+ A.CallTo(() => aeron.GetSubscription(subscriptionId)).Returns(subscription);
+
+ context.IsIngressExclusive(false);
+ const long publicationId = -6342756432L;
+ A.CallTo(() => aeron.AsyncAddPublication(context.IngressChannel(), context.IngressStreamId()))
+ .Returns(publicationId);
+ var publication = A.Fake();
+ A.CallTo(() => aeron.GetPublication(publicationId)).Returns(publication);
+
+ var asyncConnect = new AeronCluster.AsyncConnect(
+ context, aeronContext.NanoClock().NanoTime() + ONE_HOUR_IN_NANOS);
+
+ Assert.IsNull(asyncConnect.Poll());
+ Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State());
+
+ Assert.IsNull(asyncConnect.Poll());
+ Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State());
+
+ Assert.IsNull(asyncConnect.Poll());
+ Assert.AreEqual(AsyncConnectState.AWAIT_PUBLICATION_CONNECTED, asyncConnect.State());
+
+ asyncConnect.Dispose();
+ A.CallTo(() => publication.Dispose()).MustHaveHappened();
+ A.CallTo(() => subscription.Dispose()).MustHaveHappened();
+ Assert.AreEqual(1, context.DisposeCount);
+ A.CallTo(() => aeron.AsyncRemovePublication(publicationId)).MustNotHaveHappened();
+ A.CallTo(() => aeron.AsyncRemoveSubscription(subscriptionId)).MustNotHaveHappened();
+ }
+
+ [Test]
+ public void ShouldConnectViaIngressChannel()
+ {
+ const long subscriptionId = 42L;
+ long correlationIdSeq = 0;
+ A.CallTo(() => aeron.NextCorrelationId()).ReturnsLazily(() => ++correlationIdSeq);
+ A.CallTo(() => aeron.AsyncAddSubscription(context.EgressChannel(), context.EgressStreamId()))
+ .Returns(subscriptionId);
+ var subscription = A.Fake();
+ A.CallTo(() => aeron.GetSubscription(subscriptionId)).Returns(subscription);
+
+ context.IsIngressExclusive(false);
+ const long publicationId = -19L;
+ A.CallTo(() => aeron.AsyncAddPublication(context.IngressChannel(), context.IngressStreamId()))
+ .Returns(publicationId);
+ var publication = A.Fake();
+ A.CallTo(() => aeron.GetPublication(publicationId)).Returns(publication);
+
+ var asyncConnect = new AeronCluster.AsyncConnect(
+ context, aeronContext.NanoClock().NanoTime() + ONE_HOUR_IN_NANOS);
+
+ Assert.IsNull(asyncConnect.Poll());
+ Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State());
+
+ Assert.IsNull(asyncConnect.Poll());
+ Assert.AreEqual(AsyncConnectState.CREATE_INGRESS_PUBLICATIONS, asyncConnect.State());
+
+ Assert.IsNull(asyncConnect.Poll());
+ Assert.AreEqual(AsyncConnectState.AWAIT_PUBLICATION_CONNECTED, asyncConnect.State());
+
+ const string responseChannel = "aeron:udp?endpoint=localhost:8888";
+ A.CallTo(() => subscription.TryResolveChannelEndpointPort()).Returns(responseChannel);
+ A.CallTo(() => publication.IsConnected).Returns(true);
+
+ Assert.IsNull(asyncConnect.Poll());
+ Assert.AreEqual(AsyncConnectState.SEND_MESSAGE, asyncConnect.State());
+ long sendMessageCorrelationId = correlationIdSeq;
+
+ A.CallTo(() => publication.Offer(A._, 0, A._, A._)).Returns(8L);
+
+ Assert.IsNull(asyncConnect.Poll());
+ Assert.AreEqual(AsyncConnectState.POLL_RESPONSE, asyncConnect.State());
+
+ var responseBuffer = new UnsafeBuffer(new byte[256]);
+ var headerEncoder = new MessageHeaderEncoder();
+ var sessionEventEncoder = new SessionEventEncoder();
+ const long clusterSessionId = 888L;
+ const long leadershipTermId = 5L;
+ const int leaderMemberId = 2;
+ sessionEventEncoder
+ .WrapAndApplyHeader(responseBuffer, DataHeaderFlyweight.HEADER_LENGTH, headerEncoder)
+ .ClusterSessionId(clusterSessionId)
+ .CorrelationId(sendMessageCorrelationId)
+ .LeadershipTermId(leadershipTermId)
+ .LeaderMemberId(leaderMemberId)
+ .Code(EventCode.OK)
+ .Version(AeronCluster.Configuration.PROTOCOL_SEMANTIC_VERSION)
+ .LeaderHeartbeatTimeoutNs(SessionEventEncoder.LeaderHeartbeatTimeoutNsNullValue())
+ .Detail("you are now connected");
+ var egressImage = A.Fake();
+ var header = new Header(1, 16, egressImage);
+ header.Buffer = responseBuffer;
+ header.Offset = 0;
+ var headerFlyweight = new DataHeaderFlyweight();
+ headerFlyweight.Wrap(responseBuffer, 0, DataHeaderFlyweight.HEADER_LENGTH);
+ headerFlyweight.Flags(DataHeaderFlyweight.BEGIN_AND_END_FLAGS);
+ A.CallTo(() => subscription.ControlledPoll(A._, A._))
+ .ReturnsLazily(call =>
+ {
+ var assembler = call.GetArgument(0);
+ assembler.OnFragment(
+ responseBuffer,
+ DataHeaderFlyweight.HEADER_LENGTH,
+ sessionEventEncoder.EncodedLength(),
+ header);
+ return 1;
+ });
+
+ Assert.IsNull(asyncConnect.Poll());
+ Assert.AreEqual(AsyncConnectState.CONCLUDE_CONNECT, asyncConnect.State());
+
+ var aeronCluster = asyncConnect.Poll();
+ Assert.IsNotNull(aeronCluster);
+ Assert.AreEqual(leadershipTermId, aeronCluster.LeadershipTermId);
+ Assert.AreEqual(leaderMemberId, aeronCluster.LeaderMemberId);
+ Assert.AreEqual(clusterSessionId, aeronCluster.ClusterSessionId);
+ Assert.AreEqual(
+ 2 * AeronCluster.Configuration.LEADER_HEARTBEAT_TIMEOUT_DEFAULT_NS,
+ context.NewLeaderTimeoutNs());
+
+ asyncConnect.Dispose();
+ A.CallTo(() => publication.Dispose()).MustNotHaveHappened();
+ A.CallTo(() => subscription.Dispose()).MustNotHaveHappened();
+
+ A.CallTo(() => publication.TryClaim(A._, A._)).ReturnsLazily(call =>
+ {
+ int length = call.GetArgument(0);
+ var bufferClaim = call.GetArgument(1);
+ bufferClaim.Wrap(responseBuffer, 0, BitUtil.Align(DataHeaderFlyweight.HEADER_LENGTH + length, DataHeaderFlyweight.HEADER_LENGTH));
+ return 42L;
+ });
+ aeronCluster.Dispose();
+ Assert.IsTrue(aeronCluster.Closed);
+ A.CallTo(() => publication.TryClaim(A._, A._)).MustHaveHappened().Then(
+ A.CallTo(() => subscription.Dispose()).MustHaveHappened()).Then(
+ A.CallTo(() => publication.Dispose()).MustHaveHappened());
+ Assert.AreEqual(1, context.DisposeCount);
+ }
+ }
+}
diff --git a/src/Adaptive.Cluster.Tests/Client/AeronClusterContextTest.cs b/src/Adaptive.Cluster.Tests/Client/AeronClusterContextTest.cs
new file mode 100644
index 0000000..21477ef
--- /dev/null
+++ b/src/Adaptive.Cluster.Tests/Client/AeronClusterContextTest.cs
@@ -0,0 +1,101 @@
+using Adaptive.Aeron;
+using Adaptive.Aeron.Exceptions;
+using Adaptive.Cluster.Client;
+using FakeItEasy;
+using NUnit.Framework;
+using AeronType = Adaptive.Aeron.Aeron;
+
+namespace Adaptive.Cluster.Tests.Client
+{
+ public class AeronClusterContextTest
+ {
+ private AeronType aeron;
+ private AeronCluster.Context context;
+
+ [SetUp]
+ public void Before()
+ {
+ aeron = A.Fake();
+
+ context = new AeronCluster.Context()
+ .AeronClient(aeron)
+ .IngressChannel("aeron:udp")
+ .EgressChannel("aeron:udp?endpoint=localhost:0");
+ }
+
+ [TestCase(null)]
+ [TestCase("")]
+ public void ConcludeThrowsConfigurationExceptionIfIngressChannelIsNotSet(string ingressChannel)
+ {
+ context.IngressChannel(ingressChannel);
+
+ var exception = Assert.Throws(() => context.Conclude());
+ Assert.AreEqual("ingressChannel must be specified", exception.Message);
+ }
+
+ [Test]
+ public void ConcludeThrowsConfigurationExceptionIfIngressChannelIsSetToIpcAndIngressEndpointsSpecified()
+ {
+ context
+ .IngressChannel("aeron:ipc")
+ .IngressEndpoints("0,localhost:1234");
+
+ var exception = Assert.Throws(() => context.Conclude());
+ Assert.AreEqual(
+ "AeronCluster.Context ingressEndpoints must be null when using IPC ingress",
+ exception.Message);
+ }
+
+ [TestCase(null)]
+ [TestCase("")]
+ public void ConcludeThrowsConfigurationExceptionIfEgressChannelIsNotSet(string egressChannel)
+ {
+ context.EgressChannel(egressChannel);
+
+ var exception = Assert.Throws(() => context.Conclude());
+ Assert.AreEqual("egressChannel must be specified", exception.Message);
+ }
+
+ [TestCase(null)]
+ [TestCase("")]
+ public void ClientNameShouldHandleEmptyValue(string clientName)
+ {
+ context.ClientName(clientName);
+ Assert.AreEqual("", context.ClientName());
+ }
+
+ [TestCase("test")]
+ [TestCase("Some other name")]
+ public void ClientNameShouldReturnAssignedValue(string clientName)
+ {
+ context.ClientName(clientName);
+ Assert.AreEqual(clientName, context.ClientName());
+ }
+
+ [TestCase("some")]
+ [TestCase("42")]
+ public void ClientNameCanBeSetViaSystemProperty(string clientName)
+ {
+ Config.Params[AeronCluster.Configuration.CLIENT_NAME_PROP_NAME] = clientName;
+ try
+ {
+ Assert.AreEqual(clientName, new AeronCluster.Context().ClientName());
+ }
+ finally
+ {
+ Config.Params.Remove(AeronCluster.Configuration.CLIENT_NAME_PROP_NAME);
+ }
+ }
+
+ [Test]
+ public void ClientNameMustNotExceedMaxLength()
+ {
+ context.ClientName("test" + new string('x', AeronType.Configuration.MAX_CLIENT_NAME_LENGTH));
+
+ var exception = Assert.Throws(() => context.Conclude());
+ Assert.AreEqual(
+ "AeronCluster.Context.clientName length must be <= " + AeronType.Configuration.MAX_CLIENT_NAME_LENGTH,
+ exception.Message);
+ }
+ }
+}
diff --git a/src/Adaptive.Cluster.Tests/Client/AeronClusterTest.cs b/src/Adaptive.Cluster.Tests/Client/AeronClusterTest.cs
new file mode 100644
index 0000000..00c1032
--- /dev/null
+++ b/src/Adaptive.Cluster.Tests/Client/AeronClusterTest.cs
@@ -0,0 +1,293 @@
+using System;
+using System.Collections.Generic;
+using Adaptive.Aeron;
+using Adaptive.Aeron.LogBuffer;
+using Adaptive.Aeron.Protocol;
+using Adaptive.Agrona;
+using Adaptive.Agrona.Collections;
+using Adaptive.Agrona.Concurrent;
+using Adaptive.Cluster.Client;
+using Adaptive.Cluster.Codecs;
+using FakeItEasy;
+using NUnit.Framework;
+using AeronType = Adaptive.Aeron.Aeron;
+
+namespace Adaptive.Cluster.Tests.Client
+{
+ public class AeronClusterTest
+ {
+ private const string INGRESS_ENDPOINTS = "foo:1000,bar:1000,baz:1000";
+ private const int CLUSTER_SESSION_ID = 123;
+
+ private UnsafeBuffer buffer;
+ private UnsafeBuffer appMessage;
+ private IEgressListener egressListener;
+ private AeronType aeron;
+ private AeronType.Context aeronContext;
+ private AeronCluster.Context context;
+ private ExclusivePublication ingressPublication;
+ private Subscription egressSubscription;
+ private Image egressImage;
+ private AeronCluster aeronCluster;
+ private long nanoTime;
+ private int leadershipTermId;
+ private int leaderMemberId;
+ private bool newLeaderEventPending;
+
+ [SetUp]
+ public void SetUp()
+ {
+ nanoTime = 0;
+ leadershipTermId = 2;
+ leaderMemberId = 1;
+ newLeaderEventPending = false;
+
+ buffer = new UnsafeBuffer(new byte[1024]);
+ appMessage = new UnsafeBuffer(new byte[8]);
+ egressListener = A.Fake();
+ aeron = A.Fake();
+
+ var nanoClock = A.Fake();
+ A.CallTo(() => nanoClock.NanoTime()).ReturnsLazily(() => nanoTime);
+
+ aeronContext = new AeronType.Context();
+ aeronContext.NanoClock(nanoClock);
+ aeronContext.SubscriberErrorHandler(RethrowingErrorHandler.INSTANCE);
+
+ A.CallTo(() => aeron.Ctx).Returns(aeronContext);
+
+ context = new AeronCluster.Context()
+ .AeronClient(aeron)
+ .OwnsAeronClient(false)
+ .EgressChannel("aeron:udp?endpoint=localhost:0")
+ .IngressChannel("aeron:udp")
+ .IdleStrategy(new NoOpIdleStrategy())
+ .EgressListener(egressListener)
+ .NewLeaderTimeoutNs(TimeSpan.FromSeconds(1).Ticks * 100); // 1 second in nanos
+
+ ingressPublication = A.Fake();
+ egressSubscription = A.Fake();
+ egressImage = A.Fake();
+
+ const long ingressPublicationRegistrationId = 42L;
+ A.CallTo(() => ingressPublication.RegistrationId).Returns(ingressPublicationRegistrationId);
+ A.CallTo(() => aeron.AsyncAddExclusivePublication(context.IngressChannel(), context.IngressStreamId()))
+ .Returns(ingressPublicationRegistrationId);
+ A.CallTo(() => aeron.GetExclusivePublication(ingressPublicationRegistrationId))
+ .Returns(ingressPublication);
+
+ context.Conclude();
+
+ A.CallTo(() => egressSubscription.Poll(A._, A._)).ReturnsLazily(call =>
+ {
+ if (newLeaderEventPending)
+ {
+ newLeaderEventPending = false;
+
+ int offset = DataHeaderFlyweight.HEADER_LENGTH;
+ FrameDescriptor.FrameFlags(buffer, 0, FrameDescriptor.UNFRAGMENTED);
+
+ var newLeaderEventEncoder = new NewLeaderEventEncoder();
+ newLeaderEventEncoder.WrapAndApplyHeader(buffer, offset, new MessageHeaderEncoder());
+ newLeaderEventEncoder.ClusterSessionId(CLUSTER_SESSION_ID);
+ newLeaderEventEncoder.LeadershipTermId(++leadershipTermId);
+ newLeaderEventEncoder.LeaderMemberId(++leaderMemberId);
+ newLeaderEventEncoder.IngressEndpoints(INGRESS_ENDPOINTS);
+
+ int length = MessageHeaderEncoder.ENCODED_LENGTH + newLeaderEventEncoder.EncodedLength();
+
+ var header = new Header(0, 0, egressImage);
+ header.Buffer = buffer;
+
+ var handler = call.GetArgument(0);
+ handler.OnFragment(buffer, offset, length, header);
+
+ return 1;
+ }
+
+ return 0;
+ });
+
+ aeronCluster = new AeronCluster(
+ context,
+ new MessageHeaderEncoder(),
+ ingressPublication,
+ egressSubscription,
+ egressImage,
+ new Map(),
+ CLUSTER_SESSION_ID,
+ leadershipTermId,
+ leaderMemberId);
+ }
+
+ [TestCase(false)]
+ [TestCase(true)]
+ public void ShouldCloseItselfWhenDisconnectedForLongerThanNewLeaderTimeout(bool withAppMessages)
+ {
+ MakeIngressPublicationReturn(Publication.NOT_CONNECTED);
+ if (withAppMessages)
+ {
+ Assert.AreEqual(Publication.NOT_CONNECTED, aeronCluster.Offer(appMessage, 0, 8));
+ }
+ else
+ {
+ Assert.IsFalse(aeronCluster.SendKeepAlive());
+ }
+
+ nanoTime += context.NewLeaderTimeoutNs() - 1;
+
+ Assert.AreEqual(0, aeronCluster.PollEgress());
+ Assert.IsFalse(aeronCluster.Closed);
+
+ nanoTime += 1;
+
+ Assert.AreEqual(1, aeronCluster.PollEgress());
+ Assert.IsTrue(aeronCluster.Closed);
+ }
+
+ [Test]
+ public void ShouldCloseItselfAfterReachingMaxPositionOnTheIngressPublication()
+ {
+ MakeIngressPublicationReturn(Publication.MAX_POSITION_EXCEEDED);
+ Assert.AreEqual(Publication.MAX_POSITION_EXCEEDED, aeronCluster.Offer(appMessage, 0, 8));
+ A.CallTo(() => ingressPublication.Dispose()).MustHaveHappened();
+ Assert.AreEqual(1, aeronCluster.PollStateChanges());
+ Assert.IsTrue(aeronCluster.Closed);
+ }
+
+ public static IEnumerable ShouldStayConnectedAfterSuccessfulFailoverCases()
+ {
+ yield return new TestCaseData(false, false);
+ yield return new TestCaseData(false, true);
+ yield return new TestCaseData(true, false);
+ yield return new TestCaseData(true, true);
+ }
+
+ [TestCaseSource(nameof(ShouldStayConnectedAfterSuccessfulFailoverCases))]
+ public void ShouldStayConnectedAfterSuccessfulFailover(bool withIngressDisconnect, bool withAppMessages)
+ {
+ long initialResult = withIngressDisconnect ? Publication.NOT_CONNECTED : 128L;
+ MakeIngressPublicationReturn(initialResult);
+ if (withAppMessages)
+ {
+ Assert.AreEqual(initialResult, aeronCluster.Offer(appMessage, 0, 8));
+ }
+ else
+ {
+ Assert.AreEqual(!withIngressDisconnect, aeronCluster.SendKeepAlive());
+ }
+
+ nanoTime += context.NewLeaderTimeoutNs() - 1;
+
+ MakeEgressSubscriptionDeliverNewLeaderEvent();
+ Assert.AreEqual(1, aeronCluster.PollEgress());
+ A.CallTo(() => egressListener.OnNewLeader(CLUSTER_SESSION_ID, leadershipTermId, leaderMemberId, INGRESS_ENDPOINTS))
+ .MustHaveHappened();
+ Assert.AreEqual(0, aeronCluster.PollEgress());
+
+ nanoTime += context.MessageTimeoutNs() - 1;
+
+ MakeIngressPublicationReturn(256L);
+ if (withAppMessages)
+ {
+ Assert.AreEqual(256L, aeronCluster.Offer(appMessage, 0, 8));
+ }
+ else
+ {
+ Assert.IsTrue(aeronCluster.SendKeepAlive());
+ }
+
+ nanoTime += 1;
+
+ Assert.AreEqual(0, aeronCluster.PollEgress());
+ Assert.IsFalse(aeronCluster.Closed);
+ }
+
+ [TestCase(false)]
+ [TestCase(true)]
+ public void ShouldCloseItselfWhenUnableToSendMessageForLongerThanNewLeaderConnectionTimeout(bool withAppMessages)
+ {
+ MakeIngressPublicationReturn(Publication.NOT_CONNECTED);
+ if (withAppMessages)
+ {
+ Assert.AreEqual(Publication.NOT_CONNECTED, aeronCluster.Offer(appMessage, 0, 8));
+ }
+ else
+ {
+ Assert.IsFalse(aeronCluster.SendKeepAlive());
+ }
+
+ nanoTime += context.NewLeaderTimeoutNs() / 2;
+
+ MakeEgressSubscriptionDeliverNewLeaderEvent();
+
+ Assert.AreEqual(1, aeronCluster.PollEgress());
+ Assert.IsFalse(aeronCluster.Closed);
+
+ nanoTime += context.MessageTimeoutNs() - 1;
+ if (withAppMessages)
+ {
+ Assert.AreEqual(Publication.NOT_CONNECTED, aeronCluster.Offer(appMessage, 0, 8));
+ }
+ else
+ {
+ Assert.IsFalse(aeronCluster.SendKeepAlive());
+ }
+
+ nanoTime += 1;
+
+ Assert.AreEqual(1, aeronCluster.PollEgress());
+ Assert.IsTrue(aeronCluster.Closed);
+ }
+
+ [Test]
+ public void ShouldCloseIngressPublicationWhenEgressImageCloses()
+ {
+ // in CONNECTED state
+ A.CallTo(() => egressImage.Closed).Returns(true);
+ Assert.AreEqual(1, aeronCluster.PollEgress());
+ A.CallTo(() => ingressPublication.Dispose()).MustHaveHappened();
+
+ A.CallTo(() => egressImage.Closed).Returns(false);
+ MakeEgressSubscriptionDeliverNewLeaderEvent();
+ Assert.AreEqual(1, aeronCluster.PollEgress());
+ A.CallTo(() => ingressPublication.Dispose()).MustHaveHappenedTwiceOrMore();
+
+ // and in AWAIT_NEW_LEADER_CONNECTION state too
+ A.CallTo(() => egressImage.Closed).Returns(true);
+ Assert.AreEqual(1, aeronCluster.PollEgress());
+ A.CallTo(() => ingressPublication.Dispose())
+ .MustHaveHappened(3, Times.OrMore);
+ }
+
+ private void MakeIngressPublicationReturn(long result)
+ {
+ if (result > 0)
+ {
+ A.CallTo(() => ingressPublication.TryClaim(A._, A._))
+ .ReturnsLazily(call =>
+ {
+ int length = call.GetArgument(0);
+ length = BitUtil.Align(DataHeaderFlyweight.HEADER_LENGTH + length, FrameDescriptor.FRAME_ALIGNMENT);
+ var bufferClaim = call.GetArgument(1);
+ bufferClaim.Wrap(buffer, 0, length);
+ return result;
+ });
+ }
+ else
+ {
+ A.CallTo(() => ingressPublication.TryClaim(A._, A._)).Returns(result);
+ }
+
+ A.CallTo(() => ingressPublication.Offer(
+ A._, A._, A._,
+ A._, A._, A._,
+ A._)).Returns(result);
+ }
+
+ private void MakeEgressSubscriptionDeliverNewLeaderEvent()
+ {
+ newLeaderEventPending = true;
+ }
+ }
+}
diff --git a/src/Adaptive.Cluster.Tests/Client/EgressAdapterTest.cs b/src/Adaptive.Cluster.Tests/Client/EgressAdapterTest.cs
new file mode 100644
index 0000000..7b7a862
--- /dev/null
+++ b/src/Adaptive.Cluster.Tests/Client/EgressAdapterTest.cs
@@ -0,0 +1,303 @@
+using Adaptive.Aeron;
+using Adaptive.Aeron.LogBuffer;
+using Adaptive.Agrona;
+using Adaptive.Agrona.Concurrent;
+using Adaptive.Cluster.Client;
+using Adaptive.Cluster.Codecs;
+using FakeItEasy;
+using NUnit.Framework;
+
+namespace Adaptive.Cluster.Tests.Client
+{
+ public class EgressAdapterTest
+ {
+ private UnsafeBuffer buffer;
+ private MessageHeaderEncoder messageHeaderEncoder;
+ private SessionMessageHeaderEncoder sessionMessageHeaderEncoder;
+ private SessionEventEncoder sessionEventEncoder;
+ private NewLeaderEventEncoder newLeaderEventEncoder;
+ private AdminResponseEncoder adminResponseEncoder;
+
+ [SetUp]
+ public void SetUp()
+ {
+ buffer = new UnsafeBuffer(new byte[512]);
+ messageHeaderEncoder = new MessageHeaderEncoder();
+ sessionMessageHeaderEncoder = new SessionMessageHeaderEncoder();
+ sessionEventEncoder = new SessionEventEncoder();
+ newLeaderEventEncoder = new NewLeaderEventEncoder();
+ adminResponseEncoder = new AdminResponseEncoder();
+ }
+
+ [Test]
+ public void OnFragmentShouldDelegateToEgressListenerOnUnknownSchemaId()
+ {
+ const ushort schemaId = 17;
+ const ushort templateId = 19;
+ messageHeaderEncoder
+ .Wrap(buffer, 0)
+ .SchemaId(schemaId)
+ .TemplateId(templateId);
+
+ var listenerExtension = A.Fake();
+ var header = new Header(0, 0);
+ var adapter = new EgressAdapter(
+ A.Fake(), listenerExtension, 0, A.Fake(), 3);
+
+ adapter.OnFragment(buffer, 0, MessageHeaderDecoder.ENCODED_LENGTH * 2, header);
+
+ A.CallTo(() => listenerExtension.OnExtensionMessage(
+ A._,
+ templateId,
+ schemaId,
+ 0,
+ buffer,
+ MessageHeaderDecoder.ENCODED_LENGTH,
+ MessageHeaderDecoder.ENCODED_LENGTH))
+ .MustHaveHappenedOnceExactly();
+ A.CallTo(listenerExtension).MustHaveHappenedOnceExactly();
+ }
+
+ [Test]
+ public void DefaultEgressListenerBehaviourShouldThrowClusterExceptionOnUnknownSchemaId()
+ {
+ var listener = A.Fake();
+ var adapter = new EgressAdapter(listener, 42, A.Fake(), 5);
+ var exception = Assert.Throws(
+ () => adapter.OnFragment(buffer, 0, 64, new Header(0, 0)));
+ Assert.AreEqual(
+ "expected schemaId=" + MessageHeaderDecoder.SCHEMA_ID + ", actual=0",
+ exception.Message);
+ }
+
+ [Test]
+ public void OnFragmentShouldInvokeOnMessageCallbackIfSessionIdMatches()
+ {
+ const int offset = 4;
+ const long sessionId = 2973438724L;
+ const long timestamp = -46328746238764832L;
+ sessionMessageHeaderEncoder
+ .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder)
+ .ClusterSessionId(sessionId)
+ .Timestamp(timestamp);
+
+ var egressListener = A.Fake();
+ var header = new Header(0, 0);
+ var adapter = new EgressAdapter(egressListener, sessionId, A.Fake(), 3);
+
+ adapter.OnFragment(buffer, offset, sessionMessageHeaderEncoder.EncodedLength(), header);
+
+ A.CallTo(() => egressListener.OnMessage(
+ sessionId,
+ timestamp,
+ buffer,
+ offset + AeronCluster.SESSION_HEADER_LENGTH,
+ sessionMessageHeaderEncoder.EncodedLength() - AeronCluster.SESSION_HEADER_LENGTH,
+ header))
+ .MustHaveHappenedOnceExactly();
+ A.CallTo(egressListener).MustHaveHappenedOnceExactly();
+ }
+
+ [Test]
+ public void OnFragmentIsANoOpIfSessionIdDoesNotMatchOnSessionMessage()
+ {
+ const int offset = 18;
+ const long sessionId = 21;
+ const long timestamp = 1000;
+ sessionMessageHeaderEncoder
+ .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder)
+ .ClusterSessionId(sessionId)
+ .Timestamp(timestamp);
+
+ var egressListener = A.Fake();
+ var header = new Header(0, 0);
+ var adapter = new EgressAdapter(egressListener, -19, A.Fake(), 3);
+
+ adapter.OnFragment(buffer, offset, sessionMessageHeaderEncoder.EncodedLength(), header);
+
+ A.CallTo(egressListener).MustNotHaveHappened();
+ }
+
+ [Test]
+ public void OnFragmentShouldInvokeOnSessionEventCallbackIfSessionIdMatches()
+ {
+ const int offset = 8;
+ const long clusterSessionId = 42;
+ const long correlationId = 777;
+ const long leadershipTermId = 6;
+ const int leaderMemberId = 3;
+ const EventCode eventCode = EventCode.REDIRECT;
+ const int version = 18;
+ const string eventDetail = "Event details";
+ sessionEventEncoder
+ .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder)
+ .ClusterSessionId(clusterSessionId)
+ .CorrelationId(correlationId)
+ .LeadershipTermId(leadershipTermId)
+ .LeaderMemberId(leaderMemberId)
+ .Code(eventCode)
+ .Version(version)
+ .Detail(eventDetail);
+
+ var egressListener = A.Fake();
+ var header = new Header(1, 3);
+ var adapter = new EgressAdapter(egressListener, clusterSessionId, A.Fake(), 10);
+
+ adapter.OnFragment(buffer, offset, sessionEventEncoder.EncodedLength(), header);
+
+ A.CallTo(() => egressListener.OnSessionEvent(
+ correlationId, clusterSessionId, leadershipTermId, leaderMemberId, eventCode, eventDetail))
+ .MustHaveHappenedOnceExactly();
+ A.CallTo(egressListener).MustHaveHappenedOnceExactly();
+ }
+
+ [Test]
+ public void OnFragmentIsANoOpIfSessionIdDoesNotMatchOnSessionEvent()
+ {
+ const int offset = 8;
+ const long clusterSessionId = 42;
+ const long correlationId = 777;
+ const long leadershipTermId = 6;
+ const int leaderMemberId = 3;
+ const EventCode eventCode = EventCode.REDIRECT;
+ const int version = 18;
+ const string eventDetail = "Event details";
+ sessionEventEncoder
+ .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder)
+ .ClusterSessionId(clusterSessionId)
+ .CorrelationId(correlationId)
+ .LeadershipTermId(leadershipTermId)
+ .LeaderMemberId(leaderMemberId)
+ .Code(eventCode)
+ .Version(version)
+ .Detail(eventDetail);
+
+ var egressListener = A.Fake();
+ var header = new Header(0, 0);
+ var adapter = new EgressAdapter(
+ egressListener, clusterSessionId + 1, A.Fake(), 3);
+
+ adapter.OnFragment(buffer, offset, sessionEventEncoder.EncodedLength(), header);
+
+ A.CallTo(egressListener).MustNotHaveHappened();
+ }
+
+ [Test]
+ public void OnFragmentShouldInvokeOnNewLeaderCallbackIfSessionIdMatches()
+ {
+ const int offset = 0;
+ const long clusterSessionId = 0;
+ const long leadershipTermId = 6;
+ const int leaderMemberId = 9999;
+ const string ingressEndpoints = "ingress endpoints ...";
+ newLeaderEventEncoder
+ .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder)
+ .LeadershipTermId(leadershipTermId)
+ .ClusterSessionId(clusterSessionId)
+ .LeaderMemberId(leaderMemberId)
+ .IngressEndpoints(ingressEndpoints);
+
+ var egressListener = A.Fake();
+ var header = new Header(1, 3);
+ var adapter = new EgressAdapter(egressListener, clusterSessionId, A.Fake(), 10);
+
+ adapter.OnFragment(buffer, offset, newLeaderEventEncoder.EncodedLength(), header);
+
+ A.CallTo(() => egressListener.OnNewLeader(
+ clusterSessionId, leadershipTermId, leaderMemberId, ingressEndpoints))
+ .MustHaveHappenedOnceExactly();
+ A.CallTo(egressListener).MustHaveHappenedOnceExactly();
+ }
+
+ [Test]
+ public void OnFragmentIsANoOpIfSessionIdDoesNotMatchOnNewLeader()
+ {
+ const int offset = 0;
+ const long clusterSessionId = -100;
+ const long leadershipTermId = 6;
+ const int leaderMemberId = 9999;
+ const string ingressEndpoints = "ingress endpoints ...";
+ newLeaderEventEncoder
+ .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder)
+ .LeadershipTermId(leadershipTermId)
+ .ClusterSessionId(clusterSessionId)
+ .LeaderMemberId(leaderMemberId)
+ .IngressEndpoints(ingressEndpoints);
+
+ var egressListener = A.Fake();
+ var header = new Header(1, 3);
+ var adapter = new EgressAdapter(egressListener, 0, A.Fake(), 10);
+
+ adapter.OnFragment(buffer, offset, newLeaderEventEncoder.EncodedLength(), header);
+
+ A.CallTo(egressListener).MustNotHaveHappened();
+ }
+
+ [Test]
+ public void OnFragmentShouldInvokeOnAdminResponseCallbackIfSessionIdMatches()
+ {
+ const int offset = 24;
+ const long clusterSessionId = 18;
+ const long correlationId = 3274239749237498239L;
+ const AdminRequestType type = AdminRequestType.SNAPSHOT;
+ const AdminResponseCode responseCode = AdminResponseCode.UNAUTHORISED_ACCESS;
+ const string message = "Unauthorised access detected!";
+ byte[] payload = { 0x1, 0x2, 0x3 };
+ adminResponseEncoder
+ .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder)
+ .ClusterSessionId(clusterSessionId)
+ .CorrelationId(correlationId)
+ .RequestType(type)
+ .ResponseCode(responseCode)
+ .Message(message);
+ adminResponseEncoder.PutPayload(payload, 0, payload.Length);
+
+ var egressListener = A.Fake();
+ var header = new Header(1, 3);
+ var adapter = new EgressAdapter(egressListener, clusterSessionId, A.Fake(), 10);
+
+ adapter.OnFragment(buffer, offset, adminResponseEncoder.EncodedLength(), header);
+
+ A.CallTo(() => egressListener.OnAdminResponse(
+ clusterSessionId,
+ correlationId,
+ type,
+ responseCode,
+ message,
+ buffer,
+ offset + MessageHeaderEncoder.ENCODED_LENGTH + adminResponseEncoder.EncodedLength() - payload.Length,
+ payload.Length))
+ .MustHaveHappenedOnceExactly();
+ A.CallTo(egressListener).MustHaveHappenedOnceExactly();
+ }
+
+ [Test]
+ public void OnFragmentIsANoOpIfSessionIdDoesNotMatchOnAdminResponse()
+ {
+ const int offset = 24;
+ const long clusterSessionId = 18;
+ const long correlationId = 3274239749237498239L;
+ const AdminRequestType type = AdminRequestType.SNAPSHOT;
+ const AdminResponseCode responseCode = AdminResponseCode.OK;
+ const string message = "Unauthorised access detected!";
+ byte[] payload = { 0x1, 0x2, 0x3 };
+ adminResponseEncoder
+ .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder)
+ .ClusterSessionId(clusterSessionId)
+ .CorrelationId(correlationId)
+ .RequestType(type)
+ .ResponseCode(responseCode)
+ .Message(message);
+ adminResponseEncoder.PutPayload(payload, 0, payload.Length);
+
+ var egressListener = A.Fake();
+ var header = new Header(1, 3);
+ var adapter = new EgressAdapter(
+ egressListener, -clusterSessionId, A.Fake(), 10);
+
+ adapter.OnFragment(buffer, offset, adminResponseEncoder.EncodedLength(), header);
+
+ A.CallTo(egressListener).MustNotHaveHappened();
+ }
+ }
+}
diff --git a/src/Adaptive.Cluster.Tests/Client/EgressPollerTest.cs b/src/Adaptive.Cluster.Tests/Client/EgressPollerTest.cs
new file mode 100644
index 0000000..4ab0b0f
--- /dev/null
+++ b/src/Adaptive.Cluster.Tests/Client/EgressPollerTest.cs
@@ -0,0 +1,82 @@
+using Adaptive.Aeron;
+using Adaptive.Aeron.LogBuffer;
+using Adaptive.Agrona.Concurrent;
+using Adaptive.Archiver.Codecs;
+using Adaptive.Cluster.Client;
+using Adaptive.Cluster.Codecs;
+using FakeItEasy;
+using NUnit.Framework;
+using ArchiveMessageHeaderEncoder = Adaptive.Archiver.Codecs.MessageHeaderEncoder;
+using ClusterMessageHeaderEncoder = Adaptive.Cluster.Codecs.MessageHeaderEncoder;
+
+namespace Adaptive.Cluster.Tests.Client
+{
+ public class EgressPollerTest
+ {
+ private readonly UnsafeBuffer buffer = new UnsafeBuffer(new byte[1024]);
+ private readonly Header header = new Header(42, 16);
+ private readonly Subscription subscription = A.Fake();
+ private EgressPoller egressPoller;
+
+ [SetUp]
+ public void SetUp()
+ {
+ egressPoller = new EgressPoller(subscription, 10);
+ }
+
+ [Test]
+ public void ShouldIgnoreUnknownMessageSchema()
+ {
+ const int offset = 64;
+ var controlResponseEncoder = new ControlResponseEncoder();
+ var messageHeaderEncoder = new ArchiveMessageHeaderEncoder();
+ controlResponseEncoder
+ .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder)
+ .CorrelationId(42)
+ .Code(ControlResponseCode.ERROR)
+ .ErrorMessage("test");
+
+ Assert.AreEqual(
+ ControlledFragmentHandlerAction.CONTINUE,
+ egressPoller.OnFragment(
+ buffer,
+ offset,
+ messageHeaderEncoder.EncodedLength() + controlResponseEncoder.EncodedLength(),
+ header));
+ Assert.IsFalse(egressPoller.IsPollComplete());
+ }
+
+ [Test]
+ public void ShouldHandleSessionMessage()
+ {
+ const int offset = 16;
+ var encoder = new SessionMessageHeaderEncoder();
+ var messageHeaderEncoder = new ClusterMessageHeaderEncoder();
+ const long clusterSessionId = 7777L;
+ const long leadershipTermId = 5L;
+ encoder
+ .WrapAndApplyHeader(buffer, offset, messageHeaderEncoder)
+ .ClusterSessionId(clusterSessionId)
+ .LeadershipTermId(leadershipTermId);
+
+ Assert.AreEqual(
+ ControlledFragmentHandlerAction.BREAK,
+ egressPoller.OnFragment(
+ buffer,
+ offset,
+ messageHeaderEncoder.EncodedLength() + encoder.EncodedLength(),
+ header));
+ Assert.IsTrue(egressPoller.IsPollComplete());
+ Assert.AreEqual(clusterSessionId, egressPoller.ClusterSessionId());
+ Assert.AreEqual(leadershipTermId, egressPoller.LeadershipTermId());
+
+ Assert.AreEqual(
+ ControlledFragmentHandlerAction.ABORT,
+ egressPoller.OnFragment(
+ buffer,
+ offset,
+ messageHeaderEncoder.EncodedLength() + encoder.EncodedLength(),
+ header));
+ }
+ }
+}
diff --git a/src/Adaptive.Cluster/Adaptive.Cluster.csproj b/src/Adaptive.Cluster/Adaptive.Cluster.csproj
index f2a3a1f..a3f686f 100644
--- a/src/Adaptive.Cluster/Adaptive.Cluster.csproj
+++ b/src/Adaptive.Cluster/Adaptive.Cluster.csproj
@@ -21,4 +21,24 @@
+
+
+ all
+ runtime; build; native; contentfiles; analyzers
+
+
+ all
+
+
+ all
+ false
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Adaptive.Cluster/Client/AeronCluster.cs b/src/Adaptive.Cluster/Client/AeronCluster.cs
index 5b06ccd..e07abf6 100644
--- a/src/Adaptive.Cluster/Client/AeronCluster.cs
+++ b/src/Adaptive.Cluster/Client/AeronCluster.cs
@@ -956,7 +956,7 @@ public void OnNewLeader(long clusterSessionId, long leadershipTermId, int leader
}
else
{
- _publication = AddIngressPublication(_ctx, _ctx.IngressChannel(), _ctx.IngressStreamId());
+ _publication = AddNewLeaderIngressPublication(_ctx, _ctx.IngressChannel(), _ctx.IngressStreamId());
}
_fragmentAssembler.Clear();
@@ -1029,6 +1029,27 @@ internal static Publication AddIngressPublication(Context ctx, string channel, i
}
}
+ private Publication AddNewLeaderIngressPublication(Context ctx, string channel, int streamId)
+ {
+ long registrationId = AsyncAddIngressPublication(ctx, channel, streamId);
+ long deadlineNs = nanoClock.NanoTime() + ctx.MessageTimeoutNs();
+ do
+ {
+ Publication publication = GetIngressPublication(ctx, registrationId);
+ if (null != publication)
+ {
+ return publication;
+ }
+ _idleStrategy.Idle(ctx.RunAgentInvokers());
+ }
+ while (nanoClock.NanoTime() < deadlineNs);
+
+ throw new AeronTimeoutException(
+ "failed to add new leader ingress publication (leaderMemberId=" + _leaderMemberId +
+ ", leadershipTermId=" + _leadershipTermId + ", channel=" + channel + ", streamId=" + streamId +
+ ") within " + ctx.MessageTimeoutNs() + "ns");
+ }
+
internal static long AsyncAddIngressPublication(Context ctx, string channel, int streamId)
{
if (ctx.IsIngressExclusive())
@@ -1969,6 +1990,27 @@ public AgentInvoker AgentInvoker()
return agentInvoker;
}
+ internal int RunAgentInvokers()
+ {
+ int workDone = 0;
+
+ if (null != _aeron)
+ {
+ AgentInvoker conductorAgentInvoker = _aeron.ConductorAgentInvoker;
+ if (null != conductorAgentInvoker)
+ {
+ workDone += conductorAgentInvoker.Invoke();
+ }
+ }
+
+ if (null != agentInvoker)
+ {
+ workDone += agentInvoker.Invoke();
+ }
+
+ return workDone;
+ }
+
///
/// Close the context and free applicable resources.
///
@@ -2257,18 +2299,25 @@ private void CreateIngressPublications()
{
if (null == ctx.IngressEndpoints())
{
- if (NULL_VALUE == ingressRegistrationId)
- {
- ingressRegistrationId =
- AsyncAddIngressPublication(ctx, ctx.IngressChannel(), ctx.IngressStreamId());
- }
-
if (null == ingressPublication)
{
- ingressPublication = GetIngressPublication(ctx, ingressRegistrationId);
- }
+ if (NULL_VALUE == ingressRegistrationId)
+ {
+ ingressRegistrationId =
+ AsyncAddIngressPublication(ctx, ctx.IngressChannel(), ctx.IngressStreamId());
+ }
- if (null != ingressPublication)
+ try
+ {
+ ingressPublication = GetIngressPublication(ctx, ingressRegistrationId);
+ }
+ catch (RegistrationException)
+ {
+ ingressRegistrationId = NULL_VALUE;
+ throw;
+ }
+ }
+ else
{
ingressRegistrationId = NULL_VALUE;
State(AWAIT_PUBLICATION_CONNECTED);
diff --git a/src/Adaptive.Cluster/Client/EgressAdapter.cs b/src/Adaptive.Cluster/Client/EgressAdapter.cs
index e751fc8..779bb52 100644
--- a/src/Adaptive.Cluster/Client/EgressAdapter.cs
+++ b/src/Adaptive.Cluster/Client/EgressAdapter.cs
@@ -158,7 +158,7 @@ public void OnFragment(IDirectBuffer buffer, int offset, int length, Header head
{
_listener.OnNewLeader(
sessionId,
- _sessionEventDecoder.LeadershipTermId(),
+ _newLeaderEventDecoder.LeadershipTermId(),
_newLeaderEventDecoder.LeaderMemberId(),
_newLeaderEventDecoder.IngressEndpoints());
}
diff --git a/src/Adaptive.Cluster/Client/EgressPoller.cs b/src/Adaptive.Cluster/Client/EgressPoller.cs
index 35981df..e547c94 100644
--- a/src/Adaptive.Cluster/Client/EgressPoller.cs
+++ b/src/Adaptive.Cluster/Client/EgressPoller.cs
@@ -211,8 +211,7 @@ public ControlledFragmentHandlerAction OnFragment(IDirectBuffer buffer, int offs
int schemaId = messageHeaderDecoder.SchemaId();
if (schemaId != MessageHeaderDecoder.SCHEMA_ID)
{
- throw new ClusterException("expected schemaId=" + MessageHeaderDecoder.SCHEMA_ID + ", actual=" +
- schemaId);
+ return CONTINUE;
}
templateId = messageHeaderDecoder.TemplateId();
diff --git a/src/Adaptive.Cluster/FodyWeavers.xml b/src/Adaptive.Cluster/FodyWeavers.xml
new file mode 100644
index 0000000..5415e27
--- /dev/null
+++ b/src/Adaptive.Cluster/FodyWeavers.xml
@@ -0,0 +1,5 @@
+
+
+
+
+
diff --git a/src/Adaptive.Cluster/FodyWeavers.xsd b/src/Adaptive.Cluster/FodyWeavers.xsd
new file mode 100644
index 0000000..42ece42
--- /dev/null
+++ b/src/Adaptive.Cluster/FodyWeavers.xsd
@@ -0,0 +1,27 @@
+
+
+
+
+
+
+
+
+
+
+
+ 'true' to run assembly verification (PEVerify) on the target assembly after all weavers have been executed.
+
+
+
+
+ A comma-separated list of error codes that can be safely ignored in assembly verification.
+
+
+
+
+ 'false' to turn off automatic generation of the XML Schema file.
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Weavers/Unsealer.Fody/ModuleWeaver.cs b/src/Weavers/Unsealer.Fody/ModuleWeaver.cs
new file mode 100644
index 0000000..0207f8c
--- /dev/null
+++ b/src/Weavers/Unsealer.Fody/ModuleWeaver.cs
@@ -0,0 +1,80 @@
+using System.Collections.Generic;
+using System.Linq;
+using Fody;
+using Mono.Cecil;
+
+namespace Unsealer.Fody
+{
+ public class ModuleWeaver : BaseModuleWeaver
+ {
+ public override void Execute()
+ {
+ int unsealed = 0;
+
+ foreach (var type in AllTypes(ModuleDefinition.Types))
+ {
+ // Class reference types only: skip structs (IsValueType), interfaces,
+ // enums, and the implicit `` type.
+ if (!type.IsClass) continue;
+ if (type.IsValueType) continue;
+ if (type.IsInterface) continue;
+ if (!type.IsSealed) continue;
+
+ // Skip compiler-generated types (lambda closures, anonymous types,
+ // async state machines, etc.) — these are sealed for legitimate
+ // reasons and shouldn't be subclassed by tests anyway.
+ if (HasCompilerGeneratedAttribute(type)) continue;
+ if (type.Name.Contains('<')) continue; // safety net for nested compiler-gen names
+
+ // Delegate types MUST remain sealed — the CLR enforces this and
+ // assembly-loading throws TypeLoadException otherwise.
+ if (IsDelegate(type)) continue;
+
+ // C# static classes are emitted as `abstract sealed` in IL.
+ // Removing `sealed` would change their meaning (no longer static).
+ if (type.IsAbstract) continue;
+
+ type.IsSealed = false;
+ unsealed++;
+ }
+
+ WriteMessage($"Unsealer.Fody: unsealed {unsealed} class types.", MessageImportance.Low);
+ }
+
+ private static bool HasCompilerGeneratedAttribute(TypeDefinition type)
+ {
+ return type.HasCustomAttributes &&
+ type.CustomAttributes.Any(a => a.AttributeType.FullName ==
+ "System.Runtime.CompilerServices.CompilerGeneratedAttribute");
+ }
+
+ private static bool IsDelegate(TypeDefinition type)
+ {
+ var baseTypeName = type.BaseType?.FullName;
+ return baseTypeName == "System.MulticastDelegate" || baseTypeName == "System.Delegate";
+ }
+
+ public override IEnumerable GetAssembliesForScanning()
+ {
+ yield return "netstandard";
+ yield return "mscorlib";
+ }
+
+ private static IEnumerable AllTypes(IEnumerable types)
+ {
+ foreach (var type in types)
+ {
+ yield return type;
+ if (type.HasNestedTypes)
+ {
+ foreach (var nested in AllTypes(type.NestedTypes))
+ {
+ yield return nested;
+ }
+ }
+ }
+ }
+
+ public override bool ShouldCleanReference => true;
+ }
+}
diff --git a/src/Weavers/Unsealer.Fody/Unsealer.Fody.csproj b/src/Weavers/Unsealer.Fody/Unsealer.Fody.csproj
new file mode 100644
index 0000000..bfe52c5
--- /dev/null
+++ b/src/Weavers/Unsealer.Fody/Unsealer.Fody.csproj
@@ -0,0 +1,16 @@
+
+
+ netstandard2.0
+ Unsealer.Fody
+ Unsealer.Fody
+ false
+ true
+ Fody addin (Debug-only) that strips the `sealed` modifier from class types
+in the assembly being woven, so test mocking frameworks (FakeItEasy, Moq, NSubstitute)
+that rely on Castle DynamicProxy can subclass them. Mirrors what Virtuosity.Fody does
+for `virtual`. Skips value types and compiler-generated types.
+
+
+
+
+