Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Redis ISB as store #2409

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/apis/numaflow/v1alpha1/get_spec_req.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ type GetRedisServiceSpecReq struct {
}

type GetVertexPodSpecReq struct {
ISBSvcType ISBSvcType `protobuf:"bytes,1,opt,name=isbSvcType"`
Image string `protobuf:"bytes,2,opt,name=image"`
PullPolicy corev1.PullPolicy `protobuf:"bytes,3,opt,name=pullPolicy,casttype=k8s.io/api/core/v1.PullPolicy"`
Env []corev1.EnvVar `protobuf:"bytes,4,rep,name=env"`
ISBSvcType ISBSvcType `protobuf:"bytes,1,opt,name=isbSvcType"`
Image string `protobuf:"bytes,2,opt,name=image"`
PullPolicy corev1.PullPolicy `protobuf:"bytes,3,opt,name=pullPolicy,casttype=k8s.io/api/core/v1.PullPolicy"`
Env []corev1.EnvVar `protobuf:"bytes,4,rep,name=env"`
ServingEnv []corev1.EnvVar
SideInputsStoreName string `protobuf:"bytes,5,opt,name=sideInputsStoreName"`
ServingSourceStreamName string `protobuf:"bytes,6,opt,name=servingSourceStreamName"`
PipelineSpec PipelineSpec `protobuf:"bytes,7,opt,name=pipelineSpec"`
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/numaflow/v1alpha1/vertex_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func (v Vertex) GetPodSpec(req GetVertexPodSpecReq) (*corev1.PodSpec, error) {
for _, vtx := range req.PipelineSpec.Vertices {
if vtx.IsASource() && vtx.Source.Serving != nil {
commonEnvVars = append(commonEnvVars, corev1.EnvVar{Name: EnvCallbackEnabled, Value: "true"})
commonEnvVars = append(commonEnvVars, req.ServingEnv...)
}
}

Expand Down
46 changes: 37 additions & 9 deletions pkg/reconciler/vertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,34 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
return ctrl.Result{}, err
}

var servingRedisIsbSvc *dfv1.InterStepBufferService = nil
if vertex.IsASource() && vertex.Spec.Source.Serving != nil && strings.HasPrefix(*vertex.Spec.Source.Serving.Store.URL, "native-isb:") {
servingRedisIsbSvc = &dfv1.InterStepBufferService{}
isbSvcName := strings.TrimPrefix(*vertex.Spec.Source.Serving.Store.URL, "native-isb:")
err := r.client.Get(ctx, types.NamespacedName{Namespace: vertex.Namespace, Name: isbSvcName}, servingRedisIsbSvc)
if err != nil {
if apierrors.IsNotFound(err) {
e := fmt.Errorf("isbsvc %s not found", isbSvcName)
vertex.Status.MarkPhaseFailed("ISBSvcNotFound", e.Error())
return ctrl.Result{}, e
}
log.Errorw("Failed to get ISB Service", zap.String("isbsvc", isbSvcName), zap.Error(err))
vertex.Status.MarkPhaseFailed("FindISBSvcFailed", err.Error())
return ctrl.Result{}, err
}
if servingRedisIsbSvc.GetAnnotations()[dfv1.KeyInstance] != vertex.GetAnnotations()[dfv1.KeyInstance] {
log.Errorw("ISB Service is found but not managed by the same controller of this vertex", zap.String("isbsvc", isbSvcName), zap.Error(err))
return ctrl.Result{}, fmt.Errorf("isbsvc not managed by the same controller of this vertex")
}
if !servingRedisIsbSvc.Status.IsHealthy() {
log.Errorw("ISB Service is not in healthy status", zap.String("isbsvc", isbSvcName), zap.Error(err))
vertex.Status.MarkPhaseFailed("ISBSvcNotHealthy", "isbsvc not healthy")
return ctrl.Result{}, fmt.Errorf("isbsvc not healthy")
}
}

// Create pods
if err := r.orchestratePods(ctx, vertex, pipeline, isbSvc); err != nil {
if err := r.orchestratePods(ctx, vertex, pipeline, isbSvc, servingRedisIsbSvc); err != nil {
vertex.Status.MarkDeployFailed("OrchestratePodsFailed", err.Error())
r.recorder.Eventf(vertex, corev1.EventTypeWarning, "OrchestratePodsFailed", err.Error())
return ctrl.Result{}, err
Expand Down Expand Up @@ -193,7 +219,7 @@ func (r *vertexReconciler) reconcile(ctx context.Context, vertex *dfv1.Vertex) (
return ctrl.Result{}, nil
}

func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Vertex, pipeline *dfv1.Pipeline, isbSvc *dfv1.InterStepBufferService) error {
func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Vertex, pipeline *dfv1.Pipeline, isbSvc *dfv1.InterStepBufferService, servingRedisISB *dfv1.InterStepBufferService) error {
log := logging.FromContext(ctx)
desiredReplicas := vertex.GetReplicas()
vertex.Status.DesiredReplicas = uint32(desiredReplicas)
Expand All @@ -207,7 +233,7 @@ func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Ver
}()

// Build pod spec of the 1st replica to calculate the hash, which is used to determine whether the pod spec is changed
tmpSpec, err := r.buildPodSpec(vertex, pipeline, isbSvc.Status.Config, 0)
tmpSpec, err := r.buildPodSpec(vertex, pipeline, isbSvc.Status.Config, servingRedisISB.Status.Config.Redis, 0)
if err != nil {
return fmt.Errorf("failed to build a pod spec: %w", err)
}
Expand All @@ -234,7 +260,7 @@ func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Ver

if updatedReplicas > 0 {
// Make sure [0 - updatedReplicas] with hash are in place
if err := r.orchestratePodsFromTo(ctx, vertex, pipeline, isbSvc, 0, updatedReplicas, hash); err != nil {
if err := r.orchestratePodsFromTo(ctx, vertex, pipeline, isbSvc, servingRedisISB, 0, updatedReplicas, hash); err != nil {
return fmt.Errorf("failed to orchestrate vertex pods [0, %v): %w", updatedReplicas, err)
}
// Wait for the updated pods to be ready before moving on
Expand Down Expand Up @@ -263,7 +289,7 @@ func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Ver
// 1. Regular scaling operation 2. First time
// create (desiredReplicas-updatedReplicas) pods directly
if desiredReplicas > updatedReplicas {
if err := r.orchestratePodsFromTo(ctx, vertex, pipeline, isbSvc, updatedReplicas, desiredReplicas, hash); err != nil {
if err := r.orchestratePodsFromTo(ctx, vertex, pipeline, isbSvc, servingRedisISB, updatedReplicas, desiredReplicas, hash); err != nil {
return fmt.Errorf("failed to orchestrate vertex pods [%v, %v): %w", updatedReplicas, desiredReplicas, err)
}
}
Expand Down Expand Up @@ -294,7 +320,7 @@ func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Ver
log.Infof("Rolling update %d replicas, [%d, %d)", toBeUpdated, updatedReplicas, updatedReplicas+toBeUpdated)

// Create pods [updatedReplicas, updatedReplicas+toBeUpdated), and clean up any pods in that range that has a different hash
if err := r.orchestratePodsFromTo(ctx, vertex, pipeline, isbSvc, updatedReplicas, updatedReplicas+toBeUpdated, vertex.Status.UpdateHash); err != nil {
if err := r.orchestratePodsFromTo(ctx, vertex, pipeline, isbSvc, servingRedisISB, updatedReplicas, updatedReplicas+toBeUpdated, vertex.Status.UpdateHash); err != nil {
return fmt.Errorf("failed to orchestrate pods [%v, %v)]: %w", updatedReplicas, updatedReplicas+toBeUpdated, err)
}
vertex.Status.UpdatedReplicas = uint32(updatedReplicas + toBeUpdated)
Expand All @@ -317,14 +343,14 @@ func (r *vertexReconciler) orchestratePods(ctx context.Context, vertex *dfv1.Ver
return nil
}

func (r *vertexReconciler) orchestratePodsFromTo(ctx context.Context, vertex *dfv1.Vertex, pipeline *dfv1.Pipeline, isbSvc *dfv1.InterStepBufferService, fromReplica, toReplica int, newHash string) error {
func (r *vertexReconciler) orchestratePodsFromTo(ctx context.Context, vertex *dfv1.Vertex, pipeline *dfv1.Pipeline, isbSvc, servingRedisISB *dfv1.InterStepBufferService, fromReplica, toReplica int, newHash string) error {
log := logging.FromContext(ctx)
existingPods, err := r.findExistingPods(ctx, vertex, fromReplica, toReplica)
if err != nil {
return fmt.Errorf("failed to find existing pods: %w", err)
}
for replica := fromReplica; replica < toReplica; replica++ {
podSpec, err := r.buildPodSpec(vertex, pipeline, isbSvc.Status.Config, replica)
podSpec, err := r.buildPodSpec(vertex, pipeline, isbSvc.Status.Config, servingRedisISB.Status.Config.Redis, replica)
if err != nil {
return fmt.Errorf("failed to generate pod spec: %w", err)
}
Expand Down Expand Up @@ -523,13 +549,15 @@ func (r *vertexReconciler) createOrUpdateServices(ctx context.Context, vertex *d
return nil
}

func (r *vertexReconciler) buildPodSpec(vertex *dfv1.Vertex, pl *dfv1.Pipeline, isbSvcConfig dfv1.BufferServiceConfig, replicaIndex int) (*corev1.PodSpec, error) {
func (r *vertexReconciler) buildPodSpec(vertex *dfv1.Vertex, pl *dfv1.Pipeline, isbSvcConfig dfv1.BufferServiceConfig, servingRedisISBConfig *dfv1.RedisConfig, replicaIndex int) (*corev1.PodSpec, error) {
isbSvcType, envs := sharedutil.GetIsbSvcEnvVars(isbSvcConfig)
servingRedisISBEnvs := sharedutil.GetServingRedisIsbSvcEnvVars(servingRedisISBConfig)
podSpec, err := vertex.GetPodSpec(dfv1.GetVertexPodSpecReq{
ISBSvcType: isbSvcType,
Image: r.image,
PullPolicy: corev1.PullPolicy(sharedutil.LookupEnvStringOr(dfv1.EnvImagePullPolicy, "")),
Env: envs,
ServingEnv: servingRedisISBEnvs,
SideInputsStoreName: pl.GetSideInputsStoreName(),
ServingSourceStreamName: vertex.GetServingSourceStreamName(),
PipelineSpec: pl.Spec,
Expand Down
41 changes: 41 additions & 0 deletions pkg/shared/util/isbs_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,44 @@ func GetIsbSvcEnvVars(isbSvcConfig dfv1.BufferServiceConfig) (dfv1.ISBSvcType, [
}
return isbSvcType, env
}

func GetServingRedisIsbSvcEnvVars(x *dfv1.RedisConfig) []corev1.EnvVar {
var env []corev1.EnvVar
if x.URL != "" {
env = append(env, corev1.EnvVar{Name: dfv1.EnvISBSvcRedisURL, Value: x.URL})
}
if x.SentinelURL != "" {
env = append(env, corev1.EnvVar{Name: dfv1.EnvISBSvcRedisSentinelURL, Value: x.SentinelURL})
}
if x.MasterName != "" {
env = append(env, corev1.EnvVar{Name: dfv1.EnvISBSvcSentinelMaster, Value: x.MasterName})
}
if x.User != "" {
env = append(env, corev1.EnvVar{Name: dfv1.EnvISBSvcRedisUser, Value: x.User})
}
if x.Password != nil {
env = append(env, corev1.EnvVar{
Name: dfv1.EnvISBSvcRedisPassword, ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: x.Password.Name,
},
Key: x.Password.Key,
},
},
})
}
if x.SentinelPassword != nil {
env = append(env, corev1.EnvVar{
Name: dfv1.EnvISBSvcRedisSentinelPassword, ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{
Name: x.SentinelPassword.Name,
},
Key: x.SentinelPassword.Key,
},
},
})
}
return env
}
Loading