-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
S3A S3Seekable stream refactor + move S3AInputStream creation to factory under S3AStore #7295
base: feature-HADOOP-19363-analytics-accelerator-s3
Are you sure you want to change the base?
S3A S3Seekable stream refactor + move S3AInputStream creation to factory under S3AStore #7295
Conversation
First iteration * Factory interface with a parameter object creation method * Base class AbstractS3AInputStream for all streams to create * S3AInputStream subclasses that and has a factory * Production and test code to use it Not done * Input stream callbacks pushed down to S3Store * S3Store to dynamically choose factory at startup, stop in close() * S3Store to implement the factory interface, completing final binding operations (callbacks, stats) Change-Id: I8d0f86ca1f3463d4987a43924f155ce0c0644180
Revision API: Make clear this is part of the fundamental store Model: * abstract stream class is now ObjectInputStream * interface is ObjectInputStreamFactory * move to package org.apache.hadoop.fs.s3a.impl.model Implementation: Prefetching stream is created this way too; adds one extra parameter. Maybe we should pass conf down too Change-Id: I5bbb5dfe585528b047a649b6c82a9d0318c7e91e
Change-Id: If42bdd0b227c4da07c62a410a998e6d8c35581f6
Moves all prefetching stream related options into the prefetching stream factory; the standard ReadOpContext removes them, so a new PrefetchingOptions is passed around. Stream factories can now declare how many extra shared threads they want and whether or not to create a future pool around the bounded pool. This is used in S3AFileSystem when creating its thread pools -this class no longer reads in any of the prefetching options. All tests which enable/disable prefetching, or probe for its state, now use S3ATestUtils methods for this. This avoids them having to now explicitly unset two properties, set the new input stream type, and any more complications in test setup in future. Everything under S3AStore is a service, so service lifecycle matches everywhere -and store just adds to the list of managed services for start/stop/close integration. + adjust assertions in ITestS3AInputStreamLeakage for prefetching + update the prefetching.md doc for factory changs + javadocs + add string values of type names to Constants Once the analytics stream is in, a full doc on "stream performance" will be needed. package for this stuff is now impl.streams Change-Id: Id6356d2ded2c477ba16cbb9027ac0cfbece2a542
Push factory construction into the enum itself Store implements stream capabilities, which are then relayed to the active factory. This avoids the FS having to know what capabilities are available in the stream. Abstract base class for stream factories. Change-Id: Ib757e6696f29cc7e0e8edd1119e738c6adc6f98f
Change-Id: Id79f8aa019095c1601bb0b2a282c51bdb0b7b817
Conflicts: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java Change-Id: I1eddd195a9a3e3332bfaac2e225acf69774c3ce8
98bc8f4
to
6fc63b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot @rajdchak for this change. I put some minor comments.
@@ -230,7 +232,23 @@ public class S3AStoreImpl | |||
@Override | |||
protected void serviceInit(final Configuration conf) throws Exception { | |||
|
|||
objectInputStreamFactory = createStreamFactory(conf); | |||
if(conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we still doing this or using the new StreamKind? See here:
Adds a new config, fs.s3a.input.stream.type. This can be set to classic, prefetch, analytics. Believe this is better than having multipleprefetch.enabled and analytics.enabled flags.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our S3SeekableInputStreamFactory requires s3AsyncClient to be passed in the constructor S3SeekableInputStreamFactory(S3AsyncClient s3AsyncClient) {
super("S3SeekableInputStreamFactory");
this.s3AsyncClient = s3AsyncClient;
}
I couldn't create that in the InputStreamType enum that Steve made, so kept it this way for now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment on this in that PR, ask for ClientManager
to be passed in. This will have to come after serviceInit, as it won't exist until then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should go into the StreamIntegration.createStreamFactory()
method in my opinion. And over there, you want to use the InputStreamType.Analytics
in the conf check.
You can update the enum, so that the factory method takes parameters. Something like:
public Function<Configuration, ObjectInputStreamFactory> factory(FactoryParameters factoryParameters) {
return factory;
}
In the createStreamFactory method, you can do:
if (conf.getEnum(INPUT_STREAM_TYPE, defaultStream) == InputStreamType.Analytics) {
new FactoryParameters.withS3Client()
}
Let's discuss in case it's not clear!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ahmarsuhail we need to create the async client and the method for that is in the S3AStoreImpl, I can may be move this if else statement to what Fuat is suggesting in the same class but a different method getOrCreateAsyncCRTClient, but cannot move this client creation to StreamIntegration.createStreamFactory()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done some modifications in the latest changes
LOG.info("Using S3SeekableInputStream"); | ||
if(analyticsAcceleratorCRTEnabled) { | ||
LOG.info("Using S3 CRT client for analytics accelerator S3"); | ||
s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to other shall we move this to a method getOrCreateAsyncCRTClient? or maybe even change the existing method to make a decision to use CRT or not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK for the WiP, but it will be culled later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with Fuat, we can't merge with client creation here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved to a different method
|
||
import static org.apache.hadoop.fs.s3a.Constants.*; | ||
|
||
public class S3SeekableInputStreamFactory extends AbstractObjectInputStreamFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about renaming this to S3ASeekableInputStreamFactory
. This is inline with the S3ASeekableInputStream name and also we can get rid of full-path reference in the below lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...op-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/S3SeekableInputStreamFactory.java
Outdated
Show resolved
Hide resolved
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggested some changes to factory creation code
@@ -230,7 +232,23 @@ public class S3AStoreImpl | |||
@Override | |||
protected void serviceInit(final Configuration conf) throws Exception { | |||
|
|||
objectInputStreamFactory = createStreamFactory(conf); | |||
if(conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should go into the StreamIntegration.createStreamFactory()
method in my opinion. And over there, you want to use the InputStreamType.Analytics
in the conf check.
You can update the enum, so that the factory method takes parameters. Something like:
public Function<Configuration, ObjectInputStreamFactory> factory(FactoryParameters factoryParameters) {
return factory;
}
In the createStreamFactory method, you can do:
if (conf.getEnum(INPUT_STREAM_TYPE, defaultStream) == InputStreamType.Analytics) {
new FactoryParameters.withS3Client()
}
Let's discuss in case it's not clear!
LOG.info("Using S3SeekableInputStream"); | ||
if(analyticsAcceleratorCRTEnabled) { | ||
LOG.info("Using S3 CRT client for analytics accelerator S3"); | ||
s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with Fuat, we can't merge with client creation here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented. Passing down the client factory will need work, so suggest changes in the initial review.
Is it essential to use the Crt client? If so, that needs to be configurable for the whole FS (Is it already? I don't remember) And it must go into DefaultS3ClientFactory.createS3AsyncClient(), using parameters built up in S3AFS.
- add any changes you need for the factory PR to it, clearly you do need to get
ClientFactory
passed in. - And create a separate JIRA for supporting the CRT client for factories, if needed. That can go into trunk earlier.
Regarding the feature branch
- no need to merge in trunk until new stuff is needed, and even then rebases are fine if choreographed with others.
- put all the input stream factory stuff before the new work, so that things aren't mixed up.
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
Outdated
Show resolved
Hide resolved
@@ -24,28 +24,29 @@ | |||
|
|||
import org.apache.hadoop.fs.FSExceptionMessages; | |||
import org.apache.hadoop.fs.StreamCapabilities; | |||
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream; | |||
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters; | |||
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep in the "not apache, not java" imports. (there's often some hadoop precondition or org.apache.hadoop.thirdparty imports in that block -traces of the "get off guava" work)
LOG.info("Using S3SeekableInputStream"); | ||
if(analyticsAcceleratorCRTEnabled) { | ||
LOG.info("Using S3 CRT client for analytics accelerator S3"); | ||
s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK for the WiP, but it will be culled later
@@ -230,7 +232,23 @@ public class S3AStoreImpl | |||
@Override | |||
protected void serviceInit(final Configuration conf) throws Exception { | |||
|
|||
objectInputStreamFactory = createStreamFactory(conf); | |||
if(conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment on this in that PR, ask for ClientManager
to be passed in. This will have to come after serviceInit, as it won't exist until then.
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableInputStream.java
Outdated
Show resolved
Hide resolved
@@ -123,7 +123,7 @@ public void testInvalidConfigurationThrows() { | |||
|
|||
ConnectorConfiguration connectorConfiguration = | |||
new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); | |||
assertThrows("S3ASeekableStream illegal configuration does not throw", | |||
assertThrows("S3ASeekableInputStream illegal configuration does not throw", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use LambdaTestUtils.intercept()
, here with the closure
- closing any stream created -so if it didn't throw, we clean up
- return stream.toString()
the return value of the closure is used in the exception text...it is where diagnostics should go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are not actually creating any stream in this test
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java
Outdated
Show resolved
Hide resolved
Renamed some files Addressed comments
5abd61f
to
ca74969
Compare
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, no blocking comments. Still not sure what the best way to pass in the way to the factory is, we can look at that in the orginal PR.
final S3AsyncClient s3AsyncClient; | ||
boolean analyticsAcceleratorCRTEnabled = conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, | ||
ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT); | ||
LOG.info("Using S3SeekableInputStream"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: let's remove this log as this method could be used by something that is not using analytics accelerator
ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT); | ||
LOG.info("Using S3SeekableInputStream"); | ||
if(analyticsAcceleratorCRTEnabled) { | ||
LOG.info("Using S3 CRT client for analytics accelerator S3"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
downgrade to level: debug
@steveloughran wanted to clarify a few things:
Do you mean create a new JIRA to add in CRT creation into ClientManager and S3ClientFactory? This can be done now, independent of this PR.
Do you mean in the feature branch: |
exactly! It can go into trunk. In #7214 There's a callback to ask for the async client, where the stream can explicitly request the CRT client. This will raise an exception if not present. we could tune the behaviour, but I do want to push the checks into that ClientManager class for any required use elsewhere.
yes -and make sure that #7214 satisfies all your needs |
- Add callbacks from stream factories to creator. - Initial operation is to ask for an async client. - Callbacks and wiring up done in S3AStoreImpl. Change-Id: I544f05da15e3b57e9a538d337b972e4e07dc8877
💔 -1 overall
This message was automatically generated. |
d8716bc
to
1ee9f11
Compare
💔 -1 overall
This message was automatically generated. |
3d8f4a4
to
e18d0a4
Compare
Description of PR
Move InputStreamCreation to the new Factory
How was this patch tested?
Tested using the integration tests
For code changes:
LICENSE
,LICENSE-binary
,NOTICE-binary
files?