From 6cdc787fe75d5ad2696a91f411c5005a02b428b1 Mon Sep 17 00:00:00 2001 From: 00raq00 <00raq00@gmail.com> Date: Wed, 19 Mar 2025 09:49:43 +0100 Subject: [PATCH 1/5] .net8.0 --- Demos/FeedDemo/FeedDemo.csproj | 2 +- .../QueryResponseDemo.csproj | 2 +- .../SerializationDemo.csproj | 2 +- Demos/SubscriberDemo/SubscriberDemo.csproj | 2 +- kx.Benchmark.Test/kx.Benchmark.Test.csproj | 2 +- kx.Test/TestUtils/TestSerialisationHelper.cs | 60 +++++++++---------- kx.Test/kx.Test.csproj | 3 +- kx/kx.csproj | 2 +- 8 files changed, 38 insertions(+), 37 deletions(-) diff --git a/Demos/FeedDemo/FeedDemo.csproj b/Demos/FeedDemo/FeedDemo.csproj index c9f7094..691f166 100644 --- a/Demos/FeedDemo/FeedDemo.csproj +++ b/Demos/FeedDemo/FeedDemo.csproj @@ -2,7 +2,7 @@ Exe - netcoreapp3.1 + net8.0 diff --git a/Demos/QueryResponseDemo/QueryResponseDemo.csproj b/Demos/QueryResponseDemo/QueryResponseDemo.csproj index c1388b1..158e112 100644 --- a/Demos/QueryResponseDemo/QueryResponseDemo.csproj +++ b/Demos/QueryResponseDemo/QueryResponseDemo.csproj @@ -2,7 +2,7 @@ Exe - netcoreapp3.1 + net8.0 diff --git a/Demos/SerializationDemo/SerializationDemo.csproj b/Demos/SerializationDemo/SerializationDemo.csproj index c1388b1..158e112 100644 --- a/Demos/SerializationDemo/SerializationDemo.csproj +++ b/Demos/SerializationDemo/SerializationDemo.csproj @@ -2,7 +2,7 @@ Exe - netcoreapp3.1 + net8.0 diff --git a/Demos/SubscriberDemo/SubscriberDemo.csproj b/Demos/SubscriberDemo/SubscriberDemo.csproj index c1388b1..158e112 100644 --- a/Demos/SubscriberDemo/SubscriberDemo.csproj +++ b/Demos/SubscriberDemo/SubscriberDemo.csproj @@ -2,7 +2,7 @@ Exe - netcoreapp3.1 + net8.0 diff --git a/kx.Benchmark.Test/kx.Benchmark.Test.csproj b/kx.Benchmark.Test/kx.Benchmark.Test.csproj index 8a8c45d..d7ae79c 100644 --- a/kx.Benchmark.Test/kx.Benchmark.Test.csproj +++ b/kx.Benchmark.Test/kx.Benchmark.Test.csproj @@ -2,7 +2,7 @@ Exe - netcoreapp3.1 + net8 kx.Benchmark.Test.Program true kx.Benchmark.Test.snk diff --git a/kx.Test/TestUtils/TestSerialisationHelper.cs b/kx.Test/TestUtils/TestSerialisationHelper.cs index 0d3f039..b8708ed 100644 --- a/kx.Test/TestUtils/TestSerialisationHelper.cs +++ b/kx.Test/TestUtils/TestSerialisationHelper.cs @@ -1,42 +1,42 @@ -using System; +using MessagePack; +using System; using System.IO; using System.Runtime.Serialization.Formatters.Binary; +using System.Text.Json; namespace kx.Test.TestUtils { + /// + /// A helper class for testing serialisation and de-serialisation logic + /// in unit-tests + /// + internal static class TestSerialisationHelper + { /// - /// A helper class for testing serialisation and de-serialisation logic - /// in unit-tests + /// Performs binary serialisation and de-serialisation on a specified exception. /// - internal static class TestSerialisationHelper + /// The type of exception being tested. + /// The exception to be serialised and de-serialised. + /// + /// A de-serialised instance of the exception passed. All serialisable members should match the original. + /// + /// + /// This is primarily intended to confirm custom exceptions within the DeltaApiCore + /// library comply to ISerialization pattern. + /// + /// See https://stackoverflow.com/questions/94488/what-is-the-correct-way-to-make-a-custom-net-exception-serializable + /// + public static T SerialiseAndDeserialiseException(T exception) + where T : Exception { - /// - /// Performs binary serialisation and de-serialisation on a specified exception. - /// - /// The type of exception being tested. - /// The exception to be serialised and de-serialised. - /// - /// A de-serialised instance of the exception passed. All serialisable members should match the original. - /// - /// - /// This is primarily intended to confirm custom exceptions within the DeltaApiCore - /// library comply to ISerialization pattern. - /// - /// See https://stackoverflow.com/questions/94488/what-is-the-correct-way-to-make-a-custom-net-exception-serializable - /// - public static T SerialiseAndDeserialiseException(T exception) - where T : Exception - { - BinaryFormatter binaryFormatter = new BinaryFormatter(); + using (var stream = new MemoryStream()) + { + MessagePackSerializer.Serialize(stream, exception, MessagePack.Resolvers.ContractlessStandardResolver.Options); - using (var stream = new MemoryStream()) - { - binaryFormatter.Serialize(stream, exception); + stream.Seek(0, 0); - stream.Seek(0, 0); - - return (T)binaryFormatter.Deserialize(stream); - } - } + return (T)MessagePackSerializer.Deserialize(stream, MessagePack.Resolvers.ContractlessStandardResolver.Options); + } } + } } diff --git a/kx.Test/kx.Test.csproj b/kx.Test/kx.Test.csproj index b42f6ab..bb0f0ad 100644 --- a/kx.Test/kx.Test.csproj +++ b/kx.Test/kx.Test.csproj @@ -1,7 +1,7 @@  - netcoreapp3.1 + net8 false @@ -25,6 +25,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/kx/kx.csproj b/kx/kx.csproj index c4b2ca7..e39bf11 100644 --- a/kx/kx.csproj +++ b/kx/kx.csproj @@ -1,7 +1,7 @@ - netstandard2.0 + net8.0 CSharpKDB CSharpKDB Provides functionality for .NET applications to interface with a KDB+ process. From c38a4a69ee6cbce0a09e443986f73e08d24a3a0e Mon Sep 17 00:00:00 2001 From: 00raq00 <00raq00@gmail.com> Date: Wed, 19 Mar 2025 10:32:43 +0100 Subject: [PATCH 2/5] Added ParallelInsertRows --- Demos/FeedDemo/Program.cs | 184 +++++++++++++++++++++++--------------- 1 file changed, 113 insertions(+), 71 deletions(-) diff --git a/Demos/FeedDemo/Program.cs b/Demos/FeedDemo/Program.cs index 3fbbd52..2066261 100644 --- a/Demos/FeedDemo/Program.cs +++ b/Demos/FeedDemo/Program.cs @@ -1,101 +1,143 @@ using System; using System.Security.Cryptography; +using System.Threading; +using System.Threading.Tasks; using kx; using NLog; namespace FeedDemo { - static class Program - { - private static readonly ILogger Logger = LogManager.GetCurrentClassLogger(); + static class Program + { + private static readonly ILogger Logger = LogManager.GetCurrentClassLogger(); - private const string QFunc = ".u.upd"; - private const string TableName = "mytable"; + private const string QFunc = ".u.upd"; + private const string TableName = "mytable"; - static void Main() + static void Main() + { + string host = "localhost"; + int port = 5001; + string usernamePassword = $"{Environment.UserName}:mypassword"; + + c connection = null; + try + { + connection = new c(host, port, usernamePassword); + + //Example of 10 single row inserts to a table + InsertRows(connection); + + //Parallel example of 100 single row inserts to a table + ParallelInsertRows(host, port, usernamePassword, 100, 10, 4); + + //Parallel example of 1000 single row inserts to a table + ParallelInsertRows(host, port, usernamePassword, 1000, 100, 300); + + //Parallel example of 1000 single row inserts to a table + ParallelInsertRows(host, port, usernamePassword, 1000, 1000, 1000); + + //Example of bulk inserts to a table to improve throughput + BulkInsertRows(connection); + + } + catch (Exception ex) + { + Logger.Error($"Error occurred running Feed-Demo. \r\n{ex}"); + } + finally + { + if (connection != null) { - string host = "localhost"; - int port = 5001; - string usernamePassword = $"{Environment.UserName}:mypassword"; - - c connection = null; - try - { - connection = new c(host, port, usernamePassword); - - //Example of 10 single row inserts to a table - InsertRows(connection); - - //Example of bulk inserts to a table to improve throughput - BulkInsertRows(connection); - - } - catch (Exception ex) - { - Logger.Error($"Error occurred running Feed-Demo. \r\n{ex}"); - } - finally - { - if (connection != null) - { - connection.Close(); - } - } + connection.Close(); } + } + } - private static void InsertRows(c connection) + private static void InsertRows(c connection) + { + // Single row insert - not as efficient as bulk insert + Logger.Info("Populating '{0}' table on kdb server with 10 rows...", TableName); + + for (int i = 0; i < 10; i++) + { + // Assumes a remote schema of mytable:([]time:`timespan$();sym:`symbol$();price:`float$();size:`long$()) + object[] row = new object[] { - // Single row insert - not as efficient as bulk insert - Logger.Info("Populating '{0}' table on kdb server with 10 rows...", TableName); - - for(int i = 0; i < 10; i++) - { - // Assumes a remote schema of mytable:([]time:`timespan$();sym:`symbol$();price:`float$();size:`long$()) - object[] row = new object[] - { new c.KTimespan(100), "SYMBOL", 93.5, 300L - }; + }; - connection.ks(QFunc, TableName, row); - } + connection.ks(QFunc, TableName, row); + } - Logger.Info("Successfully inserted 10 rows to {0}", TableName); - } + Logger.Info("Successfully inserted 10 rows to {0}", TableName); + } + private static void ParallelInsertRows(string host, int port, string usernamePassword, int rowCount, int minThreads, int maxDegreeOfParallelism) + { + // Single row insert - not as efficient as bulk insert + Logger.Info("Populating '{0}' table on kdb server with {1} rows...", TableName, rowCount); + ThreadPool.SetMinThreads(minThreads, minThreads); + + var parallelOptions = new ParallelOptions + { + MaxDegreeOfParallelism = maxDegreeOfParallelism + }; - private static void BulkInsertRows(c connection) + Parallel.For(0, rowCount, parallelOptions, i => + { { - // Bulk row insert - more efficient - string[] syms = new[] { "ABC", "DEF", "GHI", "JKL" }; + // Assumes a remote schema of mytable:([]time:`timespan$();sym:`symbol$();price:`float$();size:`long$()) + object[] row = new object[] + { + new c.KTimespan(i), + "SYMBOL", + i, + 300L + }; + var c = new c(host, port, usernamePassword); + c.ks(QFunc, TableName, row); - c.KTimespan[] times = CreateTestArray(i => new c.KTimespan(i), 10); - string[] symbols = CreateTestArray(i => syms[RandomNumberGenerator.GetInt32(syms.Length)], 10); - double[] prices = CreateTestArray(i => i * 1.1, 10); - long[] sizes = CreateTestArray(i => (long)(i * 100), 10); + Logger.Info("Successfully inserted {1} row to {0}", TableName, i); + } + }); - Logger.Info("Bulk populating '{0}' table on kdb server without using column names", TableName); + Logger.Info("Successfully inserted {1} rows to {0}", TableName, rowCount); + } - connection.ks(QFunc, TableName, new object[] { times, symbols, prices, sizes }); + private static void BulkInsertRows(c connection) + { + // Bulk row insert - more efficient + string[] syms = new[] { "ABC", "DEF", "GHI", "JKL" }; - Logger.Info("Bulk populating '{0}' table on kdb server using column names", TableName); + c.KTimespan[] times = CreateTestArray(i => new c.KTimespan(i), 10); + string[] symbols = CreateTestArray(i => syms[RandomNumberGenerator.GetInt32(syms.Length)], 10); + double[] prices = CreateTestArray(i => i * 1.1, 10); + long[] sizes = CreateTestArray(i => (long)(i * 100), 10); - connection.ks(QFunc, TableName, new c.Flip(new c.Dict(new string[] { "time", "sym", "price", "size" }, new object[] { times, symbols, prices, sizes }))); + Logger.Info("Bulk populating '{0}' table on kdb server without using column names", TableName); - //block until all messages are processed - connection.k(string.Empty); - } + connection.ks(QFunc, TableName, new object[] { times, symbols, prices, sizes }); - private static T[] CreateTestArray(Func elementBuilder, int arraySize) - { - T[] array = new T[arraySize]; + Logger.Info("Bulk populating '{0}' table on kdb server using column names", TableName); - for (int i = 0; i < arraySize; i++) - { - array[i] = elementBuilder(i); - } - return array; - } + connection.ks(QFunc, TableName, new c.Flip(new c.Dict(new string[] { "time", "sym", "price", "size" }, new object[] { times, symbols, prices, sizes }))); + + //block until all messages are processed + connection.k(string.Empty); + } + + private static T[] CreateTestArray(Func elementBuilder, int arraySize) + { + T[] array = new T[arraySize]; + + for (int i = 0; i < arraySize; i++) + { + array[i] = elementBuilder(i); + } + return array; } -} + } +} \ No newline at end of file From acd352914ee7fd1bdf3e42c6dff08e79539c6e41 Mon Sep 17 00:00:00 2001 From: 00raq00 <00raq00@gmail.com> Date: Tue, 25 Mar 2025 08:38:45 +0100 Subject: [PATCH 3/5] fix for type error --- Demos/FeedDemo/Program.cs | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/Demos/FeedDemo/Program.cs b/Demos/FeedDemo/Program.cs index 2066261..9617f9f 100644 --- a/Demos/FeedDemo/Program.cs +++ b/Demos/FeedDemo/Program.cs @@ -29,13 +29,13 @@ static void Main() InsertRows(connection); //Parallel example of 100 single row inserts to a table - ParallelInsertRows(host, port, usernamePassword, 100, 10, 4); + ParallelInsertRows(host, port, usernamePassword, 100, 10, 4, false); //Parallel example of 1000 single row inserts to a table - ParallelInsertRows(host, port, usernamePassword, 1000, 100, 300); + ParallelInsertRows(host, port, usernamePassword, 1000, 100, 300, false); //Parallel example of 1000 single row inserts to a table - ParallelInsertRows(host, port, usernamePassword, 1000, 1000, 1000); + ParallelInsertRows(host, port, usernamePassword, 1000, 1000, 1000, false); //Example of bulk inserts to a table to improve throughput BulkInsertRows(connection); @@ -56,6 +56,7 @@ static void Main() private static void InsertRows(c connection) { + DateTime dt = DateTime.Now; // Single row insert - not as efficient as bulk insert Logger.Info("Populating '{0}' table on kdb server with 10 rows...", TableName); @@ -73,10 +74,11 @@ private static void InsertRows(c connection) connection.ks(QFunc, TableName, row); } - Logger.Info("Successfully inserted 10 rows to {0}", TableName); + Logger.Info("Successfully inserted 10 rows to {0}, {1} ms", TableName, (DateTime.Now - dt).TotalMilliseconds); } - private static void ParallelInsertRows(string host, int port, string usernamePassword, int rowCount, int minThreads, int maxDegreeOfParallelism) + private static void ParallelInsertRows(string host, int port, string usernamePassword, int rowCount, int minThreads, int maxDegreeOfParallelism, bool debugLog) { + DateTime dt = DateTime.Now; // Single row insert - not as efficient as bulk insert Logger.Info("Populating '{0}' table on kdb server with {1} rows...", TableName, rowCount); ThreadPool.SetMinThreads(minThreads, minThreads); @@ -94,17 +96,18 @@ private static void ParallelInsertRows(string host, int port, string usernamePas { new c.KTimespan(i), "SYMBOL", - i, + (Double)i, 300L }; + var c = new c(host, port, usernamePassword); c.ks(QFunc, TableName, row); - Logger.Info("Successfully inserted {1} row to {0}", TableName, i); + if (debugLog) Logger.Info("Successfully inserted {1} row to {0}", TableName, i); } }); - Logger.Info("Successfully inserted {1} rows to {0}", TableName, rowCount); + Logger.Info("Successfully inserted {1} rows to {0}, {2} ms", TableName, rowCount, (DateTime.Now - dt).TotalMilliseconds); } private static void BulkInsertRows(c connection) @@ -112,19 +115,26 @@ private static void BulkInsertRows(c connection) // Bulk row insert - more efficient string[] syms = new[] { "ABC", "DEF", "GHI", "JKL" }; - c.KTimespan[] times = CreateTestArray(i => new c.KTimespan(i), 10); - string[] symbols = CreateTestArray(i => syms[RandomNumberGenerator.GetInt32(syms.Length)], 10); - double[] prices = CreateTestArray(i => i * 1.1, 10); - long[] sizes = CreateTestArray(i => (long)(i * 100), 10); + c.KTimespan[] times = CreateTestArray(i => new c.KTimespan(i), 1000); + string[] symbols = CreateTestArray(i => syms[RandomNumberGenerator.GetInt32(syms.Length)], 1000); + double[] prices = CreateTestArray(i => i * 1.1, 1000); + long[] sizes = CreateTestArray(i => (long)(i * 100), 1000); + DateTime dt = DateTime.Now; Logger.Info("Bulk populating '{0}' table on kdb server without using column names", TableName); connection.ks(QFunc, TableName, new object[] { times, symbols, prices, sizes }); + + Logger.Info("Successfully bull inserted {1} rows to {0} without using column names, {2} ms", TableName, 1000, (DateTime.Now - dt).TotalMilliseconds); + + dt = DateTime.Now; Logger.Info("Bulk populating '{0}' table on kdb server using column names", TableName); connection.ks(QFunc, TableName, new c.Flip(new c.Dict(new string[] { "time", "sym", "price", "size" }, new object[] { times, symbols, prices, sizes }))); + Logger.Info("Successfully bull inserted {1} rows to {0} with using column names, {2} ms", TableName, 1000, (DateTime.Now - dt).TotalMilliseconds); + //block until all messages are processed connection.k(string.Empty); } From fee18d382719cb813990daf62a648211036a19b2 Mon Sep 17 00:00:00 2001 From: 00raq00 <00raq00@gmail.com> Date: Tue, 6 May 2025 11:25:17 +0200 Subject: [PATCH 4/5] small changes in feed and subscriber demos --- Demos/FeedDemo/Program.cs | 12 ++-- Demos/SubscriberDemo/Program.cs | 104 ++++++++++++++++++-------------- 2 files changed, 66 insertions(+), 50 deletions(-) diff --git a/Demos/FeedDemo/Program.cs b/Demos/FeedDemo/Program.cs index 9617f9f..dafc084 100644 --- a/Demos/FeedDemo/Program.cs +++ b/Demos/FeedDemo/Program.cs @@ -63,13 +63,13 @@ private static void InsertRows(c connection) for (int i = 0; i < 10; i++) { // Assumes a remote schema of mytable:([]time:`timespan$();sym:`symbol$();price:`float$();size:`long$()) - object[] row = new object[] - { + object[] row = + [ new c.KTimespan(100), - "SYMBOL", + "SYMBOL1", 93.5, 300L - }; + ]; connection.ks(QFunc, TableName, row); } @@ -124,9 +124,9 @@ private static void BulkInsertRows(c connection) Logger.Info("Bulk populating '{0}' table on kdb server without using column names", TableName); connection.ks(QFunc, TableName, new object[] { times, symbols, prices, sizes }); - + Logger.Info("Successfully bull inserted {1} rows to {0} without using column names, {2} ms", TableName, 1000, (DateTime.Now - dt).TotalMilliseconds); - + dt = DateTime.Now; Logger.Info("Bulk populating '{0}' table on kdb server using column names", TableName); diff --git a/Demos/SubscriberDemo/Program.cs b/Demos/SubscriberDemo/Program.cs index 42c1240..3870654 100644 --- a/Demos/SubscriberDemo/Program.cs +++ b/Demos/SubscriberDemo/Program.cs @@ -1,63 +1,79 @@ using System; +using System.Collections; using System.Threading.Tasks; using kx; using NLog; namespace SubscriberDemo { - static class Program + static class Program + { + private static readonly ILogger Logger = LogManager.GetCurrentClassLogger(); + + static void Main() { - private static readonly ILogger Logger = LogManager.GetCurrentClassLogger(); + string host = "localhost"; + int port = 5001; + string usernamePassword = $"{Environment.UserName}:mypassword"; - static void Main() - { - string host = "localhost"; - int port = 5001; - string usernamePassword = $"{Environment.UserName}:mypassword"; + c connection = null; + try + { + connection = new c(host, port, usernamePassword); + connection.k("upd:{[t;x].[t;();,;show x]};"); + //open subscription + connection.ks(".u.sub", "mytable", "SYMBOL"); - c connection = null; - try - { - connection = new c(host, port, usernamePassword); + bool subscribing = true; - //open subscription - connection.ks(".u.sub", "mytable", "MSFT"); + //start processing subscriptions until user exit or error + Task.Factory.StartNew(() => + { + Logger.Info("Processing subscription results. Press any key to exit"); + while (subscribing) + { + try + { + dynamic result = connection.k(); - bool subscribing = true; + Logger.Info($"Received subscription result:{result}"); + if (result == null || result.Length < 2) + continue; - //start processing subscriptions until user exit or error - Task.Factory.StartNew(() => - { - Logger.Info("Processing subscription results. Press any key to exit"); - while (subscribing) + kx.c.Flip flip = result[2]; + string columnValue = ""; + for (int i = 0; i < flip.x.Length; i++) + { + IEnumerator enumerator = ((IEnumerable)flip.y[i]).GetEnumerator(); + enumerator.MoveNext(); + columnValue += $"{flip.x[i]}:{enumerator.Current} "; + } + + Logger.Info($"Received subscription result:{result[0]} {result[1]} {columnValue}"); + } + catch (Exception ex) { - try - { - Logger.Info($"Received subscription result:{connection.k()}"); - } - catch (Exception) - { - Logger.Error("Error occurred processing Subscription. Exiting Subscription-Demo"); - subscribing = false; - } + Logger.Error($"Error occurred processing Subscription. Exiting Subscription-Demo {ex}"); + subscribing = false; } + } }); - Console.ReadLine(); - subscribing = false; - - } - catch (Exception ex) - { - Logger.Error($"Error occurred running Subscription-Demo. \r\n{ex}"); - } - finally - { - if (connection != null) - { - connection.Close(); - } - } + Console.ReadLine(); + subscribing = false; + + } + catch (Exception ex) + { + Logger.Error($"Error occurred running Subscription-Demo. \r\n{ex}"); + } + finally + { + if (connection != null) + { + connection.Close(); } + } } -} + } +} \ No newline at end of file From d6e5a53dbacccb01e6f1f6092941197f8eb4e2ac Mon Sep 17 00:00:00 2001 From: 00raq00 <00raq00@gmail.com> Date: Wed, 7 May 2025 14:04:54 +0200 Subject: [PATCH 5/5] revert in demos --- Demos/FeedDemo/Program.cs | 198 ++++++++++++-------------------- Demos/SubscriberDemo/Program.cs | 104 +++++++---------- 2 files changed, 117 insertions(+), 185 deletions(-) diff --git a/Demos/FeedDemo/Program.cs b/Demos/FeedDemo/Program.cs index dafc084..3fbbd52 100644 --- a/Demos/FeedDemo/Program.cs +++ b/Demos/FeedDemo/Program.cs @@ -1,153 +1,101 @@ using System; using System.Security.Cryptography; -using System.Threading; -using System.Threading.Tasks; using kx; using NLog; namespace FeedDemo { - static class Program - { - private static readonly ILogger Logger = LogManager.GetCurrentClassLogger(); + static class Program + { + private static readonly ILogger Logger = LogManager.GetCurrentClassLogger(); - private const string QFunc = ".u.upd"; - private const string TableName = "mytable"; + private const string QFunc = ".u.upd"; + private const string TableName = "mytable"; - static void Main() - { - string host = "localhost"; - int port = 5001; - string usernamePassword = $"{Environment.UserName}:mypassword"; - - c connection = null; - try - { - connection = new c(host, port, usernamePassword); - - //Example of 10 single row inserts to a table - InsertRows(connection); - - //Parallel example of 100 single row inserts to a table - ParallelInsertRows(host, port, usernamePassword, 100, 10, 4, false); - - //Parallel example of 1000 single row inserts to a table - ParallelInsertRows(host, port, usernamePassword, 1000, 100, 300, false); - - //Parallel example of 1000 single row inserts to a table - ParallelInsertRows(host, port, usernamePassword, 1000, 1000, 1000, false); - - //Example of bulk inserts to a table to improve throughput - BulkInsertRows(connection); - - } - catch (Exception ex) - { - Logger.Error($"Error occurred running Feed-Demo. \r\n{ex}"); - } - finally - { - if (connection != null) + static void Main() { - connection.Close(); + string host = "localhost"; + int port = 5001; + string usernamePassword = $"{Environment.UserName}:mypassword"; + + c connection = null; + try + { + connection = new c(host, port, usernamePassword); + + //Example of 10 single row inserts to a table + InsertRows(connection); + + //Example of bulk inserts to a table to improve throughput + BulkInsertRows(connection); + + } + catch (Exception ex) + { + Logger.Error($"Error occurred running Feed-Demo. \r\n{ex}"); + } + finally + { + if (connection != null) + { + connection.Close(); + } + } } - } - } - private static void InsertRows(c connection) - { - DateTime dt = DateTime.Now; - // Single row insert - not as efficient as bulk insert - Logger.Info("Populating '{0}' table on kdb server with 10 rows...", TableName); - - for (int i = 0; i < 10; i++) - { - // Assumes a remote schema of mytable:([]time:`timespan$();sym:`symbol$();price:`float$();size:`long$()) - object[] row = - [ - new c.KTimespan(100), - "SYMBOL1", - 93.5, - 300L - ]; - - connection.ks(QFunc, TableName, row); - } - - Logger.Info("Successfully inserted 10 rows to {0}, {1} ms", TableName, (DateTime.Now - dt).TotalMilliseconds); - } - private static void ParallelInsertRows(string host, int port, string usernamePassword, int rowCount, int minThreads, int maxDegreeOfParallelism, bool debugLog) - { - DateTime dt = DateTime.Now; - // Single row insert - not as efficient as bulk insert - Logger.Info("Populating '{0}' table on kdb server with {1} rows...", TableName, rowCount); - ThreadPool.SetMinThreads(minThreads, minThreads); - - var parallelOptions = new ParallelOptions - { - MaxDegreeOfParallelism = maxDegreeOfParallelism - }; - - Parallel.For(0, rowCount, parallelOptions, i => - { + private static void InsertRows(c connection) { - // Assumes a remote schema of mytable:([]time:`timespan$();sym:`symbol$();price:`float$();size:`long$()) - object[] row = new object[] - { - new c.KTimespan(i), + // Single row insert - not as efficient as bulk insert + Logger.Info("Populating '{0}' table on kdb server with 10 rows...", TableName); + + for(int i = 0; i < 10; i++) + { + // Assumes a remote schema of mytable:([]time:`timespan$();sym:`symbol$();price:`float$();size:`long$()) + object[] row = new object[] + { + new c.KTimespan(100), "SYMBOL", - (Double)i, + 93.5, 300L - }; + }; - var c = new c(host, port, usernamePassword); - c.ks(QFunc, TableName, row); + connection.ks(QFunc, TableName, row); + } - if (debugLog) Logger.Info("Successfully inserted {1} row to {0}", TableName, i); + Logger.Info("Successfully inserted 10 rows to {0}", TableName); } - }); - Logger.Info("Successfully inserted {1} rows to {0}, {2} ms", TableName, rowCount, (DateTime.Now - dt).TotalMilliseconds); - } - - private static void BulkInsertRows(c connection) - { - // Bulk row insert - more efficient - string[] syms = new[] { "ABC", "DEF", "GHI", "JKL" }; - - c.KTimespan[] times = CreateTestArray(i => new c.KTimespan(i), 1000); - string[] symbols = CreateTestArray(i => syms[RandomNumberGenerator.GetInt32(syms.Length)], 1000); - double[] prices = CreateTestArray(i => i * 1.1, 1000); - long[] sizes = CreateTestArray(i => (long)(i * 100), 1000); - - DateTime dt = DateTime.Now; - Logger.Info("Bulk populating '{0}' table on kdb server without using column names", TableName); - - connection.ks(QFunc, TableName, new object[] { times, symbols, prices, sizes }); + private static void BulkInsertRows(c connection) + { + // Bulk row insert - more efficient + string[] syms = new[] { "ABC", "DEF", "GHI", "JKL" }; - Logger.Info("Successfully bull inserted {1} rows to {0} without using column names, {2} ms", TableName, 1000, (DateTime.Now - dt).TotalMilliseconds); + c.KTimespan[] times = CreateTestArray(i => new c.KTimespan(i), 10); + string[] symbols = CreateTestArray(i => syms[RandomNumberGenerator.GetInt32(syms.Length)], 10); + double[] prices = CreateTestArray(i => i * 1.1, 10); + long[] sizes = CreateTestArray(i => (long)(i * 100), 10); - dt = DateTime.Now; + Logger.Info("Bulk populating '{0}' table on kdb server without using column names", TableName); - Logger.Info("Bulk populating '{0}' table on kdb server using column names", TableName); + connection.ks(QFunc, TableName, new object[] { times, symbols, prices, sizes }); - connection.ks(QFunc, TableName, new c.Flip(new c.Dict(new string[] { "time", "sym", "price", "size" }, new object[] { times, symbols, prices, sizes }))); + Logger.Info("Bulk populating '{0}' table on kdb server using column names", TableName); - Logger.Info("Successfully bull inserted {1} rows to {0} with using column names, {2} ms", TableName, 1000, (DateTime.Now - dt).TotalMilliseconds); + connection.ks(QFunc, TableName, new c.Flip(new c.Dict(new string[] { "time", "sym", "price", "size" }, new object[] { times, symbols, prices, sizes }))); - //block until all messages are processed - connection.k(string.Empty); - } + //block until all messages are processed + connection.k(string.Empty); + } - private static T[] CreateTestArray(Func elementBuilder, int arraySize) - { - T[] array = new T[arraySize]; + private static T[] CreateTestArray(Func elementBuilder, int arraySize) + { + T[] array = new T[arraySize]; - for (int i = 0; i < arraySize; i++) - { - array[i] = elementBuilder(i); - } - return array; + for (int i = 0; i < arraySize; i++) + { + array[i] = elementBuilder(i); + } + return array; + } } - } -} \ No newline at end of file +} diff --git a/Demos/SubscriberDemo/Program.cs b/Demos/SubscriberDemo/Program.cs index 3870654..42c1240 100644 --- a/Demos/SubscriberDemo/Program.cs +++ b/Demos/SubscriberDemo/Program.cs @@ -1,79 +1,63 @@ using System; -using System.Collections; using System.Threading.Tasks; using kx; using NLog; namespace SubscriberDemo { - static class Program - { - private static readonly ILogger Logger = LogManager.GetCurrentClassLogger(); - - static void Main() + static class Program { - string host = "localhost"; - int port = 5001; - string usernamePassword = $"{Environment.UserName}:mypassword"; - - c connection = null; - try - { - connection = new c(host, port, usernamePassword); - connection.k("upd:{[t;x].[t;();,;show x]};"); - //open subscription - connection.ks(".u.sub", "mytable", "SYMBOL"); + private static readonly ILogger Logger = LogManager.GetCurrentClassLogger(); - bool subscribing = true; + static void Main() + { + string host = "localhost"; + int port = 5001; + string usernamePassword = $"{Environment.UserName}:mypassword"; - //start processing subscriptions until user exit or error - Task.Factory.StartNew(() => - { - Logger.Info("Processing subscription results. Press any key to exit"); - while (subscribing) - { - try - { - dynamic result = connection.k(); + c connection = null; + try + { + connection = new c(host, port, usernamePassword); - Logger.Info($"Received subscription result:{result}"); - if (result == null || result.Length < 2) - continue; + //open subscription + connection.ks(".u.sub", "mytable", "MSFT"); - kx.c.Flip flip = result[2]; - string columnValue = ""; - for (int i = 0; i < flip.x.Length; i++) - { - IEnumerator enumerator = ((IEnumerable)flip.y[i]).GetEnumerator(); - enumerator.MoveNext(); - columnValue += $"{flip.x[i]}:{enumerator.Current} "; - } + bool subscribing = true; - Logger.Info($"Received subscription result:{result[0]} {result[1]} {columnValue}"); - } - catch (Exception ex) + //start processing subscriptions until user exit or error + Task.Factory.StartNew(() => + { + Logger.Info("Processing subscription results. Press any key to exit"); + while (subscribing) { - Logger.Error($"Error occurred processing Subscription. Exiting Subscription-Demo {ex}"); - subscribing = false; + try + { + Logger.Info($"Received subscription result:{connection.k()}"); + } + catch (Exception) + { + Logger.Error("Error occurred processing Subscription. Exiting Subscription-Demo"); + subscribing = false; + } } - } }); - Console.ReadLine(); - subscribing = false; - - } - catch (Exception ex) - { - Logger.Error($"Error occurred running Subscription-Demo. \r\n{ex}"); - } - finally - { - if (connection != null) - { - connection.Close(); + Console.ReadLine(); + subscribing = false; + + } + catch (Exception ex) + { + Logger.Error($"Error occurred running Subscription-Demo. \r\n{ex}"); + } + finally + { + if (connection != null) + { + connection.Close(); + } + } } - } } - } -} \ No newline at end of file +}