Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ const (
JSApiStreamRemovePeer = "$JS.API.STREAM.PEER.REMOVE.*"
JSApiStreamRemovePeerT = "$JS.API.STREAM.PEER.REMOVE.%s"

// JSApiStreamLeaderQuiesce is the endpoint to have stream leader quiesce.
// Will return JSON response.
JSApiStreamLeaderQuiesce = "$JS.API.STREAM.LEADER.QUIESCE.*"

// JSApiStreamLeaderStepDown is the endpoint to have stream leader stepdown.
// Will return JSON response.
JSApiStreamLeaderStepDown = "$JS.API.STREAM.LEADER.STEPDOWN.*"
Expand Down Expand Up @@ -612,6 +616,14 @@ type JSApiStreamRemovePeerResponse struct {

const JSApiStreamRemovePeerResponseType = "io.nats.jetstream.api.v1.stream_remove_peer_response"

// JSApiStreamLeaderQuiesceResponse is the response to a leader stepdown request.
type JSApiStreamLeaderQuiesceResponse struct {
ApiResponse
Success bool `json:"success,omitempty"`
}

const JSApiStreamLeaderQuiesceResponseType = "io.nats.jetstream.api.v1.stream_leader_quiesce_response"

// JSApiStreamLeaderStepDownResponse is the response to a leader stepdown request.
type JSApiStreamLeaderStepDownResponse struct {
ApiResponse
Expand Down Expand Up @@ -1006,6 +1018,7 @@ func (s *Server) setJetStreamExportSubs() error {
{JSApiStreamSnapshot, s.jsStreamSnapshotRequest},
{JSApiStreamRestore, s.jsStreamRestoreRequest},
{JSApiStreamRemovePeer, s.jsStreamRemovePeerRequest},
{JSApiStreamLeaderQuiesce, s.jsStreamLeaderQuiesceRequest},
{JSApiStreamLeaderStepDown, s.jsStreamLeaderStepDownRequest},
{JSApiConsumerLeaderStepDown, s.jsConsumerLeaderStepDownRequest},
{JSApiMsgDelete, s.jsMsgDeleteRequest},
Expand Down Expand Up @@ -2267,6 +2280,93 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}

// Request a stream leader to quiesce.
func (s *Server) jsStreamLeaderQuiesceRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
return
}

ci, acc, hdr, msg, err := s.getRequestInfo(c, rmsg)
if err != nil {
s.Warnf(badAPIRequestT, msg)
return
}

// Have extra token for this one.
name := tokenAt(subject, 6)

var resp = JSApiStreamLeaderStepDownResponse{ApiResponse: ApiResponse{Type: JSApiStreamLeaderQuiesceResponseType}}
if errorOnRequiredApiLevel(hdr) {
resp.Error = NewJSRequiredApiLevelError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}
if js.isLeaderless() {
resp.Error = NewJSClusterNotAvailError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

js.mu.RLock()
isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, name)
js.mu.RUnlock()

if isLeader && sa == nil {
resp.Error = NewJSStreamNotFoundError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
} else if sa == nil {
return
}

if hasJS, doErr := acc.checkJetStream(); !hasJS {
if doErr {
resp.Error = NewJSNotEnabledForAccountError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
}
return
}

// Check to see if we are a member of the group and if the group has no leader.
if js.isGroupLeaderless(sa.Group) {
resp.Error = NewJSClusterNotAvailError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

// We have the stream assigned and a leader, so only the stream leader should answer.
if !acc.JetStreamIsStreamLeader(name) {
return
}

mset, err := acc.lookupStream(name)
if err != nil || mset == nil {
resp.Error = NewJSStreamNotFoundError(Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}

node := mset.raftNode()
if node == nil {
resp.Success = true
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
return
}

err = node.Quiesce()
if err != nil {
resp.Error = NewJSRaftGeneralError(err, Unless(err))
} else {
resp.Success = true
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}

// Request to have a stream leader stepdown.
func (s *Server) jsStreamLeaderStepDownRequest(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
if c == nil || !s.JetStreamEnabled() {
Expand Down
133 changes: 116 additions & 17 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ type RaftNode interface {
Delete()
RecreateInternalSubs() error
IsSystemAccount() bool
Quiesce() error
}

type WAL interface {
Expand Down Expand Up @@ -225,6 +226,9 @@ type raft struct {
observer bool // The node is observing, i.e. not able to become leader
initializing bool // The node is new, and "empty log" checks can be temporarily relaxed.
scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data.

quiesce chan bool // Channel to notify leader loop to quiesc
quiesced bool // The node is quiesced
}

type proposedEntry struct {
Expand Down Expand Up @@ -260,6 +264,7 @@ const (
lostQuorumCheckIntervalDefault = hbIntervalDefault * 10 // 10 seconds
observerModeIntervalDefault = 48 * time.Hour
peerRemoveTimeoutDefault = 5 * time.Minute
quiesceIntervalDefault = 15 * time.Minute
)

var (
Expand All @@ -272,6 +277,7 @@ var (
lostQuorumCheck = lostQuorumCheckIntervalDefault
observerModeInterval = observerModeIntervalDefault
peerRemoveTimeout = peerRemoveTimeoutDefault
quiesceInterval = quiesceIntervalDefault
)

type RaftConfig struct {
Expand Down Expand Up @@ -426,6 +432,7 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
leadc: make(chan bool, 32),
observer: cfg.Observer,
extSt: ps.domainExt,
quiesce: make(chan bool),
}

// Setup our internal subscriptions for proposals, votes and append entries.
Expand Down Expand Up @@ -1601,6 +1608,44 @@ func (n *raft) selectNextLeader() string {
return nextLeader
}

func (n *raft) Quiesce() error {
if n.State() != Leader {
return errNotLeader
}
n.quiesce <- true
return nil
}

// Return true if the node can be quiesced
func (n *raft) mayQuiesce() bool {
n.RLock()
defer n.RUnlock()
// TODO this test should be strengthened:
// must check that followers are up-to-date
return !n.quiesced && n.State() == Leader && n.hasQuorumLocked()
}

func (n *raft) doQuiesce() bool {
if n.mayQuiesce() {
n.sendQuiesce()
n.setQuiesced(true)
return true
}
return false
}

func (n *raft) isQuiesced() bool {
n.RLock()
defer n.RUnlock()
return n.quiesced
}

func (n *raft) setQuiesced(quiesced bool) {
n.Lock()
defer n.Unlock()
n.quiesced = quiesced
}

// StepDown will have a leader stepdown and optionally do a leader transfer.
func (n *raft) StepDown(preferred ...string) error {
if n.State() != Leader {
Expand Down Expand Up @@ -2140,8 +2185,13 @@ func (n *raft) runAsFollower() {

select {
case <-n.entry.ch:
wasQuiesced := n.isQuiesced()
// New append entries have arrived over the network.
n.processAppendEntries()
if !wasQuiesced && n.isQuiesced() {
// Avoid unquiescing immediately
continue
}
case <-n.s.quitCh:
// The server is shutting down.
return
Expand Down Expand Up @@ -2188,6 +2238,11 @@ func (n *raft) runAsFollower() {
n.processVoteRequest(voteReq)
}
}

if n.isQuiesced() {
n.setQuiesced(false)
n.debug("Follower unquiesced")
}
}
}

Expand Down Expand Up @@ -2308,6 +2363,7 @@ const (
EntryRemovePeer
EntryLeaderTransfer
EntrySnapshot
EntryQuiesce
)

func (t EntryType) String() string {
Expand All @@ -2326,6 +2382,8 @@ func (t EntryType) String() string {
return "LeaderTransfer"
case EntrySnapshot:
return "Snapshot"
case EntryQuiesce:
return "Quiesce"
}
return fmt.Sprintf("Unknown [%d]", uint8(t))
}
Expand Down Expand Up @@ -2585,10 +2643,15 @@ func (n *raft) runAsLeader() {
n.sendPeerState()

hb := time.NewTicker(hbInterval)
defer hb.Stop()

lq := time.NewTicker(lostQuorumCheck)
defer lq.Stop()
qu := time.NewTicker(quiesceInterval)

stopTicking := func() {
hb.Stop()
lq.Stop()
qu.Stop()
}
defer stopTicking()

for n.State() == Leader {
select {
Expand All @@ -2602,6 +2665,12 @@ func (n *raft) runAsLeader() {
n.processAppendEntryResponse(ar)
}
n.resp.recycle(&ars)
// TODO follower could avoid sending a response
// for EntryQuiesce
if n.isQuiesced() {
// Avoid unquiescing immediately
continue
}
case <-n.prop.ch:
const maxBatch = 256 * 1024
const maxEntries = 512
Expand Down Expand Up @@ -2664,15 +2733,31 @@ func (n *raft) runAsLeader() {
}
case <-n.entry.ch:
n.processAppendEntries()
case <-qu.C:
if time.Since(n.active) > quiesceInterval && n.doQuiesce() {
stopTicking()
continue
}
case <-n.quiesce:
if n.doQuiesce() {
stopTicking()
continue
}
}

// Any interaction unquiesces the leader
if n.isQuiesced() {
hb.Reset(hbInterval)
lq.Reset(lostQuorumInterval)
qu.Reset(quiesceInterval)
n.setQuiesced(false)
n.debug("Leader unquiesced")
}
}
}

// Quorum reports the quorum status. Will be called on former leaders.
func (n *raft) Quorum() bool {
n.RLock()
defer n.RUnlock()

// Return true if leader believes it still has a quorum.
func (n *raft) hasQuorumLocked() bool {
nc := 0
for id, peer := range n.peers {
if id == n.id || time.Since(peer.ts) < lostQuorumInterval {
Expand All @@ -2684,6 +2769,13 @@ func (n *raft) Quorum() bool {
return false
}

// Quorum reports the quorum status. Will be called on former leaders.
func (n *raft) Quorum() bool {
n.RLock()
defer n.RUnlock()
return n.hasQuorumLocked()
}

func (n *raft) lostQuorum() bool {
n.RLock()
defer n.RUnlock()
Expand All @@ -2698,15 +2790,7 @@ func (n *raft) lostQuorumLocked() bool {
return false
}

nc := 0
for id, peer := range n.peers {
if id == n.id || time.Since(peer.ts) < lostQuorumInterval {
if nc++; nc >= n.qn {
return false
}
}
}
return true
return !n.hasQuorumLocked()
}

// Check for being not active in terms of sending entries.
Expand Down Expand Up @@ -3719,6 +3803,11 @@ CONTINUE:
// Check to see if we have any related entries to process here.
for _, e := range ae.entries {
switch e.Type {
case EntryQuiesce:
if isNew && n.State() == Follower {
n.elect.Stop()
n.quiesced = true
}
case EntryLeaderTransfer:
// Only process these if they are new, so no replays or catchups.
if isNew {
Expand Down Expand Up @@ -3870,6 +3959,11 @@ func (n *raft) buildAppendEntry(entries []*Entry) *appendEntry {
// Determine if we should store an entry. This stops us from storing
// heartbeat messages.
func (ae *appendEntry) shouldStore() bool {
if len(ae.entries) == 1 {
if e := ae.entries[0]; e.Type == EntryQuiesce {
return false
}
}
return ae != nil && len(ae.entries) > 0
}

Expand Down Expand Up @@ -4033,6 +4127,11 @@ func (n *raft) sendHeartbeat() {
n.sendAppendEntry(nil)
}

// Tell the cluster to quiesce the current term
func (n *raft) sendQuiesce() {
n.sendAppendEntry([]*Entry{{EntryQuiesce, nil}})
}

type voteRequest struct {
term uint64
lastTerm uint64
Expand Down
Loading
Loading