Skip to content

Commit

Permalink
Misc refactorings extracted from dapr#5170 (dapr#5609)
Browse files Browse the repository at this point in the history
  • Loading branch information
ItalyPaleAle authored Dec 15, 2022
1 parent 374a582 commit 7190a73
Show file tree
Hide file tree
Showing 29 changed files with 775 additions and 612 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ github.com/
google

test_report*
coverage.txt

# Go Workspaces (introduced in Go 1.18+)
go.work
20 changes: 12 additions & 8 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,10 @@ type lookupActorRes struct {
}

func (a *actorsRuntime) Call(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
a.placement.WaitUntilPlacementTableIsReady()
err := a.placement.WaitUntilPlacementTableIsReady(ctx)
if err != nil {
return nil, fmt.Errorf("failed to wait for placement table readiness: %w", err)
}

actor := req.Actor()
// Retry here to allow placement table dissemination/rebalancing to happen.
Expand Down Expand Up @@ -1031,13 +1034,14 @@ func (a *actorsRuntime) executeReminder(reminder *Reminder) error {
}

func (a *actorsRuntime) reminderRequiresUpdate(req *CreateReminderRequest, reminder *Reminder) bool {
if reminder.ActorID == req.ActorID && reminder.ActorType == req.ActorType && reminder.Name == req.Name &&
(!reflect.DeepEqual(reminder.Data, req.Data) || reminder.DueTime != req.DueTime || reminder.Period != req.Period ||
len(req.TTL) != 0 || (len(reminder.ExpirationTime) != 0 && len(req.TTL) == 0)) {
return true
}

return false
return reminder.ActorID == req.ActorID &&
reminder.ActorType == req.ActorType &&
reminder.Name == req.Name &&
(!reflect.DeepEqual(reminder.Data, req.Data) ||
reminder.DueTime != req.DueTime ||
reminder.Period != req.Period ||
len(req.TTL) != 0 ||
(len(reminder.ExpirationTime) != 0 && len(req.TTL) == 0))
}

func (a *actorsRuntime) getReminder(reminderName string, actorType string, actorID string) (*Reminder, bool) {
Expand Down
14 changes: 12 additions & 2 deletions pkg/actors/actors_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 10 additions & 3 deletions pkg/actors/internal/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package internal

import (
"context"
"net"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -224,9 +225,15 @@ func (p *ActorPlacement) Stop() {
}

// WaitUntilPlacementTableIsReady waits until placement table is until table lock is unlocked.
func (p *ActorPlacement) WaitUntilPlacementTableIsReady() {
if p.tableIsBlocked.Load() {
<-p.unblockSignal
func (p *ActorPlacement) WaitUntilPlacementTableIsReady(ctx context.Context) error {
if !p.tableIsBlocked.Load() {
return nil
}
select {
case <-p.unblockSignal:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

Expand Down
75 changes: 61 additions & 14 deletions pkg/actors/internal/placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package internal

import (
"context"
"fmt"
"io"
"net"
Expand All @@ -24,6 +25,7 @@ import (

"github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -201,24 +203,69 @@ func TestWaitUntilPlacementTableIsReady(t *testing.T) {
[]string{"actorOne", "actorTwo"},
appHealthFunc, tableUpdateFunc)

testPlacement.onPlacementOrder(&placementv1pb.PlacementOrder{Operation: "lock"})
t.Run("already unlocked", func(t *testing.T) {
require.False(t, testPlacement.tableIsBlocked.Load())

asserted := atomic.Bool{}
asserted.Store(false)
go func() {
testPlacement.WaitUntilPlacementTableIsReady()
asserted.Store(true)
}()
err := testPlacement.WaitUntilPlacementTableIsReady(context.Background())
assert.NoError(t, err)
})

t.Run("wait until ready", func(t *testing.T) {
testPlacement.onPlacementOrder(&placementv1pb.PlacementOrder{Operation: "lock"})

testSuccessCh := make(chan struct{})
go func() {
err := testPlacement.WaitUntilPlacementTableIsReady(context.Background())
if assert.NoError(t, err) {
testSuccessCh <- struct{}{}
}
}()

time.Sleep(50 * time.Millisecond)
require.True(t, testPlacement.tableIsBlocked.Load())

// unlock
testPlacement.onPlacementOrder(&placementv1pb.PlacementOrder{Operation: "unlock"})

// ensure that it is unlocked
select {
case <-time.After(500 * time.Millisecond):
t.Fatal("placement table not unlocked in 500ms")
case <-testSuccessCh:
// all good
}

time.Sleep(50 * time.Millisecond)
assert.False(t, asserted.Load())
assert.False(t, testPlacement.tableIsBlocked.Load())
})

// unlock
testPlacement.onPlacementOrder(&placementv1pb.PlacementOrder{Operation: "unlock"})
t.Run("abort on context canceled", func(t *testing.T) {
testPlacement.onPlacementOrder(&placementv1pb.PlacementOrder{Operation: "lock"})

testSuccessCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
go func() {
err := testPlacement.WaitUntilPlacementTableIsReady(ctx)
if assert.ErrorIs(t, err, context.Canceled) {
testSuccessCh <- struct{}{}
}
}()

time.Sleep(50 * time.Millisecond)
require.True(t, testPlacement.tableIsBlocked.Load())

// cancel context
cancel()

// ensure that it is still locked
select {
case <-time.After(500 * time.Millisecond):
t.Fatal("did not return in 500ms")
case <-testSuccessCh:
// all good
}

// ensure that it is unlocked
time.Sleep(50 * time.Millisecond)
assert.True(t, asserted.Load())
assert.True(t, testPlacement.tableIsBlocked.Load())
})
}

func TestLookupActor(t *testing.T) {
Expand Down
16 changes: 8 additions & 8 deletions pkg/channel/grpc/grpc_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,12 @@ func (g *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethod

var header, trailer grpcMetadata.MD

var opts []grpc.CallOption
opts = append(
opts,
opts := []grpc.CallOption{
grpc.Header(&header),
grpc.Trailer(&trailer),
grpc.MaxCallSendMsgSize(g.maxRequestBodySizeMB<<20),
grpc.MaxCallRecvMsgSize(g.maxRequestBodySizeMB<<20),
)
grpc.MaxCallSendMsgSize(g.maxRequestBodySizeMB << 20),
grpc.MaxCallRecvMsgSize(g.maxRequestBodySizeMB << 20),
}

resp, err := g.appCallbackClient.OnInvoke(ctx, req.Message(), opts...)

Expand All @@ -139,9 +137,11 @@ func (g *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethod
rsp = invokev1.NewInvokeMethodResponse(int32(codes.OK), "", nil)
}

rsp.WithHeaders(header).WithTrailers(trailer)
rsp.WithHeaders(header).
WithTrailers(trailer).
WithMessage(resp)

return rsp.WithMessage(resp), nil
return rsp, nil
}

// HealthProbe performs a health probe.
Expand Down
12 changes: 6 additions & 6 deletions pkg/channel/http/http_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
"net/http/httptest"
"strconv"
"sync"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/dapr/dapr/pkg/config"
invokev1 "github.com/dapr/dapr/pkg/messaging/v1"
Expand All @@ -41,13 +41,13 @@ type testConcurrencyHandler struct {
}

func (t *testConcurrencyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
cur := t.currentCalls.Inc()
cur := t.currentCalls.Add(1)

if cur > t.maxCalls {
t.testFailed = true
}

t.currentCalls.Dec()
t.currentCalls.Add(-1)
io.WriteString(w, r.URL.RawQuery)
}

Expand Down Expand Up @@ -365,7 +365,7 @@ func TestInvokeMethodMaxConcurrency(t *testing.T) {
t.Run("single concurrency", func(t *testing.T) {
handler := testConcurrencyHandler{
maxCalls: 1,
currentCalls: atomic.NewInt32(0),
currentCalls: &atomic.Int32{},
}
server := httptest.NewServer(&handler)
c := Channel{
Expand Down Expand Up @@ -398,7 +398,7 @@ func TestInvokeMethodMaxConcurrency(t *testing.T) {
t.Run("10 concurrent calls", func(t *testing.T) {
handler := testConcurrencyHandler{
maxCalls: 10,
currentCalls: atomic.NewInt32(0),
currentCalls: &atomic.Int32{},
}
server := httptest.NewServer(&handler)
c := Channel{
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestInvokeMethodMaxConcurrency(t *testing.T) {
t.Run("introduce failures", func(t *testing.T) {
handler := testConcurrencyHandler{
maxCalls: 5,
currentCalls: atomic.NewInt32(0),
currentCalls: &atomic.Int32{},
}
server := httptest.NewServer(&handler)
c := Channel{
Expand Down
17 changes: 6 additions & 11 deletions pkg/components/bindings/input_pluggable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,20 @@ import (
"os"
"runtime"
"sync"
"sync/atomic"
"testing"

guuid "github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/dapr/components-contrib/bindings"
contribMetadata "github.com/dapr/components-contrib/metadata"

"github.com/dapr/dapr/pkg/components/pluggable"
proto "github.com/dapr/dapr/pkg/proto/components/v1"
testingGrpc "github.com/dapr/dapr/pkg/testing/grpc"

guuid "github.com/google/uuid"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.uber.org/atomic"

"github.com/dapr/kit/logger"

"google.golang.org/grpc"
)

type inputBindingServer struct {
Expand Down
Loading

0 comments on commit 7190a73

Please sign in to comment.