Skip to content

Commit 7b9b95d

Browse files
authored
add metricstore send support (aliyun#261)
* add metricstore send support * add metricstore send support * add metricstore send support * add metricstore send support * add metricstore send support * add metricstore send support * add metricstore send support * add metricstore send support * add metricstore send support * add metricstore send support * add metricstore send support
1 parent 0ed38c6 commit 7b9b95d

9 files changed

+68
-34
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ go get -u github.com/aliyun/aliyun-log-go-sdk
105105

106106
5. **写数据**
107107

108-
这里展示了用sdk中原生的API接口去发送数据简单示例,但是我们不推荐用API直接向logstore写入数据,推荐使用SDK 中提供的[producer](https://github.com/aliyun/aliyun-log-go-sdk/tree/master/producer) 包向logstore 写入数据,自动压缩数据并且提供安全退出机制,不会使数据丢失。
108+
这里展示了用sdk中原生的API接口去发送数据简单示例,但是我们不推荐用API直接向logstore写入数据,推荐使用SDK 中提供的[producer](https://github.com/aliyun/aliyun-log-go-sdk/tree/master/producer) 包向logstore 写入数据,自动压缩数据并且提供安全退出机制,不会使数据丢失, 对于MetricStore类型, 使用`PutLogsWithMetricStoreURL`API 发送数据可以提升大基数时间线下查询性能
109109

110110
```go
111111
logs := []*sls.Log{}

client_interface.go

+1
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ type ClientInterface interface {
245245
MergeShards(project, logstore string, shardID int) (shards []*Shard, err error)
246246

247247
// #################### Log Operations #####################
248+
PutLogsWithMetricStoreURL(project, logstore string, lg *LogGroup) (err error)
248249
// PutLogs put logs into logstore.
249250
// The callers should transform user logs into LogGroup.
250251
PutLogs(project, logstore string, lg *LogGroup) (err error)

client_store.go

+6
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,12 @@ func (c *Client) PostLogStoreLogs(project, logstore string, lg *LogGroup, hashKe
103103
return ls.PostLogStoreLogs(lg, hashKey)
104104
}
105105

106+
func (c *Client) PutLogsWithMetricStoreURL(project, logstore string, lg *LogGroup) (err error) {
107+
ls := convertLogstore(c, project, logstore)
108+
ls.useMetricStoreURL = true
109+
return ls.PutLogs(lg)
110+
}
111+
106112
// PostRawLogWithCompressType put raw log data to log service, no marshal
107113
func (c *Client) PostRawLogWithCompressType(project, logstore string, rawLogData []byte, compressType int, hashKey *string) (err error) {
108114
ls := convertLogstore(c, project, logstore)

log_store.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type LogStore struct {
4040
putLogCompressType int
4141
EncryptConf *EncryptConf `json:"encrypt_conf,omitempty"`
4242
ProductType string `json:"productType,omitempty"`
43+
useMetricStoreURL bool
4344
}
4445

4546
// Shard defines shard struct
@@ -303,8 +304,12 @@ func (s *LogStore) PutLogs(lg *LogGroup) (err error) {
303304
}
304305
outLen = len(out)
305306
}
306-
307-
uri := fmt.Sprintf("/logstores/%v", s.Name)
307+
var uri string
308+
if s.useMetricStoreURL {
309+
uri = fmt.Sprintf("/prometheus/%s/%s/api/v1/write", s.project.Name, s.Name)
310+
} else {
311+
uri = fmt.Sprintf("/logstores/%v", s.Name)
312+
}
308313
r, err := request(s.project, "POST", uri, h, out[:outLen])
309314
if err != nil {
310315
return NewClientError(err)
@@ -329,7 +334,7 @@ func (s *LogStore) PostLogStoreLogs(lg *LogGroup, hashKey *string) (err error) {
329334
return nil
330335
}
331336

332-
if hashKey == nil || *hashKey == "" {
337+
if hashKey == nil || *hashKey == "" || s.useMetricStoreURL {
333338
// empty hash call PutLogs
334339
return s.PutLogs(lg)
335340
}

producer/README.md

+30-29
Original file line numberDiff line numberDiff line change
@@ -110,35 +110,36 @@ func(callback *Callback)Fail(result *producer.Result){
110110

111111
## **producer配置详解**
112112

113-
| 参数 | 类型 | 描述 |
114-
| ------------------- | ------ | ------------------------------------------------------------ |
115-
| TotalSizeLnBytes | Int64 | 单个 producer 实例能缓存的日志大小上限,默认为 100MB。 |
116-
| MaxIoWorkerCount | Int64 | 单个producer能并发的最多groutine的数量,默认为50,该参数用户可以根据自己实际服务器的性能去配置。 |
117-
| MaxBlockSec | Int | 如果 producer 可用空间不足,调用者在 send 方法上的最大阻塞时间,默认为 60 秒。<br/>如果超过这个时间后所需空间仍无法得到满足,send 方法会抛出TimeoutException。如果将该值设为0,当所需空间无法得到满足时,send 方法会立即抛出 TimeoutException。如果您希望 send 方法一直阻塞直到所需空间得到满足,可将该值设为负数。 |
118-
| MaxBatchSize | Int64 | 当一个 ProducerBatch 中缓存的日志大小大于等于 batchSizeThresholdInBytes 时,该 batch 将被发送,默认为 512 KB,最大可设置成 5MB。 |
119-
| MaxBatchCount | Int | 当一个 ProducerBatch 中缓存的日志条数大于等于 batchCountThreshold 时,该 batch 将被发送,默认为 4096,最大可设置成 40960。 |
120-
| LingerMs | Int64 | 一个 ProducerBatch 从创建到可发送的逗留时间,默认为 2 秒,最小可设置成 100 毫秒。 |
121-
| Retries | Int | 如果某个 ProducerBatch 首次发送失败,能够对其重试的次数,默认为 10 次。<br/>如果 retries 小于等于 0,该 ProducerBatch 首次发送失败后将直接进入失败队列。 |
122-
| MaxReservedAttempts | Int | 每个 ProducerBatch 每次被尝试发送都对应着一个 Attemp,此参数用来控制返回给用户的 attempt 个数,默认只保留最近的 11 次 attempt 信息。<br/>该参数越大能让您追溯更多的信息,但同时也会消耗更多的内存。 |
123-
| BaseRetryBackoffMs | Int64 | 首次重试的退避时间,默认为 100 毫秒。 Producer 采样指数退避算法,第 N 次重试的计划等待时间为 baseRetryBackoffMs * 2^(N-1)。 |
124-
| MaxRetryBackoffMs | Int64 | 重试的最大退避时间,默认为 50 秒。 |
125-
| AdjustShargHash | Bool | 如果调用 send 方法时指定了 shardHash,该参数用于控制是否需要对其进行调整,默认为 true。 |
126-
| Buckets | Int | 当且仅当 adjustShardHash 为 true 时,该参数才生效。此时,producer 会自动将 shardHash 重新分组,分组数量为 buckets。<br/>如果两条数据的 shardHash 不同,它们是无法合并到一起发送的,会降低 producer 吞吐量。将 shardHash 重新分组后,能让数据有更多地机会被批量发送。该参数的取值范围是 [1, 256],且必须是 2 的整数次幂,默认为 64。 |
127-
| AllowLogLevel | String | 设置日志输出级别,默认值是Info,consumer中一共有4种日志输出级别,分别为debug,info,warn和error。 |
128-
| LogFileName | String | 日志文件输出路径,不设置的话默认输出到stdout。 |
129-
| IsJsonType | Bool | 是否格式化文件输出格式,默认为false。 |
130-
| LogMaxSize | Int | 单个日志存储数量,默认为10M。 |
131-
| LogMaxBackups | Int | 日志轮转数量,默认为10。 |
132-
| LogCompass | Bool | 是否使用gzip 压缩日志,默认为false。 |
133-
| Endpoint | String | 服务入口,关于如何确定project对应的服务入口可参考文章[服务入口](https://help.aliyun.com/document_detail/29008.html?spm=a2c4e.11153940.blogcont682761.14.446e7720gs96LB)|
134-
| AccessKeyID | String | 账户的AK id。 |
135-
| AccessKeySecret | String | 账户的AK 密钥。 |
136-
|CredentialsProvider|Interface|可选,可自定义CredentialsProvider,来提供动态的 AccessKeyId/AccessKeySecret/StsToken,该接口应当缓存 AK,且必须线程安全|
137-
| NoRetryStatusCodeList | []int | 用户配置的不需要重试的错误码列表,当发送日志失败时返回的错误码在列表中,则不会重试。默认包含400,404两个值。 |
138-
| UpdateStsToken | Func | 函数类型,该函数内去实现自己的获取ststoken 的逻辑,producer 会自动刷新ststoken并放入client 当中。
139-
| StsTokenShutDown | channel| 关闭ststoken 自动刷新的通讯信道,当该信道关闭时,不再自动刷新ststoken值。当producer关闭的时候,该参数不为nil值,则会主动调用close去关闭该信道停止ststoken的自动刷新。 |
140-
| Region | String | 日志服务的区域,当签名版本使用 AuthV4 时必选。 例如cn-hangzhou。 |
141-
| AuthVersion | String | 使用的签名版本,可选枚举值为 AuthV1, AuthV4。AuthV4 签名示例可参考程序 [producer_test.go](producer_test.go)|
113+
| 参数 | 类型 | 描述 |
114+
| ------------------- |-----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
115+
| TotalSizeLnBytes | Int64 | 单个 producer 实例能缓存的日志大小上限,默认为 100MB。 |
116+
| MaxIoWorkerCount | Int64 | 单个producer能并发的最多groutine的数量,默认为50,该参数用户可以根据自己实际服务器的性能去配置。 |
117+
| MaxBlockSec | Int | 如果 producer 可用空间不足,调用者在 send 方法上的最大阻塞时间,默认为 60 秒。<br/>如果超过这个时间后所需空间仍无法得到满足,send 方法会抛出TimeoutException。如果将该值设为0,当所需空间无法得到满足时,send 方法会立即抛出 TimeoutException。如果您希望 send 方法一直阻塞直到所需空间得到满足,可将该值设为负数。 |
118+
| MaxBatchSize | Int64 | 当一个 ProducerBatch 中缓存的日志大小大于等于 batchSizeThresholdInBytes 时,该 batch 将被发送,默认为 512 KB,最大可设置成 5MB。 |
119+
| MaxBatchCount | Int | 当一个 ProducerBatch 中缓存的日志条数大于等于 batchCountThreshold 时,该 batch 将被发送,默认为 4096,最大可设置成 40960。 |
120+
| LingerMs | Int64 | 一个 ProducerBatch 从创建到可发送的逗留时间,默认为 2 秒,最小可设置成 100 毫秒。 |
121+
| Retries | Int | 如果某个 ProducerBatch 首次发送失败,能够对其重试的次数,默认为 10 次。<br/>如果 retries 小于等于 0,该 ProducerBatch 首次发送失败后将直接进入失败队列。 |
122+
| MaxReservedAttempts | Int | 每个 ProducerBatch 每次被尝试发送都对应着一个 Attemp,此参数用来控制返回给用户的 attempt 个数,默认只保留最近的 11 次 attempt 信息。<br/>该参数越大能让您追溯更多的信息,但同时也会消耗更多的内存。 |
123+
| BaseRetryBackoffMs | Int64 | 首次重试的退避时间,默认为 100 毫秒。 Producer 采样指数退避算法,第 N 次重试的计划等待时间为 baseRetryBackoffMs * 2^(N-1)。 |
124+
| MaxRetryBackoffMs | Int64 | 重试的最大退避时间,默认为 50 秒。 |
125+
| AdjustShargHash | Bool | 如果调用 send 方法时指定了 shardHash,该参数用于控制是否需要对其进行调整,默认为 true。 |
126+
| Buckets | Int | 当且仅当 adjustShardHash 为 true 时,该参数才生效。此时,producer 会自动将 shardHash 重新分组,分组数量为 buckets。<br/>如果两条数据的 shardHash 不同,它们是无法合并到一起发送的,会降低 producer 吞吐量。将 shardHash 重新分组后,能让数据有更多地机会被批量发送。该参数的取值范围是 [1, 256],且必须是 2 的整数次幂,默认为 64。 |
127+
| AllowLogLevel | String | 设置日志输出级别,默认值是Info,consumer中一共有4种日志输出级别,分别为debug,info,warn和error。 |
128+
| LogFileName | String | 日志文件输出路径,不设置的话默认输出到stdout。 |
129+
| IsJsonType | Bool | 是否格式化文件输出格式,默认为false。 |
130+
| LogMaxSize | Int | 单个日志存储数量,默认为10M。 |
131+
| LogMaxBackups | Int | 日志轮转数量,默认为10。 |
132+
| LogCompass | Bool | 是否使用gzip 压缩日志,默认为false。 |
133+
| Endpoint | String | 服务入口,关于如何确定project对应的服务入口可参考文章[服务入口](https://help.aliyun.com/document_detail/29008.html?spm=a2c4e.11153940.blogcont682761.14.446e7720gs96LB)|
134+
| AccessKeyID | String | 账户的AK id。 |
135+
| AccessKeySecret | String | 账户的AK 密钥。 |
136+
|CredentialsProvider| Interface | 可选,可自定义CredentialsProvider,来提供动态的 AccessKeyId/AccessKeySecret/StsToken,该接口应当缓存 AK,且必须线程安全 |
137+
| NoRetryStatusCodeList | []int | 用户配置的不需要重试的错误码列表,当发送日志失败时返回的错误码在列表中,则不会重试。默认包含400,404两个值。 |
138+
| UpdateStsToken | Func | 函数类型,该函数内去实现自己的获取ststoken 的逻辑,producer 会自动刷新ststoken并放入client 当中。
139+
| StsTokenShutDown | channel | 关闭ststoken 自动刷新的通讯信道,当该信道关闭时,不再自动刷新ststoken值。当producer关闭的时候,该参数不为nil值,则会主动调用close去关闭该信道停止ststoken的自动刷新。 |
140+
| Region | String | 日志服务的区域,当签名版本使用 AuthV4 时必选。 例如cn-hangzhou。 |
141+
| AuthVersion | String | 使用的签名版本,可选枚举值为 AuthV1, AuthV4。AuthV4 签名示例可参考程序 [producer_test.go](producer_test.go)|
142+
| UseMetricStoreURL | bool | 使用 Metricstore地址进行发送日志,可以提升大基数时间线下的查询性能。 |
142143

143144
## 关于性能
144145

producer/io_worker.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch) {
4545
level.Debug(ioWorker.logger).Log("msg", "ioworker send data to server")
4646
beginMs := GetTimeMs(time.Now().UnixNano())
4747
var err error
48-
if producerBatch.shardHash != nil {
48+
if producerBatch.isUseMetricStoreUrl() {
49+
err = ioWorker.client.PutLogsWithMetricStoreURL(producerBatch.getProject(), producerBatch.getLogstore(), producerBatch.logGroup)
50+
} else if producerBatch.shardHash != nil {
4951
err = ioWorker.client.PostLogStoreLogs(producerBatch.getProject(), producerBatch.getLogstore(), producerBatch.logGroup, producerBatch.getShardHash())
5052
} else {
5153
err = ioWorker.client.PutLogs(producerBatch.getProject(), producerBatch.getLogstore(), producerBatch.logGroup)

producer/producer_batch.go

+8
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type ProducerBatch struct {
2828
shardHash *string
2929
result *Result
3030
maxReservedAttempts int
31+
useMetricStoreUrl bool
3132
}
3233

3334
func generatePackId(source string) string {
@@ -76,6 +77,7 @@ func initProducerBatch(logData interface{}, callBackFunc CallBack, project, logs
7677
logstore: logstore,
7778
result: initResult(),
7879
maxReservedAttempts: config.MaxReservedAttempts,
80+
useMetricStoreUrl: config.UseMetricStoreURL,
7981
}
8082
if shardHash == "" {
8183
producerBatch.shardHash = nil
@@ -114,6 +116,12 @@ func (producerBatch *ProducerBatch) getLogGroupCount() int {
114116
return len(producerBatch.logGroup.GetLogs())
115117
}
116118

119+
func (producerBatch *ProducerBatch) isUseMetricStoreUrl() bool {
120+
defer producerBatch.lock.RUnlock()
121+
producerBatch.lock.RLock()
122+
return producerBatch.useMetricStoreUrl
123+
}
124+
117125
func (producerBatch *ProducerBatch) addLogToLogGroup(log interface{}) {
118126
defer producerBatch.lock.Unlock()
119127
producerBatch.lock.Lock()

producer/producer_config.go

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type ProducerConfig struct {
3838
LogTags []*sls.LogTag
3939
GeneratePackId bool
4040
CredentialsProvider sls.CredentialsProvider
41+
UseMetricStoreURL bool
4142

4243
packLock sync.Mutex
4344
packPrefix string

0 commit comments

Comments
 (0)