Skip to content

Commit 28b652f

Browse files
RSocket Test Bootstrap (#348)
Getting started with unit tests for the new RSocket APIs. The first test just starts a server and lets it shut down while leaving the contextual scope.
1 parent a04bd0e commit 28b652f

File tree

6 files changed

+226
-8
lines changed

6 files changed

+226
-8
lines changed

CMakeLists.txt

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -376,15 +376,9 @@ add_dependencies(tcpresumeserver gmock)
376376
add_subdirectory(experimental/yarpl)
377377

378378
########################################
379-
# Examples
379+
# RSocket Experimental
380380
########################################
381381

382-
add_library(
383-
reactivesocket_examples_util
384-
examples/util/ExampleSubscriber.cpp
385-
examples/util/ExampleSubscriber.h
386-
)
387-
388382
add_library(
389383
rsocket_experimental
390384
experimental/rsocket/RSocket.h
@@ -406,7 +400,7 @@ add_library(
406400
experimental/rsocket-src/transports/TcpConnectionAcceptor.cpp
407401
experimental/rsocket/transports/TcpConnectionFactory.h
408402
experimental/rsocket-src/transports/TcpConnectionFactory.cpp
409-
)
403+
)
410404

411405
add_dependencies(rsocket_experimental ReactiveStreams)
412406

@@ -418,6 +412,39 @@ target_include_directories(rsocket_experimental PUBLIC "${PROJECT_SOURCE_DIR}/ex
418412
#include_directories(${CMAKE_CURRENT_BINARY_DIR}/experimental)
419413

420414

415+
add_executable(
416+
rsocket_tests
417+
experimental/rsocket-test/RSocketClientServerTest.cpp
418+
experimental/rsocket-test/handlers/HelloStreamRequestHandler.h
419+
experimental/rsocket-test/handlers/HelloStreamRequestHandler.cpp
420+
experimental/rsocket-test/handlers/HelloStreamSubscription.h
421+
experimental/rsocket-test/handlers/HelloStreamSubscription.cpp
422+
)
423+
424+
target_link_libraries(
425+
rsocket_tests
426+
rsocket_experimental
427+
ReactiveSocket
428+
${FOLLY_LIBRARIES}
429+
${GMOCK_LIBS}
430+
${GFLAGS_LIBRARY}
431+
${GLOG_LIBRARY}
432+
${CMAKE_THREAD_LIBS_INIT})
433+
434+
add_dependencies(rsocket_tests gmock rsocket_experimental)
435+
436+
add_test(NAME RSocketTests COMMAND rsocket_tests)
437+
438+
########################################
439+
# Examples
440+
########################################
441+
442+
add_library(
443+
reactivesocket_examples_util
444+
examples/util/ExampleSubscriber.cpp
445+
examples/util/ExampleSubscriber.h
446+
)
447+
421448
target_link_libraries(
422449
reactivesocket_examples_util
423450
rsocket_experimental
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright 2004-present Facebook. All Rights Reserved.
2+
3+
#include <utility>
4+
5+
#include <folly/io/IOBuf.h>
6+
#include <gmock/gmock.h>
7+
8+
#include "handlers/HelloStreamRequestHandler.h"
9+
#include "rsocket/RSocket.h"
10+
#include "rsocket/RSocket.h"
11+
#include "rsocket/transports/TcpConnectionAcceptor.h"
12+
13+
using namespace rsocket;
14+
using namespace rsocket::tests;
15+
16+
TEST(RSocketClientServer, StartAndShutdown) {
17+
{
18+
TcpConnectionAcceptor::Options opts;
19+
opts.port = 9889;
20+
opts.threads = 2;
21+
22+
// RSocket server accepting on TCP
23+
auto rs = RSocket::createServer(
24+
std::make_unique<TcpConnectionAcceptor>(std::move(opts)));
25+
26+
// global request handler
27+
auto handler = std::make_shared<HelloStreamRequestHandler>();
28+
// start server
29+
rs->start([handler](auto r) { return handler; });
30+
}
31+
// everything should cleanly shut down here
32+
// TODO anything to assert here?
33+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright 2004-present Facebook. All Rights Reserved.
2+
3+
#include "HelloStreamRequestHandler.h"
4+
#include <string>
5+
#include "HelloStreamSubscription.h"
6+
7+
using namespace ::reactivesocket;
8+
9+
namespace rsocket {
10+
namespace tests {
11+
/// Handles a new inbound Stream requested by the other end.
12+
void HelloStreamRequestHandler::handleRequestStream(
13+
Payload request,
14+
StreamId streamId,
15+
const std::shared_ptr<Subscriber<Payload>>& response) noexcept {
16+
LOG(INFO) << "HelloStreamRequestHandler.handleRequestStream " << request;
17+
18+
// string from payload data
19+
const char* p = reinterpret_cast<const char*>(request.data->data());
20+
auto requestString = std::string(p, request.data->length());
21+
22+
response->onSubscribe(
23+
std::make_shared<HelloStreamSubscription>(response, requestString, 10));
24+
}
25+
26+
std::shared_ptr<StreamState> HelloStreamRequestHandler::handleSetupPayload(
27+
ReactiveSocket& socket,
28+
ConnectionSetupPayload request) noexcept {
29+
LOG(INFO) << "HelloStreamRequestHandler.handleSetupPayload " << request;
30+
// TODO what should this do?
31+
return nullptr;
32+
}
33+
}
34+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright 2004-present Facebook. All Rights Reserved.
2+
3+
#pragma once
4+
5+
#include <folly/ExceptionWrapper.h>
6+
#include "src/NullRequestHandler.h"
7+
#include "src/Payload.h"
8+
#include "src/ReactiveStreamsCompat.h"
9+
#include "src/StandardReactiveSocket.h"
10+
#include "src/SubscriptionBase.h"
11+
12+
namespace rsocket {
13+
namespace tests {
14+
15+
class HelloStreamRequestHandler : public reactivesocket::DefaultRequestHandler {
16+
public:
17+
/// Handles a new inbound Stream requested by the other end.
18+
void handleRequestStream(
19+
reactivesocket::Payload request,
20+
reactivesocket::StreamId streamId,
21+
const std::shared_ptr<
22+
reactivesocket::Subscriber<reactivesocket::Payload>>&
23+
response) noexcept override;
24+
25+
std::shared_ptr<reactivesocket::StreamState> handleSetupPayload(
26+
reactivesocket::ReactiveSocket&,
27+
reactivesocket::ConnectionSetupPayload request) noexcept override;
28+
};
29+
}
30+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright 2004-present Facebook. All Rights Reserved.
2+
3+
#include "HelloStreamSubscription.h"
4+
5+
#include <sstream>
6+
#include <string>
7+
8+
using namespace reactivesocket;
9+
10+
namespace rsocket {
11+
namespace tests {
12+
13+
// Emit a stream of ints starting at 0 until number of ints
14+
// emitted matches 'numberToEmit' at which point onComplete()
15+
// will be emitted.
16+
//
17+
// On each invocation will restrict emission to number of requested.
18+
//
19+
// This method has no concurrency since SubscriptionBase
20+
// schedules this on an Executor sequentially
21+
void HelloStreamSubscription::requestImpl(size_t n) noexcept {
22+
LOG(INFO) << "requested=" << n << " currentElem=" << currentElem_
23+
<< " numberToEmit=" << numberToEmit_;
24+
25+
if (numberToEmit_ == 0) {
26+
subscriber_->onComplete();
27+
return;
28+
}
29+
for (size_t i = 0; i < n; i++) {
30+
if (cancelled_) {
31+
LOG(INFO) << "emission stopped by cancellation";
32+
return;
33+
}
34+
std::stringstream ss;
35+
ss << "Hello " << name_ << " " << currentElem_ << "!";
36+
std::string s = ss.str();
37+
subscriber_->onNext(Payload(s));
38+
// currentElem is used to track progress across requestImpl invocations
39+
currentElem_++;
40+
// break the loop and complete the stream if numberToEmit_ is matched
41+
if (currentElem_ == numberToEmit_) {
42+
subscriber_->onComplete();
43+
return;
44+
}
45+
}
46+
}
47+
48+
void HelloStreamSubscription::cancelImpl() noexcept {
49+
LOG(INFO) << "cancellation received";
50+
// simple cancellation token (nothing to shut down, just stop next loop)
51+
cancelled_ = true;
52+
}
53+
}
54+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2004-present Facebook. All Rights Reserved.
2+
3+
#pragma once
4+
5+
#include <string>
6+
#include "src/DuplexConnection.h"
7+
#include "src/Payload.h"
8+
#include "src/SubscriptionBase.h"
9+
10+
namespace rsocket {
11+
namespace tests {
12+
13+
/// Emits a stream of ints
14+
class HelloStreamSubscription : public reactivesocket::SubscriptionBase {
15+
public:
16+
explicit HelloStreamSubscription(
17+
std::shared_ptr<reactivesocket::Subscriber<reactivesocket::Payload>>
18+
subscriber,
19+
std::string name,
20+
size_t numberToEmit = 2)
21+
: ExecutorBase(reactivesocket::defaultExecutor()),
22+
subscriber_(std::move(subscriber)),
23+
name_(std::move(name)),
24+
numberToEmit_(numberToEmit),
25+
cancelled_(false) {}
26+
27+
private:
28+
// Subscription methods
29+
void requestImpl(size_t n) noexcept override;
30+
void cancelImpl() noexcept override;
31+
32+
std::shared_ptr<reactivesocket::Subscriber<reactivesocket::Payload>>
33+
subscriber_;
34+
std::string name_;
35+
size_t numberToEmit_;
36+
size_t currentElem_ = 0;
37+
std::atomic_bool cancelled_;
38+
};
39+
}
40+
}

0 commit comments

Comments
 (0)