@@ -16,8 +16,10 @@ package main
16
16
17
17
import (
18
18
"fmt"
19
+ "hash/fnv"
19
20
"io/ioutil"
20
21
"net/http"
22
+ "strings"
21
23
22
24
"github.com/gin-gonic/gin"
23
25
"github.com/sirupsen/logrus"
@@ -62,11 +64,16 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
62
64
return
63
65
}
64
66
65
- for topic , metrics := range metricsPerTopic {
66
- t := topic
67
+ for topicAndHashKey , metrics := range metricsPerTopic {
68
+
69
+ topic , partitionID , err := getPartitionAndTopic (topicAndHashKey )
70
+ if err != nil {
71
+ continue
72
+ }
73
+
67
74
part := kafka.TopicPartition {
68
- Partition : kafka . PartitionAny ,
69
- Topic : & t ,
75
+ Partition : partitionID ,
76
+ Topic : & topic ,
70
77
}
71
78
for _ , metric := range metrics {
72
79
objectsWritten .Add (float64 (1 ))
@@ -87,3 +94,20 @@ func receiveHandler(producer *kafka.Producer, serializer Serializer) func(c *gin
87
94
88
95
}
89
96
}
97
+
98
+ func getPartitionAndTopic (topic string ) (string , int32 , error ) {
99
+ parts := strings .Split (topic , "|" )
100
+
101
+ if len (parts ) == 1 {
102
+ return parts [0 ], kafka .PartitionAny , nil
103
+ }
104
+ h := fnv .New32a ()
105
+ h .Write ([]byte (parts [1 ]))
106
+
107
+ v , ok := topicPartitionCount .Load (parts [0 ])
108
+ if ! ok {
109
+ logrus .WithField ("topic" , parts [0 ]).Error ("did not find metadata requested topic" )
110
+ return topic , kafka .PartitionAny , fmt .Errorf ("could not" )
111
+ }
112
+ return parts [0 ], int32 (h .Sum32 () % uint32 (v .(int ))), nil
113
+ }
0 commit comments