5
5
6
6
package org .whispersystems .textsecuregcm .storage ;
7
7
8
- import java . time . Instant ;
8
+ import io . micrometer . core . instrument . Metrics ;
9
9
import java .util .List ;
10
10
import java .util .Optional ;
11
11
import java .util .UUID ;
12
12
import java .util .concurrent .CompletableFuture ;
13
13
import org .whispersystems .textsecuregcm .entities .ECPreKey ;
14
14
import org .whispersystems .textsecuregcm .entities .ECSignedPreKey ;
15
15
import org .whispersystems .textsecuregcm .entities .KEMSignedPreKey ;
16
+ import org .whispersystems .textsecuregcm .experiment .ExperimentEnrollmentManager ;
17
+ import org .whispersystems .textsecuregcm .metrics .MetricsUtil ;
16
18
import reactor .core .publisher .Flux ;
17
19
import software .amazon .awssdk .services .dynamodb .model .TransactWriteItem ;
18
20
@@ -23,18 +25,25 @@ public class KeysManager {
23
25
private final PagedSingleUseKEMPreKeyStore pagedPqPreKeys ;
24
26
private final RepeatedUseECSignedPreKeyStore ecSignedPreKeys ;
25
27
private final RepeatedUseKEMSignedPreKeyStore pqLastResortKeys ;
28
+ private final ExperimentEnrollmentManager experimentEnrollmentManager ;
29
+
30
+ public static String PAGED_KEYS_EXPERIMENT_NAME = "pagedPreKeys" ;
31
+
32
+ private static final String TAKE_PQ_NAME = MetricsUtil .name (KeysManager .class , "takePq" );
26
33
27
34
public KeysManager (
28
35
final SingleUseECPreKeyStore ecPreKeys ,
29
36
final SingleUseKEMPreKeyStore pqPreKeys ,
30
37
final PagedSingleUseKEMPreKeyStore pagedPqPreKeys ,
31
38
final RepeatedUseECSignedPreKeyStore ecSignedPreKeys ,
32
- final RepeatedUseKEMSignedPreKeyStore pqLastResortKeys ) {
39
+ final RepeatedUseKEMSignedPreKeyStore pqLastResortKeys ,
40
+ final ExperimentEnrollmentManager experimentEnrollmentManager ) {
33
41
this .ecPreKeys = ecPreKeys ;
34
42
this .pqPreKeys = pqPreKeys ;
35
43
this .pagedPqPreKeys = pagedPqPreKeys ;
36
44
this .ecSignedPreKeys = ecSignedPreKeys ;
37
45
this .pqLastResortKeys = pqLastResortKeys ;
46
+ this .experimentEnrollmentManager = experimentEnrollmentManager ;
38
47
}
39
48
40
49
public TransactWriteItem buildWriteItemForEcSignedPreKey (final UUID identifier ,
@@ -79,33 +88,68 @@ public List<TransactWriteItem> buildWriteItemsForRemovedDevice(final UUID accoun
79
88
);
80
89
}
81
90
82
- public CompletableFuture <Void > storeEcSignedPreKeys (final UUID identifier , final byte deviceId , final ECSignedPreKey ecSignedPreKey ) {
91
+ public CompletableFuture <Void > storeEcSignedPreKeys (final UUID identifier , final byte deviceId ,
92
+ final ECSignedPreKey ecSignedPreKey ) {
83
93
return ecSignedPreKeys .store (identifier , deviceId , ecSignedPreKey );
84
94
}
85
95
86
- public CompletableFuture <Void > storePqLastResort (final UUID identifier , final byte deviceId , final KEMSignedPreKey lastResortKey ) {
96
+ public CompletableFuture <Void > storePqLastResort (final UUID identifier , final byte deviceId ,
97
+ final KEMSignedPreKey lastResortKey ) {
87
98
return pqLastResortKeys .store (identifier , deviceId , lastResortKey );
88
99
}
89
100
90
101
public CompletableFuture <Void > storeEcOneTimePreKeys (final UUID identifier , final byte deviceId ,
91
- final List <ECPreKey > preKeys ) {
102
+ final List <ECPreKey > preKeys ) {
92
103
return ecPreKeys .store (identifier , deviceId , preKeys );
93
104
}
94
105
95
106
public CompletableFuture <Void > storeKemOneTimePreKeys (final UUID identifier , final byte deviceId ,
96
- final List <KEMSignedPreKey > preKeys ) {
97
- return pqPreKeys .store (identifier , deviceId , preKeys );
107
+ final List <KEMSignedPreKey > preKeys ) {
108
+ final boolean enrolledInPagedKeys = experimentEnrollmentManager .isEnrolled (identifier , PAGED_KEYS_EXPERIMENT_NAME );
109
+ final CompletableFuture <Void > deleteOtherKeys = enrolledInPagedKeys
110
+ ? pqPreKeys .delete (identifier , deviceId )
111
+ : pagedPqPreKeys .delete (identifier , deviceId );
112
+ return deleteOtherKeys .thenCompose (ignored -> enrolledInPagedKeys
113
+ ? pagedPqPreKeys .store (identifier , deviceId , preKeys )
114
+ : pqPreKeys .store (identifier , deviceId , preKeys ));
115
+
98
116
}
99
117
100
118
public CompletableFuture <Optional <ECPreKey >> takeEC (final UUID identifier , final byte deviceId ) {
101
119
return ecPreKeys .take (identifier , deviceId );
102
120
}
103
121
104
122
public CompletableFuture <Optional <KEMSignedPreKey >> takePQ (final UUID identifier , final byte deviceId ) {
105
- return pqPreKeys .take (identifier , deviceId )
123
+ final boolean enrolledInPagedKeys = experimentEnrollmentManager .isEnrolled (identifier , PAGED_KEYS_EXPERIMENT_NAME );
124
+ return tagTakePQ (pagedPqPreKeys .take (identifier , deviceId ), PQSource .PAGE , enrolledInPagedKeys )
125
+ .thenCompose (maybeSingleUsePreKey -> maybeSingleUsePreKey
126
+ .map (ignored -> CompletableFuture .completedFuture (maybeSingleUsePreKey ))
127
+ .orElseGet (() -> tagTakePQ (pqPreKeys .take (identifier , deviceId ), PQSource .ROW , enrolledInPagedKeys )))
106
128
.thenCompose (maybeSingleUsePreKey -> maybeSingleUsePreKey
107
129
.map (singleUsePreKey -> CompletableFuture .completedFuture (maybeSingleUsePreKey ))
108
- .orElseGet (() -> pqLastResortKeys .find (identifier , deviceId )));
130
+ .orElseGet (() -> tagTakePQ (pqLastResortKeys .find (identifier , deviceId ), PQSource .LAST_RESORT , enrolledInPagedKeys )));
131
+ }
132
+
133
+ private enum PQSource {
134
+ PAGE ,
135
+ ROW ,
136
+ LAST_RESORT
137
+ }
138
+ private CompletableFuture <Optional <KEMSignedPreKey >> tagTakePQ (CompletableFuture <Optional <KEMSignedPreKey >> prekey , final PQSource source , final boolean enrolledInPagedKeys ) {
139
+ return prekey .thenApply (maybeSingleUsePreKey -> {
140
+ final Optional <String > maybeSourceTag = maybeSingleUsePreKey
141
+ // If we found a PK, use this source tag
142
+ .map (ignore -> source .name ())
143
+ // If we didn't and this is our last resort, we didn't find a PK
144
+ .or (() -> source == PQSource .LAST_RESORT ? Optional .of ("absent" ) : Optional .empty ());
145
+ maybeSourceTag .ifPresent (sourceTag -> {
146
+ Metrics .counter (TAKE_PQ_NAME ,
147
+ "source" , sourceTag ,
148
+ "enrolled" , Boolean .toString (enrolledInPagedKeys ))
149
+ .increment ();
150
+ });
151
+ return maybeSingleUsePreKey ;
152
+ });
109
153
}
110
154
111
155
public CompletableFuture <Optional <KEMSignedPreKey >> getLastResort (final UUID identifier , final byte deviceId ) {
@@ -121,20 +165,24 @@ public CompletableFuture<Integer> getEcCount(final UUID identifier, final byte d
121
165
}
122
166
123
167
public CompletableFuture <Integer > getPqCount (final UUID identifier , final byte deviceId ) {
124
- return pqPreKeys .getCount (identifier , deviceId );
168
+ return pagedPqPreKeys .getCount (identifier , deviceId ).thenCompose (count -> count == 0
169
+ ? pqPreKeys .getCount (identifier , deviceId )
170
+ : CompletableFuture .completedFuture (count ));
125
171
}
126
172
127
173
public CompletableFuture <Void > deleteSingleUsePreKeys (final UUID identifier ) {
128
174
return CompletableFuture .allOf (
129
175
ecPreKeys .delete (identifier ),
130
- pqPreKeys .delete (identifier )
176
+ pqPreKeys .delete (identifier ),
177
+ pagedPqPreKeys .delete (identifier )
131
178
);
132
179
}
133
180
134
181
public CompletableFuture <Void > deleteSingleUsePreKeys (final UUID accountUuid , final byte deviceId ) {
135
182
return CompletableFuture .allOf (
136
183
ecPreKeys .delete (accountUuid , deviceId ),
137
- pqPreKeys .delete (accountUuid , deviceId )
184
+ pqPreKeys .delete (accountUuid , deviceId ),
185
+ pagedPqPreKeys .delete (accountUuid , deviceId )
138
186
);
139
187
}
140
188
0 commit comments