Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy committed Feb 20, 2025
1 parent 0dd861e commit 3c0fb9d
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 5 deletions.
5 changes: 5 additions & 0 deletions pkg/reconciler/monovertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ func (s *Scaler) scaleOneMonoVertex(ctx context.Context, key string, worker int)
return nil
}

if monoVtx.Spec.Scale.GetMaxReplicas() == monoVtx.Spec.Scale.GetMinReplicas() {
log.Infof("MonoVertex %s has same scale.min and scale.max, skip scaling.", monoVtx.Name)
return nil
}

if monoVtx.Status.Replicas == 0 { // Was scaled to 0
// Periodically wake them up from 0 replicas to 1, to peek for the incoming messages
if secondsSinceLastScale >= float64(monoVtx.Spec.Scale.GetZeroReplicaSleepSeconds()) {
Expand Down
12 changes: 8 additions & 4 deletions pkg/reconciler/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,10 @@ func (r *pipelineReconciler) reconcileFixedResources(ctx context.Context, pl *df
// Keep the original replicas as much as possible
if originalReplicas >= newObj.Spec.Scale.GetMinReplicas() && originalReplicas <= newObj.Spec.Scale.GetMaxReplicas() {
oldObj.Spec.Replicas = &originalReplicas
} else if originalReplicas < newObj.Spec.Scale.GetMinReplicas() {
originalReplicas = newObj.Spec.Scale.GetMinReplicas()
} else {
originalReplicas = newObj.Spec.Scale.GetMaxReplicas()
}
oldObj.Annotations[dfv1.KeyHash] = newObj.GetAnnotations()[dfv1.KeyHash]
if err := r.client.Update(ctx, &oldObj); err != nil {
Expand Down Expand Up @@ -647,11 +651,11 @@ func buildVertices(pl *dfv1.Pipeline) map[string]dfv1.Vertex {
replicas = int32(partitions)
} else {
x := vCopy.Scale
if x.Min != nil && *x.Min > 1 && replicas < *x.Min {
replicas = *x.Min
if replicas < x.GetMinReplicas() {
replicas = x.GetMinReplicas()
}
if x.Max != nil && *x.Max > 1 && replicas > *x.Max {
replicas = *x.Max
if replicas > x.GetMaxReplicas() {
replicas = x.GetMaxReplicas()
}
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/reconciler/vertex/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,16 @@ func (s *Scaler) scaleOneVertex(ctx context.Context, key string, worker int) err
log.Info("Corresponding Pipeline not in Running state, skip scaling.")
return nil
}
if int(vertex.Status.Replicas) != vertex.GetReplicas() {
if vertex.Status.Replicas != vertex.Status.DesiredReplicas {
log.Infof("Vertex %s might be under processing, replicas mismatch, skip scaling.", vertex.Name)
return nil
}

if vertex.Spec.Scale.GetMaxReplicas() == vertex.Spec.Scale.GetMinReplicas() {
log.Infof("Vertex %s has same scale.min and scale.max, skip scaling.", vertex.Name)
return nil
}

var err error
daemonClient, _ := s.daemonClientsCache.Get(pl.GetDaemonServiceURL())
if daemonClient == nil {
Expand Down

0 comments on commit 3c0fb9d

Please sign in to comment.