Skip to content

Improved peerguard auth support #5086

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions core/cli/api/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
"strings"

"github.com/mudler/LocalAI/core/p2p"
p2pConfig "github.com/mudler/edgevpn/pkg/config"
"github.com/mudler/edgevpn/pkg/node"

"github.com/rs/zerolog/log"
)

func StartP2PStack(ctx context.Context, address, token, networkID string, federated bool) error {
func StartP2PStack(ctx context.Context, p2pCfg p2pConfig.Config, address, networkID string, federated bool) error {
var n *node.Node
// Here we are avoiding creating multiple nodes:
// - if the federated mode is enabled, we create a federated node and expose a service
Expand All @@ -29,23 +30,23 @@ func StartP2PStack(ctx context.Context, address, token, networkID string, federa

// Here a new node is created and started
// and a service is exposed by the node
node, err := p2p.ExposeService(ctx, "localhost", port, token, p2p.NetworkID(networkID, p2p.FederatedID))
node, err := p2p.ExposeService(ctx, p2pCfg, "localhost", port, p2p.NetworkID(networkID, p2p.FederatedID))
if err != nil {
return err
}

if err := p2p.ServiceDiscoverer(ctx, node, token, p2p.NetworkID(networkID, p2p.FederatedID), nil, false); err != nil {
if err := p2p.ServiceDiscoverer(ctx, node, p2p.NetworkID(networkID, p2p.FederatedID), nil, false); err != nil {
return err
}

n = node
}

// If the p2p mode is enabled, we start the service discovery
if token != "" {
if p2pCfg.NetworkToken != "" {
// If a node wasn't created previously, create it
if n == nil {
node, err := p2p.NewNode(token)
node, err := p2p.NewNode(p2pCfg)
if err != nil {
return err
}
Expand All @@ -58,7 +59,7 @@ func StartP2PStack(ctx context.Context, address, token, networkID string, federa

// Attach a ServiceDiscoverer to the p2p node
log.Info().Msg("Starting P2P server discovery...")
if err := p2p.ServiceDiscoverer(ctx, n, token, p2p.NetworkID(networkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) {
if err := p2p.ServiceDiscoverer(ctx, n, p2p.NetworkID(networkID, p2p.WorkerID), func(serviceID string, node p2p.NodeData) {
var tunnelAddresses []string
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(networkID, p2p.WorkerID)) {
if v.IsOnline() {
Expand Down
7 changes: 5 additions & 2 deletions core/cli/explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ import (
"time"

cliContext "github.com/mudler/LocalAI/core/cli/context"
cliP2P "github.com/mudler/LocalAI/core/cli/p2p"
"github.com/mudler/LocalAI/core/explorer"
"github.com/mudler/LocalAI/core/http"
)

type ExplorerCMD struct {
cliP2P.P2PCommonFlags `embed:""`

Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
PoolDatabase string `env:"LOCALAI_POOL_DATABASE,POOL_DATABASE" default:"explorer.json" help:"Path to the pool database" group:"api"`
ConnectionTimeout string `env:"LOCALAI_CONNECTION_TIMEOUT,CONNECTION_TIMEOUT" default:"2m" help:"Connection timeout for the explorer" group:"api"`
Expand All @@ -33,14 +36,14 @@ func (e *ExplorerCMD) Run(ctx *cliContext.Context) error {

if e.WithSync {
ds := explorer.NewDiscoveryServer(db, dur, e.ConnectionErrorThreshold)
go ds.Start(context.Background(), true)
go ds.Start(context.Background(), e.P2PCommonFlags, true)
}

if e.OnlySync {
ds := explorer.NewDiscoveryServer(db, dur, e.ConnectionErrorThreshold)
ctx := context.Background()

return ds.Start(ctx, false)
return ds.Start(ctx, e.P2PCommonFlags, false)
}

appHTTP := http.Explorer(db)
Expand Down
33 changes: 27 additions & 6 deletions core/cli/federated.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,43 @@ package cli

import (
"context"
"fmt"

cliContext "github.com/mudler/LocalAI/core/cli/context"
cliP2P "github.com/mudler/LocalAI/core/cli/p2p"
"github.com/mudler/LocalAI/core/p2p"
"github.com/rs/zerolog/log"
)

type FederatedCLI struct {
Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
RandomWorker bool `env:"LOCALAI_RANDOM_WORKER,RANDOM_WORKER" default:"false" help:"Select a random worker from the pool" group:"p2p"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances." group:"p2p"`
TargetWorker string `env:"LOCALAI_TARGET_WORKER,TARGET_WORKER" help:"Target worker to run the federated server on" group:"p2p"`
cliP2P.P2PCommonFlags `embed:""`

Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"`
RandomWorker bool `env:"LOCALAI_RANDOM_WORKER,RANDOM_WORKER" default:"false" help:"Select a random worker from the pool" group:"p2p"`
TargetWorker string `env:"LOCALAI_TARGET_WORKER,TARGET_WORKER" help:"Target worker to run the federated server on" group:"p2p"`
}

func (f *FederatedCLI) Run(ctx *cliContext.Context) error {

if f.Peer2PeerToken == "" {
log.Info().Msg("No token provided, generating one")
connectionData, err := p2p.GenerateNewConnectionData(
f.Peer2PeerDHTInterval, f.Peer2PeerOTPInterval,
f.Peer2PeerPrivkey, f.Peer2PeerUsePeerguard,
)
if err != nil {
log.Warn().Msgf("Error generating token: %s", err.Error())
}
f.Peer2PeerToken = connectionData.Base64()

log.Info().Msg("Generated Token:")
fmt.Println(f.Peer2PeerToken)

log.Info().Msg("To use the token, you can run the following command in another node or terminal:")
fmt.Printf("export TOKEN=\"%s\"\nlocal-ai worker p2p-llama-cpp-rpc\n", f.Peer2PeerToken)
}

fs := p2p.NewFederatedServer(f.Address, p2p.NetworkID(f.Peer2PeerNetworkID, p2p.FederatedID), f.Peer2PeerToken, !f.RandomWorker, f.TargetWorker)

return fs.Start(context.Background())
return fs.Start(context.Background(), f.P2PCommonFlags)
}
17 changes: 17 additions & 0 deletions core/cli/p2p/p2p.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package cli

type P2PCommonFlags struct {
Peer2PeerNoDHT bool `env:"LOCALAI_P2P_DISABLE_DHT,P2P_DISABLE_DHT" name:"p2p-disable-dht" help:"Disable DHT" group:"p2p"`
Peer2PeerLimit bool `env:"LOCALAI_P2P_ENABLE_LIMITS,P2P_ENABLE_LIMITS" name:"p2p-enable-limits" help:"Enable Limits" group:"p2p"`
Peer2PeerListenAddrs []string `env:"LOCALAI_P2P_LISTEN_MADDRS,P2P_LISTEN_MADDRS" name:"p2p-listen-maddrs" help:"A list of listen multiaddresses" group:"p2p"`
Peer2PeerBootAddrs []string `env:"LOCALAI_P2P_BOOTSTRAP_PEERS_MADDRS,P2P_BOOTSTRAP_PEERS_MADDRS" name:"p2p-bootstrap-peers-maddrs" help:"A list of bootstrap peers multiaddresses" group:"p2p"`
Peer2PeerDHTAnnounceAddrs []string `env:"LOCALAI_P2P_DHT_ANNOUNCE_MADDRS,P2P_DHT_ANNOUNCE_MADDRS" name:"p2p-dht-announce-maddrs" help:"A list of DHT announce maddrs" group:"p2p"`
Peer2PeerLibLoglevel string `env:"LOCALAI_P2P_LIB_LOGLEVEL,P2P_LIB_LOGLEVEL" name:"p2p-lib-loglevel" help:"libp2p specific loglevel" group:"p2p"`
Peer2PeerDHTInterval int `env:"LOCALAI_P2P_DHT_INTERVAL,P2P_DHT_INTERVAL" default:"360" name:"p2p-dht-interval" help:"Interval for DHT refresh (used during token generation)" group:"p2p"`
Peer2PeerOTPInterval int `env:"LOCALAI_P2P_OTP_INTERVAL,P2P_OTP_INTERVAL" default:"9000" name:"p2p-otp-interval" help:"Interval for OTP refresh (used during token generation)" group:"p2p"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
Peer2PeerPrivkey string `env:"LOCALAI_P2P_PRIVKEY,P2P_PRIVKEY" name:"p2pprivkey" help:"A base64 encoded protobuf serialized private key used for fixed ID (edgevpn can be used for generating one)" group:"p2p"`
Peer2PeerUsePeerguard bool `env:"LOCALAI_P2P_PEERGUARD,P2P_PEERGUARD" name:"p2ppeerguard" help:"Enable peerguarding through ecdsa authorization of nodes" group:"p2p"`
Peer2PeerAuthProvders string `env:"LOCALAI_P2P_PEERGATE_AUTH,P2P_PEERGATE_AUTH" name:"p2pauth" help:"JSON dict string with '{authProviderName: {providerOpt: value}}' structure, see edgevpn project" group:"p2p"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"`
}
22 changes: 16 additions & 6 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/mudler/LocalAI/core/application"
cli_api "github.com/mudler/LocalAI/core/cli/api"
cliContext "github.com/mudler/LocalAI/core/cli/context"
cliP2P "github.com/mudler/LocalAI/core/cli/p2p"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/http"
"github.com/mudler/LocalAI/core/p2p"
Expand Down Expand Up @@ -55,10 +56,7 @@ type RunCMD struct {
DisableMetricsEndpoint bool `env:"LOCALAI_DISABLE_METRICS_ENDPOINT,DISABLE_METRICS_ENDPOINT" default:"false" help:"Disable the /metrics endpoint" group:"api"`
HttpGetExemptedEndpoints []string `env:"LOCALAI_HTTP_GET_EXEMPTED_ENDPOINTS" default:"^/$,^/browse/?$,^/talk/?$,^/p2p/?$,^/chat/?$,^/text2image/?$,^/tts/?$,^/static/.*$,^/swagger.*$" help:"If LOCALAI_DISABLE_API_KEY_REQUIREMENT_FOR_HTTP_GET is overriden to true, this is the list of endpoints to exempt. Only adjust this in case of a security incident or as a result of a personal security posture review" group:"hardening"`
Peer2Peer bool `env:"LOCALAI_P2P,P2P" name:"p2p" default:"false" help:"Enable P2P mode" group:"p2p"`
Peer2PeerDHTInterval int `env:"LOCALAI_P2P_DHT_INTERVAL,P2P_DHT_INTERVAL" default:"360" name:"p2p-dht-interval" help:"Interval for DHT refresh (used during token generation)" group:"p2p"`
Peer2PeerOTPInterval int `env:"LOCALAI_P2P_OTP_INTERVAL,P2P_OTP_INTERVAL" default:"9000" name:"p2p-otp-interval" help:"Interval for OTP refresh (used during token generation)" group:"p2p"`
Peer2PeerToken string `env:"LOCALAI_P2P_TOKEN,P2P_TOKEN,TOKEN" name:"p2ptoken" help:"Token for P2P mode (optional)" group:"p2p"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"`
cliP2P.P2PCommonFlags `embed:""`
ParallelRequests bool `env:"LOCALAI_PARALLEL_REQUESTS,PARALLEL_REQUESTS" help:"Enable backends to handle multiple requests in parallel if they support it (e.g.: llama.cpp or vllm)" group:"backends"`
SingleActiveBackend bool `env:"LOCALAI_SINGLE_ACTIVE_BACKEND,SINGLE_ACTIVE_BACKEND" help:"Allow only one backend to be run at a time" group:"backends"`
PreloadBackendOnly bool `env:"LOCALAI_PRELOAD_BACKEND_ONLY,PRELOAD_BACKEND_ONLY" default:"false" help:"Do not launch the API services, only the preloaded models / backends are started (useful for multi-node setups)" group:"backends"`
Expand Down Expand Up @@ -114,26 +112,38 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
}

token := ""
p2pCfg := p2p.NewP2PConfig(r.P2PCommonFlags)
if r.Peer2Peer || r.Peer2PeerToken != "" {

log.Info().Msg("P2P mode enabled")
token = r.Peer2PeerToken
if token == "" {
// IF no token is provided, and p2p is enabled,
// we generate one and wait for the user to pick up the token (this is for interactive)
log.Info().Msg("No token provided, generating one")
token = p2p.GenerateToken(r.Peer2PeerDHTInterval, r.Peer2PeerOTPInterval)
connectionData, err := p2p.GenerateNewConnectionData(
r.Peer2PeerDHTInterval, r.Peer2PeerOTPInterval,
r.Peer2PeerPrivkey, r.Peer2PeerUsePeerguard,
)
if err != nil {
log.Warn().Msgf("Error generating token: %s", err.Error())
}
token = connectionData.Base64()

log.Info().Msg("Generated Token:")
fmt.Println(token)

log.Info().Msg("To use the token, you can run the following command in another node or terminal:")
fmt.Printf("export TOKEN=\"%s\"\nlocal-ai worker p2p-llama-cpp-rpc\n", token)
}
opts = append(opts, config.WithP2PToken(token))

p2pCfg.NetworkToken = token
}

backgroundCtx := context.Background()

if err := cli_api.StartP2PStack(backgroundCtx, r.Address, token, r.Peer2PeerNetworkID, r.Federated); err != nil {
if err := cli_api.StartP2PStack(backgroundCtx, p2pCfg, r.Address, r.Peer2PeerNetworkID, r.Federated); err != nil {
return err
}

Expand Down
20 changes: 12 additions & 8 deletions core/cli/worker/worker_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

cliContext "github.com/mudler/LocalAI/core/cli/context"
cliP2P "github.com/mudler/LocalAI/core/cli/p2p"
"github.com/mudler/LocalAI/core/p2p"
"github.com/mudler/LocalAI/pkg/assets"
"github.com/mudler/LocalAI/pkg/library"
Expand All @@ -20,12 +21,13 @@ import (
)

type P2P struct {
WorkerFlags `embed:""`
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"`
WorkerFlags `embed:""`
cliP2P.P2PCommonFlags `embed:""`

Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
}

func (r *P2P) Run(ctx *cliContext.Context) error {
Expand All @@ -41,6 +43,8 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
if r.Token == "" {
return fmt.Errorf("Token is required")
}
p2pCfg := p2p.NewP2PConfig(r.P2PCommonFlags)
p2pCfg.NetworkToken = r.Token

port, err := freeport.GetFreePort()
if err != nil {
Expand All @@ -60,7 +64,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
p = r.RunnerPort
}

_, err = p2p.ExposeService(context.Background(), address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
_, err = p2p.ExposeService(context.Background(), p2pCfg, address, p, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil {
return err
}
Expand Down Expand Up @@ -103,7 +107,7 @@ func (r *P2P) Run(ctx *cliContext.Context) error {
}
}()

_, err = p2p.ExposeService(context.Background(), address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
_, err = p2p.ExposeService(context.Background(), p2pCfg, address, fmt.Sprint(port), p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil {
return err
}
Expand Down
12 changes: 8 additions & 4 deletions core/explorer/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/rs/zerolog/log"

cliP2P "github.com/mudler/LocalAI/core/cli/p2p"
"github.com/mudler/LocalAI/core/p2p"
"github.com/mudler/edgevpn/pkg/blockchain"
)
Expand Down Expand Up @@ -40,7 +41,7 @@ type Network struct {
Clusters []ClusterData
}

func (s *DiscoveryServer) runBackground() {
func (s *DiscoveryServer) runBackground(p2pCommonFlags cliP2P.P2PCommonFlags) {
if len(s.database.TokenList()) == 0 {
time.Sleep(5 * time.Second) // avoid busy loop
return
Expand All @@ -50,11 +51,14 @@ func (s *DiscoveryServer) runBackground() {
c, cancel := context.WithTimeout(context.Background(), s.connectionTime)
defer cancel()

p2pCfg := p2p.NewP2PConfig(p2pCommonFlags)
p2pCfg.NetworkToken = token

// Connect to the network
// Get the number of nodes
// save it in the current state (mutex)
// do not do in parallel
n, err := p2p.NewNode(token)
n, err := p2p.NewNode(p2pCfg)
if err != nil {
log.Err(err).Msg("Failed to create node")
s.failedToken(token)
Expand Down Expand Up @@ -197,14 +201,14 @@ func (s *DiscoveryServer) retrieveNetworkData(c context.Context, ledger *blockch
}

// Start the discovery server. This is meant to be run in to a goroutine.
func (s *DiscoveryServer) Start(ctx context.Context, keepRunning bool) error {
func (s *DiscoveryServer) Start(ctx context.Context, p2pCommonFlags cliP2P.P2PCommonFlags, keepRunning bool) error {
for {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled")
default:
// Collect data
s.runBackground()
s.runBackground(p2pCommonFlags)
if !keepRunning {
return nil
}
Expand Down
Loading
Loading