Skip to content

Commit

Permalink
Merge pull request #45 from jefft0/fix/processMessageLoop-remove-proc…
Browse files Browse the repository at this point in the history
…ess-goroutine
  • Loading branch information
gfanton authored Apr 12, 2023
2 parents 711e228 + 7bab7b7 commit f167823
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 36 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ require (
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.7.0
golang.org/x/net v0.8.0
golang.org/x/sync v0.1.0
golang.org/x/text v0.8.0
golang.org/x/tools v0.7.0
golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f
Expand Down Expand Up @@ -309,6 +308,7 @@ require (
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/oauth2 v0.1.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.6.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd // indirect
Expand Down
57 changes: 22 additions & 35 deletions store_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"

ipfslog "berty.tech/go-ipfs-log"
"berty.tech/go-ipfs-log/identityprovider"
Expand Down Expand Up @@ -155,8 +154,6 @@ func (m *MessageStore) processMessage(ctx context.Context, message *messageItem)
}

func (m *MessageStore) processMessageLoop(ctx context.Context) {
semProcess := semaphore.NewWeighted(32)

for {
var message *messageItem
select {
Expand Down Expand Up @@ -194,43 +191,33 @@ func (m *MessageStore) processMessageLoop(ctx context.Context) {
continue
}

// start process routine
go func() {
// wait for a process slot
if err := semProcess.Acquire(ctx, 1); err != nil {
m.logger.Error("unable to acquire lock", zap.Error(err))
return
}

// defer release a process slot
defer semProcess.Release(1)

// process the message
evt, err := m.processMessage(ctx, message)
if err != nil {
if errcode.Is(err, errcode.ErrCryptoDecryptPayload) {
// @FIXME(gfanton): this should not happen
m.logger.Warn("unable to open envelope, adding envelope to cache for later process", zap.Error(err))

// if failed to decrypt add to queue, for later process
device.queue.Add(message)
_ = m.emitters.groupCacheMessage.Emit(*message)
} else {
m.logger.Error("unable to process message", zap.Error(err))
}
// process the message
evt, err := m.processMessage(ctx, message)
if err != nil {
if errcode.Is(err, errcode.ErrCryptoDecryptPayload) {
// @FIXME(gfanton): this should not happen
m.logger.Warn("unable to open envelope, adding envelope to cache for later process", zap.Error(err))

return
// if failed to decrypt add to queue, for later process
device.queue.Add(message)
_ = m.emitters.groupCacheMessage.Emit(*message)
} else {
m.logger.Error("unable to process message", zap.Error(err))
}

// emit new message event
if err := m.emitters.groupMessage.Emit(*evt); err != nil {
m.logger.Warn("unable to emit group message event", zap.Error(err))
}
continue
}

if next := device.queue.Next(); next != nil {
// emit new message event
if err := m.emitters.groupMessage.Emit(*evt); err != nil {
m.logger.Warn("unable to emit group message event", zap.Error(err))
}

if next := device.queue.Next(); next != nil {
go func() {
m.cmessage <- next
}
}()
}()
}

m.muDeviceCaches.Unlock()
}
Expand Down

0 comments on commit f167823

Please sign in to comment.