Skip to content

Commit 31ebc3e

Browse files
author
Mike Bridge
committed
Added console demo
1 parent 180435e commit 31ebc3e

File tree

11 files changed

+370
-7
lines changed

11 files changed

+370
-7
lines changed

Kafka.Rx.NET.Console/App.config

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<configuration>
3+
<startup>
4+
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" />
5+
</startup>
6+
</configuration>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<Project ToolsVersion="12.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
3+
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
4+
<PropertyGroup>
5+
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
6+
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
7+
<ProjectGuid>{D54766AF-AD83-4ED4-A6AC-AA05F0BCFEB8}</ProjectGuid>
8+
<OutputType>Exe</OutputType>
9+
<AppDesignerFolder>Properties</AppDesignerFolder>
10+
<RootNamespace>Kafka.Rx.NET.Console</RootNamespace>
11+
<AssemblyName>Kafka.Rx.NET.Console</AssemblyName>
12+
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
13+
<FileAlignment>512</FileAlignment>
14+
</PropertyGroup>
15+
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
16+
<PlatformTarget>AnyCPU</PlatformTarget>
17+
<DebugSymbols>true</DebugSymbols>
18+
<DebugType>full</DebugType>
19+
<Optimize>false</Optimize>
20+
<OutputPath>bin\Debug\</OutputPath>
21+
<DefineConstants>DEBUG;TRACE</DefineConstants>
22+
<ErrorReport>prompt</ErrorReport>
23+
<WarningLevel>4</WarningLevel>
24+
</PropertyGroup>
25+
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
26+
<PlatformTarget>AnyCPU</PlatformTarget>
27+
<DebugType>pdbonly</DebugType>
28+
<Optimize>true</Optimize>
29+
<OutputPath>bin\Release\</OutputPath>
30+
<DefineConstants>TRACE</DefineConstants>
31+
<ErrorReport>prompt</ErrorReport>
32+
<WarningLevel>4</WarningLevel>
33+
</PropertyGroup>
34+
<ItemGroup>
35+
<Reference Include="CommandLine, Version=1.9.71.2, Culture=neutral, PublicKeyToken=de6f01bd326f8c32, processorArchitecture=MSIL">
36+
<HintPath>..\packages\CommandLineParser.1.9.71\lib\net45\CommandLine.dll</HintPath>
37+
<Private>True</Private>
38+
</Reference>
39+
<Reference Include="Confluent.RestClient, Version=1.0.0.23, Culture=neutral, processorArchitecture=MSIL">
40+
<HintPath>..\packages\Confluent.RestClient.1.0.0.23\lib\net45\Confluent.RestClient.dll</HintPath>
41+
<Private>True</Private>
42+
</Reference>
43+
<Reference Include="Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
44+
<HintPath>..\packages\Newtonsoft.Json.6.0.4\lib\net45\Newtonsoft.Json.dll</HintPath>
45+
<Private>True</Private>
46+
</Reference>
47+
<Reference Include="System" />
48+
<Reference Include="System.Core" />
49+
<Reference Include="System.Net.Http" />
50+
<Reference Include="System.Net.Http.Formatting, Version=5.2.3.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
51+
<HintPath>..\packages\Microsoft.AspNet.WebApi.Client.5.2.3\lib\net45\System.Net.Http.Formatting.dll</HintPath>
52+
<Private>True</Private>
53+
</Reference>
54+
<Reference Include="System.Reactive.Core, Version=2.2.5.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
55+
<HintPath>..\packages\Rx-Core.2.2.5\lib\net45\System.Reactive.Core.dll</HintPath>
56+
<Private>True</Private>
57+
</Reference>
58+
<Reference Include="System.Reactive.Interfaces, Version=2.2.5.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
59+
<HintPath>..\packages\Rx-Interfaces.2.2.5\lib\net45\System.Reactive.Interfaces.dll</HintPath>
60+
<Private>True</Private>
61+
</Reference>
62+
<Reference Include="System.Reactive.Linq, Version=2.2.5.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
63+
<HintPath>..\packages\Rx-Linq.2.2.5\lib\net45\System.Reactive.Linq.dll</HintPath>
64+
<Private>True</Private>
65+
</Reference>
66+
<Reference Include="System.Reactive.PlatformServices, Version=2.2.5.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">
67+
<HintPath>..\packages\Rx-PlatformServices.2.2.5\lib\net45\System.Reactive.PlatformServices.dll</HintPath>
68+
<Private>True</Private>
69+
</Reference>
70+
<Reference Include="System.Runtime.Serialization" />
71+
<Reference Include="System.Xml.Linq" />
72+
<Reference Include="System.Data.DataSetExtensions" />
73+
<Reference Include="Microsoft.CSharp" />
74+
<Reference Include="System.Data" />
75+
<Reference Include="System.Xml" />
76+
</ItemGroup>
77+
<ItemGroup>
78+
<Compile Include="LogMessage.cs" />
79+
<Compile Include="Options.cs" />
80+
<Compile Include="Program.cs" />
81+
<Compile Include="Properties\AssemblyInfo.cs" />
82+
<Compile Include="Setup.cs" />
83+
</ItemGroup>
84+
<ItemGroup>
85+
<None Include="App.config" />
86+
<None Include="packages.config" />
87+
</ItemGroup>
88+
<ItemGroup>
89+
<ProjectReference Include="..\Kafka.Rx.NET\Kafka.Rx.NET.csproj">
90+
<Project>{e0c34a1c-4ae8-46bf-a02e-060f3fb8cddf}</Project>
91+
<Name>Kafka.Rx.NET</Name>
92+
</ProjectReference>
93+
</ItemGroup>
94+
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
95+
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
96+
Other similar extension points exist, see Microsoft.Common.targets.
97+
<Target Name="BeforeBuild">
98+
</Target>
99+
<Target Name="AfterBuild">
100+
</Target>
101+
-->
102+
</Project>

Kafka.Rx.NET.Console/LogMessage.cs

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using System;
2+
using System.Runtime.Serialization;
3+
4+
namespace Kafka.Rx.NET.Console
5+
{
6+
[DataContract]
7+
public class LogMessage
8+
{
9+
[DataMember(Name = "message")]
10+
public String Message { get; set; }
11+
12+
[DataMember(Name = "stacktrace")]
13+
public String StackTrace { get; set; }
14+
}
15+
}

Kafka.Rx.NET.Console/Options.cs

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using System;
2+
using CommandLine;
3+
using CommandLine.Text;
4+
5+
namespace Kafka.Rx.NET.Console
6+
{
7+
public class Options
8+
{
9+
10+
[Option('u', "baseurl", Required = true, HelpText = "Kafka Base Url.")]
11+
public string BaseUrl { get; set; }
12+
13+
[Option('i', "instanceid", Required = true, HelpText = "The Instance Id in the Consumer Group")]
14+
public string InstanceId { get; set; }
15+
16+
[Option('g', "consumergroup", Required = true, HelpText = "The Consumer Group")]
17+
public string ConsumerGroup { get; set; }
18+
19+
[Option('t', "topic", Required = true, HelpText = "The Topic")]
20+
public string Topic { get; set; }
21+
22+
[Option('s', "sleep", HelpText = "Sleep time between api calls in milliseconds", DefaultValue = 5000)]
23+
public int Sleep { get; set; }
24+
25+
[ParserState]
26+
public IParserState LastParserState { get; set; }
27+
28+
[HelpOption]
29+
public string GetUsage()
30+
{
31+
return HelpText.AutoBuild(this, current => HelpText.DefaultParsingErrorsHandler(this, current));
32+
}
33+
34+
}
35+
}

Kafka.Rx.NET.Console/Program.cs

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Diagnostics;
4+
using System.Linq;
5+
using System.Reactive.Concurrency;
6+
using System.Reactive.Linq;
7+
using System.Text;
8+
using System.Threading;
9+
using System.Threading.Tasks;
10+
using Confluent.RestClient.Model;
11+
12+
namespace Kafka.Rx.NET.Console
13+
{
14+
public class Program
15+
{
16+
static void Main(string[] args)
17+
{
18+
var options = new Options();
19+
20+
if (CommandLine.Parser.Default.ParseArguments(args, options))
21+
{
22+
System.Console.WriteLine("Listening to " + options.BaseUrl);
23+
Listen(options);
24+
25+
}
26+
else
27+
{
28+
System.Console.WriteLine(options.GetUsage());
29+
}
30+
}
31+
32+
private static void Listen(Options options)
33+
{
34+
35+
// Arrange
36+
String instanceId = options.InstanceId;
37+
string consumerGroupName = options.ConsumerGroup;
38+
string topic = options.Topic;
39+
ConsumerInstance consumerInstance = null;
40+
41+
System.Console.WriteLine("Registering group/id: " + options.ConsumerGroup + "/" + options.InstanceId);
42+
System.Console.WriteLine("Listening to topic: " + topic);
43+
System.Console.WriteLine("Query interval: " + options.Sleep+"ms");
44+
using (var client = Setup.CreateConfluentClient(options.BaseUrl))
45+
{
46+
try
47+
{
48+
// in production this should be written without blocking.
49+
consumerInstance = Setup.ConsumerInstance(client, instanceId, consumerGroupName);
50+
var consumer = new RxConsumer(client, consumerInstance, topic);
51+
52+
// Act
53+
var observable = consumer.GetRecordStream<String, LogMessage>(
54+
TimeSpan.FromMilliseconds(options.Sleep), ThreadPoolScheduler.Instance)
55+
//.Take(10)
56+
.Subscribe(successResult =>
57+
{
58+
59+
System.Console.WriteLine("Success: " + successResult.IsSuccess);
60+
if (successResult.IsSuccess)
61+
{
62+
System.Console.WriteLine(successResult.Value.Key + "=" +
63+
successResult.Value.Value.Message);
64+
}
65+
else
66+
{
67+
System.Console.WriteLine("ERROR: " + successResult.Exception.Message);
68+
}
69+
},
70+
() => System.Console.WriteLine("COMPLETED.") // not sure how to cancel this...
71+
);
72+
73+
//Thread.Sleep(15000);
74+
System.Console.ReadLine();
75+
System.Console.WriteLine("Disposing observer");
76+
observable.Dispose();
77+
}
78+
catch (Exception ex)
79+
{
80+
System.Console.WriteLine(ex.Message);
81+
if (ex.InnerException != null)
82+
{
83+
System.Console.WriteLine(ex.InnerException.Message);
84+
}
85+
}
86+
finally
87+
{
88+
if (consumerInstance != null)
89+
{
90+
System.Console.WriteLine("Destroying instance");
91+
client.DeleteConsumerAsync(consumerInstance);
92+
}
93+
}
94+
}
95+
}
96+
97+
}
98+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using System.Reflection;
2+
using System.Runtime.CompilerServices;
3+
using System.Runtime.InteropServices;
4+
5+
// General Information about an assembly is controlled through the following
6+
// set of attributes. Change these attribute values to modify the information
7+
// associated with an assembly.
8+
[assembly: AssemblyTitle("Kafka.Rx.NET.Console")]
9+
[assembly: AssemblyDescription("")]
10+
[assembly: AssemblyConfiguration("")]
11+
[assembly: AssemblyCompany("")]
12+
[assembly: AssemblyProduct("Kafka.Rx.NET.Console")]
13+
[assembly: AssemblyCopyright("Copyright © 2015")]
14+
[assembly: AssemblyTrademark("")]
15+
[assembly: AssemblyCulture("")]
16+
17+
// Setting ComVisible to false makes the types in this assembly not visible
18+
// to COM components. If you need to access a type in this assembly from
19+
// COM, set the ComVisible attribute to true on that type.
20+
[assembly: ComVisible(false)]
21+
22+
// The following GUID is for the ID of the typelib if this project is exposed to COM
23+
[assembly: Guid("f6800a2b-a801-4d06-b4d5-b0badcb8af8d")]
24+
25+
// Version information for an assembly consists of the following four values:
26+
//
27+
// Major Version
28+
// Minor Version
29+
// Build Number
30+
// Revision
31+
//
32+
// You can specify all the values or you can default the Build and Revision Numbers
33+
// by using the '*' as shown below:
34+
// [assembly: AssemblyVersion("1.0.*")]
35+
[assembly: AssemblyVersion("1.0.0.0")]
36+
[assembly: AssemblyFileVersion("1.0.0.0")]

Kafka.Rx.NET.Console/Setup.cs

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Confluent.RestClient;
4+
using Confluent.RestClient.Model;
5+
6+
namespace Kafka.Rx.NET.Console
7+
{
8+
public static class Setup
9+
{
10+
11+
public class KafkaClientSettings : IConfluentClientSettings
12+
{
13+
public string KafkaBaseUrl { get; set; }
14+
}
15+
16+
public static IConfluentClient CreateConfluentClient(String baseUrl)
17+
{
18+
var settings = new KafkaClientSettings
19+
{
20+
KafkaBaseUrl = baseUrl
21+
};
22+
IConfluentClient client = new ConfluentClient(settings);
23+
return client;
24+
}
25+
26+
public static async Task<ConfluentResponse<ConsumerInstance>> CreateConsumerInstance(IConfluentClient client,
27+
string instanceId, string consumerGroupName)
28+
{
29+
30+
var request = new CreateConsumerRequest
31+
{
32+
AutoCommitEnabled = true,
33+
InstanceId = instanceId,
34+
MessageFormat = MessageFormat.Avro
35+
};
36+
37+
return await client.CreateConsumerAsync(consumerGroupName, request);
38+
}
39+
40+
public static ConsumerInstance ConsumerInstance(IConfluentClient client, string instanceId, string consumerGroupName)
41+
{
42+
var consumerInstanceTask = Setup.CreateConsumerInstance(client, instanceId, consumerGroupName).Result;
43+
44+
if (!consumerInstanceTask.IsSuccess())
45+
{
46+
throw new ApplicationException("Error " + consumerInstanceTask.Error.ErrorCode + ": " +
47+
consumerInstanceTask.Error.Message);
48+
}
49+
var consumerInstance = consumerInstanceTask.Payload;
50+
return consumerInstance;
51+
}
52+
}
53+
}

Kafka.Rx.NET.Console/packages.config

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<packages>
3+
<package id="CommandLineParser" version="1.9.71" targetFramework="net45" />
4+
<package id="Confluent.RestClient" version="1.0.0.23" targetFramework="net45" />
5+
<package id="Microsoft.AspNet.WebApi.Client" version="5.2.3" targetFramework="net45" />
6+
<package id="Newtonsoft.Json" version="6.0.4" targetFramework="net45" />
7+
<package id="Rx-Core" version="2.2.5" targetFramework="net45" />
8+
<package id="Rx-Interfaces" version="2.2.5" targetFramework="net45" />
9+
<package id="Rx-Linq" version="2.2.5" targetFramework="net45" />
10+
<package id="Rx-Main" version="2.2.5" targetFramework="net45" />
11+
<package id="Rx-PlatformServices" version="2.2.5" targetFramework="net45" />
12+
</packages>

Kafka.Rx.NET.Tests/RxConsumerIntegrationTest.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
namespace Kafka.Rx.NET.Tests
1010
{
1111
[TestFixture]
12-
[Ignore("Requires a live Kafka server")]
12+
//[Ignore("Requires a live Kafka server")]
1313
public class RxConsumerIntegrationTest
1414
{
1515
/// <summary>
@@ -25,7 +25,8 @@ public void KafkaObservable_Should_Provide_An_Observable_Stream()
2525
string topic = "rxtest";
2626
ConsumerInstance consumerInstance = null;
2727

28-
using (var client = Setup.CreateConfluentClient("http://192.168.79.137:8082"))
28+
using (var client = Setup.CreateConfluentClient("http://199.204.211.42:8082"))
29+
//using (var client = Setup.CreateConfluentClient("http://192.168.79.137:8082"))
2930
{
3031
try
3132
{

0 commit comments

Comments
 (0)