@@ -18,6 +18,16 @@ enum {
1818 IDLE_FROM_RUNNING = 10 + static_cast <int >(NodeStatus::RUNNING)
1919};
2020
21+ struct Transition
22+ {
23+ // when serializing, we will remove the initial time and serialize only
24+ // 6 bytes, instead of 8
25+ uint64_t timestamp_usec;
26+ // if you have more than 64.000 nodes, you are doing something wrong :)
27+ uint16_t node_uid;
28+ // enough bits to contain NodeStatus
29+ uint8_t status;
30+ };
2131
2232std::array<char ,16 > CreateRandomUUID ()
2333{
@@ -82,6 +92,9 @@ struct Groot2Publisher::PImpl
8292 std::chrono::system_clock::time_point last_heartbeat;
8393 std::chrono::milliseconds max_heartbeat_delay = std::chrono::milliseconds(5000 );
8494
95+ std::atomic_bool recording = false ;
96+ std::deque<Transition> transitions_buffer;
97+
8598 std::thread heartbeat_thread;
8699
87100 zmq::context_t context;
@@ -183,7 +196,7 @@ Groot2Publisher::~Groot2Publisher()
183196 }
184197}
185198
186- void Groot2Publisher::callback (Duration, const TreeNode& node,
199+ void Groot2Publisher::callback (Duration ts , const TreeNode& node,
187200 NodeStatus prev_status, NodeStatus new_status)
188201{
189202 std::unique_lock<std::mutex> lk (_p->status_mutex );
@@ -193,6 +206,18 @@ void Groot2Publisher::callback(Duration, const TreeNode& node,
193206 status = 10 + static_cast <char >(prev_status);
194207 }
195208 *(_p->status_buffermap .at (node.UID ())) = status;
209+ if (_p->recording )
210+ {
211+ Transition trans;
212+ trans.node_uid = node.UID ();
213+ trans.status = static_cast <uint8_t >(new_status);
214+ trans.timestamp_usec =
215+ std::chrono::duration_cast<std::chrono::microseconds>(ts).count ();
216+ _p->transitions_buffer .push_back (trans);
217+ while (_p->transitions_buffer .size () > 1000 ) {
218+ _p->transitions_buffer .pop_front ();
219+ }
220+ }
196221}
197222
198223void Groot2Publisher::flush ()
@@ -383,6 +408,43 @@ void Groot2Publisher::serverLoop()
383408 }
384409 reply_msg.addstr ( json_out.dump () );
385410 } break ;
411+
412+ case Monitor::RequestType::TOGGLE_RECORDING:
413+ {
414+ if (requestMsg.size () != 2 ) {
415+ sendErrorReply (" must be 2 parts message" );
416+ continue ;
417+ }
418+
419+ auto const cmd = (requestMsg[1 ].to_string ());
420+ if (cmd == " start" )
421+ {
422+ _p->recording = true ;
423+ std::unique_lock<std::mutex> lk (_p->status_mutex );
424+ _p->transitions_buffer .clear ();
425+ }
426+ else if (cmd == " stop" )
427+ {
428+ _p->recording = false ;
429+ }
430+ } break ;
431+
432+ case Monitor::RequestType::GET_TRANSITIONS:
433+ {
434+ thread_local std::string trans_buffer;
435+ const size_t N = sizeof (Transition);
436+ trans_buffer.resize (N * _p->transitions_buffer .size ());
437+
438+ std::unique_lock<std::mutex> lk (_p->status_mutex );
439+ size_t offset = 0 ;
440+ for (const auto & trans: _p->transitions_buffer )
441+ {
442+ std::memcpy (&trans_buffer[offset], &trans, N);
443+ offset += N;
444+ }
445+ _p->transitions_buffer .clear ();
446+ } break ;
447+
386448 default : {
387449 sendErrorReply (" Request not recognized" );
388450 continue ;
0 commit comments