Skip to content

Adopt swift-service-lifecycle #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

felixschlegel
Copy link
Contributor

@felixschlegel felixschlegel commented Jul 3, 2023

Closes #57

Adapt swift-service-lifecycle

Modifications:

  • add swift-service-lifecycle aka ServicLifecycle dependency to
    Package.swift
  • add Sendable conformance to the following types:
    • KafkaProducerConfiguration
    • KafkaConsumerConfiguration
    • KafkaTopicConfiguration
    • KafkaPartition
    • KafkaClient
    • KafkaConsumerMessages
    • KafkaConsumerMessage
    • RDKafkaConfig.CapturedClosures
    • RDKafkaTopicHandles
  • add @unchecked Sendable conformance to the following types:
    • KafkaProducer
    • KafkaConsumer
  • add Service conformance to the following types:
    • KafkaProducer
    • KafkaConsumer
  • SwiftKafkaTests:
    • use ServiceGroups
  • KafkaProducerTests:
    • use ServiceGroups
    • remove testFlushQueuedProducerMessages as it relied on the now
      private implementation detail triggerGracefulShutdown
    • remove testProducerNotUsableAfterShutdown as it relied on the now
      private implementation detail triggerGracefulShutdown
    • testNoMemoryLeakAfterShutdown: remove obsolete wait for timeout
      as flushing in KafkaProducer is now non-blocking
  • KafkaConsumer:
    • remove triggerGracefulShutdown on deinit -> this is now
      ServicLifecycle's responsibility
  • refactor RDKafkaConfig to make RDKafkaConfig.CapturedClosures
    sendable

Remove ShutdownOnTerminate for AsyncSequences

Motivation:

Exiting the run() loop early is an error in swift-service-lifecycle.
Therefore we don't want to invoke
KafkaProducer/KafkaConsumer.triggerGracefulShutdown() when the our
NIOAsyncSequences have stopped being consumed

Modifications:

  • remove Kafka[Producer|Consumer]ShutdownOnTerminate behaviour
  • adjust tests

Modifications:

* add `swift-service-lifecycle` aka `ServicLifecycle` dependency to
  `Package.swift`
* add `Sendable` conformance to the following types:
    * `KafkaProducerConfiguration`
    * `KafkaConsumerConfiguration`
    * `KafkaTopicConfiguration`
    * `KafkaPartition`
    * `KafkaClient`
    * `KafkaConsumerMessages`
    * `KafkaConsumerMessage`
    * `RDKafkaConfig.CapturedClosures`
    * `RDKafkaTopicHandles`
* add `@unchecked Sendable` conformance to the following types:
    * `KafkaProducer`
    * `KafkaConsumer`
* add `Service` conformance to the following types:
    * `KafkaProducer`
    * `KafkaConsumer`
* `SwiftKafkaTests`:
    * use `ServiceGroup`s
* `KafkaProducerTests`:
    * use `ServiceGroup`s
    * remove `testFlushQueuedProducerMessages` as it relied on the now
      `private` implementation detail `triggerGracefulShutdown`
    * remove `testProducerNotUsableAfterShutdown` as it relied on the now
      `private` implementation detail `triggerGracefulShutdown`
    * `testNoMemoryLeakAfterShutdown`: remove obsolete wait for timeout
      as flushing in `KafkaProducer` is now non-blocking
* `KafkaConsumer`:
    * remove `triggerGracefulShutdown` on `deinit` -> this is now
      `ServicLifecycle`'s responsibility
* refactor `RDKafkaConfig` to make `RDKafkaConfig.CapturedClosures`
  sendable
Motivation:

Exiting the `run()` loop early is an error in `swift-service-lifecycle`.
Therefore we **don't** want to invoke
`KafkaProducer/KafkaConsumer.triggerGracefulShutdown()` when the our
`NIOAsyncSequence`s have stopped being consumed

Modifications:

* remove `Kafka[Producer|Consumer]ShutdownOnTerminate` behaviour
* adjust tests
README.md Outdated
### Producer API

The `send(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `acknowledgements` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error.

```swift
let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"])

let (producer, acknowledgements) = try KafkaProducer.makeProducerWithAcknowledgements(
let (producerService, acknowledgements) = try KafkaProducer.makeProducerWithAcknowledgements(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we need the suffix here

Suggested change
let (producerService, acknowledgements) = try KafkaProducer.makeProducerWithAcknowledgements(
let (producer, acknowledgements) = try KafkaProducer.makeProducerWithAcknowledgements(

README.md Outdated
@@ -74,21 +80,26 @@ let config = KafkaConsumerConfiguration(
bootstrapServers: ["localhost:9092"]
)

let consumer = try KafkaConsumer(
let consumerService = try KafkaConsumer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

README.md Outdated
@@ -137,24 +153,29 @@ let config = KafkaConsumerConfiguration(
bootstrapServers: ["localhost:9092"]
)

let consumer = try KafkaConsumer(
let consumerService = try KafkaConsumer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here

@@ -81,14 +44,14 @@ public struct KafkaConsumerMessages: AsyncSequence {
// MARK: - KafkaConsumer

/// Receive messages from the Kafka cluster.
public final class KafkaConsumer {
public final class KafkaConsumer: Sendable, Service { // We can do that because our stored propery is protected by a lock
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this comment. It is not fully true because only the stateMachine is protected and the compiler is enforcing it here and we don't use @unchchecked

Modifications:

* remove `*Service` suffix from `consumerService` and `producerService`
  in tests and README
* remove old comment in `KafkaConsumer`
@felixschlegel felixschlegel requested a review from FranzBusch July 4, 2023 13:47
@FranzBusch FranzBusch merged commit aab823f into swift-server:main Jul 5, 2023
@FranzBusch FranzBusch deleted the fs-kafka-service-lifecycle branch July 5, 2023 12:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Do we still need KafkaProducer.shutdownGracefully()?
2 participants