Skip to content

Commit 6140276

Browse files
Kiefer Coverkcover
Kiefer Cover
authored andcommitted
REP-255 Wire together service architecture
Starts and connects the Query, Queue, and Worker services and creates spring profiles for running the previous (Classic) and new (Ion) architectures.
1 parent 6e53f07 commit 6140276

File tree

32 files changed

+1106
-380
lines changed

32 files changed

+1106
-380
lines changed

README.md

+7
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ spring:
4343
data:
4444
solr:
4545
host: http://replication-solr:8983/solr
46+
profiles.active: Classic
4647
replication:
4748
period: 300
4849
connectionTimeout: 30
@@ -69,6 +70,12 @@ management:
6970
enabled: true
7071
```
7172
73+
###### Profiles
74+
75+
Replication can be run with one of two profiles. You can specify which profile to use in the 'spring.profiles.active'
76+
property as demonstrated in the example above. "Classic" will use the classic monolithic implementation.
77+
"Ion" will use the new scalable, cloud oriented implementation.
78+
7279
###### Metrics
7380
7481
Replication supports reporting metrics through Micrometer. Prometheus is used as the metrics collection platform. The replication-spring-config provides example configuration for exposing metrics from within the application.

adapters/ddf-adapter/src/main/java/com/connexta/replication/adapters/ddf/DdfNodeAdapter.java

+33-8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.connexta.replication.adapters.ddf.csw.Constants;
1717
import com.connexta.replication.adapters.ddf.csw.Csw;
1818
import com.connexta.replication.adapters.ddf.csw.CswRecordCollection;
19+
import com.connexta.replication.adapters.ddf.csw.MetacardMarshaller;
1920
import com.connexta.replication.adapters.ddf.rest.DdfRestClient;
2021
import com.connexta.replication.adapters.ddf.rest.DdfRestClientFactory;
2122
import com.connexta.replication.api.AdapterException;
@@ -35,6 +36,7 @@
3536
import com.connexta.replication.api.data.UpdateStorageRequest;
3637
import com.connexta.replication.data.QueryRequestImpl;
3738
import com.connexta.replication.data.QueryResponseImpl;
39+
import com.connexta.replication.data.ResourceImpl;
3840
import com.connexta.replication.data.ResourceResponseImpl;
3941
import com.google.common.annotations.VisibleForTesting;
4042
import java.io.InterruptedIOException;
@@ -253,7 +255,7 @@ public boolean createRequest(CreateRequest createRequest) {
253255
try {
254256
List<Metadata> metadata = createRequest.getMetadata();
255257
DdfRestClient client = ddfRestClientFactory.create(hostUrl);
256-
metadata.forEach(this::prepareMetadata);
258+
metadata = metadata.stream().map(this::prepareMetadata).collect(Collectors.toList());
257259
return performRequestForEach(client::post, metadata);
258260
} catch (Exception e) {
259261
throw wrapException("Failed to create on remote system", e);
@@ -265,7 +267,7 @@ public boolean updateRequest(UpdateRequest updateRequest) {
265267
try {
266268
List<Metadata> metadata = updateRequest.getMetadata();
267269
DdfRestClient client = ddfRestClientFactory.create(hostUrl);
268-
metadata.forEach(this::prepareMetadata);
270+
metadata = metadata.stream().map(this::prepareMetadata).collect(Collectors.toList());
269271
return performRequestForEach(client::put, metadata);
270272
} catch (Exception e) {
271273
throw wrapException("Failed to update remote system", e);
@@ -301,7 +303,7 @@ public boolean createResource(CreateStorageRequest createStorageRequest) {
301303
try {
302304
Resource resource = createStorageRequest.getResource();
303305
DdfRestClient client = ddfRestClientFactory.createWithSubject(hostUrl);
304-
prepareMetadata(resource.getMetadata());
306+
resource = cloneResourceWithPreparedMetadata(resource);
305307
return performRequestForEach(client::post, List.of(resource));
306308
} catch (Exception e) {
307309
throw wrapException("Failed to create resource on remote system", e);
@@ -313,20 +315,41 @@ public boolean updateResource(UpdateStorageRequest updateStorageRequest) {
313315
try {
314316
Resource resource = updateStorageRequest.getResource();
315317
DdfRestClient client = ddfRestClientFactory.createWithSubject(hostUrl);
316-
prepareMetadata(resource.getMetadata());
318+
resource = cloneResourceWithPreparedMetadata(resource);
317319
return performRequestForEach(client::put, List.of(resource));
318320
} catch (Exception e) {
319321
throw wrapException("Failed to update resource on remote system", e);
320322
}
321323
}
322324

323-
private void prepareMetadata(Metadata metadata) {
325+
private Resource cloneResourceWithPreparedMetadata(Resource resource) {
326+
return new ResourceImpl(
327+
resource.getId(),
328+
resource.getName(),
329+
resource.getResourceUri(),
330+
resource.getQualifier(),
331+
resource.getInputStream(),
332+
resource.getMimeType(),
333+
resource.getSize(),
334+
prepareMetadata(resource.getMetadata()));
335+
}
324336

325-
if (!(metadata instanceof DdfMetadata)) {
337+
/**
338+
* Unmarshals the metadata containing metacard xml into a {@link DdfMetadata} which contains a map
339+
* which we can add tags and the origins attribute to.
340+
*
341+
* @param metadata The metadata to modify
342+
* @return A DdfMetadata with the metacard tags and origins
343+
*/
344+
private DdfMetadata prepareMetadata(Metadata metadata) {
345+
if (metadata.getType() != String.class) {
326346
throw new AdapterException(
327-
"DDF adapter can't process raw metadata of type " + metadata.getClass());
347+
"DDF adapter can't process raw metadata of type " + metadata.getType());
328348
}
329-
DdfMetadata ddfMetadata = (DdfMetadata) metadata;
349+
350+
DdfMetadata ddfMetadata =
351+
MetacardMarshaller.unmarshal(
352+
(String) metadata.getRawMetadata(), MetacardMarshaller.metacardNamespaceMap());
330353
Map metadataMap = ddfMetadata.getAttributes();
331354
metadataMap.put(
332355
Constants.METACARD_TAGS,
@@ -339,6 +362,7 @@ private void prepareMetadata(Metadata metadata) {
339362
Replication.ORIGINS,
340363
new MetacardAttribute(
341364
Replication.ORIGINS, "string", metadata.getLineage(), Collections.emptyList()));
365+
return ddfMetadata;
342366
}
343367

344368
@Override
@@ -349,6 +373,7 @@ public void close() {
349373
private <T> boolean performRequestForEach(Function<T, Response> request, List<T> requestBodies) {
350374
for (T body : requestBodies) {
351375
Response response = request.apply(body);
376+
LOGGER.debug("Response received for request {}. Response: {}", body, response);
352377
if (response == null || !response.getStatusInfo().getFamily().equals(Family.SUCCESSFUL)) {
353378
return false;
354379
}

adapters/ddf-adapter/src/main/java/com/connexta/replication/adapters/ddf/csw/CswRecordConverter.java

+1-236
Original file line numberDiff line numberDiff line change
@@ -13,46 +13,18 @@
1313
*/
1414
package com.connexta.replication.adapters.ddf.csw;
1515

16-
import com.connexta.replication.adapters.ddf.DdfMetadata;
17-
import com.connexta.replication.adapters.ddf.MetacardAttribute;
18-
import com.connexta.replication.api.AdapterException;
19-
import com.connexta.replication.api.Replication;
2016
import com.connexta.replication.api.data.Metadata;
21-
import com.thoughtworks.xstream.converters.ConversionException;
2217
import com.thoughtworks.xstream.converters.Converter;
2318
import com.thoughtworks.xstream.converters.MarshallingContext;
2419
import com.thoughtworks.xstream.converters.UnmarshallingContext;
2520
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
2621
import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
2722
import com.thoughtworks.xstream.io.copy.HierarchicalStreamCopier;
28-
import com.thoughtworks.xstream.io.naming.NoNameCoder;
29-
import com.thoughtworks.xstream.io.xml.CompactWriter;
30-
import com.thoughtworks.xstream.io.xml.XppReader;
31-
import java.io.IOException;
32-
import java.io.InputStreamReader;
33-
import java.io.StringWriter;
34-
import java.net.URI;
35-
import java.net.URISyntaxException;
36-
import java.nio.charset.StandardCharsets;
37-
import java.text.DateFormat;
38-
import java.text.ParseException;
39-
import java.util.ArrayList;
4023
import java.util.Arrays;
41-
import java.util.Collections;
42-
import java.util.Date;
43-
import java.util.HashMap;
4424
import java.util.List;
4525
import java.util.Map;
46-
import java.util.Map.Entry;
47-
import javax.annotation.Nullable;
48-
import org.apache.commons.io.IOUtils;
49-
import org.apache.commons.lang.StringUtils;
50-
import org.joda.time.format.ISODateTimeFormat;
5126
import org.slf4j.Logger;
5227
import org.slf4j.LoggerFactory;
53-
import org.xmlpull.v1.XmlPullParser;
54-
import org.xmlpull.v1.XmlPullParserException;
55-
import org.xmlpull.v1.XmlPullParserFactory;
5628

5729
/**
5830
* Copied from DDF and modified for replication purposes.
@@ -89,213 +61,6 @@ public Object unmarshal(HierarchicalStreamReader reader, UnmarshallingContext co
8961
namespaceMap = (Map<String, String>) namespaceObj;
9062
}
9163

92-
return createMetadataFromCswRecord(reader, namespaceMap);
93-
}
94-
95-
private static void copyElementWithAttributes(
96-
HierarchicalStreamReader source,
97-
HierarchicalStreamWriter destination,
98-
Map<String, String> namespaceMap) {
99-
destination.startNode(source.getNodeName());
100-
int attributeCount = source.getAttributeCount();
101-
for (int i = 0; i < attributeCount; i++) {
102-
destination.addAttribute(source.getAttributeName(i), source.getAttribute(i));
103-
}
104-
if (namespaceMap != null && !namespaceMap.isEmpty()) {
105-
for (Entry<String, String> entry : namespaceMap.entrySet()) {
106-
if (StringUtils.isBlank(source.getAttribute(entry.getKey()))) {
107-
destination.addAttribute(entry.getKey(), entry.getValue());
108-
}
109-
}
110-
}
111-
String value = source.getValue();
112-
if (value != null && value.length() > 0) {
113-
destination.setValue(value);
114-
}
115-
while (source.hasMoreChildren()) {
116-
source.moveDown();
117-
COPIER.copy(source, destination);
118-
source.moveUp();
119-
}
120-
destination.endNode();
121-
}
122-
123-
/**
124-
* Copies the entire XML element {@code reader} is currently at into {@code writer} and returns a
125-
* new reader ready to read the copied element. After the call, {@code reader} will be at the end
126-
* of the element that was copied.
127-
*
128-
* <p>If {@code attributeMap} is provided, the attributes will be added to the copy.
129-
*
130-
* @param reader the reader currently at the XML element you want to copy
131-
* @param writer the writer that the element will be copied into
132-
* @param attributeMap the map of attribute names to values that will be added as attributes of
133-
* the copy, may be null
134-
* @return a new reader ready to read the copied element
135-
* @throws ConversionException if a parser to use for the new reader can't be created
136-
*/
137-
public static HierarchicalStreamReader copyXml(
138-
HierarchicalStreamReader reader, StringWriter writer, Map<String, String> attributeMap) {
139-
copyElementWithAttributes(reader, new CompactWriter(writer, new NoNameCoder()), attributeMap);
140-
141-
XmlPullParser parser;
142-
try {
143-
parser = XmlPullParserFactory.newInstance().newPullParser();
144-
} catch (XmlPullParserException e) {
145-
throw new ConversionException("Unable to create XmlPullParser, cannot parse XML.", e);
146-
}
147-
148-
try {
149-
// NOTE: must specify encoding here, otherwise the platform default
150-
// encoding will be used which will not always work
151-
return new XppReader(
152-
new InputStreamReader(
153-
IOUtils.toInputStream(writer.toString(), StandardCharsets.UTF_8.name())),
154-
parser);
155-
} catch (IOException e) {
156-
LOGGER.debug("Unable create reader with UTF-8 encoding", e);
157-
return new XppReader(
158-
new InputStreamReader(IOUtils.toInputStream(writer.toString(), StandardCharsets.UTF_8)),
159-
parser);
160-
}
161-
}
162-
163-
public static @Nullable Date convertToDate(@Nullable MetacardAttribute value) {
164-
if (value == null) {
165-
return null;
166-
}
167-
/* Dates are strings and expected to be in ISO8601 format, YYYY-MM-DD'T'hh:mm:ss.sss,
168-
per annotations in the CSW Record schema. At least the date portion must be present -
169-
the time zone and time are optional.*/
170-
try {
171-
return ISODateTimeFormat.dateOptionalTimeParser().parseDateTime(value.getValue()).toDate();
172-
} catch (IllegalArgumentException e) {
173-
LOGGER.debug("Failed to convert to date {} from ISO Format: {}", value, e);
174-
}
175-
176-
// try from java date serialization for the default locale
177-
try {
178-
return DateFormat.getDateInstance().parse(value.getValue());
179-
} catch (ParseException e) {
180-
LOGGER.debug("Unable to convert date {} from default locale format {} ", value, e);
181-
}
182-
183-
// default to current date
184-
LOGGER.debug("Unable to convert {} to a date object", value);
185-
return null;
186-
}
187-
188-
public static Metadata createMetadataFromCswRecord(
189-
HierarchicalStreamReader hreader, Map<String, String> namespaceMap) {
190-
191-
StringWriter metadataWriter = new StringWriter();
192-
HierarchicalStreamReader reader = copyXml(hreader, metadataWriter, namespaceMap);
193-
194-
Map<String, MetacardAttribute> metadataMap = new HashMap<>();
195-
String metadataStr = metadataWriter.toString();
196-
metadataMap.put(
197-
Constants.RAW_METADATA_KEY,
198-
new MetacardAttribute(Constants.RAW_METADATA_KEY, null, metadataStr));
199-
String id = reader.getAttribute("gml:id");
200-
metadataMap.put(Constants.METACARD_ID, new MetacardAttribute(Constants.METACARD_ID, null, id));
201-
// If we want to grab the type we will have to do so below. As you move through the child nodes
202-
// check if the node name is type and save the value.
203-
204-
parseToMap(reader, metadataMap);
205-
206-
Date metacardModified = convertToDate(metadataMap.get(Constants.METACARD_MODIFIED));
207-
if (metadataMap.get(Constants.VERSION_OF_ID) != null
208-
&& metadataMap.get(Constants.VERSION_OF_ID).getValue() != null) {
209-
// Since we are dealing with a delete revision metacard, we need to make sure the
210-
// returned metadata has the original metacard id and modified date.
211-
id = metadataMap.get(Constants.VERSION_OF_ID).getValue();
212-
metacardModified = convertToDate(metadataMap.get(Constants.VERSIONED_ON));
213-
}
214-
215-
if (metacardModified == null) {
216-
throw new AdapterException("Can't convert csw metacard without a metacard.modified field");
217-
}
218-
219-
DdfMetadata metadata =
220-
new DdfMetadata(metadataStr, String.class, id, metacardModified, metadataMap);
221-
metadata.setMetadataSize(metadataStr.length());
222-
if (metadataMap.get(Constants.RESOURCE_SIZE) != null) {
223-
metadata.setResourceSize(Long.parseLong(metadataMap.get(Constants.RESOURCE_SIZE).getValue()));
224-
metadata.setResourceModified(convertToDate(metadataMap.get(Constants.MODIFIED)));
225-
226-
try {
227-
metadata.setResourceUri(new URI(metadataMap.get(Constants.RESOURCE_URI).getValue()));
228-
} catch (URISyntaxException e) {
229-
LOGGER.warn(
230-
"Invalid resource URI of {} for {}",
231-
metadataMap.get(Constants.RESOURCE_URI).getValue(),
232-
id);
233-
}
234-
}
235-
236-
metadataMap.get(Constants.METACARD_TAGS).getValues().forEach(metadata::addTag);
237-
MetacardAttribute origin = metadataMap.get(Replication.ORIGINS);
238-
if (origin != null) {
239-
origin.getValues().forEach(metadata::addLineage);
240-
}
241-
MetacardAttribute versionAction = metadataMap.get(Constants.ACTION);
242-
if (versionAction != null) {
243-
metadata.setIsDeleted(versionAction.getValue().startsWith("Deleted"));
244-
}
245-
metadataMap.remove(Constants.DERIVED_RESOURCE_URI);
246-
metadataMap.remove(Constants.DERIVED_RESOURCE_DOWNLOAD_URL);
247-
248-
return metadata;
249-
}
250-
251-
private static void parseToMap(
252-
HierarchicalStreamReader reader, Map<String, MetacardAttribute> metadataMap) {
253-
while (reader.hasMoreChildren()) {
254-
reader.moveDown();
255-
256-
String entryType = reader.getNodeName();
257-
String attributeName = reader.getAttribute("name");
258-
List<String> xmlns = new ArrayList<>();
259-
int count = reader.getAttributeCount();
260-
for (int i = 0; i < count; i++) {
261-
if (reader.getAttributeName(i).startsWith("xmlns")) {
262-
xmlns.add(reader.getAttributeName(i) + "=" + reader.getAttribute(i));
263-
}
264-
}
265-
if (!reader.hasMoreChildren()) {
266-
metadataMap.put(entryType, new MetacardAttribute(entryType, null, reader.getValue()));
267-
reader.moveUp();
268-
continue;
269-
}
270-
271-
if (COMPLEX_TYPE.contains(entryType)) {
272-
reader.moveDown();
273-
reader.moveDown();
274-
StringWriter xmlWriter = new StringWriter();
275-
copyXml(reader, xmlWriter, null);
276-
metadataMap.put(
277-
attributeName,
278-
new MetacardAttribute(
279-
attributeName, entryType, Collections.singletonList(xmlWriter.toString()), xmlns));
280-
reader.moveUp();
281-
reader.moveUp();
282-
reader.moveUp();
283-
continue;
284-
}
285-
List<String> values = new ArrayList<>();
286-
while (reader.hasMoreChildren()) {
287-
reader.moveDown();
288-
values.add(reader.getValue());
289-
reader.moveUp();
290-
}
291-
292-
LOGGER.trace("attribute name: {} value: {}.", attributeName, values);
293-
if (StringUtils.isNotEmpty(attributeName) && !values.isEmpty()) {
294-
metadataMap.put(
295-
attributeName, new MetacardAttribute(attributeName, entryType, values, xmlns));
296-
}
297-
298-
reader.moveUp();
299-
}
64+
return MetacardMarshaller.unmarshal(reader, namespaceMap);
30065
}
30166
}

0 commit comments

Comments
 (0)