diff --git a/go.mod b/go.mod index 9b2175d6..bcecda23 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,6 @@ require ( go.uber.org/zap v1.24.0 golang.org/x/crypto v0.7.0 golang.org/x/net v0.8.0 - golang.org/x/sync v0.1.0 golang.org/x/text v0.8.0 golang.org/x/tools v0.7.0 golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f @@ -309,6 +308,7 @@ require ( golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect golang.org/x/mod v0.9.0 // indirect golang.org/x/oauth2 v0.1.0 // indirect + golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.6.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20221202195650-67e5cbc046fd // indirect diff --git a/store_message.go b/store_message.go index 1d8935b5..7cf7fb97 100644 --- a/store_message.go +++ b/store_message.go @@ -13,7 +13,6 @@ import ( "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/p2p/host/eventbus" "go.uber.org/zap" - "golang.org/x/sync/semaphore" ipfslog "berty.tech/go-ipfs-log" "berty.tech/go-ipfs-log/identityprovider" @@ -155,8 +154,6 @@ func (m *MessageStore) processMessage(ctx context.Context, message *messageItem) } func (m *MessageStore) processMessageLoop(ctx context.Context) { - semProcess := semaphore.NewWeighted(32) - for { var message *messageItem select { @@ -194,43 +191,33 @@ func (m *MessageStore) processMessageLoop(ctx context.Context) { continue } - // start process routine - go func() { - // wait for a process slot - if err := semProcess.Acquire(ctx, 1); err != nil { - m.logger.Error("unable to acquire lock", zap.Error(err)) - return - } - - // defer release a process slot - defer semProcess.Release(1) - - // process the message - evt, err := m.processMessage(ctx, message) - if err != nil { - if errcode.Is(err, errcode.ErrCryptoDecryptPayload) { - // @FIXME(gfanton): this should not happen - m.logger.Warn("unable to open envelope, adding envelope to cache for later process", zap.Error(err)) - - // if failed to decrypt add to queue, for later process - device.queue.Add(message) - _ = m.emitters.groupCacheMessage.Emit(*message) - } else { - m.logger.Error("unable to process message", zap.Error(err)) - } + // process the message + evt, err := m.processMessage(ctx, message) + if err != nil { + if errcode.Is(err, errcode.ErrCryptoDecryptPayload) { + // @FIXME(gfanton): this should not happen + m.logger.Warn("unable to open envelope, adding envelope to cache for later process", zap.Error(err)) - return + // if failed to decrypt add to queue, for later process + device.queue.Add(message) + _ = m.emitters.groupCacheMessage.Emit(*message) + } else { + m.logger.Error("unable to process message", zap.Error(err)) } - // emit new message event - if err := m.emitters.groupMessage.Emit(*evt); err != nil { - m.logger.Warn("unable to emit group message event", zap.Error(err)) - } + continue + } - if next := device.queue.Next(); next != nil { + // emit new message event + if err := m.emitters.groupMessage.Emit(*evt); err != nil { + m.logger.Warn("unable to emit group message event", zap.Error(err)) + } + + if next := device.queue.Next(); next != nil { + go func() { m.cmessage <- next - } - }() + }() + } m.muDeviceCaches.Unlock() }