Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
# Conflicts:
#	KubeMQ.SDK.csharp/KubeMQ.SDK.csharp.csproj
  • Loading branch information
kubemq committed Apr 11, 2021
2 parents 0ca7a2c + 5444b2f commit a19d8a9
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 20 deletions.
4 changes: 2 additions & 2 deletions Examples/CommandQueryInitiator/CommandQueryInitiator.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.11.4" />
<PackageReference Include="Grpc.Core" Version="2.27.0" />
<PackageReference Include="Google.Protobuf" Version="3.14.0" />
<PackageReference Include="Grpc.Core" Version="2.34.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.11.0" />
</ItemGroup>
Expand Down
4 changes: 2 additions & 2 deletions Examples/CommandQueryResponder/CommandQueryResponder.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.11.4" />
<PackageReference Include="Grpc.Core" Version="2.27.0" />
<PackageReference Include="Google.Protobuf" Version="3.14.0" />
<PackageReference Include="Grpc.Core" Version="2.34.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.11.0" />
</ItemGroup>
Expand Down
4 changes: 2 additions & 2 deletions Examples/EventSender/EventSender.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.11.4" />
<PackageReference Include="Grpc.Core" Version="2.27.0" />
<PackageReference Include="Google.Protobuf" Version="3.14.0" />
<PackageReference Include="Grpc.Core" Version="2.34.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.11.0" />
</ItemGroup>
Expand Down
2 changes: 1 addition & 1 deletion Examples/EventSubscriber/EventSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private void HandleIncomingEvents(EventReceive @event)
string strMsg = string.Empty;
object body = KubeMQ.SDK.csharp.Tools.Converter.FromByteArray(@event.Body);

logger.LogInformation($"Subscriber Received Event: Metadata:'{@event.Metadata}', Channel:'{@event.Channel}', Body:'{strMsg}'");
Console.WriteLine($"Subscriber Received Event: Metadata:'{@event.Metadata}', Channel:'{@event.Channel}', Body:'{body}'");
}
}

Expand Down
4 changes: 2 additions & 2 deletions Examples/EventSubscriber/EventSubscriber.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.11.4" />
<PackageReference Include="Grpc.Core" Version="2.27.0" />
<PackageReference Include="Google.Protobuf" Version="3.14.0" />
<PackageReference Include="Grpc.Core" Version="2.34.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="System.Console" Version="4.3.1" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.11.0" />
Expand Down
15 changes: 9 additions & 6 deletions Examples/QueueLongPooling/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Threading;
using Grpc.Core;
using KubeMQ.SDK.csharp.Queue.KubemqQueueErrors;
using KubeMQ.SDK.csharp.Queue.Stream;

namespace QueueLongPolling
Expand All @@ -26,7 +27,7 @@ static void Main()
Console.WriteLine($"[DemoPoll] QueueName:{QueueName}");
Console.WriteLine($"[DemoPoll] KubeMQServerAddress:{KubeMQServerAddress}");

KubeMQ.SDK.csharp.Queue.Queue queue = CreatreQueue();
KubeMQ.SDK.csharp.Queue.Queue queue = CreateQueue();
if (queue == null)
{
Console.ReadLine();
Expand All @@ -42,7 +43,7 @@ static void Main()
if (queue==null)
{
Thread.Sleep(1000);
queue = CreatreQueue();
queue = CreateQueue();

continue;
}
Expand All @@ -56,11 +57,13 @@ static void Main()
Console.WriteLine($"[DemoPoll][Tran]Transaction ready and listening");
try
{
TransactionMessagesResponse ms = transaction.Receive((int)new TimeSpan(1, 0, 0).TotalSeconds, (int)new TimeSpan(1, 0, 0).TotalSeconds);
TransactionMessagesResponse ms = transaction.Receive((int)new TimeSpan(1, 0, 0).TotalSeconds, (int)new TimeSpan(0, 0, 5).TotalSeconds);

if (ms.IsError)
{
Console.WriteLine($"[DemoPoll][Tran]message dequeue error, error:{ms.Error}");
Console.WriteLine(ms.QueueErrors == KubemqQueueErrors.ErrNoNewMessageQueue
? $"DemoPoll][Tran]no new message found"
: $"[DemoPoll][Tran]message dequeue error, error:{ms.Error}");
continue;
}

Expand All @@ -70,7 +73,7 @@ static void Main()
{
Console.WriteLine($"[DemoPoll][Tran]RPC error, error:{ex.Message}");

queue = CreatreQueue();
queue = CreateQueue();
}
transaction.Close();
Thread.Sleep(1);
Expand Down Expand Up @@ -99,7 +102,7 @@ private static void HandleMSG(TransactionMessagesResponse ms, Transaction transa

}

private static KubeMQ.SDK.csharp.Queue.Queue CreatreQueue()
private static KubeMQ.SDK.csharp.Queue.Queue CreateQueue()
{
try
{
Expand Down
87 changes: 87 additions & 0 deletions KubeMQ.SDK.csharp/Queue/KubemqQueueErrors/KubemqErrors.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;

namespace KubeMQ.SDK.csharp.Queue.KubemqQueueErrors
{
public enum KubemqQueueErrors
{
ErrGeneralError = 1,
ErrInvalidQueueName = 120,
ErrRegisterQueueSubscription = 121,
ErrInvalidMaxMessages = 122,
ErrAckQueueMsg = 123,
ErrNoCurrentMsgToAck = 124,
ErrInvalidAckSeq = 125,
ErrNoCurrentMsgToSend = 126,
ErrInvalidVisibility = 127,
ErrSubscriptionIsActive = 128,
ErrVisibilityExpired = 129,
ErrSendingQueueMessage = 130,
ErrInvalidQueueMessage = 131,
ErrInvalidStreamRequestType = 132,
ErrInvalidExpiration = 133,
ErrInvalidMaxReceiveCount = 134,
ErrInvalidDelay = 135,
ErrInvalidWaitTimeout = 136,
ErrNoActiveMessageToReject = 137,
ErrNoNewMessageQueue = 138,
}

internal class KubemqQueueErrorConverter
{
internal static KubemqQueueErrors GetQueueError(string errorMsg)
{
switch (errorMsg)
{
case string a when a.Contains("120"):
return KubemqQueueErrors.ErrInvalidQueueName;
case string a when a.Contains("121"):
return KubemqQueueErrors.ErrRegisterQueueSubscription;
case string a when a.Contains("122"):
return KubemqQueueErrors.ErrInvalidMaxMessages;
case string a when a.Contains("123"):
return KubemqQueueErrors.ErrAckQueueMsg;
case string a when a.Contains("124"):
return KubemqQueueErrors.ErrNoCurrentMsgToAck;
case string a when a.Contains("125"):
return KubemqQueueErrors.ErrInvalidAckSeq;
case string a when a.Contains("126"):
return KubemqQueueErrors.ErrNoCurrentMsgToSend;
case string a when a.Contains("127"):
return KubemqQueueErrors.ErrInvalidVisibility;
case string a when a.Contains("128"):
return KubemqQueueErrors.ErrSubscriptionIsActive;
case string a when a.Contains("129"):
return KubemqQueueErrors.ErrVisibilityExpired;
case string a when a.Contains("130"):
return KubemqQueueErrors.ErrSendingQueueMessage;
case string a when a.Contains("131"):
return KubemqQueueErrors.ErrInvalidQueueMessage;
case string a when a.Contains("132"):
return KubemqQueueErrors.ErrInvalidStreamRequestType;
case string a when a.Contains("133"):
return KubemqQueueErrors.ErrInvalidExpiration;
case string a when a.Contains("134"):
return KubemqQueueErrors.ErrInvalidMaxReceiveCount;
case string a when a.Contains("135"):
return KubemqQueueErrors.ErrInvalidDelay;
case string a when a.Contains("136"):
return KubemqQueueErrors.ErrInvalidWaitTimeout;
case string a when a.Contains("137"):
return KubemqQueueErrors.ErrNoActiveMessageToReject;
case string a when a.Contains("138"):
return KubemqQueueErrors.ErrNoNewMessageQueue;
case null:
return KubemqQueueErrors.ErrGeneralError;
}
return KubemqQueueErrors.ErrGeneralError;
}
}



}


1 change: 0 additions & 1 deletion KubeMQ.SDK.csharp/Queue/Queue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ public ReceiveMessagesResponse ReceiveQueueMessages (int? maxNumberOfMessagesQue
MaxNumberOfMessages = maxNumberOfMessagesQueueMessages ?? MaxNumberOfMessagesQueueMessages,
WaitTimeSeconds = WaitTimeSecondsQueueMessages
},Metadata);

return new ReceiveMessagesResponse (rec);
}

Expand Down
19 changes: 19 additions & 0 deletions KubeMQ.SDK.csharp/Queue/ReceiveMessagesResponse.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using KubeMQ.Grpc;
using KubeMQ.SDK.csharp.Queue.KubemqQueueErrors;

namespace KubeMQ.SDK.csharp.Queue
{
Expand Down Expand Up @@ -37,6 +38,11 @@ public class ReceiveMessagesResponse
/// </summary>
public int MessagesReceived { get; }

/// <summary>
/// Queue error set internally
/// </summary>
public KubemqQueueErrors.KubemqQueueErrors QueueErrors { get; private set; }

internal ReceiveMessagesResponse(ReceiveQueueMessagesResponse receiveQueueMessagesResponse)
{
Error = receiveQueueMessagesResponse.Error;
Expand All @@ -46,7 +52,20 @@ internal ReceiveMessagesResponse(ReceiveQueueMessagesResponse receiveQueueMessag
MessagesExpired = receiveQueueMessagesResponse.MessagesExpired;
MessagesReceived = receiveQueueMessagesResponse.MessagesReceived;
RequestID = receiveQueueMessagesResponse.RequestID;
if (IsError)
{
SetQueueError(Error);
}
}

internal void SetQueueError(string errorMsg)
{
if (!string.IsNullOrEmpty(errorMsg))
{
QueueErrors = KubemqQueueErrorConverter.GetQueueError(errorMsg);
}
}



}
Expand Down
24 changes: 23 additions & 1 deletion KubeMQ.SDK.csharp/Queue/TransactionMessagesResponse.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using KubeMQ.Grpc;
using KubeMQ.SDK.csharp.Queue.KubemqQueueErrors;

namespace KubeMQ.SDK.csharp.Queue.Stream
{
Expand All @@ -23,7 +24,12 @@ public class TransactionMessagesResponse
/// The received Message
/// </summary>
public Message Message { get; }


/// <summary>
/// Queue error set internally
/// </summary>
public KubemqQueueErrors.KubemqQueueErrors QueueErrors { get; private set; }

/// <summary>
/// Request action: ReceiveMessage, AckMessage, RejectMessage, ModifyVisibility, ResendMessage, SendModifiedMessage, Unknown
/// </summary>
Expand All @@ -36,13 +42,29 @@ internal TransactionMessagesResponse(StreamQueueMessagesResponse streamQueueMess
Message = streamQueueMessagesResponse.Message!=null? new Message(streamQueueMessagesResponse.Message): null;
RequestID = streamQueueMessagesResponse.RequestID;
StreamRequestTypeData = streamQueueMessagesResponse.StreamRequestTypeData;
if (IsError)
{
SetQueueError(Error);
}
}
internal TransactionMessagesResponse(string errorMessage, Message msg=null, string requestID=null)
{
IsError = true;
Error = errorMessage;
Message = msg;
RequestID = requestID;
if (IsError)
{
SetQueueError(Error);
}
}

internal void SetQueueError(string errorMsg)
{
if (!string.IsNullOrEmpty(errorMsg))
{
QueueErrors = KubemqQueueErrorConverter.GetQueueError(errorMsg);
}
}

}
Expand Down
47 changes: 45 additions & 2 deletions KubeMQ.SDK.csharp/Tools/Converter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using KubeMQ.SDK.csharp.Queue.KubemqQueueErrors;

namespace KubeMQ.SDK.csharp.Tools
{
Expand All @@ -14,6 +16,7 @@ namespace KubeMQ.SDK.csharp.Tools
/// </summary>
public class Converter
{

/// <summary>
/// Convert from string to byte array
/// </summary>
Expand All @@ -40,9 +43,12 @@ internal static ByteString ToByteString(byte[] byteArray)
return ByteString.CopyFrom(byteArray);
}


/// <summary>
/// Convert from byte array to object
/// Convert from byte array to object in BinaryFormatter
/// </summary>
/// <param name="data">Byte Array</param>
/// <returns>object</returns>
public static object FromByteArray(byte[] data)
{
if (data == null || data.Length == 0)
Expand All @@ -55,9 +61,29 @@ public static object FromByteArray(byte[] data)
}
}


/// <summary>
/// Convert from object to byte array
/// Convert from object to byte array in selected IFormatter
/// </summary>
/// <param name="data">Byte Array</param>
/// <param name="formatter">IFormatter type</param>
/// <returns>byte array</returns>
public static object FromByteArray(byte[] data, IFormatter formatter)
{
if (data == null || data.Length == 0)
return null;
using (MemoryStream ms = new MemoryStream(data))
{
object obj = formatter.Deserialize(ms);
return obj;
}
}

/// <summary>
/// Convert from object to byte array in BinaryFormatter
/// </summary>
/// <param name="obj">Object to format </param>
/// <returns>ByteArray</returns>
public static byte[] ToByteArray(object obj)
{
if (obj == null)
Expand All @@ -70,6 +96,23 @@ public static byte[] ToByteArray(object obj)
}
}

/// <summary>
/// Convert from object to byte array in selected IFormatter
/// </summary>
/// <param name="obj">Object to return as bytearray </param>
/// <param name="formatter">IFormatter type</param>
/// <returns>ByteArray</returns>
public static byte[] ToByteArray(object obj, IFormatter formatter)
{
if (obj == null)
return null;
using (MemoryStream ms = new MemoryStream())
{
formatter.Serialize(ms, obj);
return ms.ToArray();
}
}

public static DateTime FromUnixTime(long UnixTime)
{
double UnixTimeDbl = UnixTime;
Expand Down
Loading

0 comments on commit a19d8a9

Please sign in to comment.