-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds example and distribution modules #47
Adds example and distribution modules #47
Conversation
…ed table is expired and no longer accessible.
...igquery/src/main/java/com/google/cloud/flink/bigquery/source/config/BigQueryReadOptions.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/google/cloud/flink/bigquery/source/split/BigQuerySourceSplitAssigner.java
Outdated
Show resolved
Hide resolved
* launching the job from Flink's Rest API is known the single quotes are not supported and will | ||
* make the pipeline fail. As a workaround for that case using \u0027 as a replacement will make it | ||
* work, example {@code "TIMESTAMP_TRUNC(ingestion_timestamp, HOUR) = \u00272023-06-20 | ||
* 19:00:00\u0027"}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UnicodeEscape: Using unicode escape sequences for printable ASCII characters is obfuscated, and potentially dangerous.
* 19:00:00\u0027"}. | |
* 19:00:00'"}. |
❗❗ 2 similar findings have been found in this PR
🔎 Expand here to view all instances of this finding
File Path | Line Number |
---|---|
flink-connector-bigquery-examples/src/main/java/org/apache/flink/examples/gcp/bigquery/BigQueryExample.java | 46 |
flink-connector-bigquery-examples/src/main/java/org/apache/flink/examples/gcp/bigquery/BigQueryExample.java | 45 |
Visit the Lift Web Console to find more details in your report.
ℹ️ Expand to see all @sonatype-lift commands
You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.
Command | Usage |
---|---|
@sonatype-lift ignore |
Leave out the above finding from this PR |
@sonatype-lift ignoreall |
Leave out all the existing findings from this PR |
@sonatype-lift exclude <file|issue|path|tool> |
Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file |
Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sonatype-lift ignore
...bigquery/src/main/java/com/google/cloud/flink/bigquery/common/config/CredentialsOptions.java
Outdated
Show resolved
Hide resolved
...uery/src/main/java/com/google/cloud/flink/bigquery/common/config/BigQueryConnectOptions.java
Outdated
Show resolved
Hide resolved
|
||
private static Credentials createCredentialsFromFile(String file) { | ||
try { | ||
return GoogleCredentials.fromStream(new FileInputStream(file)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PATH_TRAVERSAL_IN: This API (java/io/FileInputStream.(Ljava/lang/String;)V) reads a file whose location might be specified by user input
ℹ️ Expand to see all @sonatype-lift commands
You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.
Command | Usage |
---|---|
@sonatype-lift ignore |
Leave out the above finding from this PR |
@sonatype-lift ignoreall |
Leave out all the existing findings from this PR |
@sonatype-lift exclude <file|issue|path|tool> |
Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file |
Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.
*/ | ||
@AutoValue | ||
@PublicEvolving | ||
public abstract class BigQuerySource<OUT> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AutoValueCouldNotWrite: Could not write generated class com.google.cloud.flink.bigquery.source.AutoValue_BigQuerySource: javax.annotation.processing.FilerException: Attempt to recreate a file for type com.google.cloud.flink.bigquery.source.AutoValue_BigQuerySource
❗❗ 3 similar findings have been found in this PR
🔎 Expand here to view all instances of this finding
File Path | Line Number |
---|---|
flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/common/config/CredentialsOptions.java | 34 |
flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/config/BigQueryReadOptions.java | 41 |
flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/common/config/BigQueryConnectOptions.java | 34 |
Visit the Lift Web Console to find more details in your report.
ℹ️ Expand to see all @sonatype-lift commands
You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.
Command | Usage |
---|---|
@sonatype-lift ignore |
Leave out the above finding from this PR |
@sonatype-lift ignoreall |
Leave out all the existing findings from this PR |
@sonatype-lift exclude <file|issue|path|tool> |
Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file |
Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.
...tor-bigquery/src/main/java/com/google/cloud/flink/bigquery/common/utils/SchemaTransform.java
Outdated
Show resolved
Hide resolved
🛠 Lift Auto-fixSome of the Lift findings in this PR can be automatically fixed. You can download and apply these changes in your local project directory of your branch to review the suggestions before committing.1 # Download the patch
curl https://lift.sonatype.com/api/patch/github.com/GoogleCloudDataproc/flink-bigquery-connector/47.diff -o lift-autofixes.diff
# Apply the patch with git
git apply lift-autofixes.diff
# Review the changes
git diff Want it all in a single command? Open a terminal in your project's directory and copy and paste the following command: curl https://lift.sonatype.com/api/patch/github.com/GoogleCloudDataproc/flink-bigquery-connector/47.diff | git apply Once you're satisfied, commit and push your changes in your project. Footnotes |
… field since the dynamic table source and the source impl needs to store the read options as property and it becomes not serializable after changing the read options to use an optional property.
/gcbrun |
/gcbrun |
…ed table is expired and no longer accessible.
…driguezdefino/flink-bigquery-connector into add_example_and_shadedsqljar
/gcbrun |
/gcbrun |
throws Exception { | ||
|
||
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | ||
env.enableCheckpointing(60000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we pass this as a user provided configuration and have this value as a default if user does not pass the value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a configuration for the Flink application, not the connector. The default behaviour, as defined by Flink itself, is that checkpointing is disabled. Here is the official doc for more details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant 60000L
can be passed as a configuration instead of hardcoding. So that users can run this with different checkpoint intervals
|
||
env.fromSource(bqSource, WatermarkStrategy.noWatermarks(), "BigQuerySource") | ||
.flatMap(new FlatMapper(recordPropertyToAggregate)) | ||
.keyBy(t -> t.f0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's have meaningful variable names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed t
to mappedTuple
.
...-bigquery-examples/src/main/java/org/apache/flink/examples/gcp/bigquery/BigQueryExample.java
Show resolved
Hide resolved
/gcbrun |
throws Exception { | ||
|
||
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | ||
env.enableCheckpointing(60000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant 60000L
can be passed as a configuration instead of hardcoding. So that users can run this with different checkpoint intervals
/gcbrun |
Also fixes a NPE when the underlying table in BQ gets expired, now returns a proper exception.