forked from apache/hudi
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[HUDI-7337] Implement MetricsReporter that reports metrics to M3 (apa…
…che#10565) --------- Co-authored-by: Krishen Bhan <“[email protected]”>
- Loading branch information
Showing
16 changed files
with
584 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
126 changes: 126 additions & 0 deletions
126
...udi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsM3Config.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,126 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.hudi.config.metrics; | ||
|
||
import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRIC_PREFIX; | ||
|
||
import java.io.File; | ||
import java.io.FileReader; | ||
import java.io.IOException; | ||
import java.util.Properties; | ||
import org.apache.hudi.common.config.ConfigClassProperty; | ||
import org.apache.hudi.common.config.ConfigGroups; | ||
import org.apache.hudi.common.config.ConfigProperty; | ||
import org.apache.hudi.common.config.HoodieConfig; | ||
|
||
/** | ||
* Configs for M3 reporter type. | ||
* <p> | ||
* {@link org.apache.hudi.metrics.MetricsReporterType#M3} | ||
*/ | ||
@ConfigClassProperty(name = "Metrics Configurations for M3", | ||
groupName = ConfigGroups.Names.METRICS, | ||
description = "Enables reporting on Hudi metrics using M3. " | ||
+ " Hudi publishes metrics on every commit, clean, rollback etc.") | ||
public class HoodieMetricsM3Config extends HoodieConfig { | ||
|
||
public static final String M3_PREFIX = METRIC_PREFIX + ".m3"; | ||
|
||
public static final ConfigProperty<String> M3_SERVER_HOST_NAME = ConfigProperty | ||
.key(M3_PREFIX + ".host") | ||
.defaultValue("localhost") | ||
.withDocumentation("M3 host to connect to."); | ||
|
||
public static final ConfigProperty<Integer> M3_SERVER_PORT_NUM = ConfigProperty | ||
.key(M3_PREFIX + ".port") | ||
.defaultValue(9052) | ||
.withDocumentation("M3 port to connect to."); | ||
|
||
public static final ConfigProperty<String> M3_TAGS = ConfigProperty | ||
.key(M3_PREFIX + ".tags") | ||
.defaultValue("") | ||
.withDocumentation("Optional M3 tags applied to all metrics."); | ||
|
||
public static final ConfigProperty<String> M3_ENV = ConfigProperty | ||
.key(M3_PREFIX + ".env") | ||
.defaultValue("production") | ||
.withDocumentation("M3 tag to label the environment (defaults to 'production'), " | ||
+ "applied to all metrics."); | ||
|
||
public static final ConfigProperty<String> M3_SERVICE = ConfigProperty | ||
.key(M3_PREFIX + ".service") | ||
.defaultValue("hoodie") | ||
.withDocumentation("M3 tag to label the service name (defaults to 'hoodie'), " | ||
+ "applied to all metrics."); | ||
|
||
private HoodieMetricsM3Config() { | ||
super(); | ||
} | ||
|
||
public static HoodieMetricsM3Config.Builder newBuilder() { | ||
return new HoodieMetricsM3Config.Builder(); | ||
} | ||
|
||
public static class Builder { | ||
|
||
private final HoodieMetricsM3Config hoodieMetricsM3Config = new HoodieMetricsM3Config(); | ||
|
||
public HoodieMetricsM3Config.Builder fromFile(File propertiesFile) throws IOException { | ||
try (FileReader reader = new FileReader(propertiesFile)) { | ||
this.hoodieMetricsM3Config.getProps().load(reader); | ||
return this; | ||
} | ||
} | ||
|
||
public HoodieMetricsM3Config.Builder fromProperties(Properties props) { | ||
this.hoodieMetricsM3Config.getProps().putAll(props); | ||
return this; | ||
} | ||
|
||
public HoodieMetricsM3Config.Builder toM3Host(String host) { | ||
hoodieMetricsM3Config.setValue(M3_SERVER_HOST_NAME, host); | ||
return this; | ||
} | ||
|
||
public HoodieMetricsM3Config.Builder onM3Port(int port) { | ||
hoodieMetricsM3Config.setValue(M3_SERVER_PORT_NUM, String.valueOf(port)); | ||
return this; | ||
} | ||
|
||
public HoodieMetricsM3Config.Builder useM3Tags(String tags) { | ||
hoodieMetricsM3Config.setValue(M3_TAGS, tags); | ||
return this; | ||
} | ||
|
||
public HoodieMetricsM3Config.Builder useM3Env(String env) { | ||
hoodieMetricsM3Config.setValue(M3_ENV, env); | ||
return this; | ||
} | ||
|
||
public HoodieMetricsM3Config.Builder useM3Service(String service) { | ||
hoodieMetricsM3Config.setValue(M3_SERVICE, service); | ||
return this; | ||
} | ||
|
||
public HoodieMetricsM3Config build() { | ||
hoodieMetricsM3Config.setDefaults(HoodieMetricsM3Config.class.getName()); | ||
return hoodieMetricsM3Config; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
120 changes: 120 additions & 0 deletions
120
...client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3MetricsReporter.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.hudi.metrics.m3; | ||
|
||
import com.codahale.metrics.MetricRegistry; | ||
import com.uber.m3.tally.m3.M3Reporter; | ||
import com.uber.m3.util.Duration; | ||
import com.uber.m3.util.ImmutableMap; | ||
import com.uber.m3.tally.RootScopeBuilder; | ||
import com.uber.m3.tally.Scope; | ||
import java.net.InetSocketAddress; | ||
import java.util.Arrays; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeUnit; | ||
import org.apache.hudi.config.HoodieWriteConfig; | ||
import org.apache.hudi.metrics.MetricsReporter; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* Implementation of M3 Metrics reporter, which can report metrics to a https://m3db.io/ service | ||
*/ | ||
public class M3MetricsReporter extends MetricsReporter { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(M3MetricsReporter.class); | ||
private final HoodieWriteConfig config; | ||
private final MetricRegistry registry; | ||
private final ImmutableMap<String, String> tags; | ||
|
||
public M3MetricsReporter(HoodieWriteConfig config, MetricRegistry registry) { | ||
this.config = config; | ||
this.registry = registry; | ||
|
||
ImmutableMap.Builder tagBuilder = new ImmutableMap.Builder<>(); | ||
tagBuilder.putAll(parseOptionalTags(config.getM3Tags())); | ||
tagBuilder.put("service", config.getM3Service()); | ||
tagBuilder.put("env", config.getM3Env()); | ||
this.tags = tagBuilder.build(); | ||
LOG.info(String.format("Building M3 Reporter with M3 tags mapping: %s", tags)); | ||
} | ||
|
||
private static Map parseOptionalTags(String tagValueString) { | ||
Map parsedTags = new HashMap(); | ||
if (!tagValueString.isEmpty()) { | ||
Arrays.stream(tagValueString.split(",")).forEach((tagValuePair) -> { | ||
String[] parsedTagValuePair = Arrays.stream(tagValuePair.split("=")) | ||
.map((tagOrValue) -> tagOrValue.trim()).filter((tagOrValue) -> !tagOrValue.isEmpty()) | ||
.toArray(String[]::new); | ||
if (parsedTagValuePair.length != 2) { | ||
throw new RuntimeException(String.format( | ||
"M3 Reporter tags cannot be initialized with tags [%s] due to not being in format `tag=value, . . .`.", | ||
tagValuePair)); | ||
} | ||
parsedTags.put(parsedTagValuePair[0], parsedTagValuePair[1]); | ||
}); | ||
} | ||
return parsedTags; | ||
} | ||
|
||
@Override | ||
public void start() {} | ||
|
||
@Override | ||
public void report() { | ||
/* | ||
Although com.uber.m3.tally.Scope supports automatically submitting metrics in an interval | ||
via a background task, it does not seem to support | ||
- an API for explicitly flushing/emitting all metrics | ||
- Taking in an external com.codahale.metrics.MetricRegistry metrics registry and automatically | ||
adding any new counters/gauges whenever they are added to the registry | ||
Due to this, this implementation emits metrics by creating a Scope, adding all metrics from | ||
the HUDI metircs registry as counters/gauges to the scope, and then closing the Scope. Since | ||
closing this Scope will implicitly flush all M3 metrics, the reporting intervals | ||
are configured to be Integer.MAX_VALUE. | ||
*/ | ||
synchronized (this) { | ||
try (Scope scope = new RootScopeBuilder() | ||
.reporter(new M3Reporter.Builder( | ||
new InetSocketAddress(config.getM3ServerHost(), config.getM3ServerPort())) | ||
.includeHost(true).commonTags(tags) | ||
.build()) | ||
.reportEvery(Duration.ofSeconds(Integer.MAX_VALUE)) | ||
.tagged(tags)) { | ||
|
||
M3ScopeReporterAdaptor scopeReporter = new M3ScopeReporterAdaptor(registry, scope); | ||
scopeReporter.start(Integer.MAX_VALUE, TimeUnit.SECONDS); | ||
scopeReporter.report(); | ||
scopeReporter.stop(); | ||
} catch (Exception e) { | ||
LOG.error(String.format("Error reporting metrics to M3: %s", e)); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void stop() {} | ||
} | ||
|
||
|
||
|
||
|
||
|
||
|
Oops, something went wrong.