Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Jan 20, 2025
1 parent bf6f98f commit a355adf
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 42 deletions.
13 changes: 10 additions & 3 deletions queue/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,11 @@ func (c *MessageClient) WatchList(ctx context.Context) (chan MessageResponse, er
if err != nil {
return
}

messageChannel <- message
select {
case <-ctx.Done():
return
case messageChannel <- message:
}
}
}()

Expand Down Expand Up @@ -232,7 +235,11 @@ func (c *MessageClient) Watch(ctx context.Context, messageID int64) (chan Messag
if err != nil {
return
}
messageChannel <- message
select {
case <-ctx.Done():
return
case messageChannel <- message:
}
}
}()

Expand Down
44 changes: 38 additions & 6 deletions queue/controller/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,15 @@ func (mc *MessageController) RegisterRoutes(ws *restful.WebService) {
Returns(http.StatusNotFound, "Message not found.", Error{}).
Returns(http.StatusBadRequest, "Invalid request format.", Error{}))

ws.Route(ws.PATCH("/messages/{message_id}/complete").To(mc.Completed).
ws.Route(ws.PATCH("/messages/{message_id}/complete").To(mc.Complete).
Doc("Set a message as completed by ID.").
Operation("setCompleted").
Produces(restful.MIME_JSON).
Consumes(restful.MIME_JSON).
Param(ws.PathParameter("message_id", "message ID").DataType("integer")).
Reads(CompletedRequest{}).
Writes(Error{}).
Returns(http.StatusNoContent, "Message failed successfully.", nil).
Returns(http.StatusNoContent, "Message complete successfully.", nil).
Returns(http.StatusBadRequest, "Invalid request format.", Error{}))

ws.Route(ws.PATCH("/messages/{message_id}/failed").To(mc.Failed).
Expand Down Expand Up @@ -453,13 +453,25 @@ func (mc *MessageController) Consume(req *restful.Request, resp *restful.Respons
return
}

message, err := mc.messageService.Consume(req.Request.Context(), messageID, completedRequest.Lease)
if completedRequest.Lease == "" {
resp.WriteHeaderAndEntity(http.StatusBadRequest, Error{Code: "CompletedRequestError", Message: "Lease cannot be empty."})
return
}

err = mc.messageService.Consume(req.Request.Context(), messageID, completedRequest.Lease)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusNotAcceptable, Error{Code: "MessageNotAcceptableError", Message: "Message not found: " + err.Error()})
return
}

data := MessageResponse{MessageID: message.MessageID, Content: message.Content, Priority: message.Priority, Status: message.Status, Data: message.Data, LastHeartbeat: message.LastHeartbeat}
curr, err := mc.messageService.GetByID(context.Background(), messageID)
if err != nil {
resp.WriteHeaderAndEntity(http.StatusNotFound, Error{Code: "MessageNotFoundError", Message: "Message not found after heartbeat: " + err.Error()})
return
}

data := MessageResponse{MessageID: curr.MessageID, Content: curr.Content, Priority: curr.Priority, Status: curr.Status, Data: curr.Data, LastHeartbeat: curr.LastHeartbeat}

mc.updateWatchChannel(messageID, data)
mc.updateWatchListChannels(data)

Expand All @@ -480,6 +492,11 @@ func (mc *MessageController) Heartbeat(req *restful.Request, resp *restful.Respo
return
}

if heartbeatRequest.Lease == "" {
resp.WriteHeaderAndEntity(http.StatusBadRequest, Error{Code: "CompletedRequestError", Message: "Lease cannot be empty."})
return
}

if err := mc.messageService.Heartbeat(req.Request.Context(), messageID, heartbeatRequest.Data, heartbeatRequest.Lease); err != nil {
resp.WriteHeaderAndEntity(http.StatusNotAcceptable, Error{Code: "MessageNotAcceptableError", Message: "Message not found: " + err.Error()})
return
Expand All @@ -499,7 +516,7 @@ func (mc *MessageController) Heartbeat(req *restful.Request, resp *restful.Respo
resp.WriteHeader(http.StatusNoContent)
}

func (mc *MessageController) Completed(req *restful.Request, resp *restful.Response) {
func (mc *MessageController) Complete(req *restful.Request, resp *restful.Response) {
messageIDStr := req.PathParameter("message_id")
messageID, err := strconv.ParseInt(messageIDStr, 10, 64)
if err != nil {
Expand All @@ -513,7 +530,12 @@ func (mc *MessageController) Completed(req *restful.Request, resp *restful.Respo
return
}

if err := mc.messageService.Completed(req.Request.Context(), messageID, completedRequest.Lease); err != nil {
if completedRequest.Lease == "" {
resp.WriteHeaderAndEntity(http.StatusBadRequest, Error{Code: "CompletedRequestError", Message: "Lease cannot be empty."})
return
}

if err := mc.messageService.Complete(req.Request.Context(), messageID, completedRequest.Lease); err != nil {
resp.WriteHeaderAndEntity(http.StatusNotAcceptable, Error{Code: "MessageNotAcceptabledError", Message: "Message not found: " + err.Error()})
return
}
Expand Down Expand Up @@ -546,6 +568,11 @@ func (mc *MessageController) Failed(req *restful.Request, resp *restful.Response
return
}

if failedRequest.Lease == "" {
resp.WriteHeaderAndEntity(http.StatusBadRequest, Error{Code: "CompletedRequestError", Message: "Lease cannot be empty."})
return
}

if err := mc.messageService.Failed(req.Request.Context(), messageID, failedRequest.Lease, failedRequest.Data); err != nil {
resp.WriteHeaderAndEntity(http.StatusNotAcceptable, Error{Code: "MessageNotAcceptableError", Message: "Message not found: " + err.Error()})
return
Expand Down Expand Up @@ -579,6 +606,11 @@ func (mc *MessageController) Cancel(req *restful.Request, resp *restful.Response
return
}

if cancelRequest.Lease == "" {
resp.WriteHeaderAndEntity(http.StatusBadRequest, Error{Code: "CompletedRequestError", Message: "Lease cannot be empty."})
return
}

if err := mc.messageService.Cancel(req.Request.Context(), messageID, cancelRequest.Lease); err != nil {
resp.WriteHeaderAndEntity(http.StatusNotAcceptable, Error{Code: "MessageNotAcceptableError", Message: "Message not found: " + err.Error()})
return
Expand Down
34 changes: 16 additions & 18 deletions queue/dao/messgae.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (m *Message) List(ctx context.Context) ([]model.Message, error) {
}

const cleanUpSQL = `
DELETE FROM messages WHERE delete_at IS NOT NULL
DELETE FROM messages WHERE delete_at IS NOT NULL AND delete_at < NOW() - INTERVAL 8 HOUR
`

func (m *Message) CleanUp(ctx context.Context) error {
Expand Down Expand Up @@ -223,46 +223,44 @@ func (m *Message) GetStale(ctx context.Context) ([]model.Message, error) {
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error occurred during rows iteration: %w", err)
}

return messages, nil
}

const setStatusAndLeaseSQL = `
const consumeSQL = `
UPDATE messages SET status = ?, lease = ? WHERE id = ? AND status = ? AND lease = ? AND delete_at IS NULL
`

func (m *Message) SetStatusAndLease(ctx context.Context, id int64, status model.MessageStatus, lease string) (int64, error) {
func (m *Message) Consume(ctx context.Context, id int64, lease string) (int64, error) {
db := GetDB(ctx)
results, err := db.ExecContext(ctx, setStatusAndLeaseSQL, status, lease, id, model.StatusPending, "")
results, err := db.ExecContext(ctx, consumeSQL, model.StatusProcessing, lease, id, model.StatusPending, "")
if err != nil {
return 0, fmt.Errorf("failed to set status and lease: %w", err)
return 0, fmt.Errorf("failed to set consume: %w", err)
}

return results.RowsAffected()
}

const setHeartbeatAndDataSQL = `
const heartbeatSQL = `
UPDATE messages SET last_heartbeat = NOW(), data = ? WHERE id = ? AND lease = ? AND status = ? AND delete_at IS NULL
`

func (m *Message) SetHeartbeatAndData(ctx context.Context, id int64, data model.MessageAttr, lease string) (int64, error) {
func (m *Message) Heartbeat(ctx context.Context, id int64, data model.MessageAttr, lease string) (int64, error) {
db := GetDB(ctx)
results, err := db.ExecContext(ctx, setHeartbeatAndDataSQL, data, id, lease, model.StatusProcessing)
results, err := db.ExecContext(ctx, heartbeatSQL, data, id, lease, model.StatusProcessing)
if err != nil {
return 0, fmt.Errorf("failed to set heartbeat and data: %w", err)
return 0, fmt.Errorf("failed to set heartbeat: %w", err)
}
return results.RowsAffected()
}

const setCompletedSQL = `
const completedSQL = `
UPDATE messages SET status = ?, lease = ? WHERE id = ? AND lease = ? AND status = ? AND delete_at IS NULL
`

func (m *Message) SetCompleted(ctx context.Context, id int64, lease string) (int64, error) {
func (m *Message) Complete(ctx context.Context, id int64, lease string) (int64, error) {
db := GetDB(ctx)
results, err := db.ExecContext(ctx, setCompletedSQL, model.StatusCompleted, "", id, lease, model.StatusProcessing)
results, err := db.ExecContext(ctx, completedSQL, model.StatusCompleted, "", id, lease, model.StatusProcessing)
if err != nil {
return 0, fmt.Errorf("failed to set completed: %w", err)
return 0, fmt.Errorf("failed to ser complete: %w", err)
}
return results.RowsAffected()
}
Expand All @@ -271,11 +269,11 @@ const setFailedSQL = `
UPDATE messages SET status = ?, lease = ?, data = ? WHERE id = ? AND lease = ? AND status = ? AND delete_at IS NULL
`

func (m *Message) SetFailed(ctx context.Context, id int64, lease string, data model.MessageAttr) (int64, error) {
func (m *Message) Failed(ctx context.Context, id int64, lease string, data model.MessageAttr) (int64, error) {
db := GetDB(ctx)
results, err := db.ExecContext(ctx, setFailedSQL, model.StatusFailed, "", data, id, lease, model.StatusProcessing)
if err != nil {
return 0, fmt.Errorf("failed to set status, lease, and data: %w", err)
return 0, fmt.Errorf("failed to set failed: %w", err)
}
return results.RowsAffected()
}
Expand All @@ -288,7 +286,7 @@ func (m *Message) Cancel(ctx context.Context, id int64, lease string) (int64, er
db := GetDB(ctx)
results, err := db.ExecContext(ctx, cancelSQL, "", model.StatusPending, id, lease, model.StatusProcessing)
if err != nil {
return 0, fmt.Errorf("failed to set status, lease, and data: %w", err)
return 0, fmt.Errorf("failed to set cancel: %w", err)
}
return results.RowsAffected()
}
Expand Down
23 changes: 9 additions & 14 deletions queue/service/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,23 @@ func (s *MessageService) List(ctx context.Context) ([]model.Message, error) {
return s.messageDao.List(ctx)
}

func (s *MessageService) Consume(ctx context.Context, id int64, lease string) (model.Message, error) {
func (s *MessageService) Consume(ctx context.Context, id int64, lease string) error {
ctx = dao.WithDB(ctx, s.db)

rowsAffected, err := s.messageDao.SetStatusAndLease(ctx, id, model.StatusProcessing, lease)
rowsAffected, err := s.messageDao.Consume(ctx, id, lease)
if err != nil {
return model.Message{}, err
return err
}

if rowsAffected == 0 {
return model.Message{}, fmt.Errorf("no rows affected when consuming message with id %d", id)
return fmt.Errorf("no rows affected when consuming message with id %d", id)
}

message, err := s.messageDao.GetByID(ctx, id)
if err != nil {
return model.Message{}, err
}
return message, nil
return nil
}

func (s *MessageService) Heartbeat(ctx context.Context, id int64, data model.MessageAttr, lease string) error {
ctx = dao.WithDB(ctx, s.db)
rowsAffected, err := s.messageDao.SetHeartbeatAndData(ctx, id, data, lease)
rowsAffected, err := s.messageDao.Heartbeat(ctx, id, data, lease)
if err != nil {
return err
}
Expand All @@ -87,9 +82,9 @@ func (s *MessageService) Heartbeat(ctx context.Context, id int64, data model.Mes
return nil
}

func (s *MessageService) Completed(ctx context.Context, id int64, lease string) error {
func (s *MessageService) Complete(ctx context.Context, id int64, lease string) error {
ctx = dao.WithDB(ctx, s.db)
rowsAffected, err := s.messageDao.SetCompleted(ctx, id, lease)
rowsAffected, err := s.messageDao.Complete(ctx, id, lease)
if err != nil {
return err
}
Expand All @@ -101,7 +96,7 @@ func (s *MessageService) Completed(ctx context.Context, id int64, lease string)

func (s *MessageService) Failed(ctx context.Context, id int64, lease string, data model.MessageAttr) error {
ctx = dao.WithDB(ctx, s.db)
rowsAffected, err := s.messageDao.SetFailed(ctx, id, lease, data)
rowsAffected, err := s.messageDao.Failed(ctx, id, lease, data)
if err != nil {
return err
}
Expand Down
20 changes: 19 additions & 1 deletion runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package runner

import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
Expand Down Expand Up @@ -31,7 +32,8 @@ func identity() (string, error) {
if err != nil {
return "", fmt.Errorf("unable to get hostname: %w", err)
}
hnHex := hex.EncodeToString([]byte(hostname))
h := sha256.Sum256([]byte(hostname))
hnHex := hex.EncodeToString(h[:])
return fmt.Sprintf("%s-%d", hnHex[:16], time.Now().Unix()), nil
}

Expand Down Expand Up @@ -173,13 +175,15 @@ func (r *Runner) runOnceSync(ctx context.Context, id string, logger *slog.Logger

var bmMut sync.Mutex
var bm []model.Blob
var updated bool

var errCh = make(chan error, 1)

go func() {
errCh <- r.syncManager.ImageWithCallback(ctx, resp.Content, func(blob string, progress, size int64) {
bmMut.Lock()
defer bmMut.Unlock()
updated = true
for i, m := range bm {
if m.Digest == blob {
bm[i].Progress = progress
Expand All @@ -202,6 +206,7 @@ func (r *Runner) runOnceSync(ctx context.Context, id string, logger *slog.Logger
case <-ticker.C:
bmMut.Lock()
nbm := append([]model.Blob{}, bm...)
updated = false
bmMut.Unlock()

err := r.client.Heartbeat(ctx, resp.MessageID, client.HeartbeatRequest{
Expand All @@ -217,6 +222,19 @@ func (r *Runner) runOnceSync(ctx context.Context, id string, logger *slog.Logger

case err := <-errCh:
if err == nil {
if updated {
err = r.client.Heartbeat(ctx, resp.MessageID, client.HeartbeatRequest{
Lease: id,
Data: model.MessageAttr{
Blobs: bm,
},
})

if err != nil {
logger.Error("Heartbeat", "error", err)
}
}

return r.client.Complete(ctx, resp.MessageID, client.CompletedRequest{
Lease: id,
})
Expand Down

0 comments on commit a355adf

Please sign in to comment.