Skip to content

Commit

Permalink
initial version of spark sink
Browse files Browse the repository at this point in the history
Contains three parts:

1. A reporter for mapping data in a metrics3
   registry into spectator. This includes trying
   to map the hierarchical names into the tagged
   approach used by spectator/atlas.
2. Spectator regitry that reports to the prana
   sidecar.
3. Implementation of sink that can be used with
   the spark metrics.properties file.

The first two parts will likely get split out into
libraries that can be used separately after some
more testing. In particular, need to see if we can
enhance the prana endpoint so we can do a cleaner
job of forwarding the data.

Still need to figure out how we want to tag system
stats like GC events for different parts. Alternatively
it could just enable those sources on spark configs
and the metrics mapping should work.
  • Loading branch information
brharrington committed Mar 10, 2015
1 parent 698e62e commit 6366ef8
Show file tree
Hide file tree
Showing 13 changed files with 1,027 additions and 0 deletions.
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ include 'spectator-api',
'spectator-ext-jvm',
'spectator-ext-log4j2',
'spectator-ext-sandbox',
'spectator-ext-spark',
'spectator-reg-metrics2',
'spectator-reg-metrics3',
'spectator-reg-servo',
Expand Down
7 changes: 7 additions & 0 deletions spectator-ext-spark/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
dependencies {
compile project(':spectator-api')
compile project(':spectator-ext-gc')
compile project(':spectator-ext-jvm')
compile 'io.dropwizard.metrics:metrics-core:3.1.0'
compile 'org.apache.spark:spark-core_2.10:1.2.1'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spectator.spark;

import com.netflix.spectator.api.Tag;

/**
* Data types for messages sent to the sidecar metrics endpoint.
*/
public enum DataType implements Tag {

/** Value reported as is, the most recent value received by the sidecar will get used. */
GAUGE,

/** Value is a delta to use when incrementing the counter. */
COUNTER,

/** Value is an amount in milliseconds that will be recorded on the timer. */
TIMER;

@Override public String key() {
return "type";
}

@Override public String value() {
return name();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spectator.spark;

import com.netflix.spectator.api.Id;

/**
* Maps a name used for the metrics registry into an id for spectator.
*/
public interface NameFunction {
/**
* Return the id corresponding to the name, or null if the name cannot be mapped and the value
* should be dropped.
*/
Id apply(String name);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spectator.spark;

import com.netflix.spectator.api.Clock;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Measurement;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicLong;

/**
* Counter that tracks the delta since the last measurement was taken.
*/
class SidecarCounter implements Counter {

private final Clock clock;
private final Id id;
private final AtomicLong value;

/** Create a new instance. */
SidecarCounter(Clock clock, Id id) {
this.clock = clock;
this.id = id.withTag(DataType.COUNTER);
this.value = new AtomicLong(0L);
}

@Override public Id id() {
return id;
}

@Override public void increment() {
value.incrementAndGet();
}

@Override public void increment(long amount) {
value.addAndGet(amount);
}

@Override public long count() {
return value.get();
}

@Override public Iterable<Measurement> measure() {
Measurement m = new Measurement(id, clock.wallTime(), value.getAndSet(0L));
return Collections.singletonList(m);
}

@Override public boolean hasExpired() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spectator.spark;

import com.netflix.spectator.api.Clock;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.DistributionSummary;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Measurement;
import com.netflix.spectator.api.Statistic;

import java.util.ArrayList;
import java.util.List;

/**
* Distribution summary that is mapped to two counters: total time and count.
*/
class SidecarDistributionSummary implements DistributionSummary {

private final Id id;
private final Counter count;
private final Counter totalAmount;

/** Create a new instance. */
SidecarDistributionSummary(Clock clock, Id id) {
this.id = id;
count = new SidecarCounter(clock, id.withTag(Statistic.count));
totalAmount = new SidecarCounter(clock, id.withTag(Statistic.totalAmount));
}

@Override public Id id() {
return id;
}

@Override public void record(long amount) {
count.increment();
totalAmount.increment(amount);
}

@Override public long count() {
return count.count();
}

@Override public long totalAmount() {
return totalAmount.count();
}

@Override public Iterable<Measurement> measure() {
List<Measurement> ms = new ArrayList<>();
for (Measurement m : count.measure()) {
ms.add(m);
}
for (Measurement m : totalAmount.measure()) {
ms.add(m);
}
return ms;
}

@Override public boolean hasExpired() {
return false;
}
}
Loading

0 comments on commit 6366ef8

Please sign in to comment.