Skip to content
This repository was archived by the owner on Jul 15, 2023. It is now read-only.

BCL API usage improvements (LINQ, exceptions, locking). #87

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/KafkaNET.Library/Consumers/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class Consumer : IConsumer
private KafkaConnection connection;

internal long CreatedTimeInUTC;

private readonly object _lockOp = new object();

public ConsumerConfiguration Config
{
get
Expand Down Expand Up @@ -137,7 +140,7 @@ public FetchResponse Fetch(FetchRequest request)
try
{
Logger.Debug("Fetch is waiting for send lock");
lock (this)
lock (_lockOp)
{
Logger.Debug("Fetch acquired send lock. Begin send");
return connection.Send(request);
Expand Down Expand Up @@ -212,7 +215,7 @@ public OffsetResponse GetOffsetsBefore(OffsetRequest request)
{
try
{
lock (this)
lock (_lockOp)
{
return connection.Send(request);
}
Expand Down Expand Up @@ -240,7 +243,7 @@ public IEnumerable<TopicMetadata> GetMetaData(TopicMetadataRequest request)
{
try
{
lock (this)
lock (_lockOp)
{
return connection.Send(request);
}
Expand Down Expand Up @@ -273,7 +276,7 @@ protected virtual void Dispose(bool disposing)
{
if (connection != null)
{
lock (this)
lock (_lockOp)
{
if (connection != null)
{
Expand Down
6 changes: 3 additions & 3 deletions src/KafkaNET.Library/Consumers/ConsumerIterator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ public class ConsumerIterator<TData> : IConsumerIterator<TData>
private FetchedDataChunk currentDataChunk = null;
private TData nextItem;
private long consumedOffset = -1;
private SemaphoreSlim makeNextSemaphore = new SemaphoreSlim(1, 1);
private string topic;
private IDecoder<TData> decoder;
private readonly SemaphoreSlim makeNextSemaphore = new SemaphoreSlim(1, 1);
private readonly string topic;
private readonly IDecoder<TData> decoder;

/// <summary>
/// Initializes a new instance of the <see cref="ConsumerIterator"/> class.
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaNET.Library/Consumers/KafkaMessageStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public class KafkaMessageStream<TData> : IKafkaMessageStream<TData>

public IConsumerIterator<TData> iterator { get; private set; }

private string topic;
private readonly string topic;

private IDecoder<TData> decoder;
private readonly IDecoder<TData> decoder;

internal KafkaMessageStream(string topic, BlockingCollection<FetchedDataChunk> queue, int consumerTimeoutMs, IDecoder<TData> decoder)
{
Expand Down
12 changes: 6 additions & 6 deletions src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ public class ZookeeperConsumerConnector : KafkaClientBase, IZookeeperConsumerCon
private readonly KafkaScheduler scheduler = new KafkaScheduler();
private readonly IDictionary<string, IDictionary<int, PartitionTopicInfo>> topicRegistry = new ConcurrentDictionary<string, IDictionary<int, PartitionTopicInfo>>();
private readonly IDictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>> queues = new Dictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>>();
private List<Action> stopAsyncRebalancing = new List<Action>();
private readonly List<Action> stopAsyncRebalancing = new List<Action>();
private volatile bool disposed;
private EventHandler consumerRebalanceHandler;
private EventHandler zkSessionDisconnectedHandler;
private EventHandler zkSessionExpiredHandler;
private readonly EventHandler consumerRebalanceHandler;
private readonly EventHandler zkSessionDisconnectedHandler;
private readonly EventHandler zkSessionExpiredHandler;

/// <summary>
/// Initializes a new instance of the <see cref="ZookeeperConsumerConnector"/> class.
Expand All @@ -78,7 +78,7 @@ public ZookeeperConsumerConnector(ConsumerConfiguration config,
{
if (string.IsNullOrEmpty(config.GroupId))
{
throw new ArgumentNullException("GroupId of ConsumerConfiguration should not be empty.");
throw new ArgumentException("GroupId of ConsumerConfiguration should not be empty.", nameof(config));
}
Logger.Info("Enter ZookeeperConsumerConnector ...");
try
Expand Down Expand Up @@ -555,7 +555,7 @@ private IDictionary<string, IList<KafkaMessageStream<TData>>> Consume<TData>(IDi

if (topicCountDict == null)
{
throw new ArgumentNullException();
throw new ArgumentNullException(nameof(topicCountDict));
}

var dirs = new ZKGroupDirs(this.config.GroupId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ internal static string GetUsage()
{
StringBuilder sb = new StringBuilder();
sb.AppendFormat("{0} is one utility tool to try KafkaNET.Library.\r\n", AssemblyName);
sb.AppendFormat("Usage:\r\n", AssemblyName);
sb.Append("Usage:\r\n");
sb.AppendFormat("{0} <Verb> ArgumentsAndOpitons \r\n", AssemblyName);
sb.AppendFormat("Valid verbs includes: \r\n\r\n");
sb.Append("Valid verbs includes: \r\n\r\n");
sb.AppendFormat("\t{0,-30} {1}\r\n\r\n", KafkaNETExampleType.Topic.ToString().ToLowerInvariant(), new TopicHelperArguments().GetUsage(true));
sb.AppendFormat("\t{0,-30} {1}\r\n\r\n", KafkaNETExampleType.ConsumeSimple.ToString().ToLowerInvariant(), new ConsumeDataHelperArguments().GetUsage(true));
sb.AppendFormat("\t{0,-30} {1}\r\n\r\n", KafkaNETExampleType.ConsumeGroup.ToString().ToLowerInvariant(), new ConsumeGroupMonitorHelperOptions().GetUsage(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ internal class ConsumerGroupHelperUnit
internal static log4net.ILog Logger = log4net.LogManager.GetLogger(typeof(ConsumerGroupHelperUnit));
internal int ThreadID;
internal ConsumerConfiguration configSettings;
ConsumeGroupHelperOptions cgOptions;
AutoResetEvent resetEvent;
readonly ConsumeGroupHelperOptions cgOptions;
readonly AutoResetEvent resetEvent;
internal int Count = -1;
internal int consumedTotalCount = 0;
internal ConsumerGroupHelperUnit(int threadID, ConsumeGroupHelperOptions cg, AutoResetEvent e, int c)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ internal class ConsumeGroupMonitorUnit
public string group;
public string topic;

private DateTime startTime = DateTime.UtcNow;
private readonly DateTime startTime = DateTime.UtcNow;
private SortedDictionary<int, long> latestOffsetDictLastValue = null;
private SortedDictionary<int, long> latestCommitedDictLastValue = null;
private SortedDictionary<int, long> latestOffsetDictFirstValue = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,13 @@ private void RunOneThread(object parameter)

private class BatchedMessages : IDisposable
{
MemoryStream stream;
BinaryWriter writer;
int limit = 0;
readonly MemoryStream stream;
readonly BinaryWriter writer;
readonly int limit = 0;
int size = 0;

bool _disposed;

internal BatchedMessages(int limit)
{
this.stream = new MemoryStream(limit);
Expand All @@ -272,6 +274,11 @@ internal bool PutMessage(byte[] val)

internal bool PutMessage(byte[] key, byte[] val)
{
if (val == null)
{
return false;
}

if (key == null)
{
if (size + 8 + val.Length > limit) return false;
Expand All @@ -281,29 +288,17 @@ internal bool PutMessage(byte[] key, byte[] val)
if (size + 8 + key.Length + val.Length > limit) return false;
}

if (val == null)
{
return false;
}

try
if (key == null)
{
if (key == null)
{
writer.Write(0);
}
else
{
writer.Write(ReverseBytes((uint)key.Length));
writer.Write(key);
}
writer.Write(ReverseBytes((uint)val.Length));
writer.Write(val);
writer.Write(0);
}
catch (Exception ex)
else
{
throw ex;
writer.Write(ReverseBytes((uint)key.Length));
writer.Write(key);
}
writer.Write(ReverseBytes((uint)val.Length));
writer.Write(val);

if (key == null)
{
Expand All @@ -326,9 +321,13 @@ internal byte[] GetData()

public void Dispose()
{
if (stream != null)
stream.Dispose();
this.Dispose();
if (!_disposed)
{
if (stream != null)
stream.Dispose();

_disposed = true;
}
}

private uint ReverseBytes(uint value)
Expand Down Expand Up @@ -357,8 +356,8 @@ internal ResponseStatus() { }

private class Message
{
byte[] key;
byte[] val;
readonly byte[] key;
readonly byte[] val;

internal Message(byte[] key, byte[] val)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ internal class ProducePerfTestKafkaSimpleManagerWrapper
private static volatile ProducePerfTestKafkaSimpleManagerWrapper instance;
private static object syncRoot = new Object();

private object lockForDictionaryChange = new Object();
private readonly object lockForDictionaryChange = new Object();
private KafkaSimpleManagerConfiguration config;
private KafkaSimpleManager<byte[], Message> kafkaSimpleManage;
private ProducerConfiguration producerConfigTemplate;
private readonly ProducerConfiguration producerConfigTemplate;
private int correlationIDGetProducer = 0;
private string clientId = "KafkaSimpleManagerProducerWrapper";
private readonly string clientId = "KafkaSimpleManagerProducerWrapper";

private ProducePerfTestKafkaSimpleManagerWrapper()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Kafka.Client.Exceptions

public class TimeStampTooSmallException : Exception
{
private long offsetTime;
private readonly long offsetTime;


public TimeStampTooSmallException()
Expand Down
28 changes: 14 additions & 14 deletions src/KafkaNET.Library/Helper/KafkaSimpleManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,30 @@ public class KafkaSimpleManager<TKey, TData> : IDisposable

public KafkaSimpleManagerConfiguration Config { get; private set; }

private ConcurrentDictionary<string, object> TopicLockProduce = new ConcurrentDictionary<string, object>();
private ConcurrentDictionary<string, object> TopicPartitionLockConsume = new ConcurrentDictionary<string, object>();
private readonly ConcurrentDictionary<string, object> TopicLockProduce = new ConcurrentDictionary<string, object>();
private readonly ConcurrentDictionary<string, object> TopicPartitionLockConsume = new ConcurrentDictionary<string, object>();
//topic --> TopicMetadata
private ConcurrentDictionary<string, TopicMetadata> TopicMetadatas = new ConcurrentDictionary<string, TopicMetadata>();
private readonly ConcurrentDictionary<string, TopicMetadata> TopicMetadatas = new ConcurrentDictionary<string, TopicMetadata>();
//topic --> TopicMetadatasLastUpdateTime
private ConcurrentDictionary<string, DateTime> TopicMetadatasLastUpdateTime = new ConcurrentDictionary<string, DateTime>();
private readonly ConcurrentDictionary<string, DateTime> TopicMetadatasLastUpdateTime = new ConcurrentDictionary<string, DateTime>();
//topic --> partitionid -->
private ConcurrentDictionary<string, Dictionary<int, Tuple<Broker, BrokerConfiguration>>> TopicMetadataPartitionsLeaders = new ConcurrentDictionary<string, Dictionary<int, Tuple<Broker, BrokerConfiguration>>>();
private ConcurrentDictionary<string, ConcurrentDictionary<int, long>> TopicOffsetEarliest = new ConcurrentDictionary<string, ConcurrentDictionary<int, long>>();
private ConcurrentDictionary<string, ConcurrentDictionary<int, long>> TopicOffsetLatest = new ConcurrentDictionary<string, ConcurrentDictionary<int, long>>();
private object topicProducerLock = new object();
private readonly ConcurrentDictionary<string, Dictionary<int, Tuple<Broker, BrokerConfiguration>>> TopicMetadataPartitionsLeaders = new ConcurrentDictionary<string, Dictionary<int, Tuple<Broker, BrokerConfiguration>>>();
private readonly ConcurrentDictionary<string, ConcurrentDictionary<int, long>> TopicOffsetEarliest = new ConcurrentDictionary<string, ConcurrentDictionary<int, long>>();
private readonly ConcurrentDictionary<string, ConcurrentDictionary<int, long>> TopicOffsetLatest = new ConcurrentDictionary<string, ConcurrentDictionary<int, long>>();
private readonly object topicProducerLock = new object();
//topic --> partitionid -->
private ConcurrentDictionary<string, ConcurrentDictionary<int, Producer<TKey, TData>>> TopicPartitionsLeaderProducers = new ConcurrentDictionary<string, ConcurrentDictionary<int, Producer<TKey, TData>>>();
private ConcurrentDictionary<string, Producer<TKey, TData>> TopicProducersWithPartitionerClass = new ConcurrentDictionary<string, Producer<TKey, TData>>();
private readonly ConcurrentDictionary<string, ConcurrentDictionary<int, Producer<TKey, TData>>> TopicPartitionsLeaderProducers = new ConcurrentDictionary<string, ConcurrentDictionary<int, Producer<TKey, TData>>>();
private readonly ConcurrentDictionary<string, Producer<TKey, TData>> TopicProducersWithPartitionerClass = new ConcurrentDictionary<string, Producer<TKey, TData>>();
//topic --> partitionid -->
private ConcurrentDictionary<string, ConcurrentDictionary<int, Consumer>> TopicPartitionsLeaderConsumers = new ConcurrentDictionary<string, ConcurrentDictionary<int, Consumer>>();
private readonly ConcurrentDictionary<string, ConcurrentDictionary<int, Consumer>> TopicPartitionsLeaderConsumers = new ConcurrentDictionary<string, ConcurrentDictionary<int, Consumer>>();

#region SyncProducerPool for metadata.
private volatile bool disposed = false;
// the pool of syncProducer for TopicMetaData requests, which retrieve PartitionMetaData, including leaders and ISRs.
private volatile SyncProducerPool syncProducerPoolForMetaData = null;
private object syncProducerPoolForMetadataLock = new object();
private Random random = new Random();
private Random randomForGetCachedProducer = new Random();
private readonly object syncProducerPoolForMetadataLock = new object();
private readonly Random random = new Random();
private readonly Random randomForGetCachedProducer = new Random();
#endregion

public KafkaSimpleManager(KafkaSimpleManagerConfiguration config)
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaNET.Library/Messages/BufferedMessageSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ private bool MaybeComputeNext()

private MessageAndOffset MakeNextOuter()
{
if (topIterPosition >= this.Messages.Count())
if (!this.Messages.Skip(topIterPosition).Any())
{
return AllDone();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class BrokerPartitionInfo : IBrokerPartitionInfo

private readonly IDictionary<string, DateTime> topicPartitionInfoLastUpdateTime = new Dictionary<string, DateTime>();
private readonly object updateLock = new object();
private int topicMetaDataRefreshIntervalMS;
private readonly int topicMetaDataRefreshIntervalMS;
private readonly ZooKeeperClient zkClient;

public BrokerPartitionInfo(ISyncProducerPool syncProducerPool, IDictionary<string, TopicMetadata> cache, IDictionary<string, DateTime> lastUpdateTime, int topicMetaDataRefreshIntervalMS, ZooKeeperClient zkClient)
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaNET.Library/Producers/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class Producer<TKey, TData> : KafkaClientBase, IProducer<TKey, TData>
private readonly object shuttingDownLock = new object();
private readonly IDictionary<string, TopicMetadata> topicPartitionInfo = new Dictionary<string, TopicMetadata>();
private readonly IDictionary<string, DateTime> topicPartitionInfoLastUpdateTime = new Dictionary<string, DateTime>();
private SyncProducerPool syncProducerPool;
private readonly SyncProducerPool syncProducerPool;

public Producer(ICallbackHandler<TKey, TData> callbackHandler)
{
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaNET.Library/Producers/Sync/SyncProducerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public override string ToString()
if (Config.ZooKeeper != null)
sb.AppendFormat("\t Broker zookeeper: {0} \t", this.Config.ZooKeeper.ZkConnect);

sb.Append(string.Join(",", this.syncProducers.Select(r => string.Format("BrokerID:{0} syncProducerCount:{1} ", r.Key, r.Value.Producers.Count())).ToArray()));
sb.Append(string.Join(",", this.syncProducers.Select(r => string.Format("BrokerID:{0} syncProducerCount:{1} ", r.Key, r.Value.Producers.Count)).ToArray()));
return sb.ToString();
}
public void AddProducer(Broker broker)
Expand Down
4 changes: 2 additions & 2 deletions src/KafkaNET.Library/Utils/Crc32Hasher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ public class Crc32Hasher : HashAlgorithm
internal const UInt32 DefaultSeed = 0xffffffff;

private UInt32 hash;
private UInt32 seed;
private UInt32[] table;
private readonly UInt32 seed;
private readonly UInt32[] table;
private static UInt32[] defaultTable;

public Crc32Hasher()
Expand Down
3 changes: 1 addition & 2 deletions src/KafkaNET.Library/Utils/KafkaConsoleUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ public static void DumpDataToFile(bool dumpDataAsUTF8, bool dumpOriginalData, St
sw.Flush();
fs.Write(v.Message.Payload, 0, v.Message.Payload.Length);
fs.Flush();
sw.WriteLine("\r\n==Binary END==",
offsetBase + i, payload.Count, v.Message.Payload.Length);
sw.WriteLine("\r\n==Binary END==");
i++;
totalCountOriginal++;
if (totalCountOriginal >= count && count > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ internal class ZKRebalancerListener<TData> : IZooKeeperChildListener

public event EventHandler ConsumerRebalance;

private IDictionary<string, IList<string>> oldPartitionsPerTopicMap = new Dictionary<string, IList<string>>();
private IDictionary<string, IList<string>> oldConsumersPerTopicMap = new Dictionary<string, IList<string>>();
private IDictionary<string, IDictionary<int, PartitionTopicInfo>> topicRegistry;
private readonly IDictionary<string, IList<string>> oldPartitionsPerTopicMap = new Dictionary<string, IList<string>>();
private readonly IDictionary<string, IList<string>> oldConsumersPerTopicMap = new Dictionary<string, IList<string>>();
private readonly IDictionary<string, IDictionary<int, PartitionTopicInfo>> topicRegistry;
private readonly IDictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>> queues;
private readonly string consumerIdString;
private readonly object syncLock = new object();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ internal class ZkPartitionLeaderListener<TData> : IZooKeeperDataListener
{
public static ILog Logger { get { return LogManager.GetLogger(typeof(ZkPartitionLeaderListener<TData>)); } }

private ZKRebalancerListener<TData> _rebalancer;
private Dictionary<string, int> _partitionLeaderMap;
private readonly ZKRebalancerListener<TData> _rebalancer;
private readonly Dictionary<string, int> _partitionLeaderMap;

public ZkPartitionLeaderListener(ZKRebalancerListener<TData> rebalancer, Dictionary<string, int> partitionLeaderMap = null)
{
Expand Down
Loading