diff --git a/settings.gradle b/settings.gradle index bc3207c58..0937f294b 100755 --- a/settings.gradle +++ b/settings.gradle @@ -5,6 +5,7 @@ include 'spectator-api', 'spectator-ext-jvm', 'spectator-ext-log4j2', 'spectator-ext-sandbox', + 'spectator-ext-spark', 'spectator-reg-metrics2', 'spectator-reg-metrics3', 'spectator-reg-servo', diff --git a/spectator-ext-spark/build.gradle b/spectator-ext-spark/build.gradle new file mode 100755 index 000000000..83baddaf8 --- /dev/null +++ b/spectator-ext-spark/build.gradle @@ -0,0 +1,7 @@ +dependencies { + compile project(':spectator-api') + compile project(':spectator-ext-gc') + compile project(':spectator-ext-jvm') + compile 'io.dropwizard.metrics:metrics-core:3.1.0' + compile 'org.apache.spark:spark-core_2.10:1.2.1' +} diff --git a/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/DataType.java b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/DataType.java new file mode 100644 index 000000000..2377923dd --- /dev/null +++ b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/DataType.java @@ -0,0 +1,41 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.spark; + +import com.netflix.spectator.api.Tag; + +/** + * Data types for messages sent to the sidecar metrics endpoint. + */ +public enum DataType implements Tag { + + /** Value reported as is, the most recent value received by the sidecar will get used. */ + GAUGE, + + /** Value is a delta to use when incrementing the counter. */ + COUNTER, + + /** Value is an amount in milliseconds that will be recorded on the timer. */ + TIMER; + + @Override public String key() { + return "type"; + } + + @Override public String value() { + return name(); + } +} diff --git a/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/NameFunction.java b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/NameFunction.java new file mode 100644 index 000000000..3be035735 --- /dev/null +++ b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/NameFunction.java @@ -0,0 +1,29 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.spark; + +import com.netflix.spectator.api.Id; + +/** + * Maps a name used for the metrics registry into an id for spectator. + */ +public interface NameFunction { + /** + * Return the id corresponding to the name, or null if the name cannot be mapped and the value + * should be dropped. + */ + Id apply(String name); +} diff --git a/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SidecarCounter.java b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SidecarCounter.java new file mode 100644 index 000000000..bb4adec37 --- /dev/null +++ b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SidecarCounter.java @@ -0,0 +1,66 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.spark; + +import com.netflix.spectator.api.Clock; +import com.netflix.spectator.api.Counter; +import com.netflix.spectator.api.Id; +import com.netflix.spectator.api.Measurement; + +import java.util.Collections; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Counter that tracks the delta since the last measurement was taken. + */ +class SidecarCounter implements Counter { + + private final Clock clock; + private final Id id; + private final AtomicLong value; + + /** Create a new instance. */ + SidecarCounter(Clock clock, Id id) { + this.clock = clock; + this.id = id.withTag(DataType.COUNTER); + this.value = new AtomicLong(0L); + } + + @Override public Id id() { + return id; + } + + @Override public void increment() { + value.incrementAndGet(); + } + + @Override public void increment(long amount) { + value.addAndGet(amount); + } + + @Override public long count() { + return value.get(); + } + + @Override public Iterable measure() { + Measurement m = new Measurement(id, clock.wallTime(), value.getAndSet(0L)); + return Collections.singletonList(m); + } + + @Override public boolean hasExpired() { + return false; + } +} diff --git a/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SidecarDistributionSummary.java b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SidecarDistributionSummary.java new file mode 100644 index 000000000..c3dd9618b --- /dev/null +++ b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SidecarDistributionSummary.java @@ -0,0 +1,75 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.spark; + +import com.netflix.spectator.api.Clock; +import com.netflix.spectator.api.Counter; +import com.netflix.spectator.api.DistributionSummary; +import com.netflix.spectator.api.Id; +import com.netflix.spectator.api.Measurement; +import com.netflix.spectator.api.Statistic; + +import java.util.ArrayList; +import java.util.List; + +/** + * Distribution summary that is mapped to two counters: total time and count. + */ +class SidecarDistributionSummary implements DistributionSummary { + + private final Id id; + private final Counter count; + private final Counter totalAmount; + + /** Create a new instance. */ + SidecarDistributionSummary(Clock clock, Id id) { + this.id = id; + count = new SidecarCounter(clock, id.withTag(Statistic.count)); + totalAmount = new SidecarCounter(clock, id.withTag(Statistic.totalAmount)); + } + + @Override public Id id() { + return id; + } + + @Override public void record(long amount) { + count.increment(); + totalAmount.increment(amount); + } + + @Override public long count() { + return count.count(); + } + + @Override public long totalAmount() { + return totalAmount.count(); + } + + @Override public Iterable measure() { + List ms = new ArrayList<>(); + for (Measurement m : count.measure()) { + ms.add(m); + } + for (Measurement m : totalAmount.measure()) { + ms.add(m); + } + return ms; + } + + @Override public boolean hasExpired() { + return false; + } +} diff --git a/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SidecarRegistry.java b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SidecarRegistry.java new file mode 100644 index 000000000..463c1cf1e --- /dev/null +++ b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SidecarRegistry.java @@ -0,0 +1,244 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.spark; + +import com.netflix.spectator.api.AbstractRegistry; +import com.netflix.spectator.api.Clock; +import com.netflix.spectator.api.Counter; +import com.netflix.spectator.api.DistributionSummary; +import com.netflix.spectator.api.Id; +import com.netflix.spectator.api.Measurement; +import com.netflix.spectator.api.Meter; +import com.netflix.spectator.api.Tag; +import com.netflix.spectator.api.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * Registry that reports values to a sidecar process via an HTTP call. + */ +public class SidecarRegistry extends AbstractRegistry { + + private static final Logger LOGGER = LoggerFactory.getLogger(SidecarRegistry.class); + + private ScheduledExecutorService executor; + + private final Counter numMessages; + private final Counter numMeasurements; + + /** Create a new instance. */ + public SidecarRegistry() { + this(Clock.SYSTEM); + } + + /** Create a new instance. */ + public SidecarRegistry(Clock clock) { + super(clock); + numMessages = counter(createId("spectator.sidecar.numMessages")); + numMeasurements = counter(createId("spectator.sidecar.numMeasurements")); + } + + /** + * Start sending data to the sidecar. + * + * @param url + * Location of the sidecar endpoint. + * @param pollPeriod + * How frequently to poll the data and send to the sidecar. + * @param pollUnit + * Unit for the {@code pollPeriod}. + */ + public void start(final URL url, long pollPeriod, TimeUnit pollUnit) { + LOGGER.info("starting sidecar registry with url {} and poll period {} {}", + url, pollPeriod, pollUnit); + executor = Executors.newSingleThreadScheduledExecutor( + new ThreadFactory() { + @Override public Thread newThread(Runnable r) { + return new Thread(r, "spectator-sidecar"); + } + } + ); + + final SidecarRegistry self = this; + Runnable task = new Runnable() { + @Override public void run() { + try { + List ms = new ArrayList<>(); + for (Meter meter : self) { + for (Measurement m : meter.measure()) { + ms.add(m); + } + } + postJson(url, ms); + } catch (Exception e) { + LOGGER.error("failed to send data to sidecar", e); + } + } + }; + executor.scheduleWithFixedDelay(task, pollPeriod, pollPeriod, pollUnit); + } + + /** + * Stop sending data to the sidecar and shutdown the executor. + */ + public void stop() { + executor.shutdown(); + executor = null; + } + + private String toJson(List ms) { + StringBuilder buf = new StringBuilder(); + buf.append('['); + appendJson(buf, ms.get(0)); + for (int i = 1; i < ms.size(); ++i) { + buf.append(','); + appendJson(buf, ms.get(i)); + } + buf.append(']'); + return buf.toString(); + } + + private void appendJson(StringBuilder buf, Measurement m) { + if (Double.isFinite(m.value())) { + buf.append('{'); + appendJsonString(buf, "timestamp"); + buf.append(':').append(m.timestamp()); + buf.append(','); + + appendJsonString(buf, "type"); + buf.append(':'); + appendJsonString(buf, getType(m.id())); + buf.append(','); + + appendJsonString(buf, "name"); + buf.append(':'); + appendJsonString(buf, m.id().name()); + buf.append(','); + + appendJsonString(buf, "tags"); + buf.append(":{"); + boolean first = true; + for (Tag t : m.id().tags()) { + if (first) { + first = false; + } else { + buf.append(','); + } + appendJsonString(buf, t.key()); + buf.append(':'); + appendJsonString(buf, t.value()); + } + buf.append("},"); + + appendJsonString(buf, "value"); + buf.append(':'); + buf.append(m.value()); + + buf.append('}'); + } + } + + private String getType(Id id) { + for (Tag t : id.tags()) { + if (t.key().equals("type")) { + return t.value(); + } + } + return DataType.GAUGE.value(); + } + + private void appendJsonString(StringBuilder buf, String s) { + buf.append('"'); + final int length = s.length(); + for (int i = 0; i < length; ++i) { + final char c = s.charAt(i); + switch (s.charAt(i)) { + case '"': + buf.append("\\\""); + break; + case '\b': + buf.append("\\b"); + break; + case '\f': + buf.append("\\f"); + break; + case '\n': + buf.append("\\n"); + break; + case '\r': + buf.append("\\r"); + break; + case '\t': + buf.append("\\t"); + break; + default: + buf.append(c); + break; + } + } + buf.append('"'); + } + + private void postJson(URL url, List ms) throws Exception { + if (!ms.isEmpty()) { + LOGGER.debug("sending {} messages to sidecar {}", ms.size(), url.toString()); + numMessages.increment(); + numMeasurements.increment(ms.size()); + String json = toJson(ms); + LOGGER.info(json); + HttpURLConnection con = (HttpURLConnection) url.openConnection(); + try { + con.setRequestMethod("POST"); + con.setDoInput(true); + con.setDoOutput(true); + try (OutputStream out = con.getOutputStream()) { + out.write(json.getBytes("UTF-8")); + } + con.connect(); + + int status = con.getResponseCode(); + if (status != 200) { + throw new IOException("post to sidecar failed with status: " + status); + } + } finally { + con.disconnect(); + } + } + } + + @Override protected Counter newCounter(Id id) { + return new SidecarCounter(clock(), id); + } + + @Override protected DistributionSummary newDistributionSummary(Id id) { + return new SidecarDistributionSummary(clock(), id); + } + + @Override protected Timer newTimer(Id id) { + return new SidecarTimer(clock(), id); + } +} diff --git a/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SidecarTimer.java b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SidecarTimer.java new file mode 100644 index 000000000..f3183739d --- /dev/null +++ b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SidecarTimer.java @@ -0,0 +1,90 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.spark; + +import com.netflix.spectator.api.Clock; +import com.netflix.spectator.api.Id; +import com.netflix.spectator.api.Measurement; +import com.netflix.spectator.api.Timer; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Timer that keeps track of all samples since the last measurement to forward to the sidecar. + */ +class SidecarTimer implements Timer { + + private static final int MAX_VALUES = 10000; + + private final Clock clock; + private final Id id; + private final LinkedBlockingQueue values = new LinkedBlockingQueue<>(MAX_VALUES); + + /** Create a new instance. */ + SidecarTimer(Clock clock, Id id) { + this.clock = clock; + this.id = id.withTag(DataType.TIMER); + } + + @Override public Id id() { + return id; + } + + @Override public void record(long amount, TimeUnit unit) { + Measurement m = new Measurement(id, clock.wallTime(), unit.toMillis(amount)); + values.offer(m); + } + + @Override public T record(Callable f) throws Exception { + final long start = clock.monotonicTime(); + try { + return f.call(); + } finally { + record(clock.monotonicTime() - start, TimeUnit.NANOSECONDS); + } + } + + @Override public void record(Runnable f) { + final long start = clock.monotonicTime(); + try { + f.run(); + } finally { + record(clock.monotonicTime() - start, TimeUnit.NANOSECONDS); + } + } + + @Override public long count() { + return 0L; + } + + @Override public long totalTime() { + return 0L; + } + + @Override public Iterable measure() { + List ms = new ArrayList<>(); + values.drainTo(ms); + return ms; + } + + @Override public boolean hasExpired() { + return false; + } +} diff --git a/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SparkNameFunction.java b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SparkNameFunction.java new file mode 100644 index 000000000..e18c717b0 --- /dev/null +++ b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SparkNameFunction.java @@ -0,0 +1,90 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.spark; + +import com.netflix.spectator.api.DefaultId; +import com.netflix.spectator.api.Id; + +import java.util.HashSet; +import java.util.Set; + +/** + * Maps hierarchical spark names into tagged ids. Spark names generally follow a pattern like: + * + *
[appId].[executorId].[role].[name]
+ */ +public class SparkNameFunction implements NameFunction { + + private static final String PREFIX = "spark."; + + private static final Set ROLES = new HashSet<>(); + static { + ROLES.add("master"); + ROLES.add("worker"); + ROLES.add("executor"); + ROLES.add("driver"); + //ROLES.add("application"); + } + + private static final Id DROP_METRIC = null; + + @Override public Id apply(String name) { + int p = name.indexOf('.'); + return (p == -1) + ? DROP_METRIC + : apply(name.substring(0, p), name.substring(p + 1)); + } + + private Id apply(String p1, String name) { + Id id = null; + if (ROLES.contains(p1)) { + id = new DefaultId(PREFIX + name).withTag("role", p1); + } else { + int p = name.indexOf('.'); + id = (p == -1) + ? DROP_METRIC + : apply(p1, name.substring(0, p), name.substring(p + 1)); + } + return id; + } + + private Id apply(String appId, String p2, String name) { + Id id = null; + if (ROLES.contains(p2)) { + id = new DefaultId(PREFIX + name).withTag("role", p2).withTag("appId", appId); + } else { + int p = name.indexOf('.'); + id = (p == -1) + ? DROP_METRIC + : apply(appId, p2, name.substring(0, p), name.substring(p + 1)); + } + return id; + } + + @SuppressWarnings("PMD.UseObjectForClearerAPI") + private Id apply(String appId, String executorId, String p3, String name) { + Id id = null; + if (ROLES.contains(p3)) { + id = new DefaultId(PREFIX + name) + .withTag("role", p3) + .withTag("appId", appId) + .withTag("executorId", executorId); + } else { + id = DROP_METRIC; + } + return id; + } +} diff --git a/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SparkSink.java b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SparkSink.java new file mode 100644 index 000000000..a498ac3d5 --- /dev/null +++ b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SparkSink.java @@ -0,0 +1,96 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.spark; + +import com.codahale.metrics.MetricRegistry; +import com.netflix.spectator.api.Spectator; +import org.apache.spark.metrics.sink.Sink; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URL; +import java.util.Locale; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +/** + * Sink for exporting spark metrics to a prana sidecar. + */ +public class SparkSink implements Sink { + + private static final Logger LOGGER = LoggerFactory.getLogger(SparkSink.class); + + private static final String DEFAULT_URL = "http://localhost:8078/metrics"; + + private final SpectatorReporter reporter; + private final SidecarRegistry sidecarRegistry; + + private final long pollPeriod; + private final TimeUnit pollUnit; + + private final URL url; + + /** + * Create a new instance. Spark looks for a constructor with all three parameters, so the + * {@code SecurityManager} needs to be in the signature even though it isn't used. + */ + @SuppressWarnings("PMD.UnusedFormalParameter") + public SparkSink( + Properties properties, + MetricRegistry registry, + org.apache.spark.SecurityManager manager) throws MalformedURLException { + reporter = SpectatorReporter.forRegistry(registry) + .withNameFunction(new SparkNameFunction()) + .build(); + pollPeriod = getPeriod(properties); + pollUnit = getUnit(properties); + url = URI.create(properties.getProperty("url", DEFAULT_URL)).toURL(); + sidecarRegistry = Spectator.registry().underlying(SidecarRegistry.class); + } + + private long getPeriod(Properties properties) { + final String v = properties.getProperty("period"); + return (v == null) ? 10L : Long.parseLong(v); + } + + private TimeUnit getUnit(Properties properties) { + final String v = properties.getProperty("unit"); + return (v == null) ? TimeUnit.SECONDS : TimeUnit.valueOf(v.toUpperCase(Locale.US)); + } + + @Override public void start() { + LOGGER.info("starting poller"); + reporter.start(pollPeriod, pollUnit); + if (sidecarRegistry != null) { + sidecarRegistry.start(url, pollPeriod, pollUnit); + } + } + + @Override public void stop() { + LOGGER.info("stopping poller"); + reporter.stop(); + if (sidecarRegistry != null) { + sidecarRegistry.stop(); + } + } + + @Override public void report() { + LOGGER.info("reporting values"); + reporter.report(); + } +} diff --git a/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SpectatorReporter.java b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SpectatorReporter.java new file mode 100644 index 000000000..bf6f8b8f4 --- /dev/null +++ b/spectator-ext-spark/src/main/java/com/netflix/spectator/spark/SpectatorReporter.java @@ -0,0 +1,198 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.spark; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.Timer; +import com.netflix.spectator.api.DistributionSummary; +import com.netflix.spectator.api.ExtendedRegistry; +import com.netflix.spectator.api.Id; +import com.netflix.spectator.api.Spectator; +import com.netflix.spectator.impl.AtomicDouble; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.SortedMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Reporter for mapping data in a metrics3 registry to spectator. + */ +public final class SpectatorReporter extends ScheduledReporter { + + private static final Logger LOGGER = LoggerFactory.getLogger(SpectatorReporter.class); + + /** + * Return a builder for creating a spectator reported based on {@code registry}. + */ + public static Builder forRegistry(MetricRegistry registry) { + return new Builder(registry); + } + + /** + * Builder for configuring the spectator reporter. + */ + public static final class Builder { + private final MetricRegistry registry; + private ExtendedRegistry spectatorRegistry = Spectator.registry(); + private NameFunction function = new NameFunction() { + @Override public Id apply(String name) { + return spectatorRegistry.createId(name); + } + }; + + /** Create a new instance. */ + Builder(MetricRegistry registry) { + this.registry = registry; + } + + /** Set the spectator registry to use. */ + public Builder withSpectatorRegistry(ExtendedRegistry r) { + spectatorRegistry = r; + return this; + } + + /** Set the name mapping function to use. */ + public Builder withNameFunction(NameFunction f) { + function = f; + return this; + } + + /** Create a new instance of the reporter. */ + public SpectatorReporter build() { + return new SpectatorReporter(registry, spectatorRegistry, function); + } + } + + private final ExtendedRegistry spectatorRegistry; + private final NameFunction nameFunction; + + private final ConcurrentHashMap gaugeDoubles = new ConcurrentHashMap<>(); + private final ConcurrentHashMap previousValues = new ConcurrentHashMap<>(); + + /** Create a new instance. */ + SpectatorReporter( + MetricRegistry metricRegistry, + ExtendedRegistry spectatorRegistry, + NameFunction nameFunction) { + super(metricRegistry, + "spectator", // name + MetricFilter.ALL, // filter + TimeUnit.SECONDS, // rateUnit + TimeUnit.SECONDS); // durationUnit + this.spectatorRegistry = spectatorRegistry; + this.nameFunction = nameFunction; + } + + @SuppressWarnings("PMD.NPathComplexity") + @Override public void report( + SortedMap gauges, + SortedMap counters, + SortedMap histograms, + SortedMap meters, + SortedMap timers) { + LOGGER.debug("gauges {}, counters {}, histograms {}, meters {}, timers {}", + gauges.size(), counters.size(), histograms.size(), meters.size(), timers.size()); + + for (Map.Entry entry : gauges.entrySet()) { + final Object obj = entry.getValue().getValue(); + LOGGER.info("type " + obj.getClass().getName()); + if (obj instanceof Number) { + final double v = ((Number) obj).doubleValue(); + setGaugeValue(entry.getKey(), v); + } + } + + for (Map.Entry entry : counters.entrySet()) { + setGaugeValue(entry.getKey(), entry.getValue().getCount()); + } + + for (Map.Entry entry : histograms.entrySet()) { + final Id id = nameFunction.apply(entry.getKey()); + if (id != null) { + final DistributionSummary sHisto = spectatorRegistry.distributionSummary(id); + final Histogram mHisto = entry.getValue(); + final long[] vs = mHisto.getSnapshot().getValues(); + for (long v : vs) { + sHisto.record(v); + } + } + } + + for (Map.Entry entry : meters.entrySet()) { + final long curr = entry.getValue().getCount(); + final long prev = getAndSetPrevious(entry.getKey(), curr); + final Id id = nameFunction.apply(entry.getKey()); + if (id != null) { + spectatorRegistry.counter(id).increment(prev - curr); + } + } + + for (Map.Entry entry : timers.entrySet()) { + final Id id = nameFunction.apply(entry.getKey()); + if (id != null) { + final com.netflix.spectator.api.Timer sTimer = spectatorRegistry.timer(id); + final Timer mTimer = entry.getValue(); + final long[] vs = mTimer.getSnapshot().getValues(); + for (long v : vs) { + sTimer.record(v, TimeUnit.NANOSECONDS); + } + } + } + } + + private void setGaugeValue(String name, double v) { + AtomicDouble value = gaugeDoubles.get(name); + if (value == null) { + AtomicDouble tmp = new AtomicDouble(v); + value = gaugeDoubles.putIfAbsent(name, tmp); + if (value == null) { + value = tmp; + register(name, value); + } + } + LOGGER.debug("setting gauge {} to {}", name, v); + value.set(v); + } + + private Id register(String name, AtomicDouble value) { + Id id = nameFunction.apply(name); + if (id != null) { + spectatorRegistry.gauge(id, value); + } + return id; + } + + private long getAndSetPrevious(String name, long newValue) { + AtomicLong prev = previousValues.get(name); + if (prev == null) { + AtomicLong tmp = new AtomicLong(0L); + prev = previousValues.putIfAbsent(name, tmp); + prev = (prev == null) ? tmp : prev; + } + return prev.getAndSet(newValue); + } + +} diff --git a/spectator-ext-spark/src/main/resources/META-INF/services/com.netflix.spectator.api.Registry b/spectator-ext-spark/src/main/resources/META-INF/services/com.netflix.spectator.api.Registry new file mode 100644 index 000000000..6f217e30b --- /dev/null +++ b/spectator-ext-spark/src/main/resources/META-INF/services/com.netflix.spectator.api.Registry @@ -0,0 +1 @@ +com.netflix.spectator.spark.SidecarRegistry diff --git a/spectator-ext-spark/src/test/java/com/netflix/spectator/spark/SparkNameFunctionTest.java b/spectator-ext-spark/src/test/java/com/netflix/spectator/spark/SparkNameFunctionTest.java new file mode 100644 index 000000000..45f7b36fb --- /dev/null +++ b/spectator-ext-spark/src/test/java/com/netflix/spectator/spark/SparkNameFunctionTest.java @@ -0,0 +1,89 @@ +/** + * Copyright 2015 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spectator.spark; + +import com.netflix.spectator.api.DefaultId; +import com.netflix.spectator.api.Id; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class SparkNameFunctionTest { + + @Test + public void executorName() { + final String name = "app-20150309231421-0000.0.executor.filesystem.file.largeRead_ops"; + final Id expected = new DefaultId("spark.filesystem.file.largeRead_ops") + .withTag("role", "executor") + .withTag("appId", "app-20150309231421-0000") + .withTag("executorId", "0"); + final SparkNameFunction f = new SparkNameFunction(); + Assert.assertEquals(expected, f.apply(name)); + } + + @Test + public void driverName() { + final String name = "app-20150309231421-0000.driver.BlockManager.disk.diskSpaceUsed_MB"; + final Id expected = new DefaultId("spark.BlockManager.disk.diskSpaceUsed_MB") + .withTag("role", "driver") + .withTag("appId", "app-20150309231421-0000"); + final SparkNameFunction f = new SparkNameFunction(); + Assert.assertEquals(expected, f.apply(name)); + } + + @Test + public void driverName2() { + final String name = "app-20150309231421-0000.driver.DAGScheduler.job.activeJobs"; + final Id expected = new DefaultId("spark.DAGScheduler.job.activeJobs") + .withTag("role", "driver") + .withTag("appId", "app-20150309231421-0000"); + final SparkNameFunction f = new SparkNameFunction(); + Assert.assertEquals(expected, f.apply(name)); + } + + @Test + public void applicationName() { + final String name = "application.Spark shell.1425968061869.cores"; + final SparkNameFunction f = new SparkNameFunction(); + Assert.assertNull(f.apply(name)); + //final Id expected = new DefaultId("spark.cores") + // .withTag("role", "application") + // .withTag("jobId", "Spark shell.1425968061869"); + //final SparkNameFunction f = new SparkNameFunction(); + //Assert.assertEquals(expected, f.apply(name)); + } + + @Test + public void masterName() { + final String name = "master.apps"; + final Id expected = new DefaultId("spark.apps") + .withTag("role", "master"); + final SparkNameFunction f = new SparkNameFunction(); + Assert.assertEquals(expected, f.apply(name)); + } + + @Test + public void workerName() { + final String name = "worker.memFree_MB"; + final Id expected = new DefaultId("spark.memFree_MB") + .withTag("role", "worker"); + final SparkNameFunction f = new SparkNameFunction(); + Assert.assertEquals(expected, f.apply(name)); + } + +}