Skip to content

Commit dd2c4db

Browse files
committed
facade done
1 parent ea376be commit dd2c4db

File tree

3 files changed

+146
-15
lines changed

3 files changed

+146
-15
lines changed
+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"os"
6+
"os/signal"
7+
"syscall"
8+
"time"
9+
10+
"github.com/ozonmp/srv-verification-api/internal/app/reader"
11+
"github.com/ozonmp/srv-verification-api/internal/config"
12+
"github.com/ozonmp/srv-verification-api/internal/pkg/logger"
13+
"github.com/ozonmp/srv-verification-api/internal/tracer"
14+
)
15+
16+
func main() {
17+
sigs := make(chan os.Signal, 1)
18+
ctx := context.Background()
19+
20+
if err := config.ReadConfigYML("config.yml"); err != nil {
21+
logger.FatalKV(ctx, "Failed init configuration", "err", err)
22+
}
23+
24+
cfgInst := config.GetConfigInstance()
25+
26+
tracing, err := tracer.NewTracer(&cfgInst)
27+
if err != nil {
28+
logger.ErrorKV(ctx, "Failed init tracing", "err", err)
29+
return
30+
}
31+
defer tracing.Close()
32+
33+
initCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
34+
defer cancel()
35+
36+
kafka, err := reader.NewEventConsumer(cfgInst.Kafka.Brokers, cfgInst.Kafka.Topic, cfgInst.Kafka.GroupID)
37+
if err != nil {
38+
logger.FatalKV(ctx, "Failed init reader", "err", err)
39+
}
40+
41+
kafka.Read(initCtx)
42+
43+
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
44+
45+
<-sigs
46+
47+
}

internal/app/reader/reader.go

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package reader
2+
3+
import (
4+
"context"
5+
"log"
6+
"sync"
7+
8+
"github.com/Shopify/sarama"
9+
)
10+
11+
type EventReader interface {
12+
Read() error
13+
}
14+
15+
type kafkaReader struct {
16+
consumer sarama.ConsumerGroup
17+
topic []string
18+
groupId string
19+
}
20+
21+
type Consumer struct {
22+
ready chan bool
23+
}
24+
25+
func NewEventConsumer(brokers []string, topic string, groupId string) (*kafkaReader, error) {
26+
config := sarama.NewConfig()
27+
28+
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
29+
config.Consumer.Offsets.Initial = sarama.OffsetOldest
30+
config.Consumer.Return.Errors = true
31+
32+
consumer, err := sarama.NewConsumerGroup(brokers, "consumer_group_1", config)
33+
34+
return &kafkaReader{
35+
consumer: consumer,
36+
topic: []string{topic},
37+
groupId: groupId,
38+
}, err
39+
}
40+
41+
func (kr *kafkaReader) Read(ctx context.Context) {
42+
wg := &sync.WaitGroup{}
43+
wg.Add(1)
44+
45+
consumer := Consumer{
46+
ready: make(chan bool),
47+
}
48+
49+
go func() {
50+
defer wg.Done()
51+
for {
52+
53+
if err := kr.consumer.Consume(ctx, kr.topic, &consumer); err != nil {
54+
log.Panicf("Error from consumer: %v", err)
55+
}
56+
57+
consumer.ready = make(chan bool)
58+
}
59+
}()
60+
61+
}
62+
63+
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
64+
// Mark the consumer as ready
65+
close(consumer.ready)
66+
return nil
67+
}
68+
69+
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
70+
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
71+
return nil
72+
}
73+
74+
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
75+
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
76+
77+
for message := range claim.Messages() {
78+
log.Printf(string(message.Value))
79+
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
80+
session.MarkMessage(message, "")
81+
}
82+
83+
return nil
84+
}

internal/app/repo/method_lock.go

+15-15
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,21 @@ func (r repo) Lock(ctx context.Context, n uint64) ([]model.VerificationEvent, er
3333
}
3434

3535
func (r repo) convertToVerificationEventModel(n uint64, eventsData []struct {
36-
EventId uint64 `db:"event_id"`
37-
EventType model.EventType `db:"type"`
36+
EventId uint64 `db:"event_id"`
37+
EventType model.EventType `db:"type"`
3838
EventStatus model.EventStatus `db:"status"`
39-
VerificationId uint64 `db:"id"`
40-
VerificationName string `db:"name"`
39+
VerificationId uint64 `db:"id"`
40+
VerificationName string `db:"name"`
4141
}) []model.VerificationEvent {
4242

4343
events := make([]model.VerificationEvent, 0, n)
4444

4545
for _, event := range eventsData {
4646
events = append(events, model.VerificationEvent{
47-
ID: event.EventId,
47+
ID: event.EventId,
4848
VerificationID: event.VerificationId,
49-
Type: event.EventType,
50-
Status: event.EventStatus,
49+
Type: event.EventType,
50+
Status: event.EventStatus,
5151
Entity: &model.Verification{
5252
ID: event.VerificationId,
5353
Name: event.VerificationName,
@@ -58,11 +58,11 @@ func (r repo) convertToVerificationEventModel(n uint64, eventsData []struct {
5858
}
5959

6060
func (r repo) getEventsDataFromDB(ctx context.Context, err error, eventIds []uint64) ([]struct {
61-
EventId uint64 `db:"event_id"`
62-
EventType model.EventType `db:"type"`
61+
EventId uint64 `db:"event_id"`
62+
EventType model.EventType `db:"type"`
6363
EventStatus model.EventStatus `db:"status"`
64-
VerificationId uint64 `db:"id"`
65-
VerificationName string `db:"name"`
64+
VerificationId uint64 `db:"id"`
65+
VerificationName string `db:"name"`
6666
}, error) {
6767

6868
query, args, err := squirrel.Select("verification_events.event_id",
@@ -81,11 +81,11 @@ func (r repo) getEventsDataFromDB(ctx context.Context, err error, eventIds []uin
8181
}
8282

8383
var eventsData []struct {
84-
EventId uint64 `db:"event_id"`
85-
EventType model.EventType `db:"type"`
84+
EventId uint64 `db:"event_id"`
85+
EventType model.EventType `db:"type"`
8686
EventStatus model.EventStatus `db:"status"`
87-
VerificationId uint64 `db:"id"`
88-
VerificationName string `db:"name"`
87+
VerificationId uint64 `db:"id"`
88+
VerificationName string `db:"name"`
8989
}
9090

9191
err = r.db.SelectContext(ctx, &eventsData, query, args...)

0 commit comments

Comments
 (0)