@@ -32,7 +32,9 @@ package trg
3232import (
3333 "context"
3434 "encoding/json"
35+ "errors"
3536 "fmt"
37+ "io"
3638 "net/url"
3739 "strconv"
3840 "strings"
@@ -65,25 +67,37 @@ const (
6567 TOPIC = topic .IntegratedService + topic .Separator + "trg"
6668)
6769
70+ var ctpStatusTopic topic.Topic = "ctp.status"
71+
6872type Plugin struct {
6973 trgHost string
7074 trgPort int
7175
72- trgClient * RpcClient
76+ trgClient * RpcClient
77+ trgClientCancel context.CancelFunc
78+
79+ kafkaReader * CtpStatusReader
80+ kafkaReaderCtx context.Context
81+ kafkaReaderCancel context.CancelFunc
7382
7483 pendingRunStops map [string ] /*envId*/ int64
7584 pendingRunUnloads map [string ] /*envId*/ int64
7685
77- cachedStatus * TrgStatus
78- cachedStatusMu sync.RWMutex
79- cachedStatusCancelFunc context.CancelFunc
86+ cachedStatus * TrgStatus
87+ cachedStatusMu sync.RWMutex
8088}
8189
8290type TrgStatus struct {
91+ // Fields from TRG RunList queries
8392 RunCount int `json:"runCount,omitempty"`
8493 Lines []string `json:"lines,omitempty"`
8594 Structured Runs `json:"structured,omitempty"`
8695 EnvMap map [uid.ID ]Run `json:"envMap,omitempty"`
96+
97+ // Fields from CTP status Kafka messages
98+ Clock string `json:"clock,omitempty"`
99+ DetectorsInHoldover []string `json:"detectorsInHoldover,omitempty"`
100+ ClockTransitionTimeExpected int64 `json:"clockTransitionTimeExpected,omitempty"` // nanoseconds since epoch, 0 if not expected
87101}
88102
89103func NewPlugin (endpoint string ) integration.Plugin {
@@ -103,6 +117,7 @@ func NewPlugin(endpoint string) integration.Plugin {
103117 trgClient : nil ,
104118 pendingRunStops : make (map [string ]int64 ),
105119 pendingRunUnloads : make (map [string ]int64 ),
120+ cachedStatus : & TrgStatus {},
106121 }
107122}
108123
@@ -190,15 +205,11 @@ func (p *Plugin) queryRunList() {
190205 }
191206 }
192207
193- out := & TrgStatus {
194- RunCount : int (runReply .Rc ),
195- Lines : strings .Split (runReply .Msg , "\n " ),
196- Structured : structured ,
197- EnvMap : envMap ,
198- }
199-
200208 p .cachedStatusMu .Lock ()
201- p .cachedStatus = out
209+ p .cachedStatus .RunCount = int (runReply .Rc )
210+ p .cachedStatus .Lines = strings .Split (runReply .Msg , "\n " )
211+ p .cachedStatus .Structured = structured
212+ p .cachedStatus .EnvMap = envMap
202213 p .cachedStatusMu .Unlock ()
203214}
204215
@@ -232,10 +243,6 @@ func (p *Plugin) GetEnvironmentsData(envIds []uid.ID) map[uid.ID]string {
232243 defer p .cachedStatusMu .RUnlock ()
233244
234245 out := make (map [uid.ID ]string )
235-
236- if p .cachedStatus == nil {
237- return nil
238- }
239246 envMap := p .cachedStatus .EnvMap
240247 for _ , envId := range envIds {
241248 if run , ok := envMap [envId ]; ! ok {
@@ -268,7 +275,7 @@ func (p *Plugin) Init(instanceId string) error {
268275 }
269276
270277 var ctx context.Context
271- ctx , p .cachedStatusCancelFunc = context .WithCancel (context .Background ())
278+ ctx , p .trgClientCancel = context .WithCancel (context .Background ())
272279
273280 trgPollingInterval := viper .GetDuration ("trgPollingInterval" )
274281
@@ -284,6 +291,21 @@ func (p *Plugin) Init(instanceId string) error {
284291 }
285292 }
286293 }()
294+
295+ // Initialize CTP status reader
296+ p .kafkaReaderCtx , p .kafkaReaderCancel = context .WithCancel (context .Background ())
297+ p .kafkaReader = NewCtpStatusReader (string (ctpStatusTopic ), "o2-aliecs-core.trg" )
298+ if p .kafkaReader != nil {
299+ log .WithField ("level" , infologger .IL_Devel ).Info ("TRG plugin: draining CTP status backlog" )
300+ p .drainCtpStatusBacklog (2 * time .Second )
301+
302+ // Start reading CTP status updates
303+ go p .readCtpStatusUpdates ()
304+ } else {
305+ log .WithField ("level" , infologger .IL_Support ).
306+ Warn ("could not create CTP status reader, CTP status monitoring disabled" )
307+ }
308+
287309 return nil
288310}
289311
@@ -1435,7 +1457,104 @@ func (p *Plugin) parseDetectors(ctsDetectorsParam string) (detectors string, err
14351457 return
14361458}
14371459
1460+ func (p * Plugin ) drainCtpStatusBacklog (timeout time.Duration ) {
1461+ drainCtx , cancel := context .WithTimeout (p .kafkaReaderCtx , timeout )
1462+ defer cancel ()
1463+ for {
1464+ ctpStatus , err := p .kafkaReader .Next (drainCtx )
1465+ if err != nil {
1466+ if errors .Is (err , context .DeadlineExceeded ) || errors .Is (err , io .EOF ) || errors .Is (err , context .Canceled ) {
1467+ break
1468+ }
1469+ // transient error: small sleep and continue until timeout
1470+ time .Sleep (50 * time .Millisecond )
1471+ continue
1472+ }
1473+ if ctpStatus == nil {
1474+ continue
1475+ }
1476+
1477+ detectorsInHoldover := extractDetectorsInHoldover (ctpStatus )
1478+ clockTransitionTimeExpected := extractClockTransitionExpected (ctpStatus )
1479+ currentClock := ctpStatus .Clock .String ()
1480+
1481+ p .cachedStatusMu .Lock ()
1482+ p .cachedStatus .Clock = currentClock
1483+ p .cachedStatus .DetectorsInHoldover = detectorsInHoldover
1484+ p .cachedStatus .ClockTransitionTimeExpected = clockTransitionTimeExpected
1485+ p .cachedStatusMu .Unlock ()
1486+ }
1487+ }
1488+
1489+ func extractClockTransitionExpected (ctpStatus * trgpb.Status ) int64 {
1490+ var clockTransitionTimeExpected int64
1491+ if ctpStatus .ClockTransitionExpected > 0 {
1492+ clockTransitionTimeExpected = int64 (ctpStatus .TimestampNano ) + int64 (ctpStatus .ClockTransitionExpected )* 1e9
1493+ }
1494+ return clockTransitionTimeExpected
1495+ }
1496+
1497+ func extractDetectorsInHoldover (ctpStatus * trgpb.Status ) []string {
1498+ var detectorsInHoldover []string
1499+ for _ , detInfo := range ctpStatus .DetectorInfo {
1500+ if detInfo .HoldoverOngoing {
1501+ detectorsInHoldover = append (detectorsInHoldover , detInfo .Detector .String ())
1502+ }
1503+ }
1504+ return detectorsInHoldover
1505+ }
1506+
1507+ func (p * Plugin ) readCtpStatusUpdates () {
1508+ for {
1509+ ctpStatus , err := p .kafkaReader .Next (p .kafkaReaderCtx )
1510+ if errors .Is (err , io .EOF ) {
1511+ log .WithField (infologger .Level , infologger .IL_Support ).
1512+ Debug ("received EOF from CTP status reader, likely reading was cancelled, exiting" )
1513+ break
1514+ }
1515+ if err != nil {
1516+ log .WithField (infologger .Level , infologger .IL_Support ).
1517+ WithError (err ).
1518+ Error ("error while reading CTP status from Kafka" )
1519+ time .Sleep (time .Second * 1 ) // throttle to avoid spamming infologger
1520+ continue
1521+ }
1522+ if ctpStatus == nil {
1523+ continue
1524+ }
1525+
1526+ detectorsInHoldover := extractDetectorsInHoldover (ctpStatus )
1527+ clockTransitionTimeExpected := extractClockTransitionExpected (ctpStatus )
1528+ currentClock := ctpStatus .Clock .String ()
1529+
1530+ p .cachedStatusMu .Lock ()
1531+ previousClockTransitionTime := p .cachedStatus .ClockTransitionTimeExpected
1532+ p .cachedStatus .Clock = currentClock
1533+ p .cachedStatus .DetectorsInHoldover = detectorsInHoldover
1534+ p .cachedStatus .ClockTransitionTimeExpected = clockTransitionTimeExpected
1535+ p .cachedStatusMu .Unlock ()
1536+
1537+ if ctpStatus .ClockTransitionExpected > 0 && len (detectorsInHoldover ) > 0 {
1538+ detectorsStr := strings .Join (detectorsInHoldover , ", " )
1539+ log .WithField (infologger .Level , infologger .IL_Ops ).
1540+ Warnf ("CTP clock transition expected in less than %ds for the detectors: %s. Starting a triggered run with these detectors may cause instabilities" ,
1541+ ctpStatus .ClockTransitionExpected , detectorsStr )
1542+ } else if ctpStatus .ClockTransitionExpected == 0 && previousClockTransitionTime > 0 {
1543+ log .WithField (infologger .Level , infologger .IL_Ops ).
1544+ Info ("CTP clock transition has just been performed" )
1545+ }
1546+ }
1547+ }
1548+
14381549func (p * Plugin ) Destroy () error {
1439- p .cachedStatusCancelFunc ()
1550+ p .trgClientCancel ()
1551+
1552+ if p .kafkaReaderCancel != nil {
1553+ p .kafkaReaderCancel ()
1554+ }
1555+ if p .kafkaReader != nil {
1556+ _ = p .kafkaReader .Close ()
1557+ }
1558+
14401559 return p .trgClient .Close ()
14411560}
0 commit comments