Skip to content

Commit 16ae4f8

Browse files
authored
Add support for monitoring for S3 crt client (#3138)
Add support for monitoring for S3 crt client
1 parent 7c44fbe commit 16ae4f8

File tree

6 files changed

+346
-24
lines changed

6 files changed

+346
-24
lines changed

generated/src/aws-cpp-sdk-s3-crt/source/S3CrtClient.cpp

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -713,9 +713,24 @@ void S3CrtClient::InitCommonCrtRequestOption(CrtRequestCallbackUserData *userDat
713713

714714
static void CopyObjectRequestShutdownCallback(void *user_data)
715715
{
716+
if(!user_data)
717+
{
718+
AWS_LOGSTREAM_ERROR("CopyObject", "user data passed is NULL ");
719+
return;
720+
}
716721
auto *userData = static_cast<S3CrtClient::CrtRequestCallbackUserData*>(user_data);
722+
717723
// call user callback and release user_data
718724
S3Crt::Model::CopyObjectOutcome outcome(userData->s3CrtClient->GenerateXmlOutcome(userData->response));
725+
//log into monitor
726+
if(!outcome.IsSuccess())
727+
{
728+
userData->asyncCallerContext->GetMonitorContext().OnRequestFailed(userData->request,userData->response);
729+
}
730+
else
731+
{
732+
userData->asyncCallerContext->GetMonitorContext().OnRequestSucceeded(userData->request, userData->response);
733+
}
719734
userData->copyResponseHandler(userData->s3CrtClient, *(reinterpret_cast<const CopyObjectRequest*>(userData->originalRequest)), std::move(outcome), userData->asyncCallerContext);
720735

721736
Aws::Delete(userData);
@@ -782,6 +797,7 @@ void S3CrtClient::CopyObjectAsync(const CopyObjectRequest& request, const CopyOb
782797
{
783798
return handler(this, request, CopyObjectOutcome(Aws::Client::AWSError<S3CrtErrors>(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", "Unable to create s3 meta request", false)), handlerContext);
784799
}
800+
handlerContext->GetMonitorContext().StartMonitorContext(Aws::String{"S3CrtClient"},request.GetServiceRequestName(), userData->request);
785801
options.shutdown_callback = CopyObjectRequestShutdownCallback;
786802
options.type = AWS_S3_META_REQUEST_TYPE_COPY_OBJECT;
787803
struct aws_signing_config_aws signing_config_override = m_s3CrtSigningConfig;
@@ -852,13 +868,13 @@ CopyObjectOutcome S3CrtClient::CopyObject(const CopyObjectRequest& request) cons
852868
[&]()-> CopyObjectOutcome {
853869
Aws::Utils::Threading::Semaphore sem(0, 1);
854870
CopyObjectOutcome res;
855-
871+
auto handlerContext = Aws::MakeShared<AsyncCallerContext>(ALLOCATION_TAG);
856872
auto handler = CopyObjectResponseReceivedHandler{[&](const S3CrtClient*, const CopyObjectRequest&, const CopyObjectOutcome& outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext> &) {
857873
res = std::move(outcome);
858874
sem.ReleaseAll();
859875
}};
860876

861-
S3CrtClient::CopyObjectAsync(request, handler, nullptr);
877+
S3CrtClient::CopyObjectAsync(request, handler, handlerContext);
862878
sem.WaitOne();
863879
return res;
864880
},
@@ -869,9 +885,24 @@ CopyObjectOutcome S3CrtClient::CopyObject(const CopyObjectRequest& request) cons
869885

870886
static void GetObjectRequestShutdownCallback(void *user_data)
871887
{
888+
if(!user_data)
889+
{
890+
AWS_LOGSTREAM_ERROR("GetObject", "user data passed is NULL ");
891+
return;
892+
}
872893
auto *userData = static_cast<S3CrtClient::CrtRequestCallbackUserData*>(user_data);
894+
873895
// call user callback and release user_data
874896
S3Crt::Model::GetObjectOutcome outcome(userData->s3CrtClient->GenerateStreamOutcome(userData->response));
897+
//log into monitor
898+
if(!outcome.IsSuccess())
899+
{
900+
userData->asyncCallerContext->GetMonitorContext().OnRequestFailed(userData->request,userData->response);
901+
}
902+
else
903+
{
904+
userData->asyncCallerContext->GetMonitorContext().OnRequestSucceeded(userData->request, userData->response);
905+
}
875906
userData->getResponseHandler(userData->s3CrtClient, *(reinterpret_cast<const GetObjectRequest*>(userData->originalRequest)), std::move(outcome), userData->asyncCallerContext);
876907

877908
Aws::Delete(userData);
@@ -933,6 +964,7 @@ void S3CrtClient::GetObjectAsync(const GetObjectRequest& request, const GetObjec
933964
{
934965
return handler(this, request, GetObjectOutcome(Aws::Client::AWSError<S3CrtErrors>(S3CrtErrors::INVALID_PARAMETER_VALUE, "INVALID_PARAMETER_VALUE", "Output stream in bad state", false)), handlerContext);
935966
}
967+
handlerContext->GetMonitorContext().StartMonitorContext(Aws::String{"S3CrtClient"},request.GetServiceRequestName(), userData->request);
936968
options.shutdown_callback = GetObjectRequestShutdownCallback;
937969
options.type = AWS_S3_META_REQUEST_TYPE_GET_OBJECT;
938970
struct aws_signing_config_aws signing_config_override = m_s3CrtSigningConfig;
@@ -972,13 +1004,13 @@ GetObjectOutcome S3CrtClient::GetObject(const GetObjectRequest& request) const
9721004
[&]()-> GetObjectOutcome {
9731005
Aws::Utils::Threading::Semaphore sem(0, 1);
9741006
GetObjectOutcome res;
975-
1007+
auto handlerContext = Aws::MakeShared<AsyncCallerContext>(ALLOCATION_TAG);
9761008
auto handler = GetObjectResponseReceivedHandler{[&](const S3CrtClient*, const GetObjectRequest&, GetObjectOutcome outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext> &) {
9771009
res = std::move(outcome);
9781010
sem.ReleaseAll();
9791011
}};
9801012

981-
S3CrtClient::GetObjectAsync(request, handler, nullptr);
1013+
S3CrtClient::GetObjectAsync(request, handler, handlerContext);
9821014
sem.WaitOne();
9831015
return res;
9841016
},
@@ -989,9 +1021,24 @@ GetObjectOutcome S3CrtClient::GetObject(const GetObjectRequest& request) const
9891021

9901022
static void PutObjectRequestShutdownCallback(void *user_data)
9911023
{
1024+
if(!user_data)
1025+
{
1026+
AWS_LOGSTREAM_ERROR("PutObject", "user data passed is NULL ");
1027+
return;
1028+
}
9921029
auto *userData = static_cast<S3CrtClient::CrtRequestCallbackUserData*>(user_data);
1030+
9931031
// call user callback and release user_data
9941032
S3Crt::Model::PutObjectOutcome outcome(userData->s3CrtClient->GenerateXmlOutcome(userData->response));
1033+
//log into monitor
1034+
if(!outcome.IsSuccess())
1035+
{
1036+
userData->asyncCallerContext->GetMonitorContext().OnRequestFailed(userData->request,userData->response);
1037+
}
1038+
else
1039+
{
1040+
userData->asyncCallerContext->GetMonitorContext().OnRequestSucceeded(userData->request, userData->response);
1041+
}
9951042
userData->putResponseHandler(userData->s3CrtClient, *(reinterpret_cast<const PutObjectRequest*>(userData->originalRequest)), std::move(outcome), userData->asyncCallerContext);
9961043

9971044
Aws::Delete(userData);
@@ -1057,6 +1104,7 @@ void S3CrtClient::PutObjectAsync(const PutObjectRequest& request, const PutObjec
10571104
{
10581105
return handler(this, request, PutObjectOutcome(Aws::Client::AWSError<S3CrtErrors>(S3CrtErrors::INVALID_PARAMETER_VALUE, "INVALID_PARAMETER_VALUE", "Input stream in bad state", false)), handlerContext);
10591106
}
1107+
handlerContext->GetMonitorContext().StartMonitorContext(Aws::String{"S3CrtClient"},request.GetServiceRequestName(), userData->request);
10601108
options.shutdown_callback = PutObjectRequestShutdownCallback;
10611109
options.type = AWS_S3_META_REQUEST_TYPE_PUT_OBJECT;
10621110
struct aws_signing_config_aws signing_config_override = m_s3CrtSigningConfig;
@@ -1127,13 +1175,13 @@ PutObjectOutcome S3CrtClient::PutObject(const PutObjectRequest& request) const
11271175
[&]()-> PutObjectOutcome {
11281176
Aws::Utils::Threading::Semaphore sem(0, 1);
11291177
PutObjectOutcome res;
1130-
1178+
auto handlerContext = Aws::MakeShared<AsyncCallerContext>(ALLOCATION_TAG);
11311179
auto handler = PutObjectResponseReceivedHandler{[&](const S3CrtClient*, const PutObjectRequest&, const PutObjectOutcome& outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext> &) {
11321180
res = std::move(outcome);
11331181
sem.ReleaseAll();
11341182
}};
11351183

1136-
S3CrtClient::PutObjectAsync(request, handler, nullptr);
1184+
S3CrtClient::PutObjectAsync(request, handler, handlerContext);
11371185
sem.WaitOne();
11381186
return res;
11391187
},

src/aws-cpp-sdk-core/include/aws/core/client/AsyncCallerContext.h

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,106 @@
77

88
#include <aws/core/utils/memory/stl/AWSString.h>
99

10+
#include <aws/crt/http/HttpRequestResponse.h>
11+
#include <aws/core/monitoring/MonitoringManager.h>
12+
1013
namespace Aws
1114
{
1215
namespace Client
1316
{
17+
18+
class AWS_CORE_API MonitorContext{
19+
20+
private:
21+
mutable Aws::String clientName;
22+
mutable Aws::String requestName;
23+
mutable Aws::Vector<void*> contexts;
24+
25+
public:
26+
~MonitorContext() = default;
27+
MonitorContext() = default;
28+
MonitorContext(const MonitorContext&) = delete;
29+
MonitorContext( MonitorContext &&) = delete;
30+
MonitorContext& operator=(const MonitorContext&) = delete;
31+
MonitorContext& operator=(MonitorContext&&) = delete;
32+
33+
void StartMonitorContext(const Aws::String& client, const Aws::String& request, std::shared_ptr<Aws::Http::HttpRequest>& httpRequest) const
34+
{
35+
clientName = client;
36+
requestName = request;
37+
contexts = Aws::Monitoring::OnRequestStarted(clientName, requestName, httpRequest);
38+
}
39+
40+
inline void OnRequestFailed(std::shared_ptr<Aws::Http::HttpRequest>& httpRequest, const Aws::Client::HttpResponseOutcome& outcome) const
41+
{
42+
if(!httpRequest)
43+
{
44+
return;
45+
}
46+
Aws::Monitoring::CoreMetricsCollection coreMetrics;
47+
coreMetrics.httpClientMetrics = httpRequest->GetRequestMetrics();
48+
49+
Aws::Monitoring::OnRequestFailed(
50+
clientName,
51+
requestName,
52+
httpRequest ,
53+
outcome,
54+
coreMetrics,
55+
contexts);
56+
57+
}
58+
59+
inline void OnRequestSucceeded(std::shared_ptr<Aws::Http::HttpRequest> httpRequest, const Aws::Client::HttpResponseOutcome& outcome) const
60+
{
61+
62+
if(!httpRequest)
63+
{
64+
return;
65+
}
66+
67+
Aws::Monitoring::CoreMetricsCollection coreMetrics;
68+
coreMetrics.httpClientMetrics = httpRequest->GetRequestMetrics();
69+
70+
Aws::Monitoring::OnRequestSucceeded(
71+
clientName,
72+
requestName,
73+
httpRequest ,
74+
outcome,
75+
coreMetrics,
76+
contexts);
77+
}
78+
79+
inline void OnRetry(std::shared_ptr<Aws::Http::HttpRequest> httpRequest) const
80+
{
81+
if(!httpRequest)
82+
{
83+
return;
84+
}
85+
Aws::Monitoring::CoreMetricsCollection coreMetrics;
86+
coreMetrics.httpClientMetrics = httpRequest->GetRequestMetrics();
87+
88+
Aws::Monitoring::OnRequestRetry(
89+
clientName,
90+
requestName,
91+
httpRequest,
92+
contexts);
93+
}
94+
95+
inline void OnFinish(std::shared_ptr<Aws::Http::HttpRequest> httpRequest) const
96+
{
97+
if(!httpRequest)
98+
{
99+
return;
100+
}
101+
Aws::Monitoring::OnFinish(
102+
clientName,
103+
requestName,
104+
httpRequest ,
105+
contexts);
106+
}
107+
108+
};
109+
14110
/**
15111
* Call-back context for all async client methods. This allows you to pass a context to your callbacks so that you can identify your requests.
16112
* It is entirely intended that you override this class in-lieu of using a void* for the user context. The base class just gives you the ability to
@@ -19,6 +115,7 @@ namespace Aws
19115
class AWS_CORE_API AsyncCallerContext
20116
{
21117
public:
118+
22119
/**
23120
* Initializes object with generated UUID
24121
*/
@@ -27,7 +124,7 @@ namespace Aws
27124
/**
28125
* Initializes object with UUID
29126
*/
30-
AsyncCallerContext(const Aws::String& uuid) : m_uuid(uuid) {}
127+
AsyncCallerContext(const Aws::String& uuid) : m_uuid(uuid){}
31128

32129
/**
33130
* Initializes object with UUID
@@ -51,8 +148,13 @@ namespace Aws
51148
*/
52149
inline void SetUUID(const char* value) { m_uuid.assign(value); }
53150

151+
inline const MonitorContext& GetMonitorContext() const{
152+
return monitorContext;
153+
}
154+
54155
private:
55156
Aws::String m_uuid;
157+
mutable MonitorContext monitorContext;
56158
};
57159
}
58160
}

src/aws-cpp-sdk-core/include/aws/core/monitoring/MonitoringManager.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,11 @@ namespace Aws
6161
* testing whether the global Monitoring instance has been destructed.
6262
*/
6363
AWS_CORE_API void CleanupMonitoring();
64+
65+
/**
66+
* Add monitoring using supplied factories
67+
*/
68+
AWS_CORE_API void AddMonitoring(const std::vector<MonitoringFactoryCreateFunction>& monitoringFactoryCreateFunctions);
69+
6470
}
6571
}

src/aws-cpp-sdk-core/source/monitoring/MonitoringManager.cpp

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -97,26 +97,37 @@ namespace Aws
9797
}
9898
}
9999

100+
void AddMonitoring(const std::vector<MonitoringFactoryCreateFunction>& monitoringFactoryCreateFunctions)
101+
{
102+
//allocate monitors only if there are valid factory functions
103+
if(s_monitors && !monitoringFactoryCreateFunctions.empty())
104+
{
105+
for (const auto& function: monitoringFactoryCreateFunctions)
106+
{
107+
auto factory = function();
108+
if (factory)
109+
{
110+
auto instance = factory->CreateMonitoringInstance();
111+
if (instance)
112+
{
113+
s_monitors->emplace_back(std::move(instance));
114+
}
115+
}
116+
}
117+
}
118+
}
119+
120+
100121
void InitMonitoring(const std::vector<MonitoringFactoryCreateFunction>& monitoringFactoryCreateFunctions)
101122
{
102123
if (s_monitors)
103124
{
104125
return;
105126
}
106127
assert(Aws::get_aws_allocator() != nullptr);
128+
107129
s_monitors = Aws::New<Monitors>(MonitoringTag);
108-
for (const auto& function: monitoringFactoryCreateFunctions)
109-
{
110-
auto factory = function();
111-
if (factory)
112-
{
113-
auto instance = factory->CreateMonitoringInstance();
114-
if (instance)
115-
{
116-
s_monitors->emplace_back(std::move(instance));
117-
}
118-
}
119-
}
130+
AddMonitoring(monitoringFactoryCreateFunctions);
120131

121132
auto defaultMonitoringFactory = Aws::MakeShared<DefaultMonitoringFactory>(MonitoringTag);
122133
auto instance = defaultMonitoringFactory->CreateMonitoringInstance();
@@ -128,8 +139,11 @@ namespace Aws
128139

129140
void CleanupMonitoring()
130141
{
131-
Aws::Delete(s_monitors);
132-
s_monitors = nullptr;
142+
if(s_monitors)
143+
{
144+
Aws::Delete(s_monitors);
145+
s_monitors = nullptr;
146+
}
133147
}
134148
} // namespace Monitoring
135149

0 commit comments

Comments
 (0)