@@ -27,7 +27,7 @@ type Engine struct {
2727 timeProvider timer.TimeProvider
2828 lastTimeUpdate time.Time
2929 gcState * GcState
30- gcStateMutex sync.Mutex
30+ mutex sync.Mutex
3131 noProgressDetector NoProgressDetector
3232 ballPlacementCoordinator BallPlacementCoordinator
3333 tickChanProvider func () <- chan time.Time
@@ -68,29 +68,30 @@ func (e *Engine) Enqueue(change *statemachine.Change) {
6868 e .queue <- change
6969}
7070
71- // SetGeometry sets a new geometry
72- func (e * Engine ) SetGeometry (geometry config.Geometry ) {
73- e .stateMachine .Geometry = geometry
74- }
75-
76- // GetGeometry returns the current geometry
77- func (e * Engine ) GetGeometry () config.Geometry {
71+ // getGeometry returns the current geometry
72+ func (e * Engine ) getGeometry () config.Geometry {
7873 return e .stateMachine .Geometry
7974}
8075
8176// SetTimeProvider sets a new time provider for this engine
8277func (e * Engine ) SetTimeProvider (provider timer.TimeProvider ) {
78+ e .mutex .Lock ()
79+ defer e .mutex .Unlock ()
8380 e .timeProvider = provider
8481 e .lastTimeUpdate = e .timeProvider ()
8582}
8683
8784// SetTickChanProvider sets an alternative provider for the tick channel
8885func (e * Engine ) SetTickChanProvider (provider func () <- chan time.Time ) {
86+ e .mutex .Lock ()
87+ defer e .mutex .Unlock ()
8988 e .tickChanProvider = provider
9089}
9190
9291// Start loads the state store and runs a go routine that consumes the change queue
9392func (e * Engine ) Start () error {
93+ e .mutex .Lock ()
94+ defer e .mutex .Unlock ()
9495 if err := e .stateStore .Open (); err != nil {
9596 return errors .Wrap (err , "Could not open state store" )
9697 }
@@ -105,6 +106,8 @@ func (e *Engine) Start() error {
105106
106107// Stop stops the go routine that processes the change queue
107108func (e * Engine ) Stop () {
109+ e .mutex .Lock ()
110+ defer e .mutex .Unlock ()
108111 close (e .queue )
109112 if err := e .stateStore .Close (); err != nil {
110113 log .Printf ("Could not close store: %v" , err )
@@ -113,28 +116,30 @@ func (e *Engine) Stop() {
113116
114117// CurrentState returns a deep copy of the current state
115118func (e * Engine ) CurrentState () (s * state.State ) {
116- s = new (state. State )
117- proto . Merge ( s , e .currentState )
118- return
119+ e . mutex . Lock ( )
120+ defer e .mutex . Unlock ( )
121+ return e . currentState . Clone ()
119122}
120123
121124// CurrentGcState returns a deep copy of the current GC state
122125func (e * Engine ) CurrentGcState () (s * GcState ) {
123- e .gcStateMutex .Lock ()
124- defer e .gcStateMutex .Unlock ()
126+ e .mutex .Lock ()
127+ defer e .mutex .Unlock ()
125128 s = new (GcState )
126129 proto .Merge (s , e .gcState )
127130 return
128131}
129132
130133func (e * Engine ) UpdateGcState (fn func (gcState * GcState )) {
131- e .gcStateMutex .Lock ()
132- defer e .gcStateMutex .Unlock ()
134+ e .mutex .Lock ()
135+ defer e .mutex .Unlock ()
133136 fn (e .gcState )
134137}
135138
136139// LatestChangesUntil returns all changes with a id larger than the given id
137140func (e * Engine ) LatestChangesUntil (id int32 ) (changes []* statemachine.StateChange ) {
141+ e .mutex .Lock ()
142+ defer e .mutex .Unlock ()
138143 for _ , change := range e .stateStore .Entries () {
139144 if * change .Id > id {
140145 changes = append (changes , change )
@@ -145,6 +150,8 @@ func (e *Engine) LatestChangesUntil(id int32) (changes []*statemachine.StateChan
145150
146151// LatestChangeId returns the latest change id or -1, if there is no change
147152func (e * Engine ) LatestChangeId () int32 {
153+ e .mutex .Lock ()
154+ defer e .mutex .Unlock ()
148155 entries := e .stateStore .Entries ()
149156 if len (entries ) > 0 {
150157 return * entries [len (entries )- 1 ].Id
@@ -169,8 +176,8 @@ func (e *Engine) processChanges() {
169176
170177// ResetMatch creates a backup of the current state store, removes it and starts with a fresh state
171178func (e * Engine ) ResetMatch () {
172- e .gcStateMutex .Lock ()
173- defer e .gcStateMutex .Unlock ()
179+ e .mutex .Lock ()
180+ defer e .mutex .Unlock ()
174181 if err := e .stateStore .Reset (); err != nil {
175182 log .Printf ("Could not reset store: %v" , err )
176183 } else {
@@ -179,6 +186,8 @@ func (e *Engine) ResetMatch() {
179186}
180187
181188func (e * Engine ) processChange (change * statemachine.Change ) {
189+ e .mutex .Lock ()
190+ defer e .mutex .Unlock ()
182191
183192 var newChanges []* statemachine.Change
184193 entry := statemachine.StateChange {}
@@ -218,7 +227,7 @@ func (e *Engine) processChange(change *statemachine.Change) {
218227 entry .State , newChanges = e .stateMachine .Process (e .currentState , change )
219228 }
220229
221- e .currentState = entry .State
230+ e .currentState = entry .State . Clone ()
222231
223232 e .postProcessChange (entry )
224233
@@ -229,8 +238,12 @@ func (e *Engine) processChange(change *statemachine.Change) {
229238 if err := e .stateStore .Add (& entry ); err != nil {
230239 log .Println ("Could not add new state to store: " , err )
231240 }
241+ stateCopy := e .currentState .Clone ()
232242 for _ , hook := range e .hooks {
233- hook <- HookOut {Change : entry .Change , State : e .CurrentState ()}
243+ select {
244+ case hook <- HookOut {Change : entry .Change , State : stateCopy }:
245+ default :
246+ }
234247 }
235248}
236249
0 commit comments