Skip to content

Commit d2c362a

Browse files
authored
[ISSUE #3763] support custom reject in MemorySafeLinkedBlockingQueue (#3764)
* [ISSUE #3763] support custom reject in MemorySafeLinkedBlockingQueue * fix code style
1 parent 5bff837 commit d2c362a

File tree

7 files changed

+240
-2
lines changed

7 files changed

+240
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shenyu.common.concurrent;
19+
20+
/**
21+
* A handler for rejected element that throws a
22+
* {@code RejectException}.
23+
*/
24+
public class AbortPolicy<E> implements Rejector<E> {
25+
26+
@Override
27+
public void reject(final E e, final MemorySafeLinkedBlockingQueue<E> queue) {
28+
throw new RejectException("no more memory can be used !");
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shenyu.common.concurrent;
19+
20+
/**
21+
* A handler for rejected element that discards the oldest element.
22+
*/
23+
public class DiscardOldestPolicy<E> implements Rejector<E> {
24+
25+
@Override
26+
public void reject(final E e, final MemorySafeLinkedBlockingQueue<E> queue) {
27+
queue.poll();
28+
queue.offer(e);
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shenyu.common.concurrent;
19+
20+
/**
21+
* A handler for rejected element that silently discards the
22+
* rejected element.
23+
*/
24+
public class DiscardPolicy<E> implements Rejector<E> {
25+
26+
@Override
27+
public void reject(final E e, final MemorySafeLinkedBlockingQueue<E> queue) {
28+
29+
}
30+
}

shenyu-common/src/main/java/org/apache/shenyu/common/concurrent/MemorySafeLinkedBlockingQueue.java

+26-2
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,21 @@ public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
3232

3333
private int maxFreeMemory;
3434

35+
private Rejector<E> rejector;
36+
3537
public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {
3638
super(Integer.MAX_VALUE);
3739
this.maxFreeMemory = maxFreeMemory;
40+
//default as DiscardPolicy to ensure compatibility with the old version
41+
this.rejector = new DiscardPolicy<>();
3842
}
3943

4044
public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c,
4145
final int maxFreeMemory) {
4246
super(c);
4347
this.maxFreeMemory = maxFreeMemory;
48+
//default as DiscardPolicy to ensure compatibility with the old version
49+
this.rejector = new DiscardPolicy<>();
4450
}
4551

4652
/**
@@ -61,6 +67,15 @@ public int getMaxFreeMemory() {
6167
return maxFreeMemory;
6268
}
6369

70+
/**
71+
* set the rejector.
72+
*
73+
* @param rejector the rejector
74+
*/
75+
public void setRejector(final Rejector<E> rejector) {
76+
this.rejector = rejector;
77+
}
78+
6479
/**
6580
* determine if there is any remaining free memory.
6681
*
@@ -75,15 +90,24 @@ public void put(final E e) throws InterruptedException {
7590
if (hasRemainedMemory()) {
7691
super.put(e);
7792
}
93+
rejector.reject(e, this);
7894
}
7995

8096
@Override
8197
public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
82-
return hasRemainedMemory() && super.offer(e, timeout, unit);
98+
if (!hasRemainedMemory()) {
99+
rejector.reject(e, this);
100+
return false;
101+
}
102+
return super.offer(e, timeout, unit);
83103
}
84104

85105
@Override
86106
public boolean offer(final E e) {
87-
return hasRemainedMemory() && super.offer(e);
107+
if (!hasRemainedMemory()) {
108+
rejector.reject(e, this);
109+
return false;
110+
}
111+
return super.offer(e);
88112
}
89113
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shenyu.common.concurrent;
19+
20+
/**
21+
* Exception thrown by an {@link MemorySafeLinkedBlockingQueue}
22+
* when a element cannot be accepted.
23+
*/
24+
public class RejectException extends RuntimeException {
25+
26+
private static final long serialVersionUID = -3240015871717170195L;
27+
28+
/**
29+
* Constructs a {@code RejectException} with no detail message.
30+
* The cause is not initialized, and may subsequently be
31+
* initialized by a call to {@link #initCause(Throwable) initCause}.
32+
*/
33+
public RejectException() {
34+
}
35+
36+
/**
37+
* Constructs a {@code RejectException} with the
38+
* specified detail message. The cause is not initialized, and may
39+
* subsequently be initialized by a call to {@link
40+
* #initCause(Throwable) initCause}.
41+
*
42+
* @param message the detail message
43+
*/
44+
public RejectException(final String message) {
45+
super(message);
46+
}
47+
48+
/**
49+
* Constructs a {@code RejectException} with the
50+
* specified detail message and cause.
51+
*
52+
* @param message the detail message
53+
* @param cause the cause (which is saved for later retrieval by the
54+
* {@link #getCause()} method)
55+
*/
56+
public RejectException(final String message, final Throwable cause) {
57+
super(message, cause);
58+
}
59+
60+
/**
61+
* Constructs a {@code RejectException} with the
62+
* specified cause. The detail message is set to {@code (cause ==
63+
* null ? null : cause.toString())} (which typically contains
64+
* the class and detail message of {@code cause}).
65+
*
66+
* @param cause the cause (which is saved for later retrieval by the
67+
* {@link #getCause()} method)
68+
*/
69+
public RejectException(final Throwable cause) {
70+
super(cause);
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.shenyu.common.concurrent;
19+
20+
/**
21+
* RejectHandler, it works when you need to custom reject action in
22+
* {@link org.apache.shenyu.common.concurrent.MemorySafeLinkedBlockingQueue}.
23+
*
24+
* @see AbortPolicy
25+
* @see DiscardPolicy
26+
* @see DiscardOldestPolicy
27+
*/
28+
public interface Rejector<E> {
29+
30+
/**
31+
* Method that may be invoked by a {@link MemorySafeLinkedBlockingQueue} when
32+
* {@link MemorySafeLinkedBlockingQueue#hasRemainedMemory} return true.
33+
* This may occur when no more memory are available because their bounds would be exceeded.
34+
*
35+
* <p>In the absence of other alternatives, the method may throw an unchecked
36+
* {@link RejectException}, which will be propagated to the caller.
37+
*
38+
* @param e the element requested to be added
39+
* @param queue the queue attempting to add this element
40+
* @throws RejectException if there is no more memory
41+
*/
42+
void reject(E e, MemorySafeLinkedBlockingQueue<E> queue);
43+
}

shenyu-common/src/test/java/org/apache/shenyu/common/concurrent/MemorySafeLinkedBlockingQueueTest.java

+9
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import static org.hamcrest.MatcherAssert.assertThat;
2323
import static org.hamcrest.Matchers.is;
24+
import static org.junit.jupiter.api.Assertions.assertThrows;
2425

2526
public class MemorySafeLinkedBlockingQueueTest {
2627
@Test
@@ -35,4 +36,12 @@ public void test() throws Exception {
3536
assertThat(queue.offer(() -> {
3637
}), is(true));
3738
}
39+
40+
@Test
41+
public void testCustomReject() throws Exception {
42+
MemorySafeLinkedBlockingQueue<Runnable> queue = new MemorySafeLinkedBlockingQueue<>(Integer.MAX_VALUE);
43+
queue.setRejector(new AbortPolicy<>());
44+
assertThrows(RejectException.class, () -> queue.offer(() -> {
45+
}));
46+
}
3847
}

0 commit comments

Comments
 (0)