Skip to content

Commit 2a64787

Browse files
committed
remove forwarder channel, repurpose forwarder goroutine
1 parent 17386f5 commit 2a64787

File tree

2 files changed

+47
-45
lines changed

2 files changed

+47
-45
lines changed

Diff for: frontend/preambler/preambler.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
func Launch(
1212
closed chan struct{},
1313
closeFrontend func(),
14-
sendToBackend func(message pgproto3.BackendMessage),
14+
sendToBackend func(message pgproto3.BackendMessage) bool,
1515
preamble []pgproto3.BackendMessage,
1616
) chan (chan<- struct{}) {
1717

@@ -32,7 +32,7 @@ func preambler(
3232
reqPreamble chan (chan<- struct{}),
3333
closed chan struct{},
3434
closeFrontend func(),
35-
sendToBackend func(message pgproto3.BackendMessage),
35+
sendToBackend func(message pgproto3.BackendMessage) bool,
3636
preamble []pgproto3.BackendMessage,
3737
) {
3838
defer misc.Recover()
@@ -44,7 +44,11 @@ func preambler(
4444
return
4545
case req := <-reqPreamble:
4646
for _, msg := range preamble {
47-
sendToBackend(msg)
47+
closed := sendToBackend(msg)
48+
if closed {
49+
close(req)
50+
return
51+
}
4852
}
4953
close(req)
5054
}

Diff for: frontend/receiver/receiver.go

+40-42
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ type Receiver struct {
3030
NewOut chan<- NewOutChans
3131

3232
newOut <-chan NewOutChans
33+
outLock *sync.RWMutex
3334
out chan pgproto3.BackendMessage
3435
outSync *misc.Cond
3536
detaching chan struct{}
3637

3738
closed chan struct{}
3839
closeFrontend func()
3940
receive func() (pgproto3.BackendMessage, error)
40-
forwarderChan chan pgproto3.BackendMessage
4141

4242
dropRFQ bool
4343
dropRFQChan chan struct{}
@@ -55,49 +55,39 @@ func Launch(
5555
r := &Receiver{
5656
NewOut: newOut,
5757
newOut: newOut,
58-
forwarderChan: make(chan pgproto3.BackendMessage, 1),
58+
outLock: &sync.RWMutex{},
5959
dropRFQChan: unblockedChannel,
6060
dropRFQLock: new(sync.Mutex),
6161
closeFrontend: closeFrontend,
6262
closed: closed,
6363
receive: receive,
6464
}
6565

66-
go r.forwarder()
66+
go r.outManager()
6767
go r.receiver()
6868

6969
return r
7070
}
7171

72-
func (r *Receiver) forwarder() {
72+
func (r *Receiver) outManager() {
7373
defer misc.Recover()
7474
defer r.closeFrontend()
7575
defer func() {
76+
r.outLock.Lock()
77+
defer r.outLock.Unlock()
7678
if r.out != nil {
7779
close(r.out)
7880
}
7981
r.outSync.SignalLocked()
82+
r.out = nil
83+
r.outSync = nil
84+
r.detaching = nil
8085
}()
8186

8287
for {
8388
select {
84-
case msg, ok := <-r.forwarderChan:
85-
if !ok {
86-
return
87-
}
88-
89-
if r.out == nil {
90-
log.Println("client detached unexpectedly. cannot send message to client!", misc.Marshal(msg))
91-
return
92-
}
93-
94-
select {
95-
case r.out <- msg:
96-
case <-r.detaching:
97-
goto detaching
98-
}
99-
10089
case newChan := <-r.newOut:
90+
r.outLock.Lock()
10191
if r.out != nil {
10292
close(r.out)
10393
}
@@ -106,31 +96,28 @@ func (r *Receiver) forwarder() {
10696
r.detaching = newChan.Detaching
10797

10898
r.outSync.SignalLocked()
99+
r.outLock.Unlock()
109100

110101
case <-r.detaching:
111-
goto detaching
102+
r.outLock.Lock()
103+
if r.out != nil {
104+
close(r.out)
105+
}
106+
r.outSync.SignalLocked()
107+
r.out = nil
108+
r.outSync = nil
109+
r.detaching = nil
110+
r.outLock.Unlock()
112111

113112
case <-r.closed:
114113
return
115114
}
116-
117-
continue
118-
119-
detaching:
120-
if r.out != nil {
121-
close(r.out)
122-
}
123-
r.outSync.SignalLocked()
124-
r.out = nil
125-
r.outSync = nil
126-
r.detaching = nil
127115
}
128116
}
129117

130118
func (r *Receiver) receiver() {
131119
defer misc.Recover()
132120
defer r.closeFrontend()
133-
defer close(r.forwarderChan)
134121

135122
for {
136123
bmsg, err := r.receive()
@@ -148,21 +135,32 @@ func (r *Receiver) receiver() {
148135
continue
149136
}
150137

151-
// log.Println("f-msg-1", misc.Marshal(bmsg))
152-
r.outSync.L.Lock()
153-
select {
154-
case r.forwarderChan <- bmsg:
155-
r.outSync.WaitAndUnlock()
156-
case <-r.closed:
138+
closed := r.Send(bmsg)
139+
if closed {
157140
return
158141
}
159142
}
160143
}
161144

162-
func (r *Receiver) Send(bmsg pgproto3.BackendMessage) {
145+
func (r *Receiver) Send(bmsg pgproto3.BackendMessage) bool {
146+
r.outLock.RLock()
147+
defer r.outLock.RUnlock()
148+
149+
if r.out == nil {
150+
log.Println("client detached unexpectedly. cannot send message to client!", misc.Marshal(bmsg))
151+
return true
152+
}
153+
154+
// log.Println("f-msg-1", misc.Marshal(bmsg))
163155
r.outSync.L.Lock()
164-
r.forwarderChan <- bmsg
165-
r.outSync.WaitAndUnlock()
156+
select {
157+
case r.out <- bmsg:
158+
r.outSync.WaitAndUnlock()
159+
case <-r.closed:
160+
return true
161+
}
162+
163+
return false
166164
}
167165

168166
// when proxying application_name, we will get an RFQ that the

0 commit comments

Comments
 (0)