Skip to content

Commit 7f5fdc1

Browse files
Added option to read messages from syncreply (#740)
* Add a new single queue sync messaging * Startet på MessageListener * Ser ut til at ting kommer inn på SynchronousReplyListener som ventet nå. Tar også med lokale hacks for å kopiere dll. Skal bort før merge * Ryddet opp i ny listener som er testet i featuremiljø * Lagt til tester og uppet versjonsnummer * ryddet bort kopiering av DLLs * Mergefeil * Remove dlls * fix test --------- Co-authored-by: Martin Tysdahl Gedde-Dahl <[email protected]>
1 parent 5dda1cd commit 7f5fdc1

26 files changed

+509
-41
lines changed

Directory.build.props

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project>
22
<PropertyGroup>
33
<LangVersion>10.0</LangVersion>
4-
<Version>5.2.1</Version>
4+
<Version>6.0.0</Version>
55
</PropertyGroup>
66
<PropertyGroup>
77
<Authors>Norsk Helsenett SF</Authors>

Documentation/MigrateFrom5To6.md

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
## Migrere fra 5.0 til 6.0
2+
3+
Hovedendringen for 6.0 er utvidelse av IMessagingClient og IMessagingServer for å kunne kontrollere synkron meldingsutveksling utenfor Helsenorge.Messaging. Dette er spesielt implementert for et konkret behov for bruk på helsenorge.no.
4+
For brukere som ikke er avhengig av å kunne kontrollere syncreply-køen, så kan det oppgraderes til 6.0 uten å gjøre noen konfigurasjonsendringer.
5+
6+
Alle endringer er dokumentert i dette migreringsdokumentet.
7+
8+
`IMessagingClient` er utvidet med `SendWithoutWaitingAsync` for å kunne sende en melding til en sync-kø. Hvis denne brukes tar man selv ansvar for å hente ned svaret fra
9+
køen spesifisert i `MessagingSettings:AmqpSettings:Synchronous:StaticReplyQueue`
10+
Hvis man ønsker å bruke den allerede eksisterende `SendAndWaitAsync` metoden, vil denne fortsatt fungere som før og trenger ikke populere opp den nye verdien.
11+
12+
### Konfigurasjonsrelaterte endringer
13+
14+
`MessagingSettings:AmqpSettings:Synchronous:StaticReplyQueue` er lagt til for å kunne spesifisere en svar-kø for synkron meldingsutveksling. Denne er ikke påkrevd
15+
16+
### Nye features
17+
18+
`IMessagingClient` er utvidet med `SendWithoutWaitingAsync`
19+
IMessagingServer er utvidet med muligheten for å lytte på syncreply-køen spesifisert i `MessagingSettings:AmqpSettings:Synchronous:StaticReplyQueue`

Messaging.sln

+3-2
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ EndProject
2525
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{FB9DDD90-D5A5-4F25-8C81-4E6C9B4A18EE}"
2626
ProjectSection(SolutionItems) = preProject
2727
.github\workflows\build.yml = .github\workflows\build.yml
28-
.github\workflows\nuget-deploy.yml = .github\workflows\nuget-deploy.yml
2928
.github\workflows\codeql-analysis.yml = .github\workflows\codeql-analysis.yml
29+
.github\workflows\nuget-deploy.yml = .github\workflows\nuget-deploy.yml
3030
EndProjectSection
3131
EndProject
3232
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "SolutionItems", "SolutionItems", "{C78E1F1C-6AE6-4E64-8211-90EEC7A5FCAD}"
3333
ProjectSection(SolutionItems) = preProject
34-
readme.md = readme.md
3534
LICENSE = LICENSE
35+
readme.md = readme.md
3636
EndProjectSection
3737
EndProject
3838
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Documentation", "Documentation", "{D65E8495-DAAD-46D6-8053-8D911A5C24F3}"
@@ -47,6 +47,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Documentation", "Documentat
4747
Documentation\KrypteringOgSignering.md = Documentation\KrypteringOgSignering.md
4848
Documentation\MigrateFrom3To4.md = Documentation\MigrateFrom3To4.md
4949
Documentation\MigrateFrom4To5.md = Documentation\MigrateFrom4To5.md
50+
Documentation\MigrateFrom5To6.md = Documentation\MigrateFrom5To6.md
5051
Documentation\MottaMeldinger.md = Documentation\MottaMeldinger.md
5152
Documentation\OppdaterProxyKlasser.md = Documentation\OppdaterProxyKlasser.md
5253
Documentation\Registre.md = Documentation\Registre.md

readme.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ Dette biblioteket støtter oppunder basisbehovet for en kommunikasjonsløsning v
2121
12. [Biztalk](Documentation/Biztalk.md "Biztalk")
2222
13. [Migrere fra 3.0 til 4.0](Documentation/MigrateFrom3To4.md)
2323
14. [Migrere fra 4.0 til 5.0](Documentation/MigrateFrom4To5.md)
24-
14. [Migrere fra SOAP til REST for 5.2 og senere versjoner](Documentation/MigrateFromSoapToRest.md)
24+
15. [Migrere fra SOAP til REST for 5.2 og senere versjoner](Documentation/MigrateFromSoapToRest.md)
25+
15. [Migrere fra 5.0 til 6.0](Documentation/MigrateFrom5To6.md)
2526

2627

2728
## Contributing

src/Helsenorge.Messaging/Abstractions/IMessagingClient.cs

+8-2
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,8 @@
66
* available at https://raw.githubusercontent.com/helsenorge/Helsenorge.Messaging/master/LICENSE
77
*/
88

9-
using System;
109
using System.Threading.Tasks;
1110
using System.Xml.Linq;
12-
using Microsoft.Extensions.Logging;
1311

1412
namespace Helsenorge.Messaging.Abstractions
1513
{
@@ -32,6 +30,14 @@ public interface IMessagingClient
3230
/// <returns></returns>
3331
Task<XDocument> SendAndWaitAsync(OutgoingMessage message);
3432

33+
/// <summary>
34+
/// Send a message without waiting for a reply (synchronous messaging)
35+
/// </summary>
36+
/// <param name="message">Details about the message being sent</param>
37+
/// <param name="correlationId">The correlation id to use when sending the message. Only relevant in synchronous messaging</param>
38+
/// <returns></returns>
39+
Task SendWithoutWaitingAsync(OutgoingMessage message, string correlationId = null);
40+
3541
/// <summary>
3642
/// Closes down links, sessions and connections.
3743
/// </summary>

src/Helsenorge.Messaging/Abstractions/IMessagingNotification.cs

+21-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,27 @@ public interface IMessagingNotification
7070
/// The client can do any necessary set up it needs, like setting up CorrelationIds, etc.
7171
/// </summary>
7272
/// <param name="message">The actual message, contains the payload in addition to metadata.</param>
73-
Task NotifySynchronousMessageReceivedStartingAsync(IncomingMessage message);
73+
Task NotifySynchronousMessageReceivedStartingAsync(IncomingMessage message);
74+
75+
/// <summary>
76+
/// Called when the syncreply message is ready for processing.
77+
/// </summary>
78+
/// <param name="message">The actual message, contains the payload in addition to metadata.</param>
79+
Task NotifySynchronousReplyMessageReceivedAsync(IncomingMessage message);
80+
81+
/// <summary>
82+
/// Called to notify that processing of an syncreply message has started. The client can do any necessary
83+
/// set up it needs, like setting up CorrelationIds, etc.
84+
/// </summary>
85+
/// <param name="listener">Reference to the listener invoking the callback.</param>
86+
/// <param name="message">The actual message, contains the payload in addition to metadata.</param>
87+
Task NotifySynchronousReplyMessageReceivedStartingAsync(MessageListener listener, IncomingMessage message);
88+
89+
/// <summary>
90+
/// Called when the syncreply message has been successfully processed.
91+
/// </summary>
92+
/// <param name="message">The actual message, contains the payload in addition to metadata.</param>
93+
Task NotifySynchronousReplyMessageReceivedCompletedAsync(IncomingMessage message);
7494

7595
/// <summary>
7696
/// Called to notifiy an unhandled exception has occurred. The client need to assert what it needs to do.

src/Helsenorge.Messaging/Abstractions/IMessagingServer.cs

+52-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
/*
1+
/*
22
* Copyright (c) 2020-2023, Norsk Helsenett SF and contributors
33
* See the file CONTRIBUTORS for details.
4-
*
4+
*
55
* This file is licensed under the MIT license
66
* available at https://raw.githubusercontent.com/helsenorge/Helsenorge.Messaging/master/LICENSE
77
*/
@@ -22,6 +22,7 @@ public interface IMessagingServer
2222
/// Start message processing
2323
/// </summary>
2424
Task StartAsync();
25+
2526
/// <summary>
2627
/// Terminate message processing
2728
/// </summary>
@@ -38,17 +39,20 @@ public interface IMessagingServer
3839
/// </summary>
3940
/// <param name="action">The delegate that should be called</param>
4041
void RegisterAsynchronousMessageReceivedStartingCallback(Action<MessageListener, IncomingMessage> action);
42+
4143
/// <summary>
4244
/// Registers a delegate that should be called as we start processing a message
4345
/// </summary>
4446
/// <param name="action">The delegate that should be called</param>
45-
void RegisterAsynchronousMessageReceivedStartingCallbackAsync(Func<MessageListener, IncomingMessage, Task> action);
47+
void RegisterAsynchronousMessageReceivedStartingCallbackAsync(
48+
Func<MessageListener, IncomingMessage, Task> action);
4649

4750
/// <summary>
4851
/// Registers a delegate that should be called when we have enough information to process the message. This is where the main processing logic hooks in.
4952
/// </summary>
5053
/// <param name="action">The delegate that should be called</param>
5154
void RegisterAsynchronousMessageReceivedCallback(Action<IncomingMessage> action);
55+
5256
/// <summary>
5357
/// Registers a delegate that should be called when we have enough information to process the message. This is where the main processing logic hooks in.
5458
/// </summary>
@@ -60,6 +64,7 @@ public interface IMessagingServer
6064
/// </summary>
6165
/// <param name="action">The delegate that should be called</param>
6266
void RegisterAsynchronousMessageReceivedCompletedCallback(Action<IncomingMessage> action);
67+
6368
/// <summary>
6469
/// Registers a delegate that should be called when we are finished processing the message.
6570
/// </summary>
@@ -71,6 +76,7 @@ public interface IMessagingServer
7176
/// </summary>
7277
/// <param name="action">The delegate that should be called</param>
7378
void RegisterSynchronousMessageReceivedStartingCallback(Action<IncomingMessage> action);
79+
7480
/// <summary>
7581
/// Registers a delegate that should be called as we start processing a message
7682
/// </summary>
@@ -82,6 +88,7 @@ public interface IMessagingServer
8288
/// </summary>
8389
/// <param name="action">The delegate that should be called</param>
8490
void RegisterSynchronousMessageReceivedCallback(Func<IncomingMessage, XDocument> action);
91+
8592
/// <summary>
8693
/// Registers a delegate that should be called when we have enough information to process the message. This is where the main processing logic hooks in.
8794
/// </summary>
@@ -93,17 +100,56 @@ public interface IMessagingServer
93100
/// </summary>
94101
/// <param name="action">The delegate that should be called</param>
95102
void RegisterSynchronousMessageReceivedCompletedCallback(Action<IncomingMessage> action);
103+
96104
/// <summary>
97105
/// Registers a delegate that should be called when we are finished processing the message.
98106
/// </summary>
99107
/// <param name="action">The delegate that should be called</param>
100108
void RegisterSynchronousMessageReceivedCompletedCallbackAsync(Func<IncomingMessage, Task> action);
101109

110+
/// <summary>
111+
/// Registers a delegate that should be called as we start processing a message
112+
/// </summary>
113+
/// <param name="action">The delegate that should be called</param>
114+
void RegisterSynchronousReplyMessageReceivedStartingCallback(Action<MessageListener, IncomingMessage> action);
115+
116+
/// <summary>
117+
/// Registers a delegate that should be called as we start processing a message
118+
/// </summary>
119+
/// <param name="action">The delegate that should be called</param>
120+
void RegisterSynchronousReplyMessageReceivedStartingCallbackAsync(
121+
Func<MessageListener, IncomingMessage, Task> action);
122+
123+
/// <summary>
124+
/// Registers a delegate that should be called when we have enough information to process the message. This is where the main processing logic hooks in.
125+
/// </summary>
126+
/// <param name="action">The delegate that should be called</param>
127+
void RegisterSynchronousReplyMessageReceivedCallback(Action<IncomingMessage> action);
128+
129+
/// <summary>
130+
/// Registers a delegate that should be called when we have enough information to process the message. This is where the main processing logic hooks in.
131+
/// </summary>
132+
/// <param name="action">The delegate that should be called</param>
133+
void RegisterSynchronousReplyMessageReceivedCallbackAsync(Func<IncomingMessage, Task> action);
134+
135+
/// <summary>
136+
/// Registers a delegate that should be called when we are finished processing the message.
137+
/// </summary>
138+
/// <param name="action">The delegate that should be called</param>
139+
void RegisterSynchronousReplyMessageReceivedCompletedCallback(Action<IncomingMessage> action);
140+
141+
/// <summary>
142+
/// Registers a delegate that should be called when we are finished processing the message.
143+
/// </summary>
144+
/// <param name="action">The delegate that should be called</param>
145+
void RegisterSynchronousReplyMessageReceivedCompletedCallbackAsync(Func<IncomingMessage, Task> action);
146+
102147
/// <summary>
103148
/// Registers a delegate that should be called when we receive an error message
104149
/// </summary>
105150
/// <param name="action">The delegate that should be called</param>
106151
void RegisterErrorMessageReceivedCallback(Action<IAmqpMessage> action);
152+
107153
/// <summary>
108154
/// Registers a delegate that should be called when we receive an error message
109155
/// </summary>
@@ -115,6 +161,7 @@ public interface IMessagingServer
115161
/// </summary>
116162
/// <param name="action">The delegate that should be called</param>
117163
void RegisterHandledExceptionCallback(Action<IAmqpMessage, Exception> action);
164+
118165
/// <summary>
119166
/// Registers a delegate that should be called when we have an handled exception
120167
/// </summary>
@@ -126,10 +173,11 @@ public interface IMessagingServer
126173
/// </summary>
127174
/// <param name="action">The delegate that should be called</param>
128175
void RegisterUnhandledExceptionCallback(Action<IAmqpMessage, Exception> action);
176+
129177
/// <summary>
130178
/// Registers a delegate that should be called when we have an unhandled exception
131179
/// </summary>
132180
/// <param name="action">The delegate that should be called</param>
133181
void RegisterUnhandledExceptionCallbackAsync(Func<IAmqpMessage, Exception, Task> action);
134182
}
135-
}
183+
}

src/Helsenorge.Messaging/Abstractions/MessageProtection.cs

-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
using System;
1010
using System.IO;
1111
using System.Security.Cryptography.X509Certificates;
12-
using System.Xml.Linq;
1312

1413
namespace Helsenorge.Messaging.Abstractions
1514
{

src/Helsenorge.Messaging/Abstractions/MessagingException.cs

-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
using System;
1010
using System.Diagnostics.CodeAnalysis;
11-
using System.Runtime.Serialization;
1211
using Microsoft.Extensions.Logging;
1312

1413
namespace Helsenorge.Messaging.Abstractions

src/Helsenorge.Messaging/Abstractions/OutgoingMessage.cs

-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
* available at https://raw.githubusercontent.com/helsenorge/Helsenorge.Messaging/master/LICENSE
77
*/
88

9-
using System;
109
using System.Xml.Linq;
1110

1211
namespace Helsenorge.Messaging.Abstractions

src/Helsenorge.Messaging/Amqp/AmqpCore.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ private void ExtractAndAddMetaDataPropertiesIfEnabled(IAmqpMessage amqpMessage,
260260
amqpMessage.SetApplicationPropertyValue(metaDataProperty.Key, metaDataProperty.Value);
261261
}
262262
sw.Stop();
263-
logger.LogInformation($"After-AddingPayloadMetadata: {amqpMessage.MessageFunction} FromHerId: {amqpMessage.FromHerId} ToHerId: {amqpMessage.ToHerId} MessageId: {amqpMessage.MessageId} Elapsed:{sw.ElapsedMilliseconds}");
263+
logger.LogInformation($"After-AddingPayloadMetadata: {amqpMessage.MessageFunction} FromHerId: {amqpMessage.FromHerId} ToHerId: {amqpMessage.ToHerId} MessageId: {amqpMessage.MessageId} Elapsed: {sw.ElapsedMilliseconds} ms");
264264
}
265265
catch (Exception e)
266266
{

src/Helsenorge.Messaging/Amqp/AmqpReceiver.cs

-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
using System;
1010
using System.Diagnostics.CodeAnalysis;
11-
using System.Threading;
1211
using System.Threading.Tasks;
1312
using Amqp;
1413
using Helsenorge.Messaging.Abstractions;

src/Helsenorge.Messaging/Amqp/Receivers/MessageListener.cs

+7-1
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,17 @@ protected virtual string GetQueueName()
6767
{
6868
QueueType.Asynchronous => _queueNames.Async,
6969
QueueType.Synchronous => _queueNames.Sync,
70+
QueueType.SynchronousReply => _queueNames.SyncReply,
7071
QueueType.Error => _queueNames.Error,
7172
_ => throw new UnknownQueueTypeException(QueueType)
7273
};
7374
}
75+
7476
/// <summary>
7577
/// Specifies what type of queue this listener is processing
7678
/// </summary>
7779
protected abstract QueueType QueueType { get; }
80+
7881
/// <summary>
7982
/// Reference to service bus related setting
8083
/// </summary>
@@ -84,6 +87,7 @@ protected virtual string GetQueueName()
8487
/// Gets a reference to the server
8588
/// </summary>
8689
protected IMessagingNotification MessagingNotification { get; }
90+
8791
/// <summary>
8892
/// Gets the logger used for diagnostics purposes
8993
/// </summary>
@@ -190,6 +194,7 @@ public async Task<IncomingMessage> ReadAndProcessMessageAsync(bool alwaysRemoveM
190194
}
191195
return await HandleRawMessageAsync(message, alwaysRemoveMessage).ConfigureAwait(false);
192196
}
197+
193198
private async Task<IncomingMessage> HandleRawMessageAsync(IAmqpMessage message, bool alwaysRemoveMessage)
194199
{
195200
if (message == null) return null;
@@ -223,9 +228,10 @@ private async Task<IncomingMessage> HandleRawMessageAsync(IAmqpMessage message,
223228
DeliveryCount = message.DeliveryCount,
224229
LockedUntil = message.LockedUntil,
225230
};
231+
226232
await NotifyMessageProcessingStartedAsync(this, incomingMessage).ConfigureAwait(false);
227233

228-
SetCorrelationIdAction?.Invoke(incomingMessage.MessageId);
234+
SetCorrelationIdAction?.Invoke(incomingMessage.CorrelationId);
229235

230236
Logger.LogStartReceive(QueueType, incomingMessage, $"Message received from host and queue: {AmqpCore.HostnameAndPath}/{queueName}");
231237

src/Helsenorge.Messaging/Amqp/Receivers/SenderHerIdMismatchException.cs

-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
*/
88

99
using System;
10-
using System.Runtime.Serialization;
1110

1211
namespace Helsenorge.Messaging.Amqp.Receivers
1312
{

src/Helsenorge.Messaging/Amqp/Receivers/SynchronousReplyListener.cs

+2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ protected override Task NotifyMessageProcessingStartedAsync(MessageListener list
4848
// Not relevant for this implementation
4949
return Task.CompletedTask;
5050
}
51+
5152
/// <summary>
5253
/// Called when message processing is complete
5354
/// </summary>
@@ -58,6 +59,7 @@ protected override Task NotifyMessageProcessingCompletedAsync(IncomingMessage me
5859
// Not relevant for this implementation
5960
return Task.CompletedTask;
6061
}
62+
6163
/// <summary>
6264
/// Called to process message
6365
/// </summary>

0 commit comments

Comments
 (0)