Skip to content
This repository was archived by the owner on Jul 15, 2023. It is now read-only.

Gfodor/add throttle time to fetch response #31

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions src/Kafka.Client.Tests/Response/FetchResponseTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,58 @@ public class FetchResponseTest
{
[TestMethod]
[TestCategory(TestCategories.BVT)]
public void ShouldAbleToParseFetchResponse()
public void ShouldAbleToParseV0FetchResponse()
{
var stream = new MemoryStream();
WriteTestFetchResponse(stream, 0);
var reader = new KafkaBinaryReader(stream);
var response = new FetchResponse.Parser(0).ParseFrom(reader);
response.ThrottleTime.ShouldBeEquivalentTo(0);
var set = response.MessageSet("topic1", 111);
set.Should().NotBeNull();
var messages = set.Messages.ToList();
messages.Count().Should().Be(1);
messages.First().Payload.Length.Should().Be(100);
}

[TestMethod]
[TestCategory(TestCategories.BVT)]
public void ShouldAbleToParseV1FetchResponse()
{
var stream = new MemoryStream();
WriteTestFetchResponse(stream, 1);
var reader = new KafkaBinaryReader(stream);
var response = new FetchResponse.Parser(1).ParseFrom(reader);
response.ThrottleTime.ShouldBeEquivalentTo(456);
var set = response.MessageSet("topic1", 111);
set.Should().NotBeNull();
var messages = set.Messages.ToList();
messages.Count().Should().Be(1);
messages.First().Payload.Length.Should().Be(100);
}

private static void WriteTestFetchResponse(MemoryStream stream, int versionId)
{
var writer = new KafkaBinaryWriter(stream);
writer.Write(1);
writer.Write(123); // correlation id
if (versionId > 0)
{
writer.Write(456); // throttle time
}
writer.Write(1); // data count
writer.WriteShortString("topic1");
writer.Write(1); // partition count
writer.Write(111); //partition id
writer.Write((short)ErrorMapping.NoError);
writer.Write((short) ErrorMapping.NoError);

writer.Write(1011L); // hw
var messageStream = new MemoryStream();
var messageWriter = new KafkaBinaryWriter(messageStream);
new BufferedMessageSet(new List<Message>() { new Message(new byte[100]) }, 0).WriteTo(messageWriter);
writer.Write((int)messageStream.Length);
writer.Write(messageStream.GetBuffer(), 0, (int)messageStream.Length);
new BufferedMessageSet(new List<Message>() {new Message(new byte[100])}, 0).WriteTo(messageWriter);
writer.Write((int) messageStream.Length);
writer.Write(messageStream.GetBuffer(), 0, (int) messageStream.Length);
stream.Seek(0, SeekOrigin.Begin);
var reader = new KafkaBinaryReader(stream);
var response = new FetchResponse.Parser().ParseFrom(reader);
var set = response.MessageSet("topic1", 111);
set.Should().NotBeNull();
var messages = set.Messages.ToList();
messages.Count().Should().Be(1);
messages.First().Payload.Length.Should().Be(100);
}
}
}
2 changes: 1 addition & 1 deletion src/KafkaNET.Library/KafkaConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public FetchResponse Send(FetchRequest request)
{
this.EnsuresNotDisposed();
Guard.NotNull(request, "request");
return this.Handle(request.RequestBuffer.GetBuffer(), new FetchResponse.Parser());
return this.Handle(request.RequestBuffer.GetBuffer(), new FetchResponse.Parser(request.VersionId));
}

/// <summary>
Expand Down
17 changes: 14 additions & 3 deletions src/KafkaNET.Library/Responses/FetchResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,19 @@ public FetchResponse(int correlationId, IEnumerable<TopicData> data)
this.TopicDataDict = data.GroupBy(x => x.Topic, x => x)
.ToDictionary(x => x.Key, x => x.ToList().FirstOrDefault());
}
public FetchResponse(int correlationId, IEnumerable<TopicData> data, int size)
public FetchResponse(int correlationId, int throttleTime, IEnumerable<TopicData> data, int size)
{
Guard.NotNull(data, "data");
this.CorrelationId = correlationId;
this.ThrottleTime = throttleTime;
this.TopicDataDict = data.GroupBy(x => x.Topic, x => x)
.ToDictionary(x => x.Key, x => x.ToList().FirstOrDefault());
this.Size = size;
}

public int Size { get; private set; }
public int CorrelationId { get; private set; }
public int ThrottleTime { get; private set; }
public Dictionary<string, TopicData> TopicDataDict { get; private set; }

public BufferedMessageSet MessageSet(string topic, int partition)
Expand Down Expand Up @@ -92,21 +94,30 @@ public PartitionData PartitionData(string topic, int partition)

public class Parser : IResponseParser<FetchResponse>
{
private int versionId;

public Parser(int versionId)
{
this.versionId = versionId;
}

public FetchResponse ParseFrom(KafkaBinaryReader reader)
{
int size = 0, correlationId = 0, dataCount = 0;
int size = 0, correlationId = 0, dataCount = 0, throttleTime = 0;
try
{
size = reader.ReadInt32();
correlationId = reader.ReadInt32();
if (versionId > 0)
throttleTime = reader.ReadInt32();
dataCount = reader.ReadInt32();
var data = new TopicData[dataCount];
for (int i = 0; i < dataCount; i++)
{
data[i] = TopicData.ParseFrom(reader);
}

return new FetchResponse(correlationId, data, size);
return new FetchResponse(correlationId, throttleTime, data, size);
}
catch (OutOfMemoryException mex)
{
Expand Down