Skip to content

Adds support to AWS MSK IAM auth method #74

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
"google/crc32": "^0.1.0",
"exussum12/xxhash": "^1.0.0",
"chdemko/sorted-collections": "^1.0",
"symfony/polyfill-php81": "^1.23"
"symfony/polyfill-php81": "^1.23",
"guzzlehttp/guzzle": "^7.4",
"guzzlehttp/psr7": "^2.4",
"aws/aws-sdk-php": "^3.227",
"ext-json": "*"
},
"require-dev": {
"phpunit/phpunit": "^7.5|^8.0|^9.0",
Expand Down
29 changes: 29 additions & 0 deletions examples/consumer_msk.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

declare(strict_types=1);

use longlang\phpkafka\Consumer\ConsumeMessage;
use longlang\phpkafka\Consumer\Consumer;
use longlang\phpkafka\Consumer\ConsumerConfig;
use longlang\phpkafka\Sasl\AwsMskIamSasl;

require dirname(__DIR__) . '/vendor/autoload.php';

function consume(ConsumeMessage $message): void
{
var_dump($message->getKey() . ':' . $message->getValue());
}
$config = new ConsumerConfig();
$config->setBroker('127.0.0.1:9092');
$config->setTopic('test'); // 主题名称
$config->setGroupId('testGroup'); // 分组ID
$config->setClientId('test'); // 客户端ID
$config->setGroupInstanceId('test'); // 分组实例ID
$config->setInterval(0.1);
$config->setSasl([
"type"=> AwsMskIamSasl::class,
"region"=>"eu-west-1",
"expiration" => "+5 minutes"
]);
$consumer = new Consumer($config, 'consume');
$consumer->start();
41 changes: 41 additions & 0 deletions examples/producer_msk.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

declare(strict_types=1);

use longlang\phpkafka\Producer\Producer;
use longlang\phpkafka\Producer\ProducerConfig;
use longlang\phpkafka\Protocol\RecordBatch\RecordHeader;
use longlang\phpkafka\Config\SslConfig;
use longlang\phpkafka\Sasl\AwsMskIamSasl;

require dirname(__DIR__) . '/vendor/autoload.php';
$sslConfig = new SslConfig();
$sslConfig->setOpen(true);
$sslConfig->setCompression(true);

$config = new ProducerConfig();
$config->setBootstrapServer('b-1.fakemskcluster.kafka.eu-west-1.amazonaws.com:9098,b-2.fakemskcluster.kafka.eu-west-1.amazonaws.com:9098');
$config->setUpdateBrokers(true);
$config->setAcks(-1);
$config->setSasl([
"type" => AwsMskIamSasl::class,
"region" => "eu-west-1",
"expiration" => "+5 minutes"
]);
$config->setSsl($sslConfig);

$producer = new Producer($config);
$topic = 'MSKTutorialTopic';
$value = "It Works!!!";
$key = uniqid('', true);
$producer->send($topic, $value, $key);

for ($i = 1; $i <= 10; $i++) {
$value = "Message: " . $i;
$headers = [
'key1' => 'value1',
(new RecordHeader())->setHeaderKey('key2')->setValue('value2'),
];
$producer->send($topic, $value, $key, $headers);
}

35 changes: 23 additions & 12 deletions src/Client/SyncClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,13 @@ public function connect(): void
$this->socket->connect();
$this->waitResponseMaps = [];
$this->updateApiVersions();
$this->sendAuthInfo();
$connector = $this->getSaslConnector();
if (!$connector instanceof SaslInterface) {
return;
}

$connector->setSocket($this->socket);
$this->sendAuthInfo($connector);
}

public function close(): bool
Expand Down Expand Up @@ -152,8 +158,8 @@ public function send(AbstractRequest $request, ?RequestHeader $header = null, bo

if ($hasResponse) {
$this->waitResponseMaps[$correlationId] = [
'apiKey' => $apiKey,
'apiVersion' => $header->getRequestApiVersion(),
'apiKey' => $apiKey,
'apiVersion' => $header->getRequestApiVersion(),
'flexibleVersions' => $request->getFlexibleVersions(),
];
}
Expand Down Expand Up @@ -197,16 +203,8 @@ protected function updateApiVersions(): void
$this->setApiKeys($response->getApiKeys());
}

protected function sendAuthInfo(): void
protected function sendAuthInfo(SaslInterface $class): void
{
$config = $this->getConfig()->getSasl();
if (!isset($config['type']) || empty($config['type'])) {
return;
}
$class = new $config['type']($this->getConfig());
if (!$class instanceof SaslInterface) {
return;
}
$handshakeRequest = new SaslHandshakeRequest();
$handshakeRequest->setMechanism($class->getName());
$correlationId = $this->send($handshakeRequest);
Expand All @@ -221,4 +219,17 @@ protected function sendAuthInfo(): void
$authenticateResponse = $this->recv($correlationId);
ErrorCode::check($authenticateResponse->getErrorCode());
}

/**
* @return SaslInterface|null
*/
protected function getSaslConnector()
{
$config = $this->getConfig()->getSasl();
if (!isset($config['type']) || empty($config['type'])) {
return null;
}
$class = new $config['type']($this->getConfig());
return $class;
}
}
114 changes: 114 additions & 0 deletions src/Sasl/AwsMskIamSasl.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
<?php

declare(strict_types=1);

namespace longlang\phpkafka\Sasl;

use longlang\phpkafka\Config\CommonConfig;
use longlang\phpkafka\Exception\KafkaErrorException;
use Aws\Credentials\CredentialProvider;
use Aws\Signature\SignatureV4;
use GuzzleHttp\Psr7\Request;
use longlang\phpkafka\Socket\SocketInterface;

class AwsMskIamSasl implements SaslInterface
{
const SIGN_ACTION = "kafka-cluster:Connect";
const SIGN_SERVICE = "kafka-cluster";
const SIGN_VERSION = "2020_10_22";
const SIGN_ACTION_KEY = "action";
const SIGN_HOST_KEY = "host";
const SIGN_USER_AGENT_KEY = "user-agent";
const SIGN_VERSION_KEY = "version";
const QUERY_ACTION_KEY = "Action";


/**
* @var CommonConfig
*/
protected $config;

/**
* @var SocketInterface
*/
protected $socket;


public function __construct(CommonConfig $config)
{
$this->config = $config;
}

public function setSocket(SocketInterface $socket): void
{
$this->socket = $socket;
}

/**
* Authorization mode
*/
public function getName(): string
{
return 'AWS_MSK_IAM';
}

/**
* Generated the Signed JSON used by AWS_MSK_IAM as auth string
* @throws KafkaErrorException
*/
public function getAuthBytes(): string
{
$config = $this->config->getSasl();
if (empty($this->socket) || empty($config['region'])) {
throw new KafkaErrorException('AWS MSK config params not found');
}

$host = $this->socket->getHost();

$query = http_build_query(array(
self::QUERY_ACTION_KEY => self::SIGN_ACTION,
));

if (empty($config['expiration'])) {
$expiration = "+5 minutes";
} else {
$expiration = $config['expiration'];
}

$region = $config['region'];

$url = "kafka://" . $host . "/?" . $query;
$provider = CredentialProvider::defaultProvider();
// Returns a CredentialsInterface or throws.
$creds = $provider()->wait();

$req = new Request('GET', $url);

$signer = new SignatureV4(self::SIGN_SERVICE, $region);
$signedReq = $signer->presign($req, $creds, $expiration);
$signedUri = $signedReq->getUri();

parse_str($signedUri->getQuery(), $params);

$headers = $signedReq->getHeaders();

$signedMap = array(
self::SIGN_VERSION_KEY => self::SIGN_VERSION,
self::SIGN_USER_AGENT_KEY => "php-kafka/sasl/aws_msk_iam/" . PHP_VERSION,
self::SIGN_ACTION_KEY => self::SIGN_ACTION,
self::SIGN_HOST_KEY => $host
);

foreach ($params as $params_key => $params_value) {
$signedMap[strtolower($params_key)] = $params_value;
}

foreach ($headers as $header_key => $header_value) {
$header_key = strtolower($header_key);
if ($header_key !== self::SIGN_HOST_KEY) {
$signedMap[$header_key] = $header_value;
}
}
return json_encode($signedMap);
}
}
6 changes: 6 additions & 0 deletions src/Sasl/PlainSasl.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use longlang\phpkafka\Config\CommonConfig;
use longlang\phpkafka\Exception\KafkaErrorException;
use longlang\phpkafka\Socket\SocketInterface;

class PlainSasl implements SaslInterface
{
Expand Down Expand Up @@ -40,4 +41,9 @@ public function getAuthBytes(): string

return sprintf("\x00%s\x00%s", $config['username'], $config['password']);
}

public function setSocket(SocketInterface $socket): void
{
// we do nothing here
}
}
3 changes: 3 additions & 0 deletions src/Sasl/SaslInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace longlang\phpkafka\Sasl;

use longlang\phpkafka\Config\CommonConfig;
use longlang\phpkafka\Socket\SocketInterface;

interface SaslInterface
{
Expand All @@ -19,4 +20,6 @@ public function getName(): string;
* 返回授权信息.
*/
public function getAuthBytes(): string;

public function setSocket(SocketInterface $socket): void;
}