Skip to content
This repository was archived by the owner on Jul 15, 2023. It is now read-only.

Optimization and bubble up ProducerResponseStatus #53

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/Kafka.Client.Tests/Producers/DefaultCallbackHandlerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,61 @@ public void ShouldRetryHandleWhenTopicNotFound()

Assert.Fail("Should have caught exception.");
}

[TestMethod]
[TestCategory(TestCategories.BVT)]
public void ShouldReturnOffset()
{
var partitioner = new Mock<IPartitioner<string>>();
var config = new ProducerConfiguration(new List<BrokerConfiguration>());
var pool = new Mock<ISyncProducerPool>();
var producer = new Mock<ISyncProducer>();
var partitionMetadatas = new List<PartitionMetadata>()
{
new PartitionMetadata(0, new Broker(0, "host1", 1234), Enumerable.Empty<Broker>(),
Enumerable.Empty<Broker>())
};
var metadatas = new List<TopicMetadata>() { new TopicMetadata("test", partitionMetadatas, ErrorMapping.NoError) };
producer.SetupGet(p => p.Config)
.Returns(
() =>
new SyncProducerConfiguration(new ProducerConfiguration(new List<BrokerConfiguration>()), 0,
"host1", 1234));
producer.Setup(p => p.Send(It.IsAny<TopicMetadataRequest>())).Returns(() => metadatas);
var statuses = new Dictionary<TopicAndPartition, ProducerResponseStatus>();
var producerResponseStatus = new ProducerResponseStatus()
{
PartitionId = 0,
Topic = "test",
Offset = 1,
Error = ErrorMapping.NoError
};
statuses[new TopicAndPartition("test", 0)] = producerResponseStatus;
producer.Setup(p => p.Send(It.IsAny<ProducerRequest>()))
.Returns(
() =>
new ProducerResponse(1, statuses));
pool.Setup(p => p.GetShuffledProducers()).Returns(() => new List<ISyncProducer>() { producer.Object });
pool.Setup(p => p.GetProducer(It.IsAny<int>())).Returns(() => producer.Object);
var mockPartitionInfo = new Mock<IBrokerPartitionInfo>();
mockPartitionInfo.Setup(m => m.GetBrokerPartitionInfo(0, string.Empty, It.IsAny<int>(), "test"))
.Returns(() => new List<Partition>());
var handler = new DefaultCallbackHandler<string, Message>(config, partitioner.Object, new DefaultEncoder(), mockPartitionInfo.Object, pool.Object);
List<ProducerResponseStatus> producerResponse;
try
{
producerResponse = handler.Handle(new List<ProducerData<string, Message>>()
{
new ProducerData<string, Message>("test", new Message(new byte[100]))
});
}
catch (FailedToSendMessageException<string>)
{
mockPartitionInfo.Verify(m => m.GetBrokerPartitionInfo(0, string.Empty, It.IsAny<int>(), "test"), Times.Exactly(3));
return;
}

Assert.AreEqual(1, producerResponse.FirstOrDefault().Offset);
}
}
}
74 changes: 45 additions & 29 deletions src/KafkaNET.Library/Producers/DefaultCallbackHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ private int NextCorrelationId
get { return Interlocked.Increment(ref correlationId); }
}

public void Handle(IEnumerable<ProducerData<TK, TV>> events)
public List<ProducerResponseStatus> Handle(IEnumerable<ProducerData<TK, TV>> events)
{
IEnumerable<ProducerData<TK, Message>> serializedData = this.Serialize(events);
ProduceDispatchSeralizeResult<TK> outstandingProduceRequests = new ProduceDispatchSeralizeResult<TK>(new List<Exception> { }, serializedData, null, true);
var remainingRetries = this.producerConfig.ProducerRetries;
int currentRetryMs = producerConfig.ProducerRetryExponentialBackoffMinMs;
//new up master list of ProducerResponseStatus to return
List<ProducerResponseStatus> producerResponseStatuses = new List<ProducerResponseStatus>();

var brokers = this.producerConfig.Brokers;
if (producerConfig.Verbose)
Expand All @@ -86,6 +88,8 @@ public void Handle(IEnumerable<ProducerData<TK, TV>> events)
{
ProduceDispatchSeralizeResult<TK> currentOutstandingRequests =
this.DispatchSerializedData(outstandingProduceRequests.FailedProducerDatas, remainingRetries > 1 ? false : true);
//Add producerResponseStatuses to response master list
producerResponseStatuses.AddRange(currentOutstandingRequests.ProducerResponse);
outstandingProduceRequests = currentOutstandingRequests;
if (outstandingProduceRequests.HasDataNeedDispatch)
{
Expand Down Expand Up @@ -121,6 +125,7 @@ public void Handle(IEnumerable<ProducerData<TK, TV>> events)
Logger.Error(message);
throw new FailedToSendMessageException<TK>(message, new List<Exception>(), outstandingProduceRequests, allCount, remainFailedCount);
}
return producerResponseStatuses;
}

private int ExponentialRetry(int currentRetryMs)
Expand All @@ -141,8 +146,10 @@ private ProduceDispatchSeralizeResult<TK> DispatchSerializedData(IEnumerable<Pro
List<Tuple<int, TopicAndPartition, ProducerResponseStatus>> failedDetail = null;
var exceptions = new List<Exception>();
bool hasDataNeedReprocess = false;
//new up master list of ProducerResponseStatus to add to ProduceDispatchSerializeResult constructor
List<ProducerResponseStatus> producerResponseStatuses = new List<ProducerResponseStatus>();
try
{
{
IEnumerable<KeyValuePair<int, Dictionary<TopicAndPartition, List<ProducerData<TK, Message>>>>> partitionedData = this.PartitionAndCollate(messages);
foreach (KeyValuePair<int, Dictionary<TopicAndPartition, List<ProducerData<TK, Message>>>> keyValuePair in partitionedData)
{
Expand All @@ -154,37 +161,47 @@ private ProduceDispatchSeralizeResult<TK> DispatchSerializedData(IEnumerable<Pro
Logger.DebugFormat("ProducerDispatchSeralizeResult,brokerId={0},partitionData.Count={1}", brokerId, partitionedData.Count());
}

ProducerSendResult<IEnumerable<Tuple<TopicAndPartition, ProducerResponseStatus>>> failedTopicResponse = this.Send(brokerId, messageSetPerBroker);
if (!failedTopicResponse.Success || (failedTopicResponse.ReturnVal != null && failedTopicResponse.ReturnVal.Any()))
{
failedProduceRequests = new List<ProducerData<TK, Message>>();
foreach (var failedTopic in failedTopicResponse.ReturnVal)
{
List<ProducerData<TK, Message>> failedMessages = eventsPerBrokerMap[failedTopic.Item1];
failedProduceRequests.AddRange(failedMessages);
hasDataNeedReprocess = true;
}
//Get topic responses from send method
ProducerSendResult<IEnumerable<Tuple<TopicAndPartition, ProducerResponseStatus>>> topicResponse = this.Send(brokerId, messageSetPerBroker);

foreach (var topic in failedTopicResponse.ReturnVal.Select(e => e.Item1.Topic).Distinct())
{
// update the metadata in case that the failure caused by kafka broker failover
this.brokerPartitionInfo.UpdateInfo(producerConfig.VersionId, NextCorrelationId,
producerConfig.ClientId, topic);
}
if (topicResponse?.ReturnVal != null)
{
//New up list of topics to ensure we do not updateInfo on the same topic more than once
var topics = new List<string>();
failedDetail = new List<Tuple<int, TopicAndPartition, ProducerResponseStatus>>();

if (lastRetry)
//Use a single for each to do proper error handling and retries and add the topicResponseStatus to the master list
foreach (var topicResponseStatus in topicResponse.ReturnVal)
{
failedDetail = new List<Tuple<int, TopicAndPartition, ProducerResponseStatus>>();
foreach (var failedTopic in failedTopicResponse.ReturnVal)
if (topicResponseStatus.Item2.Error != ErrorMapping.NoError)
{
failedDetail.Add(new Tuple<int, TopicAndPartition, ProducerResponseStatus>(brokerId, failedTopic.Item1, failedTopic.Item2));
}
failedProduceRequests = new List<ProducerData<TK, Message>>();

List<ProducerData<TK, Message>> failedMessages = eventsPerBrokerMap[topicResponseStatus.Item1];
failedProduceRequests.AddRange(failedMessages);
hasDataNeedReprocess = true;

//If not in list of topics whose metadata has already been updated
if (!topics.Contains(topicResponseStatus.Item1.Topic))
{
// update the metadata in case that the failure caused by kafka broker failover
this.brokerPartitionInfo.UpdateInfo(producerConfig.VersionId, NextCorrelationId,
producerConfig.ClientId, topicResponseStatus.Item1.Topic);
topics.Add(topicResponseStatus.Item1.Topic);
}

if (lastRetry)
{
failedDetail.Add(new Tuple<int, TopicAndPartition, ProducerResponseStatus>(brokerId, topicResponseStatus.Item1, topicResponseStatus.Item2));
}
}
//Add ProducerResponseStatus to master list
producerResponseStatuses.Add(topicResponseStatus.Item2);
}
}
if (failedTopicResponse.Exception != null)
exceptions.Add(failedTopicResponse.Exception);


if (topicResponse?.Exception != null)
exceptions.Add(topicResponse.Exception);
}
}
catch (Exception)
Expand All @@ -193,7 +210,7 @@ private ProduceDispatchSeralizeResult<TK> DispatchSerializedData(IEnumerable<Pro
throw;
}

return new ProduceDispatchSeralizeResult<TK>(exceptions, failedProduceRequests, failedDetail, hasDataNeedReprocess);
return new ProduceDispatchSeralizeResult<TK>(exceptions, failedProduceRequests, failedDetail, hasDataNeedReprocess, producerResponseStatuses);
}
/// <summary>
/// Send message of one broker.
Expand Down Expand Up @@ -268,8 +285,7 @@ private ProducerSendResult<IEnumerable<Tuple<TopicAndPartition, ProducerResponse
sb.Append(string.Join(",", response.Statuses.Where(r => r.Value.Error != (short)ErrorMapping.NoError).Select(r => r.ToString())));
throw new FailedToSendMessageException<TK>(sb.ToString());
}
return new ProducerSendResult<IEnumerable<Tuple<TopicAndPartition, ProducerResponseStatus>>>(response.Statuses.Where(s => s.Value.Error != (short)ErrorMapping.NoError)
.Select(s => new Tuple<TopicAndPartition, ProducerResponseStatus>(s.Key, s.Value)));
return new ProducerSendResult<IEnumerable<Tuple<TopicAndPartition, ProducerResponseStatus>>>(response.Statuses.Select(s => new Tuple<TopicAndPartition, ProducerResponseStatus>(s.Key, s.Value)));
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/KafkaNET.Library/Producers/ICallbackHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
* limitations under the License.
*/

using Kafka.Client.Responses;

namespace Kafka.Client.Producers
{
using System;
Expand All @@ -30,6 +32,6 @@ public interface ICallbackHandler<K,V> : IDisposable
/// <param name="events">
/// The sent request events.
/// </param>
void Handle(IEnumerable<ProducerData<K,V>> events);
List<ProducerResponseStatus> Handle(IEnumerable<ProducerData<K,V>> events);
}
}
3 changes: 2 additions & 1 deletion src/KafkaNET.Library/Producers/IProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/

using System.Collections.Generic;
using Kafka.Client.Responses;

namespace Kafka.Client.Producers
{
Expand All @@ -36,7 +37,7 @@ public interface IProducer<TKey, TData> : IDisposable
/// synchronous or the asynchronous producer.
/// </summary>
/// <param name="data">The producer data objects that encapsulate the topic, key and message data.</param>
void Send(IEnumerable<ProducerData<TKey, TData>> data);
List<ProducerResponseStatus> Send(IEnumerable<ProducerData<TKey, TData>> data);

/// <summary>
/// Sends the data to a single topic, partitioned by key, using either the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@ namespace Kafka.Client.Producers
public class ProduceDispatchSeralizeResult<K>
{
public ProduceDispatchSeralizeResult(IEnumerable<Exception> exceptions,
IEnumerable<ProducerData<K, Message>> failedProducerDatas, List<Tuple<int, TopicAndPartition, ProducerResponseStatus>> failedDetail, bool hasDataNeedDispatch)
IEnumerable<ProducerData<K, Message>> failedProducerDatas, List<Tuple<int, TopicAndPartition, ProducerResponseStatus>> failedDetail, bool hasDataNeedDispatch, List<ProducerResponseStatus> producerResponse = null)
{
Exceptions = exceptions;
FailedProducerDatas = failedProducerDatas;
FailedDetail = failedDetail;
this.HasDataNeedDispatch = hasDataNeedDispatch;
this.ProducerResponse = producerResponse ?? new List<ProducerResponseStatus>();
}

public IEnumerable<ProducerData<K, Message>> FailedProducerDatas { get; private set; }
public IEnumerable<Exception> Exceptions { get; private set; }
public List<Tuple<int, TopicAndPartition, ProducerResponseStatus>> FailedDetail { get; private set; }
public bool HasDataNeedDispatch { get; private set; }
}
public List<ProducerResponseStatus> ProducerResponse { get; private set; }
}
}
6 changes: 4 additions & 2 deletions src/KafkaNET.Library/Producers/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
* limitations under the License.
*/

using Kafka.Client.Responses;

namespace Kafka.Client.Producers
{
using Kafka.Client.Cfg;
Expand Down Expand Up @@ -70,14 +72,14 @@ public Producer(ProducerConfiguration config)
/// Sends the data to a multiple topics, partitioned by key
/// </summary>
/// <param name="data">The producer data objects that encapsulate the topic, key and message data.</param>
public void Send(IEnumerable<ProducerData<TKey, TData>> data)
public List<ProducerResponseStatus> Send(IEnumerable<ProducerData<TKey, TData>> data)
{
Guard.NotNull(data, "data");
Guard.CheckBool(data.Any(), true, "data.Any()");

this.EnsuresNotDisposed();

this.callbackHandler.Handle(data);
return this.callbackHandler.Handle(data);
}

/// <summary>
Expand Down
7 changes: 6 additions & 1 deletion src/KafkaNET.Library/Responses/ProducerResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class ProducerResponseStatus
{
public ErrorMapping Error { get; set; }
public long Offset { get; set; }
public string Topic { get; set; }
public int PartitionId { get; set; }
public override string ToString()
{
return string.Format("Error:{0} Offset:{1}", this.Error, this.Offset);
Expand Down Expand Up @@ -59,13 +61,16 @@ public ProducerResponse ParseFrom(KafkaBinaryReader reader)
{
var partitionId = reader.ReadInt32();
var error = reader.ReadInt16();
//only returns first message offset per Topic and Partition pair
var offset = reader.ReadInt64();
var topicAndPartition = new TopicAndPartition(topic, partitionId);

statuses.Add(topicAndPartition, new ProducerResponseStatus()
{
Error = ErrorMapper.ToError(error),
Offset = offset
Offset = offset,
Topic = topic,
PartitionId = partitionId
});

}
Expand Down