Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@

[[blockchains]]
type = "anvil"
chain_id = "1337"
docker_cmd_params = ["-b", "0.5", "--mixed-mining"]
#target = "remote"

[[blockchains]]
type = "anvil"
chain_id = "2337"
port = "8546"
docker_cmd_params = ["-b", "0.5", "--mixed-mining"]
target = "remote"


[jd]
csa_encryption_key = "d1093c0060d50a3c89c189b2e485da5a3ce57f3dcb38ab7e2c0d5f0bb2314a44" # any random 32 byte hex string
# change to your version
image = "job-distributor:0.22.1"

[fake]
port = 8171

[fake_http]
port = 8666

#[s3provider]
# # use all defaults
# port = 9000
# console_port = 9001

[infra]
# either "docker" or "kubernetes"
type = "docker"

[[nodesets]]
nodes = 4
name = "workflow"
don_types = ["workflow"]
override_mode = "all"
http_port_range_start = 10100

env_vars = { CL_EVM_CMD = "" }
capabilities = ["ocr3", "custom-compute", "web-api-target", "web-api-trigger", "vault", "cron", "http-action", "http-trigger", "consensus", "don-time", "write-evm-1337", "write-evm-2337", "evm-1337", "evm-2337", "read-contract-1337", "read-contract-2337"]

[nodesets.db]
image = "postgres:12.0"
port = 13000

[[nodesets.node_specs]]
roles = ["plugin"]
[nodesets.node_specs.node]
#docker_ctx = "../../../.."
#docker_file = "core/chainlink.Dockerfile"
#docker_build_args = { "CL_IS_PROD_BUILD" = "false" }
image = "chainlink-tmp:latest"
user_config_overrides = ""

[[nodesets]]
nodes = 1
name = "bootstrap-gateway"
don_types = ["bootstrap", "gateway"]
override_mode = "each"
http_port_range_start = 10300

env_vars = { CL_EVM_CMD = "" }
supported_evm_chains = [1337, 2337]

[nodesets.db]
image = "postgres:12.0"
port = 13200

[[nodesets.node_specs]]
roles = ["bootstrap", "gateway"]
[nodesets.node_specs.node]
#docker_ctx = "../../../.."
#docker_file = "core/chainlink.Dockerfile"
#docker_build_args = { "CL_IS_PROD_BUILD" = "false" }
image = "chainlink-tmp:latest"
# 5002 is the web API capabilities port for incoming requests
# 15002 is the vault port for incoming requests
custom_ports = ["5002:5002","15002:15002"]
# image = "chainlink-tmp:latest"
user_config_overrides = ""
193 changes: 192 additions & 1 deletion core/scripts/cre/environment/environment/environment.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package environment

import (
"bufio"
"context"
"crypto/ecdsa"
"crypto/rand"
Expand All @@ -13,6 +14,7 @@ import (
"path/filepath"
"runtime/debug"
"slices"
"strconv"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -255,6 +257,10 @@ func startCmd() *cobra.Command {
return errors.Wrap(err, "failed to set default CTF configs")
}

if err := cleanupTrackedTunnels(relativePathToRepoRoot); err != nil {
framework.L.Warn().Err(err).Msg("failed to clean up tracked SSM tunnels before start")
}

cleanUpErr := envconfig.RemoveAllEnvironmentStateDir(relativePathToRepoRoot)
if cleanUpErr != nil {
return errors.Wrap(cleanUpErr, "failed to clean up environment state files")
Expand Down Expand Up @@ -500,6 +506,9 @@ func startCmd() *cobra.Command {
if storeErr != nil {
return errors.Wrap(storeErr, "failed to store local CRE state")
}
if err := persistTunnelState(relativePathToRepoRoot, output); err != nil {
return errors.Wrap(err, "failed to store tunnel state")
}

return nil
},
Expand Down Expand Up @@ -634,6 +643,10 @@ func stopCmd() *cobra.Command {
return errors.Wrap(removeErr, "failed to remove environment containers. Please remove them manually")
}

if err := cleanupTrackedTunnels(relativePathToRepoRoot); err != nil {
framework.L.Warn().Err(err).Msg("failed to clean up tracked SSM tunnels")
}

if allFlag {
stopBeholderErr := stopBeholder()
if stopBeholderErr != nil {
Expand Down Expand Up @@ -712,7 +725,7 @@ func StartCLIEnvironment(

universalSetupInput := &creenv.SetupInput{
NodeSets: in.NodeSets,
BlockchainsInput: in.Blockchains,
Blockchains: in.Blockchains,
ContractVersions: env.ContractVersions(),
WithV2Registries: env.WithV2Registries(),
JdInput: in.JD,
Expand Down Expand Up @@ -818,6 +831,184 @@ func oneLineErrorMessage(errOrPanic any) string {
return strings.SplitN(fmt.Sprintf("%v", errOrPanic), "\n", 1)[0]
}

func cleanupTrackedTunnels(relativePathToRepoRoot string) error {
state, err := envconfig.LoadTunnelState(relativePathToRepoRoot)
if err != nil {
return errors.Wrap(err, "failed to load tracked tunnel state")
}
if len(state.Tunnels) == 0 {
return nil
}

framework.L.Info().Msgf("Found %d tracked SSM tunnel process(es), cleaning up", len(state.Tunnels))
failed := 0
for _, t := range state.Tunnels {
// First, aggressively kill known long-lived plugin children by local forwarded port.
if pluginKilled, pluginErr := killSessionManagerPluginByLocalPort(t.LocalPort); pluginErr != nil {
framework.L.Warn().Err(pluginErr).Msgf("failed to clean session-manager-plugin for localPort=%d", t.LocalPort)
} else if pluginKilled {
framework.L.Info().Msgf("stopped session-manager-plugin for localPort=%d", t.LocalPort)
}

if t.PID <= 0 {
continue
}
if !processExists(t.PID) {
continue
}
isSSM, checkErr := isSSMStartSessionProcess(t.PID)
if checkErr != nil {
framework.L.Warn().Err(checkErr).Msgf("failed to inspect process pid=%d before tunnel cleanup", t.PID)
failed++
continue
}
if !isSSM {
framework.L.Warn().Msgf("refusing to kill non-SSM process pid=%d recorded in tunnel state", t.PID)
failed++
continue
}

proc, findErr := os.FindProcess(t.PID)
if findErr != nil {
failed++
continue
}

_ = proc.Signal(syscall.SIGTERM)
deadline := time.Now().Add(2 * time.Second)
for processExists(t.PID) && time.Now().Before(deadline) {
time.Sleep(150 * time.Millisecond)
}
if processExists(t.PID) {
_ = proc.Kill()
}
if processExists(t.PID) {
failed++
framework.L.Warn().Msgf("failed to stop tracked tunnel process pid=%d localPort=%d remotePort=%d", t.PID, t.LocalPort, t.RemotePort)
continue
}

framework.L.Info().Msgf("stopped tracked tunnel process pid=%d localPort=%d remotePort=%d kind=%s", t.PID, t.LocalPort, t.RemotePort, t.Kind)
}

if clearErr := envconfig.ClearTunnelState(relativePathToRepoRoot); clearErr != nil {
framework.L.Warn().Err(clearErr).Msg("failed to clear tunnel state file after cleanup")
}

if failed > 0 {
return fmt.Errorf("failed to clean up %d tracked tunnel process(es)", failed)
}
return nil
}

func processExists(pid int) bool {
if pid <= 0 {
return false
}
proc, err := os.FindProcess(pid)
if err != nil {
return false
}
err = proc.Signal(syscall.Signal(0))
return err == nil
}

func isSSMStartSessionProcess(pid int) (bool, error) {
out, err := exec.Command("ps", "-o", "command=", "-p", strconv.Itoa(pid)).Output()
if err != nil {
return false, err
}
cmd := strings.TrimSpace(string(out))
if cmd == "" {
return false, nil
}

return strings.Contains(cmd, "aws ssm start-session"), nil
}

func killSessionManagerPluginByLocalPort(localPort int) (bool, error) {
if localPort <= 0 {
return false, nil
}

out, err := exec.Command("ps", "-axo", "pid=,command=").Output()
if err != nil {
return false, err
}

pattern := fmt.Sprintf(`"localPortNumber": ["%d"]`, localPort)
killedAny := false
scanner := bufio.NewScanner(strings.NewReader(string(out)))
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
if !strings.Contains(line, "session-manager-plugin") || !strings.Contains(line, pattern) {
continue
}

fields := strings.Fields(line)
if len(fields) == 0 {
continue
}
pid, parseErr := strconv.Atoi(fields[0])
if parseErr != nil || pid <= 0 {
continue
}

proc, findErr := os.FindProcess(pid)
if findErr != nil {
continue
}
_ = proc.Signal(syscall.SIGTERM)
deadline := time.Now().Add(2 * time.Second)
for processExists(pid) && time.Now().Before(deadline) {
time.Sleep(100 * time.Millisecond)
}
if processExists(pid) {
_ = proc.Kill()
}
if !processExists(pid) {
killedAny = true
}
}
if scanErr := scanner.Err(); scanErr != nil {
return killedAny, scanErr
}

return killedAny, nil
}

func persistTunnelState(relativePathToRepoRoot string, output *creenv.SetupOutput) error {
if output == nil {
return envconfig.ClearTunnelState(relativePathToRepoRoot)
}

bindings := output.TunnelBindings()
processes := make([]envconfig.TunnelProcess, 0, len(bindings))
for _, b := range bindings {
if b.PID <= 0 {
continue
}
processes = append(processes, envconfig.TunnelProcess{
PID: b.PID,
Kind: "ssm",
InstanceID: os.Getenv("CRE_EC2_INSTANCE_ID"),
Region: "us-west-2",
RemotePort: b.Port,
LocalPort: b.LocalPort,
ComponentID: b.ComponentID,
Endpoint: b.EndpointName,
})
}

return envconfig.StoreTunnelState(relativePathToRepoRoot, &envconfig.TunnelState{
Version: 1,
Tunnels: processes,
})
}

func initDxTracker() {
if dxTracker != nil {
return
Expand Down
10 changes: 9 additions & 1 deletion core/scripts/cre/environment/environment/swap.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,14 @@ func swapNodes(ctx context.Context, forceFlag bool, waitTime time.Duration) erro
return fmt.Errorf("failed to set TESTCONTAINERS_RYUK_DISABLED environment variable: %w", setErr)
}

effectiveBlockchains, effectiveErr := config.EffectiveBlockchains()
if effectiveErr != nil {
return errors.Wrap(effectiveErr, "failed to resolve blockchain inputs")
}
if len(effectiveBlockchains) == 0 || effectiveBlockchains[0] == nil || effectiveBlockchains[0].Out == nil {
return errors.New("at least one blockchain output is required to restart node sets")
}

nerrg := errgroup.Group{}
for _, nodeSet := range config.NodeSets {
nerrg.Go(func() error {
Expand Down Expand Up @@ -290,7 +298,7 @@ func swapNodes(ctx context.Context, forceFlag bool, waitTime time.Duration) erro
nodeSet.Out = nil
var nodesetErr error
nodeSet.Input.NodeSpecs = nodeSet.ExtractCTFInputs()
nodeSet.Out, nodesetErr = ns.NewSharedDBNodeSet(nodeSet.Input, config.Blockchains[0].Out)
nodeSet.Out, nodesetErr = ns.NewSharedDBNodeSet(nodeSet.Input, effectiveBlockchains[0].Out)
if nodesetErr != nil {
framework.L.Error().Msgf("Failed to create node set named %s: %s", nodeSet.Name, nodesetErr)
framework.L.Info().Msgf("Waiting %s for the containers to be removed", waitTime.String())
Expand Down
Loading
Loading