Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
* Display MySQL user defined error in API Key UI (#4590)

* Display MySQL user defined error in UI

Signed-off-by: Kenta Kozuka <[email protected]>

* Fix Unexpected empty arrow function

Signed-off-by: Kenta Kozuka <[email protected]>

* Add tests

Signed-off-by: Kenta Kozuka <[email protected]>

* Run subtests in parallel

Signed-off-by: Kenta Kozuka <[email protected]>

---------

Signed-off-by: Kenta Kozuka <[email protected]>

* Display MySQL user defined error of piped, command, and application (#4597)

Signed-off-by: Kenta Kozuka <[email protected]>

* Add service tags as takset task on create (#4598)

Signed-off-by: khanhtc1202 <[email protected]>

* [ECS] Fix remove all previous active tasksets on QuickSync (#4600)

* Remove previous ACTIVE tasksets if present on quicksync

Signed-off-by: khanhtc1202 <[email protected]>

* Remove GetPrimaryTaskSet interface

Signed-off-by: khanhtc1202 <[email protected]>

---------

Signed-off-by: khanhtc1202 <[email protected]>

* Fix unable to fetch ECS taskset tags (#4605)

Signed-off-by: khanhtc1202 <[email protected]>

* Support recreate for ECS tasks (#4608)

* Support singleton ECS task

Signed-off-by: khanhtc1202 <[email protected]>

* Rename singleton to recreate

Signed-off-by: khanhtc1202 <[email protected]>

---------

Signed-off-by: khanhtc1202 <[email protected]>

---------

Signed-off-by: Kenta Kozuka <[email protected]>
Signed-off-by: khanhtc1202 <[email protected]>
Co-authored-by: Kenta Kozuka <[email protected]>
  • Loading branch information
khanhtc1202 and kentakozuka authored Oct 11, 2023
1 parent 3b63fe5 commit b6f9d17
Show file tree
Hide file tree
Showing 16 changed files with 306 additions and 67 deletions.
3 changes: 2 additions & 1 deletion pkg/app/piped/executor/ecs/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func (e *deployExecutor) ensureSync(ctx context.Context) model.StageStatus {
return model.StageStatus_STAGE_FAILURE
}

if !sync(ctx, &e.Input, e.platformProviderName, e.platformProviderCfg, taskDefinition, servicedefinition, primary) {
recreate := e.appCfg.QuickSync.Recreate
if !sync(ctx, &e.Input, e.platformProviderName, e.platformProviderCfg, recreate, taskDefinition, servicedefinition, primary) {
return model.StageStatus_STAGE_FAILURE
}

Expand Down
51 changes: 38 additions & 13 deletions pkg/app/piped/executor/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/pipe-cd/pipecd/pkg/app/piped/deploysource"
"github.com/pipe-cd/pipecd/pkg/app/piped/executor"
"github.com/pipe-cd/pipecd/pkg/app/piped/platformprovider"
provider "github.com/pipe-cd/pipecd/pkg/app/piped/platformprovider/ecs"
"github.com/pipe-cd/pipecd/pkg/config"
"github.com/pipe-cd/pipecd/pkg/model"
Expand Down Expand Up @@ -160,6 +159,8 @@ func applyServiceDefinition(ctx context.Context, cli provider.Client, serviceDef
if err := cli.TagResource(ctx, *service.ServiceArn, serviceDefinition.Tags); err != nil {
return nil, fmt.Errorf("failed to update tags of ECS service %s: %w", *serviceDefinition.ServiceName, err)
}
// Re-assign tags to service object because UpdateService API doesn't return tags.
service.Tags = serviceDefinition.Tags

} else {
service, err = cli.CreateService(ctx, serviceDefinition)
Expand Down Expand Up @@ -214,10 +215,9 @@ func runStandaloneTask(
}

func createPrimaryTaskSet(ctx context.Context, client provider.Client, service types.Service, taskDef types.TaskDefinition, targetGroup *types.LoadBalancer) error {
// Get current PRIMARY task set.
prevPrimaryTaskSet, err := client.GetPrimaryTaskSet(ctx, service)
// Ignore error in case it's not found error, the prevPrimaryTaskSet doesn't exist for newly created Service.
if err != nil && !errors.Is(err, platformprovider.ErrNotFound) {
// Get current PRIMARY/ACTIVE task sets.
prevTaskSets, err := client.GetServiceTaskSets(ctx, service)
if err != nil {
return err
}

Expand All @@ -234,17 +234,17 @@ func createPrimaryTaskSet(ctx context.Context, client provider.Client, service t
return err
}

// Remove old taskSet if existed.
if prevPrimaryTaskSet != nil {
if err = client.DeleteTaskSet(ctx, *prevPrimaryTaskSet); err != nil {
// Remove old taskSets if existed.
for _, prevTaskSet := range prevTaskSets {
if err = client.DeleteTaskSet(ctx, *prevTaskSet); err != nil {
return err
}
}

return nil
}

func sync(ctx context.Context, in *executor.Input, platformProviderName string, platformProviderCfg *config.PlatformProviderECSConfig, taskDefinition types.TaskDefinition, serviceDefinition types.Service, targetGroup *types.LoadBalancer) bool {
func sync(ctx context.Context, in *executor.Input, platformProviderName string, platformProviderCfg *config.PlatformProviderECSConfig, recreate bool, taskDefinition types.TaskDefinition, serviceDefinition types.Service, targetGroup *types.LoadBalancer) bool {
client, err := provider.DefaultRegistry().Client(platformProviderName, platformProviderCfg, in.Logger)
if err != nil {
in.LogPersister.Errorf("Unable to create ECS client for the provider %s: %v", platformProviderName, err)
Expand All @@ -265,10 +265,35 @@ func sync(ctx context.Context, in *executor.Input, platformProviderName string,
return false
}

in.LogPersister.Infof("Start rolling out ECS task set")
if err := createPrimaryTaskSet(ctx, client, *service, *td, targetGroup); err != nil {
in.LogPersister.Errorf("Failed to rolling out ECS task set for service %s: %v", *serviceDefinition.ServiceName, err)
return false
if recreate {
cnt := service.DesiredCount
// Scale down the service tasks by set it to 0
in.LogPersister.Infof("Scale down ECS desired tasks count to 0")
service.DesiredCount = 0
if _, err = client.UpdateService(ctx, *service); err != nil {
in.LogPersister.Errorf("Failed to stop service tasks: %v", err)
return false
}

in.LogPersister.Infof("Start rolling out ECS task set")
if err := createPrimaryTaskSet(ctx, client, *service, *td, targetGroup); err != nil {
in.LogPersister.Errorf("Failed to rolling out ECS task set for service %s: %v", *serviceDefinition.ServiceName, err)
return false
}

// Scale up the service tasks count back to its desired.
in.LogPersister.Infof("Scale up ECS desired tasks count back to %d", cnt)
service.DesiredCount = cnt
if _, err = client.UpdateService(ctx, *service); err != nil {
in.LogPersister.Errorf("Failed to turning back service tasks: %v", err)
return false
}
} else {
in.LogPersister.Infof("Start rolling out ECS task set")
if err := createPrimaryTaskSet(ctx, client, *service, *td, targetGroup); err != nil {
in.LogPersister.Errorf("Failed to rolling out ECS task set for service %s: %v", *serviceDefinition.ServiceName, err)
return false
}
}

in.LogPersister.Infof("Wait service to reach stable state")
Expand Down
50 changes: 27 additions & 23 deletions pkg/app/piped/platformprovider/ecs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,13 @@ func (c *client) CreateTaskSet(ctx context.Context, service types.Service, taskD
if taskDefinition.TaskDefinitionArn == nil {
return nil, fmt.Errorf("failed to create task set of task family %s: no task definition provided", *taskDefinition.Family)
}

input := &ecs.CreateTaskSetInput{
Cluster: service.ClusterArn,
Service: service.ServiceArn,
TaskDefinition: taskDefinition.TaskDefinitionArn,
Scale: &types.Scale{Unit: types.ScaleUnitPercent, Value: float64(scale)},
Tags: service.Tags,
// If you specify the awsvpc network mode, the task is allocated an elastic network interface,
// and you must specify a NetworkConfiguration when run a task with the task definition.
NetworkConfiguration: service.NetworkConfiguration,
Expand Down Expand Up @@ -250,7 +252,7 @@ func (c *client) CreateTaskSet(ctx context.Context, service types.Service, taskD
return output.TaskSet, nil
}

func (c *client) GetPrimaryTaskSet(ctx context.Context, service types.Service) (*types.TaskSet, error) {
func (c *client) GetServiceTaskSets(ctx context.Context, service types.Service) ([]*types.TaskSet, error) {
input := &ecs.DescribeServicesInput{
Cluster: service.ClusterArn,
Services: []string{
Expand All @@ -259,42 +261,44 @@ func (c *client) GetPrimaryTaskSet(ctx context.Context, service types.Service) (
}
output, err := c.ecsClient.DescribeServices(ctx, input)
if err != nil {
return nil, fmt.Errorf("failed to get primary task set of service %s: %w", *service.ServiceName, err)
return nil, fmt.Errorf("failed to get task sets of service %s: %w", *service.ServiceName, err)
}
if len(output.Services) == 0 {
return nil, fmt.Errorf("failed to get primary task set of service %s: services empty", *service.ServiceName)
return nil, fmt.Errorf("failed to get task sets of service %s: services empty", *service.ServiceName)
}
taskSets := output.Services[0].TaskSets
for _, taskSet := range taskSets {
if aws.ToString(taskSet.Status) == "PRIMARY" {
return &taskSet, nil
svc := output.Services[0]
activeTaskSetArns := make([]string, 0, len(svc.TaskSets))
for i := range svc.TaskSets {
if aws.ToString(svc.TaskSets[i].Status) == "DRAINING" {
continue
}
activeTaskSetArns = append(activeTaskSetArns, *svc.TaskSets[i].TaskSetArn)
}
return nil, platformprovider.ErrNotFound
}

func (c *client) GetServiceTaskSets(ctx context.Context, service types.Service) ([]*types.TaskSet, error) {
input := &ecs.DescribeServicesInput{
Cluster: service.ClusterArn,
Services: []string{
*service.ServiceArn,
if len(activeTaskSetArns) == 0 {
return nil, fmt.Errorf("failed to get task sets of service %s: services empty", *service.ServiceName)
}

tsInput := &ecs.DescribeTaskSetsInput{
Cluster: service.ClusterArn,
Service: service.ServiceArn,
TaskSets: activeTaskSetArns,
Include: []types.TaskSetField{
types.TaskSetFieldTags,
},
}
output, err := c.ecsClient.DescribeServices(ctx, input)
tsOutput, err := c.ecsClient.DescribeTaskSets(ctx, tsInput)
if err != nil {
return nil, fmt.Errorf("failed to get task sets of service %s: %w", *service.ServiceName, err)
}
if len(output.Services) == 0 {
return nil, fmt.Errorf("failed to get task sets of service %s: services empty", *service.ServiceName)
}
svc := output.Services[0]
taskSets := make([]*types.TaskSet, 0, len(svc.TaskSets))
for i := range svc.TaskSets {
if aws.ToString(svc.TaskSets[i].Status) == "DRAINING" {
taskSets := make([]*types.TaskSet, 0, len(tsOutput.TaskSets))
for i := range tsOutput.TaskSets {
if !IsPipeCDManagedTaskSet(&tsOutput.TaskSets[i]) {
continue
}
taskSets = append(taskSets, &svc.TaskSets[i])
taskSets = append(taskSets, &tsOutput.TaskSets[i])
}

return taskSets, nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/app/piped/platformprovider/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type ECS interface {
WaitServiceStable(ctx context.Context, service types.Service) error
RegisterTaskDefinition(ctx context.Context, taskDefinition types.TaskDefinition) (*types.TaskDefinition, error)
RunTask(ctx context.Context, taskDefinition types.TaskDefinition, clusterArn string, launchType string, awsVpcConfiguration *config.ECSVpcConfiguration, tags []types.Tag) error
GetPrimaryTaskSet(ctx context.Context, service types.Service) (*types.TaskSet, error)
GetServiceTaskSets(ctx context.Context, service types.Service) ([]*types.TaskSet, error)
CreateTaskSet(ctx context.Context, service types.Service, taskDefinition types.TaskDefinition, targetGroup *types.LoadBalancer, scale int) (*types.TaskSet, error)
DeleteTaskSet(ctx context.Context, taskSet types.TaskSet) error
Expand Down
26 changes: 26 additions & 0 deletions pkg/app/piped/platformprovider/ecs/task_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2023 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecs

import "github.com/aws/aws-sdk-go-v2/service/ecs/types"

func IsPipeCDManagedTaskSet(ts *types.TaskSet) bool {
for _, tag := range ts.Tags {
if *tag.Key == LabelManagedBy && *tag.Value == ManagedByPiped {
return true
}
}
return false
}
61 changes: 61 additions & 0 deletions pkg/app/piped/platformprovider/ecs/task_set_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2023 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecs

import (
"testing"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ecs/types"
"github.com/stretchr/testify/assert"
)

func TestIsPipeCDManagedTaskSet(t *testing.T) {
t.Parallel()

testcases := []struct {
name string
ts *types.TaskSet
expected bool
}{
{
name: "managed by piped",
ts: &types.TaskSet{Tags: []types.Tag{
{Key: aws.String(LabelManagedBy), Value: aws.String(ManagedByPiped)},
}},
expected: true,
},
{
name: "nil tags",
ts: &types.TaskSet{},
expected: false,
},
{
name: "not managed by piped",
ts: &types.TaskSet{Tags: []types.Tag{
{Key: aws.String(LabelManagedBy), Value: aws.String("other")},
{Key: aws.String("hoge"), Value: aws.String("fuga")},
}},
expected: false,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
got := IsPipeCDManagedTaskSet(tc.ts)
assert.Equal(t, tc.expected, got)
})
}
}
20 changes: 12 additions & 8 deletions pkg/app/server/grpcapi/grpcapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func getCommand(ctx context.Context, store commandstore.Store, id string, logger
func addCommand(ctx context.Context, store commandstore.Store, cmd *model.Command, logger *zap.Logger) error {
if err := store.AddCommand(ctx, cmd); err != nil {
logger.Error("failed to create command", zap.Error(err))
return status.Error(codes.Internal, "Failed to create command")
return gRPCStoreError(err, "create command")
}
return nil
}
Expand Down Expand Up @@ -166,18 +166,22 @@ func getEncriptionKey(se *model.Piped_SecretEncryption) ([]byte, error) {
}

func gRPCStoreError(err error, msg string) error {
switch err {
case nil:
if err == nil {
return nil
case datastore.ErrNotFound, filestore.ErrNotFound, stagelogstore.ErrNotFound:
}
if errors.Is(err, datastore.ErrNotFound) || errors.Is(err, filestore.ErrNotFound) || errors.Is(err, stagelogstore.ErrNotFound) {
return status.Error(codes.NotFound, fmt.Sprintf("Entity was not found to %s", msg))
case datastore.ErrInvalidArgument:
}
if errors.Is(err, datastore.ErrInvalidArgument) {
return status.Error(codes.InvalidArgument, fmt.Sprintf("Invalid argument to %s", msg))
case datastore.ErrAlreadyExists:
}
if errors.Is(err, datastore.ErrAlreadyExists) {
return status.Error(codes.AlreadyExists, fmt.Sprintf("Entity already exists to %s", msg))
default:
return status.Error(codes.Internal, fmt.Sprintf("Failed to %s", msg))
}
if errors.Is(err, datastore.ErrUserDefined) {
return status.Error(codes.FailedPrecondition, err.Error())
}
return status.Error(codes.Internal, fmt.Sprintf("Failed to %s", msg))
}

func makeUnregisteredAppsCacheKey(projectID string) string {
Expand Down
Loading

0 comments on commit b6f9d17

Please sign in to comment.