Skip to content

Commit

Permalink
rm event_bus_rocketmq
Browse files Browse the repository at this point in the history
  • Loading branch information
xiandong-italki committed May 4, 2023
1 parent 7909a9b commit f37b3f2
Show file tree
Hide file tree
Showing 8 changed files with 0 additions and 856 deletions.
12 changes: 0 additions & 12 deletions event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/wuyazi/gddd/proto"
"strings"
)

Expand Down Expand Up @@ -54,14 +53,3 @@ func newDomainEventMessage(event DomainEvent) (msg DomainEventMessage, err error
}
return
}

func newDomainEventMessageProto(msg0 DomainEventMessage) (msg proto.DomainEventMessageProto) {
msg = proto.DomainEventMessageProto{
AggregateId: msg0.AggregateId,
AggregateName: msg0.AggregateName,
EventName: msg0.EventName,
EventId: msg0.EventId,
EventBody: msg0.EventBody,
}
return
}
81 changes: 0 additions & 81 deletions event_bus_dtm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (
"database/sql"
"encoding/json"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/dtm-labs/client/dtmcli"
"github.com/dtm-labs/client/dtmgrpc"
"github.com/lithammer/shortuuid/v3"
Expand Down Expand Up @@ -130,81 +127,3 @@ type DtmEventConsumerConfig struct {
GroupName string
NameServers []string
}

type DtmEventConsumer struct {
DomainName string
GroupName string
Consumer rocketmq.PushConsumer
}

func NewDtmEventConsumer(ctx context.Context, config DtmEventConsumerConfig) (eventConsumer *DtmEventConsumer, err error) {
eventConsumer.DomainName = config.DomainName
eventConsumer.GroupName = config.GroupName
c, err := rocketmq.NewPushConsumer(
consumer.WithGroupName(eventConsumer.GroupName),
consumer.WithNsResolver(primitive.NewPassthroughResolver(config.NameServers)),
)
eventConsumer.Consumer = c
return
}

func (c *DtmEventConsumer) Start() {
err := c.Consumer.Start()
if err != nil {
fmt.Printf("DtmEventConsumer start error: %v\n", err)
panic(err)
}
}

func (c *DtmEventConsumer) Stop() {
err := c.Consumer.Shutdown()
if err != nil {
fmt.Printf("DtmEventConsumer stop error: %v\n", err)
panic(err)
}
}

func (c *DtmEventConsumer) Subscribe(topicName string, change aggregateChange, eventHandle EventHandle) {
topicName = strings.TrimSpace(topicName)
if topicName == "" {
err := fmt.Errorf("DtmEventConsumer subscribe event failed, topicName is empty")
panic(err)
}
if change == nil {
err := fmt.Errorf("DtmEventConsumer subscribe event failed, change is nil")
panic(err)
}
if eventHandle == nil {
err := fmt.Errorf("DtmEventConsumer subscribe event failed, eventHandle is nil")
panic(err)
}
err := c.Consumer.Subscribe(topicName, consumer.MessageSelector{},
func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, msg := range msgs {
if topicName == msg.Message.Topic {
domainEventMessage := new(DomainEventMessage)
err := json.Unmarshal(msg.Message.Body, domainEventMessage)
if err != nil {
return consumer.ConsumeRetryLater, err
}
if getAggregateChangeName(change) == domainEventMessage.EventName {
var event SampleDomainEvent
// TODO: change is read only?
newChange := change
event, err = newSampleDomainEvent(*domainEventMessage, newChange)
if err != nil {
return 0, err
}
err, _ = eventHandle(context.TODO(), &event)
if err != nil {
return consumer.ConsumeRetryLater, err
}
}
}
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
panic(nil)
}
}
187 changes: 0 additions & 187 deletions event_bus_rocketmq.go

This file was deleted.

69 changes: 0 additions & 69 deletions event_bus_rocketmq_demo_listener.go

This file was deleted.

Loading

0 comments on commit f37b3f2

Please sign in to comment.