Skip to content

Commit

Permalink
Add setConnectionCallback (#2204)
Browse files Browse the repository at this point in the history
  • Loading branch information
fantasy-peak authored Jan 8, 2025
1 parent 152a69f commit 686f68a
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 2 deletions.
32 changes: 31 additions & 1 deletion examples/async_stream/main.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,33 @@
#include <drogon/drogon.h>
#include <chrono>
#include <functional>
#include <mutex>
#include <unordered_map>
#include <trantor/utils/Logger.h>
#include <trantor/net/callbacks.h>
#include <trantor/net/TcpConnection.h>

using namespace drogon;
using namespace std::chrono_literals;

std::mutex mutex;
std::unordered_map<trantor::TcpConnectionPtr, std::function<void()>>
connMapping;

int main()
{
app().registerHandler(
"/stream",
[](const HttpRequestPtr &,
[](const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback) {
const auto &weakConnPtr = req->getConnectionPtr();
if (auto connPtr = weakConnPtr.lock())
{
std::lock_guard lk(mutex);
connMapping.emplace(std::move(connPtr), [] {
LOG_INFO << "call stop or other options!!!!";
});
}
auto resp = drogon::HttpResponse::newAsyncStreamResponse(
[](drogon::ResponseStreamPtr stream) {
std::thread([stream =
Expand Down Expand Up @@ -79,5 +97,17 @@ int main()

LOG_INFO << "Server running on 127.0.0.1:8848";
app().enableRequestStream(); // This is for request stream.
app().setConnectionCallback([](const trantor::TcpConnectionPtr &conn) {
if (conn->disconnected())
{
std::lock_guard lk(mutex);
if (auto it = connMapping.find(conn); it != connMapping.end())
{
LOG_INFO << "disconnect";
connMapping[conn]();
connMapping.erase(conn);
}
}
});
app().addListener("127.0.0.1", 8848).run();
}
9 changes: 9 additions & 0 deletions lib/inc/drogon/HttpAppFramework.h
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,15 @@ class DROGON_EXPORT HttpAppFramework : public trantor::NonCopyable
virtual HttpAppFramework &setAfterAcceptSockOptCallback(
std::function<void(int)> cb) = 0;

/**
* @brief Set the client disconnect or connect callback.
*
* @param cb This callback will be called, when the client disconnect or
* connect
*/
virtual HttpAppFramework &setConnectionCallback(
std::function<void(const trantor::TcpConnectionPtr &)> cb) = 0;

virtual HttpAppFramework &enableRequestStream(bool enable = true) = 0;
virtual bool isRequestStreamEnabled() const = 0;

Expand Down
4 changes: 4 additions & 0 deletions lib/inc/drogon/HttpRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <unordered_map>
#include <optional>
#include <string_view>
#include <trantor/net/TcpConnection.h>

namespace drogon
{
Expand Down Expand Up @@ -506,6 +507,9 @@ class DROGON_EXPORT HttpRequest

virtual bool connected() const noexcept = 0;

virtual const std::weak_ptr<trantor::TcpConnection> &getConnectionPtr()
const noexcept = 0;

virtual ~HttpRequest()
{
}
Expand Down
7 changes: 7 additions & 0 deletions lib/src/HttpAppFrameworkImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1367,3 +1367,10 @@ HttpAppFramework &HttpAppFrameworkImpl::setAfterAcceptSockOptCallback(
listenerManagerPtr_->setAfterAcceptSockOptCallback(std::move(cb));
return *this;
}

HttpAppFramework &HttpAppFrameworkImpl::setConnectionCallback(
std::function<void(const trantor::TcpConnectionPtr &)> cb)
{
listenerManagerPtr_->setConnectionCallback(std::move(cb));
return *this;
}
2 changes: 2 additions & 0 deletions lib/src/HttpAppFrameworkImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ class HttpAppFrameworkImpl final : public HttpAppFramework
std::function<void(int)> cb) override;
HttpAppFramework &setAfterAcceptSockOptCallback(
std::function<void(int)> cb) override;
HttpAppFramework &setConnectionCallback(
std::function<void(const trantor::TcpConnectionPtr &)> cb) override;

HttpAppFramework &enableRequestStream(bool enable) override;
bool isRequestStreamEnabled() const override;
Expand Down
7 changes: 7 additions & 0 deletions lib/src/HttpRequestImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "HttpUtils.h"
#include "CacheFile.h"
#include "impl_forwards.h"
#include <drogon/utils/Utilities.h>
#include <drogon/HttpRequest.h>
#include <drogon/RequestStream.h>
Expand Down Expand Up @@ -572,6 +573,12 @@ class HttpRequestImpl : public HttpRequest
return false;
}

const std::weak_ptr<trantor::TcpConnection> &getConnectionPtr()
const noexcept override
{
return connPtr_;
}

bool isOnSecureConnection() const noexcept override
{
return isOnSecureConnection_;
Expand Down
8 changes: 7 additions & 1 deletion lib/src/HttpServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "HttpControllersRouter.h"
#include "StaticFileRouter.h"
#include "WebSocketConnectionImpl.h"
#include "impl_forwards.h"

#if COZ_PROFILING
#include <coz.h>
Expand Down Expand Up @@ -75,7 +76,12 @@ HttpServer::HttpServer(EventLoop *loop,
: server_(loop, listenAddr, std::move(name), true, app().reusePort())
#endif
{
server_.setConnectionCallback(onConnection);
server_.setConnectionCallback(
[this](const trantor::TcpConnectionPtr &conn) {
onConnection(conn);
if (connectionCallback_)
connectionCallback_(conn);
});
server_.setRecvMessageCallback(onMessage);
server_.kickoffIdleConnections(
HttpAppFrameworkImpl::instance().getIdleConnectionTimeout());
Expand Down
7 changes: 7 additions & 0 deletions lib/src/HttpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ class HttpServer : trantor::NonCopyable
afterAcceptSetSockOptCallback_ = std::move(cb);
}

void setConnectionCallback(
std::function<void(const trantor::TcpConnectionPtr &)> cb)
{
connectionCallback_ = std::move(cb);
}

private:
friend class HttpInternalForwardHelper;

Expand Down Expand Up @@ -144,6 +150,7 @@ class HttpServer : trantor::NonCopyable

std::function<void(int)> beforeListenSetSockOptCallback_;
std::function<void(int)> afterAcceptSetSockOptCallback_;
std::function<void(const trantor::TcpConnectionPtr &)> connectionCallback_;
};

class HttpInternalForwardHelper
Expand Down
4 changes: 4 additions & 0 deletions lib/src/ListenerManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ void ListenerManager::createListeners(
serverPtr->setAfterAcceptSockOptCallback(
afterAcceptSetSockOptCallback_);
}
if (connectionCallback_)
{
serverPtr->setConnectionCallback(connectionCallback_);
}

if (listener.useSSL_ && utils::supportsTls())
{
Expand Down
7 changes: 7 additions & 0 deletions lib/src/ListenerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class ListenerManager : public trantor::NonCopyable
afterAcceptSetSockOptCallback_ = std::move(cb);
}

void setConnectionCallback(
std::function<void(const trantor::TcpConnectionPtr &)> cb)
{
connectionCallback_ = std::move(cb);
}

void reloadSSLFiles();

private:
Expand Down Expand Up @@ -101,6 +107,7 @@ class ListenerManager : public trantor::NonCopyable
std::unique_ptr<trantor::EventLoopThread> listeningThread_;
std::function<void(int)> beforeListenSetSockOptCallback_;
std::function<void(int)> afterAcceptSetSockOptCallback_;
std::function<void(const trantor::TcpConnectionPtr &)> connectionCallback_;
};

} // namespace drogon

0 comments on commit 686f68a

Please sign in to comment.