26
26
import org .bson .BsonDocument ;
27
27
import org .bson .BsonTimestamp ;
28
28
29
+ import java .util .concurrent .atomic .AtomicBoolean ;
30
+
29
31
import static com .mongodb .assertions .Assertions .assertTrue ;
30
32
import static com .mongodb .assertions .Assertions .isTrue ;
31
33
@@ -39,20 +41,19 @@ public class BaseClientSessionImpl implements ClientSession {
39
41
private ServerSession serverSession ;
40
42
private final Object originator ;
41
43
private final ClientSessionOptions options ;
44
+ private final AtomicBoolean closed = new AtomicBoolean (false );
42
45
private BsonDocument clusterTime ;
43
46
private BsonTimestamp operationTime ;
44
47
private BsonTimestamp snapshotTimestamp ;
45
48
private ServerAddress pinnedServerAddress ;
46
49
private BsonDocument recoveryToken ;
47
50
private ReferenceCounted transactionContext ;
48
- private volatile boolean closed ;
49
51
50
52
public BaseClientSessionImpl (final ServerSessionPool serverSessionPool , final Object originator , final ClientSessionOptions options ) {
51
53
this .serverSessionPool = serverSessionPool ;
52
54
this .originator = originator ;
53
55
this .options = options ;
54
56
this .pinnedServerAddress = null ;
55
- closed = false ;
56
57
}
57
58
58
59
@ Override
@@ -121,7 +122,7 @@ public BsonTimestamp getOperationTime() {
121
122
122
123
@ Override
123
124
public ServerSession getServerSession () {
124
- isTrue ("open" , !closed );
125
+ isTrue ("open" , !closed . get () );
125
126
if (serverSession == null ) {
126
127
serverSession = serverSessionPool .get ();
127
128
}
@@ -130,19 +131,19 @@ public ServerSession getServerSession() {
130
131
131
132
@ Override
132
133
public void advanceOperationTime (@ Nullable final BsonTimestamp newOperationTime ) {
133
- isTrue ("open" , !closed );
134
+ isTrue ("open" , !closed . get () );
134
135
this .operationTime = greaterOf (newOperationTime );
135
136
}
136
137
137
138
@ Override
138
139
public void advanceClusterTime (@ Nullable final BsonDocument newClusterTime ) {
139
- isTrue ("open" , !closed );
140
+ isTrue ("open" , !closed . get () );
140
141
this .clusterTime = greaterOf (newClusterTime );
141
142
}
142
143
143
144
@ Override
144
145
public void setSnapshotTimestamp (@ Nullable final BsonTimestamp snapshotTimestamp ) {
145
- isTrue ("open" , !closed );
146
+ isTrue ("open" , !closed . get () );
146
147
if (snapshotTimestamp != null ) {
147
148
if (this .snapshotTimestamp != null && !snapshotTimestamp .equals (this .snapshotTimestamp )) {
148
149
throw new MongoClientException ("Snapshot timestamps should not change during the lifetime of the session. Current "
@@ -155,7 +156,7 @@ public void setSnapshotTimestamp(@Nullable final BsonTimestamp snapshotTimestamp
155
156
@ Override
156
157
@ Nullable
157
158
public BsonTimestamp getSnapshotTimestamp () {
158
- isTrue ("open" , !closed );
159
+ isTrue ("open" , !closed . get () );
159
160
return snapshotTimestamp ;
160
161
}
161
162
@@ -182,8 +183,10 @@ private BsonTimestamp greaterOf(@Nullable final BsonTimestamp newOperationTime)
182
183
183
184
@ Override
184
185
public void close () {
185
- if (!closed ) {
186
- closed = true ;
186
+ // While the interface implemented by this class is documented as not thread safe, it's still useful to provide thread safety here
187
+ // in order to prevent the code within the conditional from executing more than once. Doing so protects the server session pool from
188
+ // corruption, by preventing the same server session from being released to the pool more than once.
189
+ if (closed .compareAndSet (false , true )) {
187
190
if (serverSession != null ) {
188
191
serverSessionPool .release (serverSession );
189
192
}
0 commit comments