Skip to content

Commit 05e2f45

Browse files
committed
Enhance how message output is handled internally
1 parent ad0c722 commit 05e2f45

File tree

2 files changed

+92
-89
lines changed

2 files changed

+92
-89
lines changed

discovery_server.go

+50-89
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import (
3434
"regexp"
3535
"strconv"
3636
"strings"
37-
"sync"
3837

3938
"github.com/arduino/go-properties-orderedmap"
4039
)
@@ -86,14 +85,13 @@ type ErrorCallback func(err string)
8685
// it must be created using the NewDiscoveryServer function.
8786
type DiscoveryServer struct {
8887
impl Discovery
89-
out io.Writer
90-
outMutex sync.Mutex
88+
outputChan chan *message
9189
userAgent string
9290
reqProtocolVersion int
9391
initialized bool
9492
started bool
9593
syncStarted bool
96-
syncChannel chan interface{}
94+
syncChannel chan *message
9795
cachedPorts map[string]*Port
9896
cachedErr string
9997
}
@@ -103,7 +101,8 @@ type DiscoveryServer struct {
103101
// use the Run method.
104102
func NewDiscoveryServer(impl Discovery) *DiscoveryServer {
105103
return &DiscoveryServer{
106-
impl: impl,
104+
impl: impl,
105+
outputChan: make(chan *message),
107106
}
108107
}
109108

@@ -113,20 +112,21 @@ func NewDiscoveryServer(impl Discovery) *DiscoveryServer {
113112
// the input stream is closed. In case of IO error the error is
114113
// returned.
115114
func (d *DiscoveryServer) Run(in io.Reader, out io.Writer) error {
116-
d.out = out
115+
go d.outputProcessor(out)
116+
defer close(d.outputChan)
117117
reader := bufio.NewReader(in)
118118
for {
119119
fullCmd, err := reader.ReadString('\n')
120120
if err != nil {
121-
d.outputError("command_error", err.Error())
121+
d.outputChan <- messageError("command_error", err.Error())
122122
return err
123123
}
124124
fullCmd = strings.TrimSpace(fullCmd)
125125
split := strings.Split(fullCmd, " ")
126126
cmd := strings.ToUpper(split[0])
127127

128128
if !d.initialized && cmd != "HELLO" && cmd != "QUIT" {
129-
d.outputError("command_error", fmt.Sprintf("First command must be HELLO, but got '%s'", cmd))
129+
d.outputChan <- messageError("command_error", fmt.Sprintf("First command must be HELLO, but got '%s'", cmd))
130130
continue
131131
}
132132

@@ -143,61 +143,61 @@ func (d *DiscoveryServer) Run(in io.Reader, out io.Writer) error {
143143
d.stop()
144144
case "QUIT":
145145
d.impl.Quit()
146-
d.outputOk("quit")
146+
d.outputChan <- messageOk("quit")
147147
return nil
148148
default:
149-
d.outputError("command_error", fmt.Sprintf("Command %s not supported", cmd))
149+
d.outputChan <- messageError("command_error", fmt.Sprintf("Command %s not supported", cmd))
150150
}
151151
}
152152
}
153153

154154
func (d *DiscoveryServer) hello(cmd string) {
155155
if d.initialized {
156-
d.outputError("hello", "HELLO already called")
156+
d.outputChan <- messageError("hello", "HELLO already called")
157157
return
158158
}
159159
re := regexp.MustCompile(`^(\d+) "([^"]+)"$`)
160160
matches := re.FindStringSubmatch(cmd)
161161
if len(matches) != 3 {
162-
d.outputError("hello", "Invalid HELLO command")
162+
d.outputChan <- messageError("hello", "Invalid HELLO command")
163163
return
164164
}
165165
d.userAgent = matches[2]
166166
if v, err := strconv.ParseInt(matches[1], 10, 64); err != nil {
167-
d.outputError("hello", "Invalid protocol version: "+matches[2])
167+
d.outputChan <- messageError("hello", "Invalid protocol version: "+matches[2])
168168
return
169169
} else {
170170
d.reqProtocolVersion = int(v)
171171
}
172172
if err := d.impl.Hello(d.userAgent, 1); err != nil {
173-
d.outputError("hello", err.Error())
173+
d.outputChan <- messageError("hello", err.Error())
174174
return
175175
}
176-
d.output(&genericMessageJSON{
176+
d.outputChan <- &message{
177177
EventType: "hello",
178178
ProtocolVersion: 1, // Protocol version 1 is the only supported for now...
179179
Message: "OK",
180-
})
180+
}
181181
d.initialized = true
182182
}
183183

184184
func (d *DiscoveryServer) start() {
185185
if d.started {
186-
d.outputError("start", "Discovery already STARTed")
186+
d.outputChan <- messageError("start", "Discovery already STARTed")
187187
return
188188
}
189189
if d.syncStarted {
190-
d.outputError("start", "Discovery already START_SYNCed, cannot START")
190+
d.outputChan <- messageError("start", "Discovery already START_SYNCed, cannot START")
191191
return
192192
}
193193
d.cachedPorts = map[string]*Port{}
194194
d.cachedErr = ""
195195
if err := d.impl.StartSync(d.eventCallback, d.errorCallback); err != nil {
196-
d.outputError("start", "Cannot START: "+err.Error())
196+
d.outputChan <- messageError("start", "Cannot START: "+err.Error())
197197
return
198198
}
199199
d.started = true
200-
d.outputOk("start")
200+
d.outputChan <- messageOk("start")
201201
}
202202

203203
func (d *DiscoveryServer) eventCallback(event string, port *Port) {
@@ -216,65 +216,61 @@ func (d *DiscoveryServer) errorCallback(msg string) {
216216

217217
func (d *DiscoveryServer) list() {
218218
if !d.started {
219-
d.outputError("list", "Discovery not STARTed")
219+
d.outputChan <- messageError("list", "Discovery not STARTed")
220220
return
221221
}
222222
if d.syncStarted {
223-
d.outputError("list", "discovery already START_SYNCed, LIST not allowed")
223+
d.outputChan <- messageError("list", "discovery already START_SYNCed, LIST not allowed")
224224
return
225225
}
226226
if d.cachedErr != "" {
227-
d.outputError("list", d.cachedErr)
227+
d.outputChan <- messageError("list", d.cachedErr)
228228
return
229229
}
230230
ports := []*Port{}
231231
for _, port := range d.cachedPorts {
232232
ports = append(ports, port)
233233
}
234-
type listOutputJSON struct {
235-
EventType string `json:"eventType"`
236-
Ports []*Port `json:"ports"`
237-
}
238-
d.output(&listOutputJSON{
234+
d.outputChan <- &message{
239235
EventType: "list",
240-
Ports: ports,
241-
})
236+
Ports: &ports,
237+
}
242238
}
243239

244240
func (d *DiscoveryServer) startSync() {
245241
if d.syncStarted {
246-
d.outputError("start_sync", "Discovery already START_SYNCed")
242+
d.outputChan <- messageError("start_sync", "Discovery already START_SYNCed")
247243
return
248244
}
249245
if d.started {
250-
d.outputError("start_sync", "Discovery already STARTed, cannot START_SYNC")
246+
d.outputChan <- messageError("start_sync", "Discovery already STARTed, cannot START_SYNC")
251247
return
252248
}
253-
c := make(chan interface{}, 10) // buffer up to 10 events
249+
c := make(chan *message, 10) // buffer up to 10 events
254250
d.syncChannel = c
255251
if err := d.impl.StartSync(d.syncEvent, d.errorEvent); err != nil {
256-
d.outputError("start_sync", "Cannot START_SYNC: "+err.Error())
252+
d.outputChan <- messageError("start_sync", "Cannot START_SYNC: "+err.Error())
257253
close(d.syncChannel) // do not leak channel...
258254
d.syncChannel = nil
259255
return
260256
}
261257
d.syncStarted = true
262-
d.outputOk("start_sync")
258+
d.outputChan <- messageOk("start_sync")
263259

264260
go func() {
265261
for e := range c {
266-
d.output(e)
262+
d.outputChan <- e
267263
}
268264
}()
269265
}
270266

271267
func (d *DiscoveryServer) stop() {
272268
if !d.syncStarted && !d.started {
273-
d.outputError("stop", "Discovery already STOPped")
269+
d.outputChan <- messageError("stop", "Discovery already STOPped")
274270
return
275271
}
276272
if err := d.impl.Stop(); err != nil {
277-
d.outputError("stop", "Cannot STOP: "+err.Error())
273+
d.outputChan <- messageError("stop", "Cannot STOP: "+err.Error())
278274
return
279275
}
280276
d.started = false
@@ -283,66 +279,31 @@ func (d *DiscoveryServer) stop() {
283279
d.syncChannel = nil
284280
d.syncStarted = false
285281
}
286-
d.outputOk("stop")
282+
d.outputChan <- messageOk("stop")
287283
}
288284

289285
func (d *DiscoveryServer) syncEvent(event string, port *Port) {
290-
type syncOutputJSON struct {
291-
EventType string `json:"eventType"`
292-
Port *Port `json:"port"`
293-
}
294-
d.syncChannel <- &syncOutputJSON{
286+
d.syncChannel <- &message{
295287
EventType: event,
296288
Port: port,
297289
}
298290
}
299291

300292
func (d *DiscoveryServer) errorEvent(msg string) {
301-
type syncOutputJSON struct {
302-
EventType string `json:"eventType"`
303-
Error bool `json:"error"`
304-
Message string `json:"message"`
305-
}
306-
d.syncChannel <- &syncOutputJSON{
307-
EventType: "start_sync",
308-
Error: true,
309-
Message: msg,
310-
}
311-
}
312-
313-
type genericMessageJSON struct {
314-
EventType string `json:"eventType"`
315-
Message string `json:"message"`
316-
Error bool `json:"error,omitempty"`
317-
ProtocolVersion int `json:"protocolVersion,omitempty"`
293+
d.syncChannel <- messageError("start_sync", msg)
318294
}
319295

320-
func (d *DiscoveryServer) outputOk(event string) {
321-
d.output(&genericMessageJSON{
322-
EventType: event,
323-
Message: "OK",
324-
})
325-
}
326-
327-
func (d *DiscoveryServer) outputError(event, msg string) {
328-
d.output(&genericMessageJSON{
329-
EventType: event,
330-
Error: true,
331-
Message: msg,
332-
})
333-
}
334-
335-
func (d *DiscoveryServer) output(msg interface{}) {
336-
data, err := json.MarshalIndent(msg, "", " ")
337-
if err != nil {
338-
d.output(&genericMessageJSON{
339-
EventType: "command_error",
340-
Error: true,
341-
Message: err.Error(),
342-
})
343-
} else {
344-
d.outMutex.Lock()
345-
fmt.Fprintln(d.out, string(data))
346-
d.outMutex.Unlock()
347-
}
296+
func (d *DiscoveryServer) outputProcessor(outWriter io.Writer) {
297+
// Start go routine to serialize messages printing
298+
go func() {
299+
for msg := range d.outputChan {
300+
data, err := json.MarshalIndent(msg, "", " ")
301+
if err != nil {
302+
// We are certain that this will be marshalled correctly
303+
// so we don't handle the error
304+
data, _ = json.MarshalIndent(messageError("command_error", err.Error()), "", " ")
305+
}
306+
fmt.Fprintln(outWriter, string(data))
307+
}
308+
}()
348309
}

message.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
//
2+
// This file is part of pluggable-discovery-protocol-handler.
3+
//
4+
// Copyright 2021 ARDUINO SA (http://www.arduino.cc/)
5+
//
6+
// This software is released under the GNU General Public License version 3,
7+
// which covers the main part of arduino-cli.
8+
// The terms of this license can be found at:
9+
// https://www.gnu.org/licenses/gpl-3.0.en.html
10+
//
11+
// You can be released from the requirements of the above licenses by purchasing
12+
// a commercial license. Buying such a license is mandatory if you want to modify or
13+
// otherwise use the software for commercial activities involving the Arduino
14+
// software without disclosing the source code of your own applications. To purchase
15+
// a commercial license, send an email to [email protected].
16+
//
17+
18+
package discovery
19+
20+
type message struct {
21+
EventType string `json:"eventType"`
22+
Message string `json:"message,omitempty"`
23+
Error bool `json:"error,omitempty"`
24+
ProtocolVersion int `json:"protocolVersion,omitempty"`
25+
Port *Port `json:"port,omitempty"`
26+
Ports *[]*Port `json:"ports,omitempty"`
27+
}
28+
29+
func messageOk(event string) *message {
30+
return &message{
31+
EventType: event,
32+
Message: "OK",
33+
}
34+
}
35+
36+
func messageError(event, msg string) *message {
37+
return &message{
38+
EventType: event,
39+
Error: true,
40+
Message: msg,
41+
}
42+
}

0 commit comments

Comments
 (0)