Skip to content

Commit

Permalink
New fanout to master (mchmarny#8)
Browse files Browse the repository at this point in the history
* WIP: simple fanout

* wip: http

* v2 validated

* readme updates

* removed extra target logging

* producer cleanup

* reaedme updates

* added queue consumer
  • Loading branch information
mchmarny authored Sep 1, 2020
1 parent f21a009 commit 22acadb
Show file tree
Hide file tree
Showing 46 changed files with 1,186 additions and 484 deletions.
156 changes: 62 additions & 94 deletions fan-out/README.md
Original file line number Diff line number Diff line change
@@ -1,176 +1,144 @@
# fan-out demo

`Fan-out` is a messaging pattern where single message source is "broadcasted" to multiple targets. The common use-case for this may be situation where multiple teams or systems need to receive the events from the same source. This sometimes made more complicated by the differences in expected formats and protocols by each one of these target systems.
`Fan-out` is a messaging pattern where single message source is "broadcasted" to multiple targets. The common use-case for this may be situation where multiple teams or systems need to receive events from the same source. This is sometimes made even more complicated by the differences in expected formats and protocols by each one of the target systems.

This demo illustrates how to `fan-out` events from Azure Event Hubs using Dapr plugable component mechanism to:
This demo will illustrate how to use Dapr's plugable component mechanism to `fan-out` events from one Pub/Sub configured with Redis to:

* Redis queue in XML format
* Kafka topic in CSV format
* REST endpoint in JSON format
* gRPC service in binary format
* gRPC service in XML format

![](./img/fan-out-in-dapr.png)

For more information about Dapr's pub/sub see these [docs](https://github.com/dapr/docs/tree/master/concepts/publish-subscribe-messaging)
This allows for incremental modifications with ability to customize each stream according to its unique configuration needs (e.g. throughout, authentication, format, retry strategy, or even error tolerance). For more information about Dapr's pub/sub see these [docs](https://github.com/dapr/docs/tree/master/concepts/publish-subscribe-messaging)

This demo requires Dapr `v0.10` as well as go `1.14+` and docker-compose `v1.26+`

## Events
## App 2: Pub/Sub to Pub/Sub Publisher

For this demo we will need an event source. Start by, create your Event Hubs (if you don't already have one) using [these instructions](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create). Then capture the connection string using [these instructions](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string).

To mock up events we will use the included `./eventmaker` utility which will generate random `temperature` and `humidity` events and publish them to the Event Hub. Navigate to the `./eventmaker` directory and run:
To run these demos you will need access to Redis and Kafka servers. For Redis, you can use the one installed during local Dapr setup. For Kafka, you can use the included Docker Compose file. First, navigate to the `queue-format-converter` and start Kafka:

```shell
go run *.go --conn "your-eventhubs-connection-string"
```

> Make sure to replace the `your-eventhubs-connection-string` string with your Event Hubs connection string
The output should look something like this:

```shell
sending: {"id":"775ccb8f-8039-4c97-9849-15fdf6a26a1e","temperature":60.46998219508215,"humidity":94.05150371362079,"time":1598373738}
sending: {"id":"ef658e1f-a16d-4cc7-99a9-6e17d5542fb8","temperature":66.45935972131686,"humidity":43.77704157682614,"time":1598373740}
```

## Standalone Mode Setup

To run these demos you will have first create a secret file (`secrets.json`) in this directory with your Azure Event Hubs secrets

```json
{
"eventhubConnStr": "***",
"storageAccountKey": "***"
}
docker-compose -f ./config/kafka.yaml up -d
```

In addition, you will need access to Redis and Kafka services. Dapr installed Redis container during setup so you can use that. For Kafka, you can use the included Docker Compose file:
The result should look something like this:

```shell
docker-compose -f ./queue-format-converter/config/kafka.yaml up -d
Creating network "config_default" with the default driver
Creating config_kafka_1 ... done
Creating config_zookeeper_1 ... done
```

> You can leave this running during the demo, just remember to stop it on the end to avoid getting charge for the mocked up events.
### Run

With events on the Azure Event Hubs, you can now run each one of the fan-out distributors.

#### Event Hubs to Pub/Sub

This step will subscribe to the Event Hub source using Dapr binding, convert into specified format, and publish them to the pre-configured Pub/Sub target. The specific Pub/Sub is defined by the Dapr component found in the `./config` directory. Dapr has a wide array of [Pub/Sub components](https://github.com/dapr/components-contrib/tree/master/pubsub#pub-sub) (e.g. Redis, NATS, Kafka, RabbitMQ...), for this example we will use Redis.

To start, navigate to the directory (`cd ./queue-format-converter`)

##### XML

```shell
export TARGET_TOPIC_FORMAT="xml"
export TARGET_PUBSUB_NAME="fanout-queue-redis-target"
```

Now run the service using Dapr:
Now, start `App 2` which will receive events from Redis, convert them to XML, and publish them onto the Kafka topic:

```shell
dapr run \
--app-id redis-xml-publisher \
--app-id app2 \
--app-port 60010 \
--app-protocol grpc \
--components-path ./config \
go run main.go
```

##### CSV
> The source and targets Pub/Sub components are defined in the [./config](./config) directory in their respected files so the use code is free of SDK and libraries which allows for easy re-configuration at run-time.
```shell
export TARGET_TOPIC_FORMAT="csv"
export TARGET_PUBSUB_NAME="fanout-queue-kafka-target"
export ADDRESS=":60020"
```
Leave the application running, we will come back to it after configuring `App 1`

## App 1: Pub/Sub Event Producer

Now run the service using Dapr:
To demo the above `App 2` we will need events. To produce events, in another terminal session navigate to `queue-event-producer` directory, start the `App 1`:

```shell
dapr run \
--app-id kafka-csv-publisher \
--app-port 60020 \
--app-id app1 \
--app-port 60013 \
--app-protocol grpc \
--components-path ./config \
go run main.go
```

##### Output

The terminal output should include the received event and the event that was published to the target
The app will now publish one event every `3s`. To change the frequency just define the desired duration using `THREAD_PUB_FREQ` variable and restart the app. The results should look something like this:

```shell
== APP == Source: {"id":"8453c94e-1ff0-47d1-b0f9-7936c5be3d98","temperature":51.52611072392151,"humidity":81.36585969939978,"time":1598361172}
== APP == Target: <SourceEvent><ID>8453c94e-1ff0-47d1-b0f9-7936c5be3d98</ID><Temperature>51.52611072392151</Temperature><Humidity>81.36585969939978</Humidity><Time>1598361172</Time></SourceEvent>
== APP == published: {"id":"df50a6c7-b5bb-45ce-b3a8-ad428bbbd5fe","temperature":60.46998219508215,"humidity":94.05150371362079,"time":1598960035}
```

#### Event Hubs to REST endpoint in JSON format

This step will subscribe to the Event Hub source using Dapr binding, convert the incoming events into JSON, and publish them to the pre-configured REST endpoint using Dapr HTTP binding. The specific endpoint as well as method (`POST` vs `GET` for example) is defined by the Dapr component found in the `./config` directory. Dapr has a wide array of [output bindings](https://github.com/dapr/docs/tree/master/concepts/bindings#supported-bindings-and-specs) (e.g. Twilio, SendGrid, MQTT...), for this example we will use HTTP.

> To change the target, simply update the [http-format-converter/config/target-binding.yaml](./http-format-converter/config/target-binding.yaml) file with the desired output binding.
To start, navigate to the directory (`cd ./http-format-converter`) and export the desired format:
Now in the `App 1`, the log output for each event should look something like this:

```shell
export TARGET_TOPIC_FORMAT="json"
== APP == Event - PubsubName:fanout-source-pubsub, Topic:events, ID:5ffa4502-8bf1-4bbf-927e-8b62e1949166
== APP == Target (csv): "45ecf820-705b-47c2-a2e4-7dbb3eecb728",66.459360,43.777042,"2020-09-01T04:33:58-07:00"
```

Now run the service using Dapr:
## App 3: Pub/Sub to External REST Endpoint Publisher

To add another publisher which converts the received events into JSON and publishes them to the component defined REST endpoint, first navigate to the `http-format-converter` directory and start `App 3`:

```shell
dapr run \
--app-id http-json-publisher \
--app-id app3 \
--app-port 60011 \
--app-protocol grpc \
--components-path ./config \
go run main.go
```

The terminal output should include the received event and the event that was published to the target

```shell
== APP == Target: {"id":"ef658e1f-a16d-4cc7-99a9-6e17d5542fb8","temperature":66.45935972131686,"humidity":43.77704157682614,"time":1598373740}
> This app is configured with an HTTP binding that defines the target and verb of the invocation. This external configuration allows for easy change of the target binding to something like email using SendGrid or AWS S3 without changing the use-code. To learn more about Dapr bindings, see these [docs](https://github.com/dapr/docs/tree/master/concepts/bindings#supported-bindings-and-specs)
```yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: fanout-http-target-post-binding
spec:
type: bindings.http
metadata:
- name: url
value: https://postman-echo.com/post
- name: method
value: POST
```
#### Event Hubs to gRPC service in binary format
If the `App 1` is still running, you should see entries in the log similar to this:

This step will subscribe to the Event Hub source using Dapr binding, convert the incoming events into target service expecting format, and publish them to the Dapr service identified by name. The discovery of the target service as well as the mTLS encryption and protocol translation (if necessary in case HTTP to gPRC or gPRC to HTTP invocation) are handled automatically by Dapr. You can learn more about the service to service invocation in Dapr [here](https://github.com/dapr/docs/blob/master/concepts/service-invocation/README.md#service-invocation)
```shell
== APP == Event - PubsubName:fanout-source-pubsub, Topic:events, ID:d5bbcc9e-f0d3-46df-8d8e-f7dadb3f304c
== APP == Target (json): {"temperature":20.326656,"humidity":36.093533,"time":1598961599,"id":"b2fa85cf-1be6-4489-9aec-613cf969675e"}
```

> For purposes of this demo, we are going to use the [grpc-echo-service](../grpc-echo-service). You will need to start that service before this one. You can find instructions [here](../grpc-echo-service)
## App 4: Pub/Sub to another Dapr Service Publisher

To start, navigate to the directory (`cd ./service-format-converter`) and export the desired format:
To add the final publisher, which will convert events into XML format and publish them to another Dapr service over gRPC, first navigate to the `grpc-echo-service` directory and start target service. For demo purposes, we will use the included echo service which simply returns whatever message it receives.

```shell
export TARGET_SERVICE="grpc-echo-service"
export TARGET_METHOD="echo"
dapr run \
--app-id grpc-echo-service \
--app-port 60015 \
--app-protocol grpc \
go run main.go
```

Now run the service using Dapr:
Then, in yet another terminal session, navigate to the `service-format-converter` directory and start the `App 4` that will invoke the `grpc-echo-service`:

```shell
dapr run \
--app-id grpc-service-publisher \
--app-id app4 \
--app-port 60012 \
--app-protocol grpc \
--components-path ./config \
go run main.go
```

The terminal output should include the received event and the event that was published to the target
If everything went well, you should see something similar in the `App 4` logs:

```shell
== APP == Source: {"id":"bc2e96cd-a3f0-4a49-bcdf-cda5d077449f","temperature":67.91167674526243,"humidity":21.8631197287505,"time":1598376230}
== APP == Target: &{Data:[123 34 105 100 34 58 34 98 99 50 101 57 54 99 100 45 97 51 102 48] ContentType:application/json}
== APP == Target: &{Data:[60 84 111 112 105 99 69 118...] ContentType:application/xml}
== APP == Response: <TopicEvent><ID>df450605-4927-407e-a2b2-61233939ee58</ID><SpecVersion>1.0</SpecVersion><Type>com.dapr.event.sent</Type><Source>app1</Source><DataContentType>application/json</DataContentType><Data>{&#34;time&#34;:1598962950,&#34;id&#34;:&#34;745e3ebe-990b-4292-9347-f84ff81f41e6&#34;,&#34;temperature&#34;:31.812637,&#34;humidity&#34;:46.894296}</Data><Subject></Subject><Topic>events</Topic><PubsubName>fanout-source-pubsub</PubsubName></TopicEvent>
```

## Kubernetes Deployment
> If you left all the apps running, you can go to each terminal session and see the different formats which are generated and published by each application.

> WIP: this section is currently being worked on, come back soon.

## Disclaimer

Expand Down
23 changes: 0 additions & 23 deletions fan-out/eventmaker/Makefile

This file was deleted.

9 changes: 0 additions & 9 deletions fan-out/eventmaker/go.mod

This file was deleted.

Loading

0 comments on commit 22acadb

Please sign in to comment.