@@ -69,7 +69,8 @@ type Monitor interface {
6969// it must be created using the NewServer function.
7070type Server struct {
7171 impl Monitor
72- outputChan chan * message
72+ out io.Writer
73+ outMutex sync.Mutex
7374 userAgent string
7475 reqProtocolVersion int
7576 initialized bool
@@ -82,8 +83,7 @@ type Server struct {
8283// use the Run method.
8384func NewServer (impl Monitor ) * Server {
8485 return & Server {
85- impl : impl ,
86- outputChan : make (chan * message ),
86+ impl : impl ,
8787 }
8888}
8989
@@ -93,21 +93,20 @@ func NewServer(impl Monitor) *Server {
9393// the input stream is closed. In case of IO error the error is
9494// returned.
9595func (d * Server ) Run (in io.Reader , out io.Writer ) error {
96- go d .outputProcessor (out )
97- defer close (d .outputChan )
96+ d .out = out
9897 reader := bufio .NewReader (in )
9998 for {
10099 fullCmd , err := reader .ReadString ('\n' )
101100 if err != nil {
102- d .outputChan <- messageError ("command_error" , err .Error ())
101+ d .outputMessage ( messageError ("command_error" , err .Error () ))
103102 return err
104103 }
105104 fullCmd = strings .TrimSpace (fullCmd )
106105 split := strings .Split (fullCmd , " " )
107106 cmd := strings .ToUpper (split [0 ])
108107
109108 if ! d .initialized && cmd != "HELLO" && cmd != "QUIT" {
110- d .outputChan <- messageError ("command_error" , fmt .Sprintf ("First command must be HELLO, but got '%s'" , cmd ))
109+ d .outputMessage ( messageError ("command_error" , fmt .Sprintf ("First command must be HELLO, but got '%s'" , cmd ) ))
111110 continue
112111 }
113112
@@ -124,105 +123,105 @@ func (d *Server) Run(in io.Reader, out io.Writer) error {
124123 d .close ("" )
125124 case "QUIT" :
126125 d .impl .Quit ()
127- d .outputChan <- messageOk ("quit" )
126+ d .outputMessage ( messageOk ("quit" ) )
128127 return nil
129128 default :
130- d .outputChan <- messageError ("command_error" , fmt .Sprintf ("Command %s not supported" , cmd ))
129+ d .outputMessage ( messageError ("command_error" , fmt .Sprintf ("Command %s not supported" , cmd ) ))
131130 }
132131 }
133132}
134133
135134func (d * Server ) hello (cmd string ) {
136135 if d .initialized {
137- d .outputChan <- messageError ("hello" , "HELLO already called" )
136+ d .outputMessage ( messageError ("hello" , "HELLO already called" ) )
138137 return
139138 }
140139 re := regexp .MustCompile (`^(\d+) "([^"]+)"$` )
141140 matches := re .FindStringSubmatch (cmd )
142141 if len (matches ) != 3 {
143- d .outputChan <- messageError ("hello" , "Invalid HELLO command" )
142+ d .outputMessage ( messageError ("hello" , "Invalid HELLO command" ) )
144143 return
145144 }
146145 d .userAgent = matches [2 ]
147146 v , err := strconv .ParseInt (matches [1 ], 10 , 64 )
148147 if err != nil {
149- d .outputChan <- messageError ("hello" , "Invalid protocol version: " + matches [2 ])
148+ d .outputMessage ( messageError ("hello" , "Invalid protocol version: " + matches [2 ]) )
150149 return
151150 }
152151 d .reqProtocolVersion = int (v )
153152 if err := d .impl .Hello (d .userAgent , 1 ); err != nil {
154- d .outputChan <- messageError ("hello" , err .Error ())
153+ d .outputMessage ( messageError ("hello" , err .Error () ))
155154 return
156155 }
157- d .outputChan <- & message {
156+ d .outputMessage ( & message {
158157 EventType : "hello" ,
159158 ProtocolVersion : 1 , // Protocol version 1 is the only supported for now...
160159 Message : "OK" ,
161- }
160+ })
162161 d .initialized = true
163162}
164163
165164func (d * Server ) describe () {
166165 if ! d .initialized {
167- d .outputChan <- messageError ("describe" , "Monitor not initialized" )
166+ d .outputMessage ( messageError ("describe" , "Monitor not initialized" ) )
168167 return
169168 }
170169 portDescription , err := d .impl .Describe ()
171170 if err != nil {
172- d .outputChan <- messageError ("describe" , err .Error ())
171+ d .outputMessage ( messageError ("describe" , err .Error () ))
173172 return
174173 }
175- d .outputChan <- & message {
174+ d .outputMessage ( & message {
176175 EventType : "describe" ,
177176 Message : "OK" ,
178177 PortDescription : portDescription ,
179- }
178+ })
180179}
181180
182181func (d * Server ) configure (cmd string ) {
183182 if ! d .initialized {
184- d .outputChan <- messageError ("configure" , "Monitor not initialized" )
183+ d .outputMessage ( messageError ("configure" , "Monitor not initialized" ) )
185184 return
186185 }
187186 re := regexp .MustCompile (`^([\w.-]+) (.+)$` )
188187 matches := re .FindStringSubmatch (cmd )
189188 if len (matches ) != 3 {
190- d .outputChan <- messageError ("configure" , "Invalid CONFIGURE command" )
189+ d .outputMessage ( messageError ("configure" , "Invalid CONFIGURE command" ) )
191190 return
192191 }
193192 parameterName := matches [1 ]
194193 value := matches [2 ]
195194 if err := d .impl .Configure (parameterName , value ); err != nil {
196- d .outputChan <- messageError ("configure" , err .Error ())
195+ d .outputMessage ( messageError ("configure" , err .Error () ))
197196 return
198197 }
199- d .outputChan <- & message {
198+ d .outputMessage ( & message {
200199 EventType : "configure" ,
201200 Message : "OK" ,
202- }
201+ })
203202}
204203
205204func (d * Server ) open (cmd string ) {
206205 if ! d .initialized {
207- d .outputChan <- messageError ("open" , "Monitor not initialized" )
206+ d .outputMessage ( messageError ("open" , "Monitor not initialized" ) )
208207 return
209208 }
210209 parameters := strings .SplitN (cmd , " " , 2 )
211210 if len (parameters ) != 2 {
212- d .outputChan <- messageError ("open" , "Invalid OPEN command" )
211+ d .outputMessage ( messageError ("open" , "Invalid OPEN command" ) )
213212 return
214213 }
215214 address := parameters [0 ]
216215 portName := parameters [1 ]
217216 port , err := d .impl .Open (portName )
218217 if err != nil {
219- d .outputChan <- messageError ("open" , err .Error ())
218+ d .outputMessage ( messageError ("open" , err .Error () ))
220219 return
221220 }
222221 d .clientConn , err = net .Dial ("tcp" , address )
223222 if err != nil {
224223 d .impl .Close ()
225- d .outputChan <- messageError ("open" , err .Error ())
224+ d .outputMessage ( messageError ("open" , err .Error () ))
226225 return
227226 }
228227 // io.Copy is used to bridge the Client's TCP connection to the port one and vice versa
@@ -242,51 +241,47 @@ func (d *Server) open(cmd string) {
242241 d .close ("lost connection with the port" )
243242 }
244243 }()
245- d .outputChan <- & message {
244+ d .outputMessage ( & message {
246245 EventType : "open" ,
247246 Message : "OK" ,
248- }
247+ })
249248}
250249
251250func (d * Server ) close (messageErr string ) {
252251 d .closeFuncMutex .Lock ()
253252 defer d .closeFuncMutex .Unlock ()
254253 if d .clientConn == nil {
255254 if messageErr == "" {
256- d .outputChan <- messageError ("close" , "port already closed" )
255+ d .outputMessage ( messageError ("close" , "port already closed" ) )
257256 }
258257 return
259258 }
260259 connErr := d .clientConn .Close ()
261260 portErr := d .impl .Close ()
262261 d .clientConn = nil
263262 if messageErr != "" {
264- d .outputChan <- messageError ("port_closed" , messageErr )
263+ d .outputMessage ( messageError ("port_closed" , messageErr ) )
265264 return
266265 }
267266 if connErr != nil || portErr != nil {
268267 var errs * multierror.Error
269268 errs = multierror .Append (errs , connErr , portErr )
270- d .outputChan <- messageError ("close" , errs .Error ())
269+ d .outputMessage ( messageError ("close" , errs .Error () ))
271270 return
272271 }
273- d .outputChan <- & message {
272+ d .outputMessage ( & message {
274273 EventType : "close" ,
275274 Message : "OK" ,
276- }
275+ })
277276}
278277
279- func (d * Server ) outputProcessor (outWriter io.Writer ) {
280- // Start go routine to serialize messages printing
281- go func () {
282- for msg := range d .outputChan {
283- data , err := json .MarshalIndent (msg , "" , " " )
284- if err != nil {
285- // We are certain that this will be marshalled correctly
286- // so we don't handle the error
287- data , _ = json .MarshalIndent (messageError ("command_error" , err .Error ()), "" , " " )
288- }
289- fmt .Fprintln (outWriter , string (data ))
290- }
291- }()
278+ func (d * Server ) outputMessage (msg * message ) {
279+ data , err := json .MarshalIndent (msg , "" , " " )
280+ if err != nil {
281+ // We are certain that this will be marshalled correctly so we don't handle the error
282+ data , _ = json .MarshalIndent (messageError ("command_error" , err .Error ()), "" , " " )
283+ }
284+ d .outMutex .Lock ()
285+ fmt .Fprintln (d .out , string (data ))
286+ d .outMutex .Unlock ()
292287}
0 commit comments