Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions code/common/backtest.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
\d .backtest

/ Params to be passed to .backtest.run to kick off backtest, edit to fit usecase
test:`name`version`tabs`sts`ets`replayinterval`timer`timerinterval`timerfunc!(`;1;`;0Np;0Np;0Nn;0b;0Nn;`);
initRan:0b;

/ TO BE DELETED, TESTING ONLY
test:`name`version`tabs`sts`ets`replayinterval`timer`timerinterval`timerfunc!(`vwappublisher;1;`trade;2026.01.22D00:00:00.00;2026.01.22D01:00:00.00;0Nn;1b;0D00:10:00.00;`.vwapsub.logvwap);

init:{[]
requiredProcs:`backtestdb`backtestpub;
.servers.registerfromdiscovery[requiredProcs;1b];
.backtest.rdbh:neg first exec w from .servers.SERVERS where procname=`backtestdb;
.backtest.pubh:neg first exec w from .servers.SERVERS where procname=`backtestpub;
`.u.pub set .backtest.pub;
.backtest.initRan:1b;
};

/ Receive full message from datareplay, extract details from msg before running msg func
extractmessage:{[msgs]
.dbg.msg:msgs;
msg:msgs`msg;
.backtest.simtime:msgs`time;
.backtest.name:first msg;
value msg
};

pub:{[t;d]
.dbg.pub:(t;d);
rdbh(`upd;`output;(.z.p;id;simtime;name;d));
};

/ To run backtest, optional where
run:{[params]
params:validaterun[params];
/ Random guid generated to match config to output
.backtest.id:first -1?0Ng;
/ Kick off backtest from backtestpub which will replay the data back through the process running backtest
pubh(`.backtest.datareplay;params;.backtest.id);
};

validaterun:{[params]
if[.proc.procname=`backtestpub;'"Backtest should be ran from the process you are backtesting not backtest instance itself"];
if[not initRan;'"Please run .backtest.init to override functions to backtest before running .backtest.run";];
if[not all (key[test]except `where) in key params;'"Please ensure all mandatory params have been populated";];
if[count where null `replayinterval _params;'"Not all mandatory keys have been populated"];
/ Remove optional where, when not required
if[`where in key params; if[not count params`where;params:`where _params]];
params
};

\d .
29 changes: 15 additions & 14 deletions code/common/datareplay.q
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ getBuckets:{[s;e;p](s+p*til(ceiling 1+e%p)-(ceiling s%p))}
// params[`t] is table data
// params[`tc] is time column to cut on
// params[`tn] is table name
// params[`interval] is the time interval to bucket the messages into.
// params[`replayinterval] is the data time interval to bucket the messages into.
tableDataToDataStream:{[params]
.dbg.params:params;
// Sort table by time column.
params[`t]:params[`tc] xasc delete date from params[`t];

// get all times from table
t_times:params[`t][params[`tc]];

$[not null params[`interval];
$[not null params[`replayinterval];
[ // if there is an interval, bucket messages into this interval
// make bukets of ten second intervals
times:getBuckets[params[`sts];params[`ets];params[`interval]];
// make buckets of ten second intervals
times:getBuckets[params[`sts];params[`ets];params[`replayinterval]];

// put start time in fornt of t_times
t_times:params[`sts],t_times;
Expand All @@ -38,7 +39,7 @@ tableDataToDataStream:{[params]
// Return table of times and message chunks
-1_([]time:time;msg:{(`upd;x;y)}[params[`tn]] each msgs)
];
// if there is no intevral, cut by distinct time.
// if there is no interval, cut by distinct time.
([]
time:distinct t_times;
msg:{(`upd;x;$[1<count y;flip y;first y])}[params[`tn]] each
Expand All @@ -65,7 +66,7 @@ tableToDataStream:{[params]

// Have hdb evaluate select statement.
t:@[params[`h];
(eval;(?;params[`tn];enlist wherec;0b;()));
(eval;.backtest.query:(?;params[`tn];enlist wherec;0b;()));
{.lg.e[`dataloader;"Failed to evauluate query on hdb: ",x]}
];

Expand All @@ -74,10 +75,10 @@ tableToDataStream:{[params]

// params[`sts] is start of time window to get
// params[`ets] is end of time window to get
// params[`tp] is the inrement between times
// params[`tp] is the increment between times
// params[`timerfunc] is the timer function to use
getTimers:{[params]
times:getBuckets[params[`sts];params[`ets];params[`interval]];
times:getBuckets[params[`sts];params[`ets];params[`timerinterval]];
([]time:times;msg:params[`timerfunc],'times)
}

Expand All @@ -86,24 +87,24 @@ getTimers:{[params]
// params[`sts] is start of time window to get - Required
// params[`ets] is end of time window to get - Required
// params[`syms] is list of instruments to get - Default all syms
// params[`where] is an additional where clause in functional form - Not Reuqired
// params[`where] is an additional where clause in functional form - Not Required
// params[`timer] is whether or not to retrieve timer - Default 0b
// params[`h] is handle to hdb - Default 0 (self)
// params[`interval] is the time interval to bucket the messages into. - Not Required
// params[`timerinterval] is the time interval to bucket the timer messages into. - Not Required
// prarms[`tc] is the time column of the tables specified - Defualt `time
// params[`timerfunc] is the timer function to use in timer messages - Default `.z.ts
tablesToDataStream:{[params]
defaults:`timer`h`syms`interval`tc`timerfunc`where!(0b;0;`symbol$();`timespan$0n;`time;`.z.ts;());
defaults:`timer`h`syms`replayinterval`timerinterval`tc`timerfunc`where!(0b;0;`symbol$();`timespan$0n;`timespan$0n;`time;`.z.ts;());
params:defaults,params;

// check for default parameters `tabs`sts`ets
if[count missing:`tabs`sts`ets except key params;'"mising prameters: "," " sv string missing;];
if[count missing:`tabs`sts`ets except key params;'"missing parameters: "," " sv string missing;];
params[`tabs]:(),params[`tabs];

ds:raze {tableToDataStream x,(enlist `tn)!enlist y}[params] each params[`tabs];

$[params[`timer];
`time xasc ds,getTimers[params,enlist[`interval]! enlist $[null k:params[`interval];0D00:00:10.00;k]];
`time xasc ds,getTimers[params,enlist[`timerinterval]! enlist $[null k:params[`timerinterval];0D00:00:10.00;k]];
`time xasc ds]
};

Expand Down
21 changes: 21 additions & 0 deletions code/processes/backtestpub.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
\d .backtest

init:{[]
.servers.CONNECTIONS:`hdb`backtestdb;
.servers.startup[];
.backtest.hdb:first exec w from .servers.SERVERS where proctype=`hdb;
.backtest.rdb:neg first exec w from .servers.SERVERS where procname=`backtestdb;
};

datareplay:{[params;id]
.dbg.replay:(params;id;.z.w);
params[`h]:first exec w from .servers.SERVERS where proctype=`hdb;
msgs:.datareplay.tablesToDataStream `name`version _params;
/ Publishing configuration of backtest, needs to run after datareplay to pick up dataconfig
rdb(`upd;`config;(id;.z.p;;;;;enlist query;`h _params). params`name`version`sts`ets);
{neg[.z.w](`.backtest.extractmessage;x)}each msgs
};

\d .

.backtest.init[];