Skip to content
This repository was archived by the owner on Jun 10, 2022. It is now read-only.

Choose partition by key for several brokers (to master) #218

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/Broker.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@
use Kafka\Sasl\Plain;
use Kafka\Sasl\Scram;
use function array_keys;
use function count;
use function crc32;
use function explode;
use function in_array;
use function serialize;
use function shuffle;
use function sprintf;
use function strpos;
use function trim;

class Broker
{
Expand Down Expand Up @@ -281,4 +284,20 @@ private function getSaslMechanismProvider(Config $config): SaslMechanism

throw new Exception(sprintf('"%s" is an invalid SASL mechanism', $mechanism));
}

/**
* @param mixed[] $record
*/
public function getPartitionId(array $record): int
{
$topicInfos = $this->getTopics();
$topicMeta = $topicInfos[$record['topic']];
$partNums = array_keys($topicMeta);
if (isset($record['key']) && trim($record['key'])) {
$partId = $partNums[crc32($record['key']) % count($partNums)];
} else {
$partId = isset($record['partId'], $topicMeta[$record['partId']]) ? $record['partId'] : $partNums[0];
}
return (int) $partId;
}
}
6 changes: 1 addition & 5 deletions src/Producer/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
use Kafka\ProducerConfig;
use Kafka\Protocol;
use Psr\Log\LoggerAwareTrait;
use function array_keys;
use function count;
use function explode;
use function in_array;
Expand Down Expand Up @@ -320,10 +319,7 @@ protected function convertRecordSet(array $recordSet): array
$this->recordValidator->validate($record, $topics);

$topicMeta = $topics[$record['topic']];
$partNums = array_keys($topicMeta);
shuffle($partNums);

$partId = ! isset($record['partId'], $topicMeta[$record['partId']]) ? $partNums[0] : $record['partId'];
$partId = $broker->getPartitionId($record);

$brokerId = $topicMeta[$partId];
$topicData = [];
Expand Down
6 changes: 1 addition & 5 deletions src/Producer/SyncProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use Kafka\ProducerConfig;
use Kafka\Protocol\Protocol;
use Psr\Log\LoggerAwareTrait;
use function array_keys;
use function count;
use function explode;
use function json_encode;
Expand Down Expand Up @@ -161,10 +160,7 @@ protected function convertRecordSet(array $recordSet): array
$this->recordValidator->validate($record, $topics);

$topicMeta = $topics[$record['topic']];
$partNums = array_keys($topicMeta);
shuffle($partNums);

$partId = isset($record['partId'], $topicMeta[$record['partId']]) ? $record['partId'] : $partNums[0];
$partId = $broker->getPartitionId($record);

$brokerId = $topicMeta[$partId];
$topicData = [];
Expand Down
54 changes: 54 additions & 0 deletions tests/Base/BrokerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,58 @@ private function getBroker(): Broker
{
return Broker::getInstance();
}

/**
* testGetPartitionId
* @access public
*/
public function testGetPartitionId(): void
{
$broker = Broker::getInstance();
$data = [
'brokers' => [
[
'host' => '127.0.0.1',
'port' => '9092',
'nodeId' => '0',
],
[
'host' => '127.0.0.1',
'port' => '9192',
'nodeId' => '1',
],
[
'host' => '127.0.0.1',
'port' => '9292',
'nodeId' => '2',
],
],
'topics' => [
[
'topicName' => 'test',
'errorCode' => 0,
'partitions' => [
[
'partitionId' => 0,
'errorCode' => 0,
'leader' => 0,
],
[
'partitionId' => 1,
'errorCode' => 0,
'leader' => 2,
],
],
],
],
];
$broker->setData($data['topics'], $data['brokers']);
$data = [
'partId' => '1',
'topic' => 'test',
'value' => 'test message',
];
$partId = $broker->getPartitionId($data);
$this->assertEquals('1', $partId);
}
}