Skip to content

Commit

Permalink
Merge branch 'main' into feat/monitor_group_status
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouyou9505 committed Jan 15, 2024
2 parents 3a8d7a5 + 87098f5 commit 919505f
Show file tree
Hide file tree
Showing 249 changed files with 9,205 additions and 6,408 deletions.
93 changes: 47 additions & 46 deletions .github/workflows/pr-e2e-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,55 +177,56 @@ jobs:
include_passed: true
detailed_summary: true

test-e2e-remoting-java:
if: ${{ success() }}
name: Test E2E remoting java
needs: [ list-version, deploy ]
runs-on: private-k3s
timeout-minutes: 60
strategy:
matrix:
version: ${{ fromJSON(needs.list-version.outputs.version-json) }}
steps:
- name: Delay
env:
DELAY_SECONDS: 30
run: |
sleep $DELAY_SECONDS
- uses: actions/checkout@v3
with:
submodules: true
- name: Install flatc
run: sudo bash install_flatc.sh
- name: Build rocketmq dependency
run: |
mvn clean install -U -DskipTests
- uses: actions/checkout@v3
if: always()
with:
repository: AutoMQ/rocketmq-e2e
ref: master
path: rocketmq-e2e
- name: e2e test
env:
HELM_NAME: ${{ env.HELM_NAME }}
NAMESPACE: ${{ needs.deploy.outputs.namespace }}
run: |
cd rocketmq-e2e
cd java/e2e-v4 && mvn -B test -DnamesrvAddr=${HELM_NAME}-rocketmq-broker.${NAMESPACE}.svc.pve1.local:8081
- name: Publish Test Report
uses: mikepenz/action-junit-report@v3
if: always() # always run even if the previous step fails
with:
report_paths: '**/surefire-reports/TEST-*.xml'
annotate_only: true
include_passed: true
detailed_summary: true
# test-e2e-remoting-java:
# if: ${{ success() }}
# name: Test E2E remoting java
# needs: [ list-version, deploy ]
# runs-on: private-k3s
# timeout-minutes: 60
# strategy:
# matrix:
# version: ${{ fromJSON(needs.list-version.outputs.version-json) }}
# steps:
# - name: Delay
# env:
# DELAY_SECONDS: 30
# run: |
# sleep $DELAY_SECONDS
# - uses: actions/checkout@v3
# with:
# submodules: true
# - name: Install flatc
# run: sudo bash install_flatc.sh
# - name: Build rocketmq dependency
# run: |
# mvn clean install -U -DskipTests
# - uses: actions/checkout@v3
# if: always()
# with:
# repository: AutoMQ/rocketmq-e2e
# ref: master
# path: rocketmq-e2e
# - name: e2e test
# env:
# HELM_NAME: ${{ env.HELM_NAME }}
# NAMESPACE: ${{ needs.deploy.outputs.namespace }}
# run: |
# cd rocketmq-e2e
# cd java/e2e-v4 && mvn -B test -DnamesrvAddr=${HELM_NAME}-rocketmq-broker.${NAMESPACE}.svc.pve1.local:8081
# - name: Publish Test Report
# uses: mikepenz/action-junit-report@v3
# if: always() # always run even if the previous step fails
# with:
# report_paths: '**/surefire-reports/TEST-*.xml'
# annotate_only: true
# include_passed: true
# detailed_summary: true

clean:
if: always()
name: Clean
needs: [deploy, list-version, test-e2e-grpc-java, test-e2e-remoting-java]
# needs: [deploy, list-version, test-e2e-grpc-java, test-e2e-remoting-java]
needs: [deploy, list-version, test-e2e-grpc-java]
runs-on: private-k3s
timeout-minutes: 60
strategy:
Expand All @@ -238,4 +239,4 @@ jobs:
NAMESPACE: ${{ needs.deploy.outputs.namespace }}
run: |
bash clean-ci.sh ${NAMESPACE}
kubectl delete namespace $NAMESPACE
kubectl delete namespace $NAMESPACE
50 changes: 50 additions & 0 deletions .github/workflows/s3-stream-e2e.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
name: E2E-TEST for AutoMQ S3Stream
on:
pull_request:
types:
- opened
- reopened
- synchronize
paths:
- 's3stream/**'
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
submodules: true
- uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '17'
cache: "maven"
- name: Build s3stream
working-directory: ./s3stream
run: mvn clean install -U -DskipTests
- uses: actions/checkout@v3
with:
repository: AutoMQ/s3stream-e2e
ref: main
path: s3stream-e2e
token: ${{ secrets.PAT_E2E }}
- name: Change s3stream version
run: |
export S3STREAM_VERSION=$(cat s3stream/target/maven-archiver/pom.properties | grep version | awk -F '=' '{print $2}')
echo "change s3stream version of e2e test to $S3STREAM_VERSION"
sed -i "s/<s3stream.version>.*<\/s3stream.version>/<s3stream.version>$S3STREAM_VERSION<\/s3stream.version>/g" s3stream-e2e/pom.xml
export LINE_START=$(awk '/<repositories>/{print NR}' s3stream-e2e/pom.xml)
export LINE_END=$(awk '/<\/repositories>/{print NR}' s3stream-e2e/pom.xml)
sed -i "${LINE_START},${LINE_END}d" s3stream-e2e/pom.xml
cat s3stream-e2e/pom.xml
- name: Run tests
working-directory: ./s3stream-e2e
run: mvn test -pl integration
- name: Publish Test Report
uses: mikepenz/action-junit-report@v3
if: success() || failure() # always run even if the previous step fails
with:
report_paths: '**/surefire-reports/TEST-*.xml'
annotate_only: true
include_passed: true
detailed_summary: true
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.automq.rocketmq.common.api.DataStore;
import com.automq.rocketmq.common.config.BrokerConfig;
import com.automq.rocketmq.common.config.ProfilerConfig;
import com.automq.rocketmq.common.config.TraceConfig;
import com.automq.rocketmq.common.util.Lifecycle;
import com.automq.rocketmq.controller.MetadataStore;
import com.automq.rocketmq.controller.server.ControllerServiceImpl;
Expand All @@ -47,23 +46,12 @@
import com.automq.rocketmq.store.MessageStoreBuilder;
import com.automq.rocketmq.store.MessageStoreImpl;
import com.automq.rocketmq.store.api.MessageStore;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.SpanProcessor;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.pyroscope.http.Format;
import io.pyroscope.javaagent.EventType;
import io.pyroscope.javaagent.PyroscopeAgent;
import io.pyroscope.javaagent.config.Config;
import io.pyroscope.labels.Pyroscope;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.common.constant.LoggerName;
Expand All @@ -73,10 +61,6 @@
import org.apache.rocketmq.proxy.service.ServiceManager;
import org.apache.rocketmq.proxy.service.message.MessageService;

import static io.opentelemetry.semconv.ResourceAttributes.SERVICE_INSTANCE_ID;
import static io.opentelemetry.semconv.ResourceAttributes.SERVICE_NAME;
import static io.opentelemetry.semconv.ResourceAttributes.SERVICE_VERSION;

public class BrokerController implements Lifecycle {
private static final Logger LOGGER = LoggerFactory.getLogger(MetricsExporter.class);

Expand All @@ -89,6 +73,7 @@ public class BrokerController implements Lifecycle {
private final StoreMetadataService storeMetadataService;
private final ProxyMetadataService proxyMetadataService;
private final ExtendMessagingProcessor messagingProcessor;
private final TelemetryExporter telemetryExporter;
private final MetricsExporter metricsExporter;
private final DeadLetterService dlqService;
private final MessageService messageService;
Expand Down Expand Up @@ -132,56 +117,9 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {

messagingProcessor = ExtendMessagingProcessor.createForS3RocketMQ(serviceManager, brokerConfig.proxy());

// Build resource.
Properties gitProperties;
try {
ClassLoader classLoader = getClass().getClassLoader();
InputStream inputStream = classLoader.getResourceAsStream("git.properties");
gitProperties = new Properties();
gitProperties.load(inputStream);
} catch (Exception e) {
LOGGER.warn("read project version failed", e);
throw new RuntimeException(e);
}

AttributesBuilder builder = Attributes.builder();
builder.put(SERVICE_NAME, brokerConfig.name());
builder.put(SERVICE_VERSION, (String) gitProperties.get("git.build.version"));
builder.put("git.hash", (String) gitProperties.get("git.commit.id.describe"));
builder.put(SERVICE_INSTANCE_ID, brokerConfig.instanceId());

Resource resource = Resource.create(builder.build());

// Build trace provider.
TraceConfig traceConfig = brokerConfig.trace();
SdkTracerProvider sdkTracerProvider = null;
if (traceConfig.enabled()) {
OtlpGrpcSpanExporter spanExporter = OtlpGrpcSpanExporter.builder()
.setEndpoint(traceConfig.grpcExporterTarget())
.setTimeout(traceConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS)
.build();

SpanProcessor spanProcessor;
if (traceConfig.batchSize() == 0) {
spanProcessor = SimpleSpanProcessor.create(spanExporter);
} else {
spanProcessor = BatchSpanProcessor.builder(spanExporter)
.setExporterTimeout(traceConfig.grpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS)
.setScheduleDelay(traceConfig.periodicExporterIntervalInMills(), TimeUnit.MILLISECONDS)
.setMaxExportBatchSize(traceConfig.batchSize())
.setMaxQueueSize(traceConfig.maxCachedSize())
.build();
}

sdkTracerProvider = SdkTracerProvider.builder()
.addSpanProcessor(spanProcessor)
.setResource(resource)
.build();
}

// Init the metrics exporter before accept requests.
metricsExporter = new MetricsExporter(brokerConfig, messageStore, messagingProcessor, resource,
sdkTracerProvider, metadataStore, s3MetadataService);
telemetryExporter = new TelemetryExporter(brokerConfig);
metricsExporter = new MetricsExporter(brokerConfig, telemetryExporter, messageStore, messagingProcessor, metadataStore, s3MetadataService);

// Init the profiler agent.
ProfilerConfig profilerConfig = brokerConfig.profiler();
Expand Down Expand Up @@ -231,6 +169,7 @@ public void shutdown() throws Exception {
messageStore.shutdown();
metadataStore.close();
metricsExporter.shutdown();
telemetryExporter.close();

// Shutdown the thread pool monitor.
ThreadPoolMonitor.shutdown();
Expand Down
Loading

0 comments on commit 919505f

Please sign in to comment.