Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,11 @@ private static CompletableFuture<JobID> submitJob(

return dispatcherGateway
.getBlobServerPort(rpcTimeout)
.thenApply(
blobServerPort ->
.thenCombine(
dispatcherGateway.getBlobServerAddress(rpcTimeout),
(blobServerPort, blobServerAddress) ->
new InetSocketAddress(
dispatcherGateway.getHostname(), blobServerPort))
blobServerAddress.getHostName(), blobServerPort))
.thenCompose(
blobServerAddress -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -113,4 +114,9 @@ public int getPort() {
// NOTE: both blob stores connect to the same server!
return permanentBlobCache.getPort();
}

@Override
public InetAddress getAddress() {
return permanentBlobCache.serverAddress.getAddress();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -1009,6 +1010,24 @@ public int getPort() {
return this.serverSocket.getLocalPort();
}

/**
* Returns the address on which the server is listening.
*
* @return address on which the server is listening
*/
@Override
public InetAddress getAddress() {
InetAddress bindAddr = serverSocket.getInetAddress();
if (bindAddr.getHostAddress().equals(NetUtils.getWildcardIPAddress())) {
try {
return InetAddress.getLocalHost();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
}
return bindAddr;
}

/**
* Returns the blob expiry times - for testing purposes only!
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.blob;

import java.io.Closeable;
import java.net.InetAddress;

/** A simple store and retrieve binary large objects (BLOBs). */
public interface BlobService extends Closeable {
Expand All @@ -43,4 +44,14 @@ public interface BlobService extends Closeable {
* @return the port of the blob server.
*/
int getPort();

/**
* Returns the network address of the BLOB server that this BLOB service is working with. This
* default implementation returns the loopback address.
*
* @return the InetAddress of the BLOB server.
*/
default InetAddress getAddress() {
return InetAddress.getLoopbackAddress();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -998,6 +999,11 @@ public CompletableFuture<Integer> getBlobServerPort(Duration timeout) {
return CompletableFuture.completedFuture(blobServer.getPort());
}

@Override
public CompletableFuture<InetAddress> getBlobServerAddress(Duration timeout) {
return CompletableFuture.completedFuture(blobServer.getAddress());
}

@Override
public CompletableFuture<String> triggerCheckpoint(JobID jobID, Duration timeout) {
return performOperationOnJobMasterGateway(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.streaming.api.graph.ExecutionPlan;

import java.net.InetAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -64,6 +65,14 @@ CompletableFuture<Acknowledge> submitFailedJob(
*/
CompletableFuture<Integer> getBlobServerPort(@RpcTimeout Duration timeout);

/**
* Returns the address of the blob server.
*
* @param timeout of the operation
* @return A future InetAddress of the blob server address
*/
CompletableFuture<InetAddress> getBlobServerAddress(@RpcTimeout Duration timeout);

default CompletableFuture<Acknowledge> shutDownCluster(ApplicationStatus applicationStatus) {
return shutDownCluster();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1163,10 +1163,11 @@ private CompletableFuture<InetSocketAddress> createBlobServerAddress(
dispatcherGateway ->
dispatcherGateway
.getBlobServerPort(rpcTimeout)
.thenApply(
blobServerPort ->
.thenCombine(
dispatcherGateway.getBlobServerAddress(rpcTimeout),
(blobServerPort, blobServerAddress) ->
new InetSocketAddress(
dispatcherGateway.getHostname(),
blobServerAddress.getHostName(),
blobServerPort)))
.thenCompose(Function.identity());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.io.File;
import java.io.ObjectInputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -188,18 +189,22 @@ private CompletableFuture<ExecutionPlan> uploadExecutionPlanFiles(
Collection<Tuple2<String, Path>> artifacts,
Configuration configuration) {
CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
CompletableFuture<InetAddress> blobServerAddressFuture =
gateway.getBlobServerAddress(timeout);

return executionPlanFuture.thenCombine(
blobServerPortFuture,
(ExecutionPlan executionPlan, Integer blobServerPort) -> {
final InetSocketAddress address =
new InetSocketAddress(gateway.getHostname(), blobServerPort);
blobServerPortFuture.thenCombine(
blobServerAddressFuture,
(blobServerPort, blobServerAddress) ->
new InetSocketAddress(
blobServerAddress.getHostName(), blobServerPort)),
(ExecutionPlan executionPlan, InetSocketAddress blobSocketAddress) -> {
try {
ClientUtils.uploadExecutionPlanFiles(
executionPlan,
jarFiles,
artifacts,
() -> new BlobClient(address, configuration));
() -> new BlobClient(blobSocketAddress, configuration));
} catch (FlinkException e) {
throw new CompletionException(
new RestHandlerException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.ExceptionUtils;

Expand All @@ -36,6 +37,7 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.security.MessageDigest;
Expand Down Expand Up @@ -242,7 +244,8 @@ private void testContentAddressableBuffer(BlobKey.BlobType blobType)
byte[] digest = md.digest();

InetSocketAddress serverAddress =
new InetSocketAddress("localhost", getBlobServer().getPort());
new InetSocketAddress(
getBlobServer().getAddress().getHostName(), getBlobServer().getPort());
client = new BlobClient(serverAddress, getBlobClientConfig());

JobID jobId = new JobID();
Expand Down Expand Up @@ -329,7 +332,9 @@ private void testContentAddressableStream(BlobKey.BlobType blobType)

try (BlobClient client =
new BlobClient(
new InetSocketAddress("localhost", getBlobServer().getPort()),
new InetSocketAddress(
getBlobServer().getAddress().getHostName(),
getBlobServer().getPort()),
getBlobClientConfig())) {

JobID jobId = new JobID();
Expand Down Expand Up @@ -403,7 +408,9 @@ private void testGetFailsDuringStreaming(@Nullable final JobID jobId, BlobKey.Bl

try (BlobClient client =
new BlobClient(
new InetSocketAddress("localhost", getBlobServer().getPort()),
new InetSocketAddress(
getBlobServer().getAddress().getHostName(),
getBlobServer().getPort()),
getBlobClientConfig())) {

byte[] data = new byte[5000000];
Expand Down Expand Up @@ -462,7 +469,8 @@ static void uploadJarFile(BlobServer blobServer, Configuration blobClientConfig)
testFile.deleteOnExit();
prepareTestFile(testFile);

InetSocketAddress serverAddress = new InetSocketAddress("localhost", blobServer.getPort());
InetSocketAddress serverAddress =
new InetSocketAddress(blobServer.getAddress().getHostName(), blobServer.getPort());

uploadJarFile(serverAddress, blobClientConfig, testFile);
uploadJarFile(serverAddress, blobClientConfig, testFile);
Expand Down Expand Up @@ -504,7 +512,8 @@ void testSocketTimeout() throws IOException {
10_000L)) {
testBlobServer.start();
InetSocketAddress serverAddress =
new InetSocketAddress("localhost", testBlobServer.getPort());
new InetSocketAddress(
getBlobServer().getAddress().getHostName(), testBlobServer.getPort());

try (BlobClient client = new BlobClient(serverAddress, clientConfig)) {
client.getInternal(new JobID(), BlobKey.createKey(TRANSIENT_BLOB));
Expand All @@ -523,12 +532,53 @@ void testSocketTimeout() throws IOException {
void testUnresolvedInetSocketAddress() throws Exception {
try (BlobClient client =
new BlobClient(
InetSocketAddress.createUnresolved("localhost", getBlobServer().getPort()),
InetSocketAddress.createUnresolved(
getBlobServer().getAddress().getHostName(),
getBlobServer().getPort()),
getBlobClientConfig())) {
assertThat(client.isConnected()).isTrue();
}
}

/** BlobServer should return routable address when bound to wildcard. */
@Test
void testWildcardBindingAddress() throws Exception {
Configuration config = new Configuration();
config.set(JobManagerOptions.BIND_HOST, "0.0.0.0");

File tempServerDir = tempDir.resolve("wildcard_test").toFile();
tempServerDir.mkdirs();

try (BlobServer testServer = new BlobServer(config, tempServerDir, new VoidBlobStore())) {
testServer.start();

InetAddress address = testServer.getAddress();
assertThat(address.getHostAddress())
.as("Should not return wildcard address")
.isNotEqualTo("0.0.0.0");
}
}

/** BlobServer should return the configured bind address. */
@Test
void testReturnsConfiguredBindAddress() throws Exception {
String loopbackAddress = InetAddress.getLoopbackAddress().getHostAddress();
Configuration config = new Configuration();
config.set(JobManagerOptions.BIND_HOST, loopbackAddress);

File tempServerDir = tempDir.resolve("bind_address_test").toFile();
tempServerDir.mkdirs();

try (BlobServer testServer = new BlobServer(config, tempServerDir, new VoidBlobStore())) {
testServer.start();

InetAddress address = testServer.getAddress();
assertThat(address.getHostAddress())
.as("Should return the bound address")
.isEqualTo(loopbackAddress);
}
}

static class TestBlobServer extends BlobServer {

private final long blockingMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.TriFunction;

import java.net.InetAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -93,6 +94,7 @@ public final class TestingDispatcherGateway extends TestingRestfulGateway
submitFailedFunction;
private final Supplier<CompletableFuture<Collection<JobID>>> listFunction;
private final int blobServerPort;
private InetAddress blobServerAddress;
private final DispatcherId fencingToken;
private final Function<JobID, CompletableFuture<ArchivedExecutionGraph>>
requestArchivedJobFunction;
Expand Down Expand Up @@ -206,6 +208,11 @@ public TestingDispatcherGateway(
this.submitFailedFunction = submitFailedFunction;
this.listFunction = listFunction;
this.blobServerPort = blobServerPort;
try {
this.blobServerAddress = InetAddress.getByName(hostname);
} catch (Exception e) {
throw new RuntimeException("Failed to resolve hostname: " + hostname, e);
}
this.fencingToken = fencingToken;
this.requestArchivedJobFunction = requestArchivedJobFunction;
this.clusterShutdownWithStatusFunction = clusterShutdownWithStatusFunction;
Expand Down Expand Up @@ -236,6 +243,11 @@ public CompletableFuture<Integer> getBlobServerPort(Duration timeout) {
return CompletableFuture.completedFuture(blobServerPort);
}

@Override
public CompletableFuture<InetAddress> getBlobServerAddress(Duration timeout) {
return CompletableFuture.completedFuture(blobServerAddress);
}

@Override
public DispatcherId getFencingToken() {
return DEFAULT_FENCING_TOKEN;
Expand Down