Skip to content

Commit 7fe7231

Browse files
authored
REPL-139 Initial implementation for the in-memory queue (#224)
* REPL-139 Implementation for the in-memory queue Added metrics to queue Updated DDF metadata info class to be more generic
1 parent 8c2d6c2 commit 7fe7231

File tree

21 files changed

+3306
-52
lines changed

21 files changed

+3306
-52
lines changed

.gitignore

+2-1
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ pom.xml.next
3333
release.properties
3434
dependency-reduced-pom.xml
3535
buildNumber.properties
36-
.mvn/timing.properties
36+
.mvn/timing.properties
37+
.java-version

pom.xml

-6
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@
106106
<gson.version>2.8.5</gson.version>
107107
<org.geotools.version>19.1</org.geotools.version>
108108
<junit.version>4.12</junit.version>
109-
<codice-test.version>0.9</codice-test.version>
110109
<commons.lang3.version>3.9</commons.lang3.version>
111110
<commons.collection4.version>4.4</commons.collection4.version>
112111
<commons-text.version>1.7</commons-text.version>
@@ -554,11 +553,6 @@
554553
<build>
555554
<pluginManagement>
556555
<plugins>
557-
<plugin>
558-
<groupId>org.apache.maven.plugins</groupId>
559-
<artifactId>maven-resources-plugin</artifactId>
560-
<version>3.1.0</version>
561-
</plugin>
562556
<plugin>
563557
<groupId>org.springframework.boot</groupId>
564558
<artifactId>spring-boot-maven-plugin</artifactId>

replication-api-impl/pom.xml

+7-3
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@
114114
<groupId>net.jodah</groupId>
115115
<artifactId>failsafe</artifactId>
116116
</dependency>
117+
<dependency>
118+
<groupId>io.micrometer</groupId>
119+
<artifactId>micrometer-core</artifactId>
120+
</dependency>
117121

118122
<dependency>
119123
<groupId>com.connexta.replication</groupId>
@@ -208,17 +212,17 @@
208212
<limit implementation="org.codice.jacoco.LenientLimit">
209213
<counter>INSTRUCTION</counter>
210214
<value>COVEREDRATIO</value>
211-
<minimum>0.77</minimum>
215+
<minimum>0.83</minimum>
212216
</limit>
213217
<limit implementation="org.codice.jacoco.LenientLimit">
214218
<counter>BRANCH</counter>
215219
<value>COVEREDRATIO</value>
216-
<minimum>0.75</minimum>
220+
<minimum>0.79</minimum>
217221
</limit>
218222
<limit implementation="org.codice.jacoco.LenientLimit">
219223
<counter>COMPLEXITY</counter>
220224
<value>COVEREDRATIO</value>
221-
<minimum>0.81</minimum>
225+
<minimum>0.82</minimum>
222226
</limit>
223227
</limits>
224228
</rule>

replication-api-impl/src/main/java/com/connexta/replication/api/impl/data/FilterImpl.java

+4-9
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class FilterImpl extends AbstractPersistable<FilterPojo> implements Filte
3838

3939
private boolean isSuspended;
4040

41-
private byte priority = 1;
41+
private byte priority = 0;
4242

4343
/** Creates a default filter. */
4444
public FilterImpl() {
@@ -139,12 +139,7 @@ void setSuspended(boolean suspended) {
139139

140140
@VisibleForTesting
141141
void setPriority(byte priority) {
142-
if (priority < 1 || priority > 10) {
143-
throw new IllegalArgumentException(
144-
"Filter priority must be between 1 and 10 but was " + priority);
145-
} else {
146-
this.priority = priority;
147-
}
142+
this.priority = priority;
148143
}
149144

150145
@Override
@@ -156,7 +151,7 @@ protected FilterPojo writeTo(FilterPojo pojo) {
156151
return pojo.setVersion(FilterPojo.CURRENT_VERSION)
157152
.setDescription(description)
158153
.setSuspended(isSuspended)
159-
.setPriority(priority);
154+
.setPriority((byte) Math.min(Math.max(priority, 0), 9));
160155
}
161156

162157
@Override
@@ -180,6 +175,6 @@ private void readFromCurrentOrFutureVersion(FilterPojo pojo) {
180175
setOrFailIfNullOrEmpty("name", pojo::getName, this::setName);
181176
this.description = pojo.getDescription();
182177
this.isSuspended = pojo.isSuspended();
183-
this.priority = (byte) Math.min(Math.max(pojo.getPriority(), 1), 10);
178+
this.priority = (byte) Math.min(Math.max(pojo.getPriority(), 0), 9);
184179
}
185180
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Copyright (c) Connexta
3+
*
4+
* <p>This is free software: you can redistribute it and/or modify it under the terms of the GNU
5+
* Lesser General Public License as published by the Free Software Foundation, either version 3 of
6+
* the License, or any later version.
7+
*
8+
* <p>This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
9+
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10+
* GNU Lesser General Public License for more details. A copy of the GNU Lesser General Public
11+
* License is distributed along with this program and can be found at
12+
* <http://www.gnu.org/licenses/lgpl.html>.
13+
*/
14+
package com.connexta.replication.api.impl.queue;
15+
16+
import com.connexta.replication.api.data.MetadataInfo;
17+
import com.connexta.replication.api.data.OperationType;
18+
import com.connexta.replication.api.data.ResourceInfo;
19+
import com.connexta.replication.api.data.Task;
20+
import com.connexta.replication.api.data.TaskInfo;
21+
import java.time.Instant;
22+
import java.util.Optional;
23+
import java.util.stream.Stream;
24+
25+
/**
26+
* Provides an abstract implementation of a task which delegates all information-related methods to
27+
* an associated {@link TaskInfo} object.
28+
*/
29+
public abstract class AbstractTask implements Task {
30+
protected final TaskInfo info;
31+
32+
/**
33+
* Creates an abstract task which wrapps around the specified task information.
34+
*
35+
* @param info the task info to wrap around
36+
*/
37+
public AbstractTask(TaskInfo info) {
38+
this.info = info;
39+
}
40+
41+
@Override
42+
public byte getPriority() {
43+
return info.getPriority();
44+
}
45+
46+
@Override
47+
public String getId() {
48+
return info.getId();
49+
}
50+
51+
@Override
52+
public OperationType getOperation() {
53+
return info.getOperation();
54+
}
55+
56+
@Override
57+
public Instant getLastModified() {
58+
return info.getLastModified();
59+
}
60+
61+
@Override
62+
public Optional<ResourceInfo> getResource() {
63+
return info.getResource();
64+
}
65+
66+
@Override
67+
public Stream<MetadataInfo> metadatas() {
68+
return info.metadatas();
69+
}
70+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/**
2+
* Copyright (c) Connexta
3+
*
4+
* <p>This is free software: you can redistribute it and/or modify it under the terms of the GNU
5+
* Lesser General Public License as published by the Free Software Foundation, either version 3 of
6+
* the License, or any later version.
7+
*
8+
* <p>This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
9+
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10+
* GNU Lesser General Public License for more details. A copy of the GNU Lesser General Public
11+
* License is distributed along with this program and can be found at
12+
* <http://www.gnu.org/licenses/lgpl.html>.
13+
*/
14+
package com.connexta.replication.api.impl.queue.memory;
15+
16+
import com.connexta.replication.api.data.Task;
17+
import com.connexta.replication.api.queue.QueueBroker;
18+
import com.connexta.replication.api.queue.QueueException;
19+
import java.util.Optional;
20+
import java.util.Set;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.stream.Collectors;
23+
import java.util.stream.Stream;
24+
import javax.annotation.Nullable;
25+
26+
/**
27+
* A composite queue is an artifact created around multiple site queues allowing a worker to
28+
* retrieve tasks from any of the queues based on task priorities.
29+
*/
30+
public class MemoryCompositeQueue implements MemoryQueue {
31+
private final MemoryQueueBroker broker;
32+
private final Set<String> sites;
33+
34+
public MemoryCompositeQueue(MemoryQueueBroker broker, Stream<String> sites) {
35+
this.broker = broker;
36+
this.sites = sites.collect(Collectors.toSet());
37+
}
38+
39+
@Override
40+
public QueueBroker getBroker() {
41+
return broker;
42+
}
43+
44+
/**
45+
* Gets all site queues that are compounded together.
46+
*
47+
* @return a stream of all site queues compounded together (never empty)
48+
*/
49+
public Stream<MemorySiteQueue> queues() {
50+
return sites.stream().map(broker::getQueueIfDefined).flatMap(Optional::stream);
51+
}
52+
53+
/**
54+
* Gets all sites that have their queues compounded together.
55+
*
56+
* @return a stream of all site ids that have their queues compounded together (never empty)
57+
*/
58+
public Stream<String> sites() {
59+
return sites.stream();
60+
}
61+
62+
/**
63+
* Gets a compounded queue for a given site.
64+
*
65+
* @param site the site for which to get a queue that was compounded
66+
* @return the site queue to use for the specified site or empty if no queue was compounded for
67+
* the specified site
68+
*/
69+
public Optional<MemorySiteQueue> getQueue(String site) {
70+
return sites.contains(site) ? broker.getQueueIfDefined(site) : Optional.empty();
71+
}
72+
73+
@Override
74+
public int size() {
75+
return queues().mapToInt(MemorySiteQueue::size).sum();
76+
}
77+
78+
@Override
79+
public int pendingSize() {
80+
return queues().mapToInt(MemorySiteQueue::pendingSize).sum();
81+
}
82+
83+
@Override
84+
public int activeSize() {
85+
return queues().mapToInt(MemorySiteQueue::activeSize).sum();
86+
}
87+
88+
@Override
89+
public int remainingCapacity() {
90+
return queues().mapToInt(MemorySiteQueue::remainingCapacity).sum();
91+
}
92+
93+
@Override
94+
public Task take() throws InterruptedException {
95+
throw new QueueException("not yet supported");
96+
}
97+
98+
@Nullable
99+
@Override
100+
public Task poll(long timeout, TimeUnit unit) throws InterruptedException {
101+
throw new QueueException("not yet supported");
102+
}
103+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* Copyright (c) Connexta
3+
*
4+
* <p>This is free software: you can redistribute it and/or modify it under the terms of the GNU
5+
* Lesser General Public License as published by the Free Software Foundation, either version 3 of
6+
* the License, or any later version.
7+
*
8+
* <p>This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
9+
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10+
* GNU Lesser General Public License for more details. A copy of the GNU Lesser General Public
11+
* License is distributed along with this program and can be found at
12+
* <http://www.gnu.org/licenses/lgpl.html>.
13+
*/
14+
package com.connexta.replication.api.impl.queue.memory;
15+
16+
import com.connexta.replication.api.queue.Queue;
17+
18+
/** Base interface for all in-memory implementations of the {@link Queue} interface. */
19+
public interface MemoryQueue extends Queue {
20+
/**
21+
* Gets the number of tasks in this queue.
22+
*
23+
* @return the number of tasks in this queue
24+
*/
25+
public int size();
26+
27+
/**
28+
* Gets the number of tasks that are pending in this queue.
29+
*
30+
* @return the number of pending tasks in this queue
31+
*/
32+
public abstract int pendingSize();
33+
34+
/**
35+
* Gets the number of tasks that were queued in this queue and that are currently active.
36+
*
37+
* @return the number of active tasks in this queue
38+
*/
39+
public abstract int activeSize();
40+
41+
/**
42+
* Gets the number of additional tasks that this queue can ideally (in the absence of memory or
43+
* resource constraints) accept without blocking. This is always equal to the initial capacity of
44+
* this queue minus the current {@link #size()} of this queue.
45+
*
46+
* <p><i>Note:</i> You <em>cannot</em> always tell if an attempt to insert an element will succeed
47+
* by inspecting {@link #remainingCapacity} because it may be the case that another thread is
48+
* about to insert or remove an element.
49+
*
50+
* @return the current remaining capacity
51+
*/
52+
public abstract int remainingCapacity();
53+
}

0 commit comments

Comments
 (0)