Skip to content

Commit a92be69

Browse files
authored
Producer send with header support key-value and RecordHeader (#55)
* Add product and consume with header * producer send with header support key-value and RecordHeader * Fix * Fix * Fix
1 parent 2d497bf commit a92be69

File tree

8 files changed

+109
-9
lines changed

8 files changed

+109
-9
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ jobs:
4040
run: |
4141
docker exec swoole composer update
4242
docker exec kafka1 /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --partitions 3 --replication-factor 1 --topic test
43+
docker exec kafka1 /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --partitions 3 --replication-factor 1 --topic test-header
4344
4445
- name: plaintext-test
4546
run: |

doc/producer.en.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ If partition === null && key === null, then use Round Robin to select partition
4747
```php
4848
use longlang\phpkafka\Producer\Producer;
4949
use longlang\phpkafka\Producer\ProducerConfig;
50+
use longlang\phpkafka\Protocol\RecordBatch\RecordHeader;
5051

5152
$config = new ProducerConfig();
5253
$config->setBootstrapServer('127.0.0.1:9092');
@@ -57,6 +58,14 @@ $topic = 'test';
5758
$value = (string) microtime(true);
5859
$key = uniqid('', true);
5960
$producer->send('test', $value, $key);
61+
62+
// set headers
63+
// key-value or use RecordHeader
64+
$headers = [
65+
'key1' => 'value1',
66+
(new RecordHeader())->setHeaderKey('key2')->setValue('value2'),
67+
];
68+
$producer->send('test', $value, $key, $headers);
6069
```
6170

6271
## Send batch messages
@@ -83,14 +92,17 @@ $producer->sendBatch([
8392
```
8493

8594
## SASL Support
95+
8696
### Configuration
97+
8798
| Key | Description | Default |
8899
| - | - | - |
89100
| type | SASL Authentication Type. PLAIN is ``\longlang\phpkafka\Sasl\PlainSasl::class``| ''|
90101
| username | username | '' |
91102
| password | password | '' |
92103

93104
**Example**
105+
94106
```php
95107
use longlang\phpkafka\Producer\ProducerConfig;
96108
use longlang\phpkafka\Producer\Producer;
@@ -106,13 +118,14 @@ $producer = new Producer($config);
106118
// .... Your Business Code
107119
```
108120

109-
110121
## SSL Support
122+
111123
Class `longlang\phpkafka\Config\SslConfig`
112124

113125
> You can pass an array to a constructor.
114126
115127
### Configuration keys
128+
116129
| Key | Description | Default |
117130
| - | - | - |
118131
| open | Enable SSL | `false` |
@@ -145,4 +158,4 @@ $sslConfig->setCafile("/kafka-client/.github/kafka/cert/ca-cert");
145158
$config->setSsl($sslConfig);
146159
$producer = new Producer($config);
147160
// .... Your Business Code
148-
```
161+
```

doc/producer.md

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
```php
4848
use longlang\phpkafka\Producer\Producer;
4949
use longlang\phpkafka\Producer\ProducerConfig;
50+
use longlang\phpkafka\Protocol\RecordBatch\RecordHeader;
5051

5152
$config = new ProducerConfig();
5253
$config->setBootstrapServer('127.0.0.1:9092');
@@ -57,6 +58,14 @@ $topic = 'test';
5758
$value = (string) microtime(true);
5859
$key = uniqid('', true);
5960
$producer->send('test', $value, $key);
61+
62+
// 指定 headers
63+
// key-value或使用 RecordHeader 对象,都可以
64+
$headers = [
65+
'key1' => 'value1',
66+
(new RecordHeader())->setHeaderKey('key2')->setValue('value2'),
67+
];
68+
$producer->send('test', $value, $key, $headers);
6069
```
6170

6271
## 批量发送消息
@@ -82,17 +91,18 @@ $producer->sendBatch([
8291
]);
8392
```
8493

85-
86-
8794
## SASL支持
95+
8896
### 相关配置
97+
8998
|参数名|说明|默认值|
9099
| - | - | - |
91100
| type | SASL授权对应的类。PLAIN为``\longlang\phpkafka\Sasl\PlainSasl::class``| ''|
92101
| username | 账号 | '' |
93102
| password | 密码 | '' |
94103

95104
**代码示例:**
105+
96106
```php
97107
use longlang\phpkafka\Producer\ProducerConfig;
98108

@@ -107,11 +117,12 @@ $producer = new Producer($config);
107117
// .... 你的业务代码
108118
```
109119

110-
111120
## SSL支持
121+
112122
类名:`longlang\phpkafka\Config\SslConfig`
113123

114124
> 支持构造方法传入数组赋值
125+
115126
### 配置参数
116127

117128
|参数名|说明|默认值|
@@ -146,4 +157,4 @@ $sslConfig->setCafile("/kafka-client/.github/kafka/cert/ca-cert");
146157
$config->setSsl($sslConfig);
147158
$producer = new Producer($config);
148159
// .... 你的业务代码
149-
```
160+
```

src/Consumer/ConsumeMessage.php

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
namespace longlang\phpkafka\Consumer;
66

7+
use longlang\phpkafka\Protocol\RecordBatch\RecordHeader;
8+
79
class ConsumeMessage
810
{
911
/**
@@ -32,10 +34,13 @@ class ConsumeMessage
3234
protected $value;
3335

3436
/**
35-
* @var array
37+
* @var RecordHeader[]
3638
*/
3739
protected $headers;
3840

41+
/**
42+
* @param RecordHeader[] $headers
43+
*/
3944
public function __construct(Consumer $consumer, string $topic, int $partition, ?string $key, ?string $value, array $headers)
4045
{
4146
$this->consumer = $consumer;
@@ -106,11 +111,17 @@ public function setValue(?string $value): self
106111
return $this;
107112
}
108113

114+
/**
115+
* @return RecordHeader[]
116+
*/
109117
public function getHeaders(): array
110118
{
111119
return $this->headers;
112120
}
113121

122+
/**
123+
* @param RecordHeader[] $headers
124+
*/
114125
public function setHeaders(array $headers): self
115126
{
116127
$this->headers = $headers;

src/Producer/ProduceMessage.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
namespace longlang\phpkafka\Producer;
66

7+
use longlang\phpkafka\Protocol\RecordBatch\RecordHeader;
8+
79
class ProduceMessage
810
{
911
/**
@@ -22,7 +24,7 @@ class ProduceMessage
2224
protected $key;
2325

2426
/**
25-
* @var array
27+
* @var RecordHeader[]|array
2628
*/
2729
protected $headers;
2830

@@ -31,6 +33,9 @@ class ProduceMessage
3133
*/
3234
protected $partitionIndex;
3335

36+
/**
37+
* @param RecordHeader[]|array $headers
38+
*/
3439
public function __construct(string $topic, ?string $value, ?string $key = null, array $headers = [], ?int $partitionIndex = null)
3540
{
3641
$this->topic = $topic;
@@ -55,6 +60,9 @@ public function getKey(): ?string
5560
return $this->key;
5661
}
5762

63+
/**
64+
* @return RecordHeader[]|array
65+
*/
5866
public function getHeaders(): array
5967
{
6068
return $this->headers;

src/Producer/Producer.php

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
use longlang\phpkafka\Protocol\Produce\TopicProduceData;
1414
use longlang\phpkafka\Protocol\RecordBatch\Record;
1515
use longlang\phpkafka\Protocol\RecordBatch\RecordBatch;
16+
use longlang\phpkafka\Protocol\RecordBatch\RecordHeader;
1617

1718
class Producer
1819
{
@@ -44,6 +45,9 @@ public function __construct(ProducerConfig $config)
4445
$this->partitioner = new $class();
4546
}
4647

48+
/**
49+
* @param RecordHeader[]|array $headers
50+
*/
4751
public function send(string $topic, ?string $value, ?string $key = null, array $headers = [], ?int $partitionIndex = null): void
4852
{
4953
$message = new ProduceMessage($topic, $value, $key, $headers, $partitionIndex);
@@ -113,7 +117,16 @@ public function sendBatch(array $messages): void
113117
$record = $records[] = new Record();
114118
$record->setKey($key);
115119
$record->setValue($value);
116-
$record->setHeaders($message->getHeaders());
120+
$headers = [];
121+
foreach ($message->getHeaders() as $key => $value) {
122+
if ($value instanceof RecordHeader) {
123+
$headers[] = $value;
124+
// @phpstan-ignore-next-line
125+
} else {
126+
$headers[] = (new RecordHeader())->setHeaderKey($key)->setValue($value);
127+
}
128+
}
129+
$record->setHeaders($headers);
117130
$record->setOffsetDelta($offsetDelta);
118131
$record->setTimestampDelta(((int) (microtime(true) * 1000)) - $timestamp);
119132
$recordBatch->setRecords($records);

tests/ConsumerTest.php

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,29 @@ public function testConsumeWithStickyAssignor(): void
7070
$consumer->start();
7171
$consumer->close();
7272
}
73+
74+
public function testConsumeWithHeader(): void
75+
{
76+
$config = new ConsumerConfig();
77+
$config->setBroker(TestUtil::getHost() . ':' . TestUtil::getPort());
78+
TestUtil::addConfigInfo($config);
79+
$config->setTopic('test-header');
80+
$config->setGroupId('testGroup');
81+
$config->setClientId('testConsumeWithHeader');
82+
$config->setGroupInstanceId('testConsumeWithHeader');
83+
$config->setInterval(0.1);
84+
$consumer = new Consumer($config, function (ConsumeMessage $message) {
85+
$consumer = $message->getConsumer();
86+
$this->assertNotEmpty($message->getValue());
87+
$headers = $message->getHeaders();
88+
$this->assertCount(2, $headers);
89+
$this->assertEquals('key1', $headers[0]->getHeaderKey());
90+
$this->assertEquals('value1', $headers[0]->getValue());
91+
$this->assertEquals('key2', $headers[1]->getHeaderKey());
92+
$this->assertEquals('value2', $headers[1]->getValue());
93+
$consumer->stop();
94+
});
95+
$consumer->start();
96+
$consumer->close();
97+
}
7398
}

tests/ProducerTest.php

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use longlang\phpkafka\Producer\ProduceMessage;
88
use longlang\phpkafka\Producer\Producer;
99
use longlang\phpkafka\Producer\ProducerConfig;
10+
use longlang\phpkafka\Protocol\RecordBatch\RecordHeader;
1011
use PHPUnit\Framework\TestCase;
1112

1213
class ProducerTest extends TestCase
@@ -48,4 +49,21 @@ public function testSendBatch(): void
4849
$producer->close();
4950
$this->assertTrue(true);
5051
}
52+
53+
public function testSendWithHeader(): void
54+
{
55+
$config = new ProducerConfig();
56+
$config->setBootstrapServer(TestUtil::getHost() . ':' . TestUtil::getPort());
57+
TestUtil::addConfigInfo($config);
58+
$config->setAcks(-1);
59+
$producer = new Producer($config);
60+
$headers = [
61+
'key1' => 'value1',
62+
(new RecordHeader())->setHeaderKey('key2')->setValue('value2'),
63+
];
64+
// @phpstan-ignore-next-line
65+
$producer->send('test-header', (string) microtime(true), uniqid('', true), $headers);
66+
$producer->close();
67+
$this->assertTrue(true);
68+
}
5169
}

0 commit comments

Comments
 (0)