Skip to content

Commit

Permalink
Merge pull request #153 from brharrington/spark-stream
Browse files Browse the repository at this point in the history
allow drill-down by driver name
  • Loading branch information
brharrington committed May 29, 2015
2 parents 0ef371b + 0ae74d8 commit 0e8e9f7
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ boolean matches(String name) {
}

double apply(double v) {
return v * factor;
// streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
// -1 is used for abnormal conditions
return (Math.abs(v + 1.0) <= 1e-12) ? Double.NaN : v * factor;
}
}

Expand Down
38 changes: 35 additions & 3 deletions spectator-ext-spark/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,39 @@ spectator.spark {
// core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
// core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
{
pattern = "^([^.]+)\\.<?(driver)>?\\.((DAGScheduler|BlockManager)\\..+?)(_MB|_ms)?$"
name = 3
pattern = "^([^.]+)\\.<?(driver)>?\\.(DAGScheduler|BlockManager)\\.(.+?)(_MB|_ms)?$"
name = 4
tags = {
"role" = 2
"appId" = 1
"driver" = 3
}
},

// streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
// streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala
// Note: the .*Delay stats are derived from the .*Time stats. The time stats by themselves
// aren't that useful sent to the backend.
{
pattern = "^([^.]+)\\.<?(driver)>?\\.([^.]+)\\.(StreamingMetrics)\\.(.+Delay)$"
name = 5
tags = {
"role" = 2
"driver" = 4
"appId" = 1
"appName" = 3
}
},

// streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
{
pattern = "^([^.]+)\\.<?(driver)>?\\.([^.]+)\\.(StreamingMetrics)\\.(.+(?<!Time|Delay))$"
name = 5
tags = {
"role" = 2
"driver" = 4
"appId" = 1
"appName" = 3
}
},
]
Expand All @@ -52,6 +80,10 @@ spectator.spark {
{
pattern = "^.*_ms$"
factor = 0.001
},
{
pattern = "^.*streaming.*_.*Delay$"
factor = 0.001
}
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,30 +45,61 @@ public void executorName() {
@Test
public void driverName() {
final String name = "app-20150309231421-0000.driver.BlockManager.disk.diskSpaceUsed_MB";
final Id expected = new DefaultId("spark.BlockManager.disk.diskSpaceUsed")
final Id expected = new DefaultId("spark.disk.diskSpaceUsed")
.withTag("role", "driver")
.withTag("driver", "BlockManager")
.withTag("appId", "app-20150309231421-0000");
assertEquals(expected, f.apply(name));
}

@Test
public void driverName2() {
final String name = "app-20150309231421-0000.driver.DAGScheduler.job.activeJobs";
final Id expected = new DefaultId("spark.DAGScheduler.job.activeJobs")
final Id expected = new DefaultId("spark.job.activeJobs")
.withTag("role", "driver")
.withTag("driver", "DAGScheduler")
.withTag("appId", "app-20150309231421-0000");
assertEquals(expected, f.apply(name));
}

@Test
public void driverName3() {
final String name = "local-1429219722964.<driver>.DAGScheduler.job.activeJobs";
final Id expected = new DefaultId("spark.DAGScheduler.job.activeJobs")
final Id expected = new DefaultId("spark.job.activeJobs")
.withTag("role", "driver")
.withTag("driver", "DAGScheduler")
.withTag("appId", "local-1429219722964");
assertEquals(expected, f.apply(name));
}

@Test
public void driverStreamingSimple() {
final String name = "app-20150527224111-0014.<driver>.SubscriptionEnded.StreamingMetrics.streaming.receivers";
final Id expected = new DefaultId("spark.streaming.receivers")
.withTag("role", "driver")
.withTag("driver", "StreamingMetrics")
.withTag("appId", "app-20150527224111-0014")
.withTag("appName", "SubscriptionEnded");
assertEquals(expected, f.apply(name));
}

@Test
public void driverStreamingDelay() {
final String name = "app-20150527224111-0014.<driver>.SubscriptionEnded.StreamingMetrics.streaming.lastReceivedBatch_submissionDelay";
final Id expected = new DefaultId("spark.streaming.lastReceivedBatch_submissionDelay")
.withTag("role", "driver")
.withTag("driver", "StreamingMetrics")
.withTag("appId", "app-20150527224111-0014")
.withTag("appName", "SubscriptionEnded");
assertEquals(expected, f.apply(name));
}

@Test
public void driverStreamingTime() {
final String name = "app-20150527224111-0014.<driver>.SubscriptionEnded.StreamingMetrics.streaming.lastReceivedBatch_submissionTime";
Assert.assertNull(f.apply(name));
}

@Test
public void applicationName() {
final String name = "application.Spark shell.1425968061869.cores";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ public void driverName() {
Assert.assertEquals(42.0 * 1e6, f.convert(name, 42.0), 1e-12);
}

@Test
public void driverStreamingDelay() {
final String name = "app-20150527224111-0014.<driver>.SubscriptionEnded.StreamingMetrics.streaming.lastReceivedBatch_submissionDelay";
Assert.assertEquals(42.0 / 1000.0, f.convert(name, 42.0), 1e-12);
}

@Test
public void driverStreamingDelayAbnormal() {
final String name = "app-20150527224111-0014.<driver>.SubscriptionEnded.StreamingMetrics.streaming.lastReceivedBatch_submissionDelay";
Assert.assertEquals(Double.NaN, f.convert(name, -1.0), 1e-12);
}

@Test
public void applicationName2() {
final String name = "application.SubscriptionEnded.1429226958083.runtime_ms";
Expand Down

0 comments on commit 0e8e9f7

Please sign in to comment.