6
6
import io .reactiverse .awssdk .integration .LocalStackBaseSpec ;
7
7
import io .reactiverse .awssdk .reactivestreams .ReadStreamPublisher ;
8
8
import io .reactivex .Single ;
9
+ import io .vertx .codegen .annotations .Nullable ;
10
+ import io .vertx .core .AsyncResult ;
9
11
import io .vertx .core .Context ;
12
+ import io .vertx .core .Future ;
13
+ import io .vertx .core .Handler ;
10
14
import io .vertx .core .Vertx ;
11
15
import io .vertx .core .buffer .Buffer ;
12
16
import io .vertx .core .eventbus .MessageProducer ;
13
17
import io .vertx .core .file .OpenOptions ;
18
+ import io .vertx .core .streams .Pump ;
19
+ import io .vertx .core .streams .WriteStream ;
14
20
import io .vertx .junit5 .Timeout ;
15
21
import io .vertx .junit5 .VertxExtension ;
16
22
import io .vertx .junit5 .VertxTestContext ;
@@ -155,18 +161,50 @@ void downloadImageFromBucket(Vertx vertx, VertxTestContext ctx) throws Exception
155
161
void downloadImageFromBucketToPump (Vertx vertx , VertxTestContext ctx ) throws Exception {
156
162
final Context originalContext = vertx .getOrCreateContext ();
157
163
final S3AsyncClient s3 = s3 (originalContext );
158
- final String ebAddress = "s3-forwarded" ;
159
- final MessageProducer <Buffer > producer = vertx .eventBus ().sender (ebAddress );
160
- final Buffer received = Buffer .buffer ();
164
+ Buffer received = Buffer .buffer ();
161
165
AtomicBoolean handlerCalled = new AtomicBoolean (false );
162
- VertxAsyncResponseTransformer <GetObjectResponse > transformer = new VertxAsyncResponseTransformer <>(producer );
163
- transformer .setResponseHandler (resp -> {
164
- handlerCalled .set (true );
165
- });
166
- vertx .eventBus ().<Buffer >consumer (ebAddress , msg -> {
166
+ VertxAsyncResponseTransformer <GetObjectResponse > transformer = new VertxAsyncResponseTransformer <>(new WriteStream <Buffer >() {
167
+ @ Override
168
+ public WriteStream <Buffer > exceptionHandler (Handler <Throwable > handler ) {
169
+ return null ;
170
+ }
171
+
172
+ @ Override
173
+ public Future <Void > write (Buffer data ) {
174
+ received .appendBuffer (data );
175
+ return Future .succeededFuture ();
176
+ }
177
+
178
+ @ Override
179
+ public void write (Buffer data , Handler <AsyncResult <Void >> handler ) {
180
+ received .appendBuffer (data );
181
+ handler .handle (null );
182
+ }
183
+
184
+ @ Override
185
+ public void end (Handler <AsyncResult <Void >> handler ) {
167
186
assertTrue (handlerCalled .get (), "Response handler should have been called before first bytes are received" );
168
- received .appendBuffer (msg .body ());
169
187
if (received .length () == fileSize ) ctx .completeNow ();
188
+ handler .handle (null );
189
+ }
190
+
191
+ @ Override
192
+ public WriteStream <Buffer > setWriteQueueMaxSize (int maxSize ) {
193
+ return null ;
194
+ }
195
+
196
+ @ Override
197
+ public boolean writeQueueFull () {
198
+ return false ;
199
+ }
200
+
201
+ @ Override
202
+ public WriteStream <Buffer > drainHandler (@ Nullable Handler <Void > handler ) {
203
+ return null ;
204
+ }
205
+ });
206
+ transformer .setResponseHandler (resp -> {
207
+ handlerCalled .set (true );
170
208
});
171
209
single (s3 .getObject (VertxS3ClientSpec ::downloadImgReq , transformer ))
172
210
.subscribe (getRes -> {}, ctx ::failNow );
@@ -175,11 +213,53 @@ void downloadImageFromBucketToPump(Vertx vertx, VertxTestContext ctx) throws Exc
175
213
@ Test
176
214
@ Order (7 )
177
215
void downloadImageFromBucketWithoutSettingResponseHandler (Vertx vertx , VertxTestContext ctx ) throws Exception {
178
- final Context originalContext = vertx .getOrCreateContext ();
179
- final S3AsyncClient s3 = s3 (originalContext );
180
- final String ebAddress = "s3-forwarded" ;
181
- final MessageProducer <Buffer > producer = vertx .eventBus ().sender (ebAddress );
182
- VertxAsyncResponseTransformer <GetObjectResponse > transformer = new VertxAsyncResponseTransformer <>(producer );
216
+ final Context originalContext = vertx .getOrCreateContext ();
217
+ final S3AsyncClient s3 = s3 (originalContext );
218
+ final Buffer received = Buffer .buffer ();
219
+ AtomicBoolean handlerCalled = new AtomicBoolean (false );
220
+ VertxAsyncResponseTransformer <GetObjectResponse > transformer = new VertxAsyncResponseTransformer <>(new WriteStream <Buffer >() {
221
+ @ Override
222
+ public WriteStream <Buffer > exceptionHandler (Handler <Throwable > handler ) {
223
+ return null ;
224
+ }
225
+
226
+ @ Override
227
+ public Future <Void > write (Buffer data ) {
228
+ received .appendBuffer (data );
229
+ return Future .succeededFuture ();
230
+ }
231
+
232
+ @ Override
233
+ public void write (Buffer data , Handler <AsyncResult <Void >> handler ) {
234
+ received .appendBuffer (data );
235
+ handler .handle (null );
236
+ }
237
+
238
+ @ Override
239
+ public void end (Handler <AsyncResult <Void >> handler ) {
240
+ assertTrue (handlerCalled .get (), "Response handler should have been called before first bytes are received" );
241
+ if (received .length () == fileSize ) ctx .completeNow ();
242
+ handler .handle (null );
243
+ }
244
+
245
+ @ Override
246
+ public WriteStream <Buffer > setWriteQueueMaxSize (int maxSize ) {
247
+ return null ;
248
+ }
249
+
250
+ @ Override
251
+ public boolean writeQueueFull () {
252
+ return false ;
253
+ }
254
+
255
+ @ Override
256
+ public WriteStream <Buffer > drainHandler (@ Nullable Handler <Void > handler ) {
257
+ return null ;
258
+ }
259
+ });
260
+ transformer .setResponseHandler (resp -> {
261
+ handlerCalled .set (true );
262
+ });
183
263
single (s3 .getObject (VertxS3ClientSpec ::downloadImgReq , transformer ))
184
264
.subscribe (getRes -> ctx .completeNow (), ctx ::failNow );
185
265
}
0 commit comments