25
25
import io .envoyproxy .envoy .service .rate_limit_quota .v3 .RateLimitQuotaUsageReports ;
26
26
import io .envoyproxy .envoy .service .rate_limit_quota .v3 .RateLimitQuotaUsageReports .BucketQuotaUsage ;
27
27
import io .grpc .Grpc ;
28
+ import io .grpc .InternalLogId ;
28
29
import io .grpc .ManagedChannel ;
30
+ import io .grpc .internal .GrpcUtil ;
29
31
import io .grpc .stub .ClientCallStreamObserver ;
30
32
import io .grpc .stub .StreamObserver ;
31
33
import io .grpc .xds .client .Bootstrapper .RemoteServerInfo ;
34
+ import io .grpc .xds .client .XdsLogger ;
35
+ import io .grpc .xds .client .XdsLogger .XdsLogLevel ;
32
36
import io .grpc .xds .internal .rlqs .RlqsBucket .RlqsBucketUsage ;
33
37
import java .util .List ;
34
- import java .util .concurrent .TimeUnit ;
35
38
import java .util .concurrent .atomic .AtomicBoolean ;
36
39
import java .util .function .Consumer ;
37
- import java .util .logging .Level ;
38
- import java .util .logging .Logger ;
40
+ import javax .annotation .Nullable ;
39
41
40
42
public final class RlqsClient {
41
- private static final Logger logger = Logger .getLogger (RlqsClient .class .getName ());
43
+ // TODO(sergiitk): [IMPL] remove
44
+ // Do do not fail on parsing errors, only log requests.
45
+ static final boolean dryRun = GrpcUtil .getFlag ("GRPC_EXPERIMENTAL_RLQS_DRY_RUN" , false );
46
+
47
+ private final XdsLogger logger ;
42
48
43
49
private final RemoteServerInfo serverInfo ;
44
50
private final Consumer <List <RlqsUpdateBucketAction >> bucketsUpdateCallback ;
45
51
private final RlqsStream rlqsStream ;
46
52
47
53
RlqsClient (
48
54
RemoteServerInfo serverInfo , String domain ,
49
- Consumer <List <RlqsUpdateBucketAction >> bucketsUpdateCallback ) {
55
+ Consumer <List <RlqsUpdateBucketAction >> bucketsUpdateCallback , String prettyHash ) {
50
56
// TODO(sergiitk): [post] check not null.
51
57
this .serverInfo = serverInfo ;
52
58
this .bucketsUpdateCallback = bucketsUpdateCallback ;
59
+
60
+ logger = XdsLogger .withLogId (
61
+ InternalLogId .allocate (this .getClass (), "<" + prettyHash + "> " + serverInfo .target ()));
62
+
53
63
this .rlqsStream = new RlqsStream (serverInfo , domain );
54
64
}
55
65
@@ -62,7 +72,7 @@ public void sendUsageReports(List<RlqsBucketUsage> bucketUsages) {
62
72
}
63
73
64
74
public void shutdown () {
65
- logger .log (Level . FINER , "Shutting down RlqsClient to {0}" , serverInfo .target ());
75
+ logger .log (XdsLogLevel . DEBUG , "Shutting down RlqsClient to {0}" , serverInfo .target ());
66
76
// TODO(sergiitk): [IMPL] RlqsClient shutdown
67
77
}
68
78
@@ -72,18 +82,26 @@ public void handleStreamClosed() {
72
82
73
83
private class RlqsStream {
74
84
private final AtomicBoolean isFirstReport = new AtomicBoolean (true );
75
- private final ManagedChannel channel ;
76
85
private final String domain ;
86
+ @ Nullable
77
87
private final ClientCallStreamObserver <RateLimitQuotaUsageReports > clientCallStream ;
78
88
79
89
RlqsStream (RemoteServerInfo serverInfo , String domain ) {
80
90
this .domain = domain ;
81
- channel = Grpc .newChannelBuilder (serverInfo .target (), serverInfo .channelCredentials ())
82
- .keepAliveTime (10 , TimeUnit .SECONDS )
83
- .keepAliveWithoutCalls (true )
84
- .build ();
85
- // keepalive?
91
+
92
+ if (dryRun ) {
93
+ clientCallStream = null ;
94
+ logger .log (XdsLogLevel .DEBUG , "Dry run, not connecting to " + serverInfo .target ());
95
+ return ;
96
+ }
97
+
86
98
// TODO(sergiitk): [IMPL] Manage State changes?
99
+ ManagedChannel channel =
100
+ Grpc .newChannelBuilder (serverInfo .target (), serverInfo .channelCredentials ()).build ();
101
+ // keepalive?
102
+ // .keepAliveTime(10, TimeUnit.SECONDS)
103
+ // .keepAliveWithoutCalls(true)
104
+
87
105
RateLimitQuotaServiceStub stub = RateLimitQuotaServiceGrpc .newStub (channel );
88
106
clientCallStream = (ClientCallStreamObserver <RateLimitQuotaUsageReports >)
89
107
stub .streamRateLimitQuotas (new RlqsStreamObserver ());
@@ -107,6 +125,10 @@ void reportUsage(List<RlqsBucket.RlqsBucketUsage> usageReports) {
107
125
for (RlqsBucket .RlqsBucketUsage bucketUsage : usageReports ) {
108
126
report .addBucketQuotaUsages (toUsageReport (bucketUsage ));
109
127
}
128
+ if (clientCallStream == null ) {
129
+ logger .log (XdsLogLevel .DEBUG , "Dry run, skipping bucket usage report: " + report .build ());
130
+ return ;
131
+ }
110
132
clientCallStream .onNext (report .build ());
111
133
}
112
134
@@ -128,12 +150,12 @@ public void onNext(RateLimitQuotaResponse response) {
128
150
129
151
@ Override
130
152
public void onError (Throwable t ) {
131
-
153
+ logger . log ( XdsLogLevel . DEBUG , "Got error in RlqsStreamObserver: " + t . toString ());
132
154
}
133
155
134
156
@ Override
135
157
public void onCompleted () {
136
-
158
+ logger . log ( XdsLogLevel . DEBUG , "RlqsStreamObserver completed" );
137
159
}
138
160
}
139
161
}
0 commit comments