diff --git a/implementations/micrometer-registry-elastic/build.gradle b/implementations/micrometer-registry-elastic/build.gradle index 3d20de7af4..ae16f4eeb4 100644 --- a/implementations/micrometer-registry-elastic/build.gradle +++ b/implementations/micrometer-registry-elastic/build.gradle @@ -2,9 +2,7 @@ apply plugin: 'org.junit.platform.gradle.plugin' dependencies { compile project(':micrometer-core') - compile 'com.fasterxml.jackson.core:jackson-core:latest.release' - compile 'com.fasterxml.jackson.core:jackson-databind:latest.release' - compile 'com.fasterxml.jackson.module:jackson-module-afterburner:latest.release' + compile 'commons-io:commons-io:latest.release' testCompile project(':micrometer-test') } \ No newline at end of file diff --git a/implementations/micrometer-registry-elastic/src/main/java/io/micrometer/elastic/ElasticConfig.java b/implementations/micrometer-registry-elastic/src/main/java/io/micrometer/elastic/ElasticConfig.java index 47cb3f6240..afec4219c3 100644 --- a/implementations/micrometer-registry-elastic/src/main/java/io/micrometer/elastic/ElasticConfig.java +++ b/implementations/micrometer-registry-elastic/src/main/java/io/micrometer/elastic/ElasticConfig.java @@ -17,8 +17,6 @@ import io.micrometer.core.instrument.step.StepRegistryConfig; -import java.util.concurrent.TimeUnit; - /** * @author Nicolas Portmann */ @@ -32,10 +30,8 @@ public interface ElasticConfig extends StepRegistryConfig { /** * Get the value associated with a key. * - * @param key - * Key to lookup in the config. - * @return - * Value for the key or null if no key is present. + * @param key Key to lookup in the config. + * @return Value for the key or null if no key is present. */ String get(String key); @@ -55,42 +51,6 @@ default String[] hosts() { return v == null ? new String[]{"http://localhost:9200"} : v.split(","); } - /** - * Prefix all metrics with a given {@link String}. - * Default is "" - */ - default String metricPrefix() { - String v = get(prefix() + ".metricPrefix"); - return v == null ? "" : v; - } - - /** - * Convert all durations to a certain {@link TimeUnit} - * Default is {@link TimeUnit#SECONDS} - */ - default TimeUnit rateUnits() { - String v = get(prefix() + ".rateUnits"); - return v == null ? TimeUnit.SECONDS : TimeUnit.valueOf(v.toUpperCase()); - } - - /** - * Convert all durations to a certain {@link TimeUnit} - * Default is {@link TimeUnit#MILLISECONDS} - */ - default TimeUnit durationUnits() { - String v = get(prefix() + ".durationUnits"); - return v == null ? TimeUnit.MILLISECONDS : TimeUnit.valueOf(v.toUpperCase()); - } - - /** - * The timeout to wait for until a connection attempt is and the next host is tried. - * Default is: 1000 - */ - default int timeout() { - String v = get(prefix() + ".timeout"); - return v == null ? 1000 : Integer.parseInt(v); - } - /** * The index name to write metrics to. * Default is: "metrics" @@ -110,15 +70,6 @@ default String indexDateFormat() { return v == null ? "yyyy-MM" : v; } - /** - * The bulk size per request. - * Default is: 2500 - */ - default int bulkSize() { - String v = get(prefix() + ".bulkSize"); - return v == null ? 2500 : Integer.parseInt(v); - } - /** * The name of the timestamp field. * Default is: "@timestamp" diff --git a/implementations/micrometer-registry-elastic/src/main/java/io/micrometer/elastic/ElasticMeterRegistry.java b/implementations/micrometer-registry-elastic/src/main/java/io/micrometer/elastic/ElasticMeterRegistry.java index c1dda1f895..821b5b8055 100644 --- a/implementations/micrometer-registry-elastic/src/main/java/io/micrometer/elastic/ElasticMeterRegistry.java +++ b/implementations/micrometer-registry-elastic/src/main/java/io/micrometer/elastic/ElasticMeterRegistry.java @@ -15,20 +15,17 @@ */ package io.micrometer.elastic; -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import io.micrometer.core.instrument.*; import io.micrometer.core.instrument.config.NamingConvention; +import io.micrometer.core.instrument.distribution.CountAtBucket; +import io.micrometer.core.instrument.distribution.HistogramSnapshot; +import io.micrometer.core.instrument.distribution.ValueAtPercentile; import io.micrometer.core.instrument.step.StepMeterRegistry; +import io.micrometer.core.instrument.util.DoubleFormat; import io.micrometer.core.instrument.util.MeterPartition; import io.micrometer.core.lang.NonNull; -import io.micrometer.elastic.ElasticMetricsModule.BulkIndexOperationHeader; -import io.micrometer.elastic.ElasticSerializableMeters.*; +import io.micrometer.core.lang.Nullable; +import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,53 +34,35 @@ import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; -import java.text.SimpleDateFormat; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Base64; -import java.util.Date; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - -import static java.util.stream.Collectors.toList; +import java.util.stream.Stream; /** * @author Nicolas Portmann + * @author Jon Schneider */ public class ElasticMeterRegistry extends StepMeterRegistry { private final Logger logger = LoggerFactory.getLogger(ElasticMeterRegistry.class); private final ElasticConfig config; - private final SimpleDateFormat indexDateFormat; private boolean checkedForIndexTemplate = false; - private final ObjectWriter objectWriter; public ElasticMeterRegistry(ElasticConfig config, Clock clock, NamingConvention namingConvention, ThreadFactory threadFactory) { super(config, clock); this.config().namingConvention(namingConvention); this.config = config; - this.indexDateFormat = new SimpleDateFormat(config.indexDateFormat()); - - ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); - objectMapper.configure(SerializationFeature.CLOSE_CLOSEABLE, false); - objectMapper.configure(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT, false); - objectMapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false); - objectMapper.registerModule(new AfterburnerModule()); - objectMapper.registerModule(new ElasticMetricsModule( - namingConvention, - config.rateUnits(), - config.durationUnits(), - config.timeStampFieldName(), - config.metricPrefix()) - ); - objectWriter = objectMapper.writer(); start(threadFactory); - - logger.info("ElasticMeterRegistry started"); } public ElasticMeterRegistry(ElasticConfig config, Clock clock) { @@ -95,7 +74,7 @@ private void createIndexIfNeeded() { return; } try { - HttpURLConnection connection = openConnection( "/_template/metrics_template", "HEAD"); + HttpURLConnection connection = openConnection("/_template/metrics_template", "HEAD"); if (connection == null) { logger.error("Could not connect to any configured elasticsearch instances: {}", Arrays.asList(config.hosts())); return; @@ -110,31 +89,16 @@ private void createIndexIfNeeded() { } logger.debug("No metrics template found in elasticsearch. Adding..."); - HttpURLConnection putTemplateConnection = openConnection( "/_template/metrics_template", "PUT"); - if(putTemplateConnection == null) { + HttpURLConnection putTemplateConnection = openConnection("/_template/metrics_template", "PUT"); + if (putTemplateConnection == null) { logger.error("Error adding metrics template to elasticsearch"); return; } - JsonGenerator json = new JsonFactory().createGenerator(putTemplateConnection.getOutputStream()); - json.writeStartObject(); - json.writeStringField("template", config.index() + "*"); - json.writeObjectFieldStart("mappings"); - - json.writeObjectFieldStart("_default_"); - json.writeObjectFieldStart("_all"); - json.writeBooleanField("enabled", false); - json.writeEndObject(); - json.writeObjectFieldStart("properties"); - json.writeObjectFieldStart("name"); - json.writeObjectField("type", "keyword"); - json.writeEndObject(); - json.writeEndObject(); - json.writeEndObject(); - - json.writeEndObject(); - json.writeEndObject(); - json.flush(); + try (OutputStream outputStream = putTemplateConnection.getOutputStream()) { + outputStream.write("{\"template\":\"metrics*\",\"mappings\":{\"_default_\":{\"_all\":{\"enabled\":false},\"properties\":{\"name\":{\"type\":\"keyword\"}}}}}".getBytes()); + outputStream.flush(); + } putTemplateConnection.disconnect(); if (putTemplateConnection.getResponseCode() != 200) { @@ -153,38 +117,30 @@ protected void publish() { createIndexIfNeeded(); } - final long timestamp = clock.wallTime(); - - String currentIndexName = config.index() + "-" + indexDateFormat.format(new Date(timestamp)); - for (List batch : MeterPartition.partition(this, config.batchSize())) { - List> serializableMeters = batch.stream().map(m -> { - if (m instanceof Timer) { - return new ElasticTimer((Timer) m, clock.wallTime()); - } - if (m instanceof DistributionSummary) { - return new ElasticDistributionSummary((DistributionSummary) m, clock.wallTime()); - } - if (m instanceof FunctionTimer) { - return new ElasticFunctionTimer((FunctionTimer) m, clock.wallTime()); - } + long wallTime = config().clock().wallTime(); + + String bulkPayload = batch.stream().flatMap(m -> { if (m instanceof TimeGauge) { - return new ElasticTimeGauge((TimeGauge) m, clock.wallTime()); + return writeGauge((TimeGauge) m, wallTime); + } else if (m instanceof Gauge) { + return writeGauge((Gauge) m, wallTime); + } else if (m instanceof Counter) { + return writeCounter((Counter) m, wallTime); + } else if (m instanceof FunctionCounter) { + return writeCounter((FunctionCounter) m, wallTime); + } else if (m instanceof Timer) { + return writeTimer((Timer) m, wallTime); + } else if (m instanceof FunctionTimer) { + return writeTimer((FunctionTimer) m, wallTime); + } else if (m instanceof DistributionSummary) { + return writeSummary((DistributionSummary) m, wallTime); + } else if (m instanceof LongTaskTimer) { + return writeLongTaskTimer((LongTaskTimer) m, wallTime); + } else { + return writeMeter(m, wallTime); } - if (m instanceof Gauge) { - return new ElasticGauge((Gauge) m, clock.wallTime()); - } - if (m instanceof FunctionCounter) { - return new ElasticFunctionCounter((FunctionCounter) m, clock.wallTime()); - } - if (m instanceof Counter) { - return new ElasticCounter((Counter) m, clock.wallTime()); - } - if (m instanceof LongTaskTimer) { - return new ElasticLongTaskTimer((LongTaskTimer) m, clock.wallTime()); - } - return new ElasticMeter(m, clock.wallTime()); - }).collect(toList()); + }).collect(Collectors.joining("\n")) + "\n"; HttpURLConnection connection = openConnection("/_bulk", "POST"); if (connection == null) { @@ -192,35 +148,188 @@ protected void publish() { return; } - try { - OutputStream outputStream = connection.getOutputStream(); + try (OutputStream outputStream = connection.getOutputStream()) { + outputStream.write(bulkPayload.getBytes()); + outputStream.flush(); + + if (connection.getResponseCode() >= 400) { + try { + logger.error("failed to send metrics to elasticsearch (HTTP {}). Cause: {}", connection.getResponseCode(), IOUtils.toString(connection.getErrorStream(), StandardCharsets.UTF_8)); + } catch (IOException ignored) { + } + return; // don't try another batch + } else { + try { + // It's not enough to look at response code. ES could return {"errors":true} in body: + // {"took":16,"errors":true,"items":[{"index":{"_index":"metrics-2018-03","_type":"timer","_id":"i8kdBmIBmtn9wpUGezjX","status":400,"error":{"type":"illegal_argument_exception","reason":"Rejecting mapping update to [metrics-2018-03] as the final mapping would have more than 1 type: [metric, doc]"}}}]} + String response = IOUtils.toString(connection.getInputStream(), StandardCharsets.UTF_8); + if (response.contains("\"errors\":true")) { + logger.error("failed to send metrics to elasticsearch (HTTP {}). Cause: {}", connection.getResponseCode(), response); + return; + } + else { + logger.info("successfully sent {} metrics to elasticsearch", batch.size()); + } + } catch (IOException ignored) { + } + } + } catch (IOException e) { + logger.error("Could not serialize meter", e); + return; + } finally { + connection.disconnect(); + } + } + } - for (ElasticSerializableMeter serializableMeter : serializableMeters) { - BulkIndexOperationHeader header = new BulkIndexOperationHeader(currentIndexName, serializableMeter.getType()); + private Stream writeCounter(Counter counter, long wallTime) { + return Stream.of(index(counter, wallTime).field("count", counter.count()).build()); + } - objectWriter.writeValue(outputStream, header); - outputStream.write("\n".getBytes()); - objectWriter.writeValue(outputStream, serializableMeter); - outputStream.write("\n".getBytes()); + private Stream writeCounter(FunctionCounter counter, long wallTime) { + return Stream.of(index(counter, wallTime).field("count", counter.count()).build()); + } - outputStream.flush(); - } + private Stream writeGauge(Gauge gauge, long wallTime) { + return Stream.of(index(gauge, wallTime).field("count", gauge.value()).build()); + } + + private Stream writeGauge(TimeGauge gauge, long wallTime) { + return Stream.of(index(gauge, wallTime).field("count", gauge.value(getBaseTimeUnit())).build()); + } - outputStream.close(); + private Stream writeTimer(FunctionTimer timer, long wallTime) { + return Stream.of(index(timer, wallTime) + .field("count", timer.count()) + .field("sum", timer.totalTime(getBaseTimeUnit())) + .field("mean", timer.mean(getBaseTimeUnit())) + .build()); + } - connection.disconnect(); + private Stream writeLongTaskTimer(LongTaskTimer timer, long wallTime) { + return Stream.of(index(timer, wallTime) + .field("activeTasks", timer.activeTasks()) + .field("duration", timer.duration(getBaseTimeUnit())) + .build()); + } - if (connection.getResponseCode() != 200) { - logger.error("Reporting returned code {} {}: {}", connection.getResponseCode(), connection.getResponseMessage()); - return; - } - } catch (IOException e) { - logger.error("Could not serialize meter", e); - return; + private Stream writeTimer(Timer timer, long wallTime) { + HistogramSnapshot snap = timer.takeSnapshot(false); + Stream.Builder stream = Stream.builder(); + stream.add(index(timer, wallTime) + .field("count", snap.count()) + .field("sum", snap.total(getBaseTimeUnit())) + .field("mean", snap.mean(getBaseTimeUnit())) + .field("max", snap.max(getBaseTimeUnit())) + .build()); + + if (snap.percentileValues().length > 0) { + String percentileName = config().namingConvention().name(timer.getId().getName() + ".percentile", Meter.Type.GAUGE); + for (ValueAtPercentile valueAtPercentile : snap.percentileValues()) { + stream.add(index(percentileName, "gauge", wallTime) + .field("phi", DoubleFormat.decimalOrWhole(valueAtPercentile.percentile())) + .field("value", valueAtPercentile.value(getBaseTimeUnit())) + .build()); + } + } + + if (snap.histogramCounts().length > 0) { + String histogramName = config().namingConvention().name(timer.getId().getName() + ".histogram", Meter.Type.GAUGE); + for (CountAtBucket countAtBucket : snap.histogramCounts()) { + stream.add(index(histogramName, "gauge", wallTime) + .field("le", DoubleFormat.decimalOrWhole(countAtBucket.bucket(getBaseTimeUnit()))) + .field("value", countAtBucket.count()) + .build()); + } + } + + return stream.build(); + } + + private Stream writeSummary(DistributionSummary summary, long wallTime) { + HistogramSnapshot snap = summary.takeSnapshot(false); + Stream.Builder stream = Stream.builder(); + stream.add(index(summary, wallTime) + .field("count", snap.count()) + .field("sum", snap.total()) + .field("mean", snap.mean()) + .field("max", snap.max()) + .build()); + + if (snap.percentileValues().length > 0) { + String percentileName = config().namingConvention().name(summary.getId().getName() + ".percentile", Meter.Type.GAUGE); + for (ValueAtPercentile valueAtPercentile : snap.percentileValues()) { + stream.add(index(percentileName, "gauge", wallTime) + .field("phi", DoubleFormat.decimalOrWhole(valueAtPercentile.percentile())) + .field("value", valueAtPercentile.value()) + .build()); + } + } + + if (snap.histogramCounts().length > 0) { + String histogramName = config().namingConvention().name(summary.getId().getName() + ".histogram", Meter.Type.GAUGE); + for (CountAtBucket countAtBucket : snap.histogramCounts()) { + stream.add(index(histogramName, "gauge", wallTime) + .field("le", DoubleFormat.decimalOrWhole(countAtBucket.bucket())) + .field("value", countAtBucket.count()) + .build()); } } - logger.debug("Reported meters to elasticsearch"); + return stream.build(); + } + + private Stream writeMeter(Meter meter, long wallTime) { + IndexBuilder index = index(meter, wallTime); + for (Measurement measurement : meter.measure()) { + index.field(measurement.getStatistic().getTagValueRepresentation(), measurement.getValue()); + } + return Stream.of(index.build()); + } + + private IndexBuilder index(Meter meter, long wallTime) { + return new IndexBuilder(config, getConventionName(meter.getId()), meter.getId().getType().toString().toLowerCase(), wallTime); + } + + // VisibleForTesting + IndexBuilder index(String name, String type, long wallTime) { + return new IndexBuilder(config, name, type, wallTime); + } + + static class IndexBuilder { + private StringBuilder indexLine = new StringBuilder(); + + private IndexBuilder(ElasticConfig config, String name, String type, long wallTime) { + indexLine.append(indexLine(config, wallTime)) + .append("{\"").append(config.timeStampFieldName()).append("\":\"").append(timestamp(wallTime)).append("\"") + .append(",\"name\":\"").append(name).append("\"") + .append(",\"type\":\"").append(type).append("\""); + } + + IndexBuilder field(String name, double value) { + indexLine.append(",\"").append(name).append("\":").append(DoubleFormat.decimalOrNan(value)); + return this; + } + + IndexBuilder field(String name, String value) { + indexLine.append(",\"").append(name).append("\":\"").append(value).append("\""); + return this; + } + + // VisibleForTesting + static String timestamp(long wallTime) { + return DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(wallTime)); + } + + private static String indexLine(ElasticConfig config, long wallTime) { + ZonedDateTime dt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(wallTime), ZoneId.of("UTC")); + String indexName = config.index() + "-" + DateTimeFormatter.ofPattern(config.indexDateFormat()).format(dt); + return "{\"index\":{\"_index\":\"" + indexName + "\",\"_type\":\"doc\"}}\n"; + } + + String build() { + return indexLine.toString() + "}"; + } } @Override @@ -232,18 +341,19 @@ protected TimeUnit getBaseTimeUnit() { private HttpURLConnection openConnection(String uri, String method) { for (String host : config.hosts()) { try { - URL templateUrl = new URL(host + uri); - HttpURLConnection connection = ( HttpURLConnection ) templateUrl.openConnection(); + URL templateUrl = new URL(host + uri); + HttpURLConnection connection = (HttpURLConnection) templateUrl.openConnection(); connection.setRequestMethod(method); - connection.setConnectTimeout(config.timeout()); + connection.setConnectTimeout((int) config.connectTimeout().toMillis()); + connection.setReadTimeout((int) config.readTimeout().toMillis()); connection.setUseCaches(false); connection.setRequestProperty("Content-Type", "application/json"); if (method.equalsIgnoreCase("POST") || method.equalsIgnoreCase("PUT")) { connection.setDoOutput(true); } - if (!"".equals(config.userName()) && !"".equals(config.password())) { - byte[] authBinary = (config.userName()+":"+config.password()).getBytes(StandardCharsets.UTF_8); + if (isNotBlank(config.userName()) && isNotBlank(config.password())) { + byte[] authBinary = (config.userName() + ":" + config.password()).getBytes(StandardCharsets.UTF_8); String authEncoded = Base64.getEncoder().encodeToString(authBinary); connection.setRequestProperty("Authorization", "Basic " + authEncoded); } @@ -258,4 +368,23 @@ private HttpURLConnection openConnection(String uri, String method) { return null; } + + /** + * Modified from {@link org.apache.commons.lang.StringUtils#isBlank(String)}. + * + * @param str The string to check + * @return {@code true} if the String is null or blank. + */ + private static boolean isNotBlank(@Nullable String str) { + int strLen; + if (str == null || (strLen = str.length()) == 0) { + return false; + } + for (int i = 0; i < strLen; i++) { + if (!Character.isWhitespace(str.charAt(i))) { + return true; + } + } + return false; + } } diff --git a/implementations/micrometer-registry-elastic/src/main/java/io/micrometer/elastic/ElasticMetricsModule.java b/implementations/micrometer-registry-elastic/src/main/java/io/micrometer/elastic/ElasticMetricsModule.java deleted file mode 100644 index 6cb436614e..0000000000 --- a/implementations/micrometer-registry-elastic/src/main/java/io/micrometer/elastic/ElasticMetricsModule.java +++ /dev/null @@ -1,302 +0,0 @@ -/** - * Copyright 2017 Pivotal Software, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.micrometer.elastic; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.Version; -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.module.SimpleSerializers; -import com.fasterxml.jackson.databind.ser.std.StdSerializer; -import io.micrometer.core.instrument.*; -import io.micrometer.core.instrument.config.NamingConvention; -import io.micrometer.elastic.ElasticSerializableMeters.*; - -import java.io.IOException; -import java.text.DecimalFormat; -import java.text.DecimalFormatSymbols; -import java.util.Arrays; -import java.util.Date; -import java.util.Locale; -import java.util.concurrent.TimeUnit; - -class ElasticMetricsModule extends Module { - // TODO: there must be a better way! - private static final Version VERSION = new Version(1, 0, 0, null, "io.micrometer", "micrometer-registry-elastic"); - - private static abstract class AbstractElasticMeterSerializer, M extends Meter> extends StdSerializer { - private final DecimalFormat df = new DecimalFormat("#.####", DecimalFormatSymbols.getInstance(Locale.ENGLISH)); - private final NamingConvention namingConvention; - final TimeUnit rateUnit; - final TimeUnit durationUnit; - private final String timeStampFieldName; - private final String metricPrefix; - - AbstractElasticMeterSerializer(Class t, NamingConvention namingConvention, TimeUnit rateUnit, TimeUnit durationUnit, String timeStampFieldName, String metricPrefix) { - super(t); - this.namingConvention = namingConvention; - this.rateUnit = rateUnit; - this.durationUnit = durationUnit; - this.timeStampFieldName = timeStampFieldName; - this.metricPrefix = metricPrefix; - } - - @Override - public void serialize(T meter, JsonGenerator json, SerializerProvider provider) throws IOException { - json.writeStartObject(); - - json.writeObjectField(timeStampFieldName, new Date(meter.getTimestamp())); - json.writeStringField("type", meter.getType()); - - json.writeStringField("name", metricPrefix + meter.getMeter().getId().getConventionName(namingConvention)); - for (Tag t : meter.getMeter().getId().getTags()) { - json.writeStringField(namingConvention.tagKey(t.getKey()), namingConvention.tagValue(t.getValue())); - } - - serialize(json, meter.getMeter()); - - json.writeEndObject(); - } - - protected abstract void serialize(JsonGenerator json, M meter) throws IOException; - - void writePercentiles(JsonGenerator json, ValueAtPercentile[] percentiles) throws IOException { - for (ValueAtPercentile p : percentiles) { - json.writeNumberField(writePercentile(p.percentile()), p.value(durationUnit)); - } - } - - void writeCounts(JsonGenerator json, CountAtValue[] counts) throws IOException { - for (CountAtValue c : counts) { - json.writeNumberField(writeCount(c.count()), c.value(rateUnit)); - } - } - - private String writePercentile(double v) { - return "p" + df.format(v * 100).replace(".", ""); - } - - private String writeCount(double v) { - return "c" + df.format(v * 100).replace(".", ""); - } - } - - private static class TimerSerializer extends AbstractElasticMeterSerializer { - - TimerSerializer(NamingConvention namingConvention, TimeUnit rateUnit, TimeUnit durationUnit, String timeStampFieldName, String metricPrefix) { - super(ElasticTimer.class, namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix); - } - - @Override - protected void serialize(JsonGenerator json, Timer meter) throws IOException { - final HistogramSnapshot snapshot = meter.takeSnapshot(false); - - json.writeNumberField("count", snapshot.count()); - json.writeNumberField("max", snapshot.max(durationUnit)); - json.writeNumberField("mean", snapshot.mean(durationUnit)); - json.writeNumberField("sum", snapshot.total(durationUnit)); - - writePercentiles(json, snapshot.percentileValues()); - writeCounts(json, snapshot.histogramCounts()); - } - } - - private static class FunctionTimerSerializer extends AbstractElasticMeterSerializer { - - FunctionTimerSerializer(NamingConvention namingConvention, TimeUnit rateUnit, TimeUnit durationUnit, String timeStampFieldName, String metricPrefix) { - super(ElasticFunctionTimer.class, namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix); - } - - @Override - protected void serialize(JsonGenerator json, FunctionTimer meter) throws IOException { - json.writeNumberField("sum", meter.totalTime(durationUnit)); - json.writeNumberField("count", meter.count()); - json.writeNumberField("mean", meter.mean(durationUnit)); - } - } - - private static class DistributionSummarySerializer extends AbstractElasticMeterSerializer { - - DistributionSummarySerializer(NamingConvention namingConvention, TimeUnit rateUnit, TimeUnit durationUnit, String timeStampFieldName, String metricPrefix) { - super(ElasticDistributionSummary.class, namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix); - } - - @Override - protected void serialize(JsonGenerator json, DistributionSummary meter) throws IOException { - final HistogramSnapshot snapshot = meter.takeSnapshot(false); - - json.writeNumberField("count", snapshot.count()); - json.writeNumberField("max", snapshot.max(durationUnit)); - json.writeNumberField("mean", snapshot.mean(durationUnit)); - json.writeNumberField("sum", snapshot.total(durationUnit)); - - writePercentiles(json, snapshot.percentileValues()); - writeCounts(json, snapshot.histogramCounts()); - } - } - - private static class LongTaskTimerSerializer extends AbstractElasticMeterSerializer { - - LongTaskTimerSerializer(NamingConvention namingConvention, TimeUnit rateUnit, TimeUnit durationUnit, String timeStampFieldName, String metricPrefix) { - super(ElasticLongTaskTimer.class, namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix); - } - - @Override - protected void serialize(JsonGenerator json, LongTaskTimer meter) throws IOException { - json.writeNumberField("active_tasks", meter.activeTasks()); - json.writeNumberField("duration", meter.duration(durationUnit)); - } - } - - private static class CounterSerializer extends AbstractElasticMeterSerializer { - - CounterSerializer(NamingConvention namingConvention, TimeUnit rateUnit, TimeUnit durationUnit, String timeStampFieldName, String metricPrefix) { - super(ElasticCounter.class, namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix); - } - - @Override - protected void serialize(JsonGenerator json, Counter meter) throws IOException { - json.writeNumberField("count", meter.count()); - } - } - - private static class FunctionCounterSerializer extends AbstractElasticMeterSerializer { - - FunctionCounterSerializer(NamingConvention namingConvention, TimeUnit rateUnit, TimeUnit durationUnit, String timeStampFieldName, String metricPrefix) { - super(ElasticFunctionCounter.class, namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix); - } - - @Override - protected void serialize(JsonGenerator json, FunctionCounter meter) throws IOException { - json.writeNumberField("count", meter.count()); - } - } - - private static class GaugeSerializer extends AbstractElasticMeterSerializer { - - GaugeSerializer(NamingConvention namingConvention, TimeUnit rateUnit, TimeUnit durationUnit, String timeStampFieldName, String metricPrefix) { - super(ElasticGauge.class, namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix); - } - - @Override - protected void serialize(JsonGenerator json, Gauge meter) throws IOException { - json.writeNumberField("value", meter.value()); - } - } - - private static class TimeGaugeSerializer extends AbstractElasticMeterSerializer { - - TimeGaugeSerializer(NamingConvention namingConvention, TimeUnit rateUnit, TimeUnit durationUnit, String timeStampFieldName, String metricPrefix) { - super(ElasticTimeGauge.class, namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix); - } - - - @Override - protected void serialize(JsonGenerator json, TimeGauge meter) throws IOException { - json.writeNumberField("value", meter.value(durationUnit)); - } - } - - private static class MeterSerializer extends AbstractElasticMeterSerializer { - - MeterSerializer(NamingConvention namingConvention, TimeUnit rateUnit, TimeUnit durationUnit, String timeStampFieldName, String metricPrefix) { - super(ElasticMeter.class, namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix); - } - - @Override - protected void serialize(JsonGenerator json, Meter meter) throws IOException { - for (Measurement measurement : meter.measure()) { - String fieldKey = measurement.getStatistic().toString() - .replaceAll("(.)(\\p{Upper})", "$1_$2").toLowerCase(); - - json.writeNumberField(fieldKey, measurement.getValue()); - } - } - } - - - private static class BulkIndexOperationHeaderSerializer extends StdSerializer { - - protected BulkIndexOperationHeaderSerializer() { - super(BulkIndexOperationHeader.class); - } - - @Override - public void serialize(BulkIndexOperationHeader value, JsonGenerator json, SerializerProvider provider) throws IOException { - json.writeStartObject(); - json.writeObjectFieldStart("index"); - if (value.index != null) { - json.writeStringField("_index", value.index); - } - if (value.type != null) { - json.writeStringField("_type", value.type); - } - json.writeEndObject(); - json.writeEndObject(); - } - } - - public static class BulkIndexOperationHeader { - public String index; - public String type; - - public BulkIndexOperationHeader(String index, String type) { - this.index = index; - this.type = type; - } - } - - - private final NamingConvention namingConvention; - private final TimeUnit rateUnit; - private final TimeUnit durationUnit; - private final String timeStampFieldName; - private final String metricPrefix; - - ElasticMetricsModule(NamingConvention namingConvention, TimeUnit rateUnit, TimeUnit durationUnit, String timeStampFieldName, String metricPrefix) { - this.namingConvention = namingConvention; - this.rateUnit = rateUnit; - this.durationUnit = durationUnit; - this.timeStampFieldName = timeStampFieldName; - this.metricPrefix = metricPrefix; - } - - @Override - public String getModuleName() { - return VERSION.getArtifactId(); - } - - @Override - public Version version() { - return VERSION; - } - - @Override - public void setupModule(SetupContext context) { - context.addSerializers(new SimpleSerializers(Arrays.asList( - new TimerSerializer(namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix), - new FunctionTimerSerializer(namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix), - new DistributionSummarySerializer(namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix), - new LongTaskTimerSerializer(namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix), - new CounterSerializer(namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix), - new FunctionCounterSerializer(namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix), - new GaugeSerializer(namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix), - new TimeGaugeSerializer(namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix), - new MeterSerializer(namingConvention, rateUnit, durationUnit, timeStampFieldName, metricPrefix), - new BulkIndexOperationHeaderSerializer() - ))); - } -} diff --git a/implementations/micrometer-registry-elastic/src/main/java/io/micrometer/elastic/ElasticSerializableMeters.java b/implementations/micrometer-registry-elastic/src/main/java/io/micrometer/elastic/ElasticSerializableMeters.java deleted file mode 100644 index add9fa438e..0000000000 --- a/implementations/micrometer-registry-elastic/src/main/java/io/micrometer/elastic/ElasticSerializableMeters.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Copyright 2017 Pivotal Software, Inc. - *

- * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.micrometer.elastic; - -import io.micrometer.core.instrument.*; - -class ElasticSerializableMeters { - public static class ElasticSerializableMeter { - private final T meter; - private final long timestamp; - private final String type; - - ElasticSerializableMeter(T meter, long timestamp, String type) { - this.meter = meter; - this.timestamp = timestamp; - this.type = type; - } - - public T getMeter() { - return meter; - } - - public long getTimestamp() { - return timestamp; - } - - public String getType() { - return type; - } - } - - static class ElasticTimer extends ElasticSerializableMeter { - ElasticTimer(Timer meter, long timestamp) { - super(meter, timestamp, "timer"); - } - } - - static class ElasticFunctionTimer extends ElasticSerializableMeter { - ElasticFunctionTimer(FunctionTimer meter, long timestamp) { - super(meter, timestamp, "timer"); - } - } - - static class ElasticDistributionSummary extends ElasticSerializableMeter { - ElasticDistributionSummary(DistributionSummary meter, long timestamp) { - super(meter, timestamp, "histogram"); - } - } - - static class ElasticLongTaskTimer extends ElasticSerializableMeter { - ElasticLongTaskTimer(LongTaskTimer meter, long timestamp) { - super(meter, timestamp, "long_task_timer"); - } - } - - static class ElasticCounter extends ElasticSerializableMeter { - ElasticCounter(Counter meter, long timestamp) { - super(meter, timestamp, "counter"); - } - } - - static class ElasticFunctionCounter extends ElasticSerializableMeter { - ElasticFunctionCounter(FunctionCounter meter, long timestamp) { - super(meter, timestamp, "counter"); - } - } - - static class ElasticGauge extends ElasticSerializableMeter { - - ElasticGauge(Gauge meter, long timestamp) { - super(meter, timestamp, "gauge"); - } - } - - static class ElasticTimeGauge extends ElasticSerializableMeter { - - ElasticTimeGauge(TimeGauge meter, long timestamp) { - super(meter, timestamp, "gauge"); - } - } - - static class ElasticMeter extends ElasticSerializableMeter { - ElasticMeter(Meter meter, long timestamp) { - super(meter, timestamp, "unknown"); - } - } -} diff --git a/implementations/micrometer-registry-elastic/src/test/java/io/micrometer/elastic/ElasticMeterRegistryCompatibilityTest.java b/implementations/micrometer-registry-elastic/src/test/java/io/micrometer/elastic/ElasticMeterRegistryCompatibilityTest.java index 64ba700861..64f8ab7e7c 100644 --- a/implementations/micrometer-registry-elastic/src/test/java/io/micrometer/elastic/ElasticMeterRegistryCompatibilityTest.java +++ b/implementations/micrometer-registry-elastic/src/test/java/io/micrometer/elastic/ElasticMeterRegistryCompatibilityTest.java @@ -1,3 +1,18 @@ +/** + * Copyright 2017 Pivotal Software, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.micrometer.elastic; import io.micrometer.core.instrument.MeterRegistry; diff --git a/implementations/micrometer-registry-elastic/src/test/java/io/micrometer/elastic/ElasticMeterRegistryTest.java b/implementations/micrometer-registry-elastic/src/test/java/io/micrometer/elastic/ElasticMeterRegistryTest.java new file mode 100644 index 0000000000..84b90c8c7c --- /dev/null +++ b/implementations/micrometer-registry-elastic/src/test/java/io/micrometer/elastic/ElasticMeterRegistryTest.java @@ -0,0 +1,51 @@ +/** + * Copyright 2017 Pivotal Software, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micrometer.elastic; + +import io.micrometer.core.instrument.MockClock; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class ElasticMeterRegistryTest { + private MockClock clock = new MockClock(); + private ElasticConfig config = new ElasticConfig() { + @Override + public String get(String key) { + return null; + } + + @Override + public boolean enabled() { + return false; + } + }; + + private ElasticMeterRegistry registry = new ElasticMeterRegistry(config, clock); + + @Test + void timestampFormat() { + assertThat(ElasticMeterRegistry.IndexBuilder.timestamp(1)).isEqualTo("1970-01-01T00:00:00.001Z"); + } + + @Test + void buildIndex() { + assertThat(registry.index("myTimer", "timer", 0) + .field("phi", "0.99").field("value", 1).build()) + .isEqualTo("{\"index\":{\"_index\":\"metrics-1970-01\",\"_type\":\"doc\"}}\n" + + "{\"@timestamp\":\"1970-01-01T00:00:00Z\",\"name\":\"myTimer\",\"type\":\"timer\",\"phi\":\"0.99\",\"value\":1}"); + } +} \ No newline at end of file diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/cumulative/CumulativeFunctionTimer.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/cumulative/CumulativeFunctionTimer.java index daa0b3b43a..75c06b499f 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/cumulative/CumulativeFunctionTimer.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/cumulative/CumulativeFunctionTimer.java @@ -55,7 +55,7 @@ public CumulativeFunctionTimer(Id id, T obj, ToLongFunction countFunction, To */ public double count() { T obj2 = ref.get(); - return obj2 != null ? (lastCount = countFunction.applyAsLong(obj2)) : lastCount; + return obj2 != null ? (lastCount = Math.max(countFunction.applyAsLong(obj2), 0)) : lastCount; } /** @@ -65,9 +65,9 @@ public double totalTime(TimeUnit unit) { T obj2 = ref.get(); if (obj2 == null) return lastTime; - return (lastTime = TimeUtils.convert(totalTimeFunction.applyAsDouble(obj2), + return (lastTime = Math.max(TimeUtils.convert(totalTimeFunction.applyAsDouble(obj2), totalTimeFunctionUnits, - unit)); + unit), 0)); } @Override diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepFunctionTimer.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepFunctionTimer.java index d7a6651cd3..9f04f48019 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepFunctionTimer.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepFunctionTimer.java @@ -62,7 +62,7 @@ public double count() { T obj2 = ref.get(); if (obj2 != null) { long prevLast = lastCount; - lastCount = countFunction.applyAsLong(obj2); + lastCount = Math.max(countFunction.applyAsLong(obj2), 0); count.getCurrent().add(lastCount - prevLast); } return count.poll(); @@ -75,7 +75,8 @@ public double totalTime(TimeUnit unit) { T obj2 = ref.get(); if (obj2 != null) { double prevLast = lastTime; - lastTime = TimeUtils.convert(totalTimeFunction.applyAsDouble(obj2), totalTimeFunctionUnits, unit); + lastTime = Math.max(TimeUtils.convert(totalTimeFunction.applyAsDouble(obj2), totalTimeFunctionUnits, unit), 0); + System.out.println("prevLast: " + prevLast + " last: " + lastTime + "accum: " + (lastTime - prevLast)); total.getCurrent().add(lastTime - prevLast); } return total.poll(); diff --git a/micrometer-spring-legacy/src/main/java/io/micrometer/spring/autoconfigure/export/elastic/ElasticMetricsExportAutoConfiguration.java b/micrometer-spring-legacy/src/main/java/io/micrometer/spring/autoconfigure/export/elastic/ElasticMetricsExportAutoConfiguration.java index aa783b9a6a..8eedbbe9c3 100644 --- a/micrometer-spring-legacy/src/main/java/io/micrometer/spring/autoconfigure/export/elastic/ElasticMetricsExportAutoConfiguration.java +++ b/micrometer-spring-legacy/src/main/java/io/micrometer/spring/autoconfigure/export/elastic/ElasticMetricsExportAutoConfiguration.java @@ -18,8 +18,7 @@ import io.micrometer.core.instrument.Clock; import io.micrometer.elastic.ElasticConfig; import io.micrometer.elastic.ElasticMeterRegistry; -import io.micrometer.influx.InfluxConfig; -import io.micrometer.influx.InfluxMeterRegistry; +import io.micrometer.spring.autoconfigure.CompositeMeterRegistryAutoConfiguration; import io.micrometer.spring.autoconfigure.MetricsAutoConfiguration; import io.micrometer.spring.autoconfigure.export.StringToDurationConverter; import io.micrometer.spring.autoconfigure.export.simple.SimpleMetricsExportAutoConfiguration; @@ -40,7 +39,8 @@ * @author Nicolas Portmann */ @Configuration -@AutoConfigureBefore(SimpleMetricsExportAutoConfiguration.class) +@AutoConfigureBefore({CompositeMeterRegistryAutoConfiguration.class, + SimpleMetricsExportAutoConfiguration.class}) @AutoConfigureAfter(MetricsAutoConfiguration.class) @ConditionalOnBean(Clock.class) @ConditionalOnClass(ElasticMeterRegistry.class) diff --git a/micrometer-spring-legacy/src/main/java/io/micrometer/spring/autoconfigure/export/elastic/ElasticPropertiesConfigAdapter.java b/micrometer-spring-legacy/src/main/java/io/micrometer/spring/autoconfigure/export/elastic/ElasticPropertiesConfigAdapter.java index 1f41b4c5dd..97f2361ae6 100644 --- a/micrometer-spring-legacy/src/main/java/io/micrometer/spring/autoconfigure/export/elastic/ElasticPropertiesConfigAdapter.java +++ b/micrometer-spring-legacy/src/main/java/io/micrometer/spring/autoconfigure/export/elastic/ElasticPropertiesConfigAdapter.java @@ -16,11 +16,7 @@ package io.micrometer.spring.autoconfigure.export.elastic; import io.micrometer.elastic.ElasticConfig; -import io.micrometer.influx.InfluxConfig; import io.micrometer.spring.autoconfigure.export.StepRegistryPropertiesConfigAdapter; -import io.micrometer.spring.autoconfigure.export.influx.InfluxProperties; - -import java.util.concurrent.TimeUnit; /** * Adapter to convert {@link ElasticProperties} to an {@link ElasticConfig}. @@ -38,26 +34,6 @@ public String[] hosts() { return get(ElasticProperties::getHosts, ElasticConfig.super::hosts); } - @Override - public String metricPrefix() { - return get(ElasticProperties::getMetricPrefix, ElasticConfig.super::metricPrefix); - } - - @Override - public TimeUnit rateUnits() { - return get(ElasticProperties::getRateUnits, ElasticConfig.super::rateUnits); - } - - @Override - public TimeUnit durationUnits() { - return get(ElasticProperties::getDurationUnits, ElasticConfig.super::durationUnits); - } - - @Override - public int timeout() { - return get(ElasticProperties::getTimeout, ElasticConfig.super::timeout); - } - @Override public String index() { return get(ElasticProperties::getIndex, ElasticConfig.super::index); @@ -66,12 +42,6 @@ public String index() { @Override public String indexDateFormat() { return get(ElasticProperties::getIndexDateFormat, ElasticConfig.super::indexDateFormat); - - } - - @Override - public int bulkSize() { - return get(ElasticProperties::getBulkSize, ElasticConfig.super::bulkSize); } @Override diff --git a/samples/micrometer-samples-core/src/main/java/io/micrometer/core/samples/utils/SampleRegistries.java b/samples/micrometer-samples-core/src/main/java/io/micrometer/core/samples/utils/SampleRegistries.java index 119766e1f7..a431b1ae8c 100644 --- a/samples/micrometer-samples-core/src/main/java/io/micrometer/core/samples/utils/SampleRegistries.java +++ b/samples/micrometer-samples-core/src/main/java/io/micrometer/core/samples/utils/SampleRegistries.java @@ -45,7 +45,6 @@ import io.micrometer.wavefront.WavefrontConfig; import io.micrometer.wavefront.WavefrontMeterRegistry; -import java.io.Closeable; import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; diff --git a/scripts/elastic.sh b/scripts/elastic.sh old mode 100644 new mode 100755 index fa903ec7e2..56e2d59762 --- a/scripts/elastic.sh +++ b/scripts/elastic.sh @@ -1,2 +1,2 @@ #!/usr/bin/env bash -`docker run -d -p 9200:9200 -p 9300:9300 elasticsearch` \ No newline at end of file +docker run -d -p 9200:9200 -p 5601:5601 nshou/elasticsearch-kibana \ No newline at end of file