Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 85 additions & 18 deletions website/docs/quickstart/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,19 @@ mkdir fluss-quickstart-flink
cd fluss-quickstart-flink
```

2. Create a `docker-compose.yml` file with the following content:
2. Create a `lib` directory and download the required jar files. You can adjust the Flink version as needed. Please make sure to download the compatible versions of [fluss-flink connector jar](/downloads) and [flink-connector-faker](https://github.com/knaufk/flink-faker/releases)

```shell
export FLINK_VERSION="1.20"
```

```shell
mkdir lib
wget -O lib/flink-faker-0.5.3.jar https://github.com/knaufk/flink-faker/releases/download/v0.5.3/flink-faker-0.5.3.jar
wget -O "lib/fluss-flink-${FLINK_VERSION}-$FLUSS_DOCKER_VERSION$.jar" "https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-${FLINK_VERSION}/$FLUSS_DOCKER_VERSION$/fluss-flink-${FLINK_VERSION}-$FLUSS_DOCKER_VERSION$.jar"
```

3. Create a `docker-compose.yml` file with the following content:

```yaml
services:
Expand Down Expand Up @@ -69,36 +80,48 @@ services:
#end
#begin Flink cluster
jobmanager:
image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$
image: flink:${FLINK_VERSION}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, with ${FLINK_VERSION}, users can't directly copy & paste the yaml?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this saves effort to set the flink version for each component.

ports:
- "8083:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh jobmanager"]
volumes:
- ./lib:/tmp/lib
taskmanager:
image: apache/fluss-quickstart-flink:1.20-$FLUSS_DOCKER_VERSION$
image: flink:${FLINK_VERSION}
depends_on:
- jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 10
taskmanager.memory.process.size: 2048m
taskmanager.memory.framework.off-heap.size: 256m
entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh taskmanager"]
volumes:
- ./lib:/tmp/lib
sql-client:
image: flink:${FLINK_VERSION}
depends_on:
- jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh bin/sql-client.sh"]
volumes:
- ./lib:/tmp/lib
#end
```

The Docker Compose environment consists of the following containers:
- **Fluss Cluster:** a Fluss `CoordinatorServer`, a Fluss `TabletServer` and a `ZooKeeper` server.
- **Flink Cluster**: a Flink `JobManager` and a Flink `TaskManager` container to execute queries.

**Note:** The `apache/fluss-quickstart-flink` image is based on [flink:1.20.1-java17](https://hub.docker.com/layers/library/flink/1.20-java17/images/sha256:bf1af6406c4f4ad8faa46efe2b3d0a0bf811d1034849c42c1e3484712bc83505) and
includes the [fluss-flink](engine-flink/getting-started.md) and
[flink-connector-faker](https://flink-packages.org/packages/flink-faker) to simplify this guide.
- **Flink Cluster**: a Flink `JobManager`, a Flink `TaskManager`, and a Flink SQL client container to execute queries.

3. To start all containers, run:
```shell
Expand All @@ -116,7 +139,6 @@ You can also visit http://localhost:8083/ to see if Flink is running normally.

:::note
- If you want to additionally use an observability stack, follow one of the provided quickstart guides [here](maintenance/observability/quickstart.md) and then continue with this guide.
- If you want to run with your own Flink environment, remember to download the [fluss-flink connector jar](/downloads), [flink-connector-faker](https://github.com/knaufk/flink-faker/releases) and then put them to `FLINK_HOME/lib/`.
- All the following commands involving `docker compose` should be executed in the created working directory that contains the `docker-compose.yml` file.
:::

Expand All @@ -125,25 +147,70 @@ Congratulations, you are all set!
## Enter into SQL-Client
First, use the following command to enter the Flink SQL CLI Container:
```shell
docker compose exec jobmanager ./sql-client
docker compose run sql-client
```

**Note**:
To simplify this guide, three temporary tables have been pre-created with `faker` connector to generate data.
You can view their schemas by running the following commands:
To simplify this guide, we will create three temporary tables with `faker` connector to generate data:

```sql title="Flink SQL"
SHOW CREATE TABLE source_customer;
CREATE TEMPORARY TABLE source_order (
`order_key` BIGINT,
`cust_key` INT,
`total_price` DECIMAL(15, 2),
`order_date` DATE,
`order_priority` STRING,
`clerk` STRING
) WITH (
'connector' = 'faker',
'rows-per-second' = '10',
'number-of-rows' = '10000',
'fields.order_key.expression' = '#{number.numberBetween ''0'',''100000000''}',
'fields.cust_key.expression' = '#{number.numberBetween ''0'',''20''}',
'fields.total_price.expression' = '#{number.randomDouble ''3'',''1'',''1000''}',
'fields.order_date.expression' = '#{date.past ''100'' ''DAYS''}',
'fields.order_priority.expression' = '#{regexify ''(low|medium|high){1}''}',
'fields.clerk.expression' = '#{regexify ''(Clerk1|Clerk2|Clerk3|Clerk4){1}''}'
);
```

```sql title="Flink SQL"
SHOW CREATE TABLE source_order;
CREATE TEMPORARY TABLE source_customer (
`cust_key` INT,
`name` STRING,
`phone` STRING,
`nation_key` INT NOT NULL,
`acctbal` DECIMAL(15, 2),
`mktsegment` STRING,
PRIMARY KEY (`cust_key`) NOT ENFORCED
) WITH (
'connector' = 'faker',
'number-of-rows' = '200',
'fields.cust_key.expression' = '#{number.numberBetween ''0'',''20''}',
'fields.name.expression' = '#{funnyName.name}',
'fields.nation_key.expression' = '#{number.numberBetween ''1'',''5''}',
'fields.phone.expression' = '#{phoneNumber.cellPhone}',
'fields.acctbal.expression' = '#{number.randomDouble ''3'',''1'',''1000''}',
'fields.mktsegment.expression' = '#{regexify ''(AUTOMOBILE|BUILDING|FURNITURE|MACHINERY|HOUSEHOLD){1}''}'
);
```

```sql title="Flink SQL"
SHOW CREATE TABLE source_nation;
CREATE TEMPORARY TABLE `source_nation` (
`nation_key` INT NOT NULL,
`name` STRING,
PRIMARY KEY (`nation_key`) NOT ENFORCED
) WITH (
'connector' = 'faker',
'number-of-rows' = '100',
'fields.nation_key.expression' = '#{number.numberBetween ''1'',''5''}',
'fields.name.expression' = '#{regexify ''(CANADA|JORDAN|CHINA|UNITED|INDIA){1}''}'
);
```

```sql title="Flink SQL"
-- drop records silently if a null value would have to be inserted into a NOT NULL column
SET 'table.exec.sink.not-null-enforcer'='DROP';
```

## Create Fluss Tables
### Create Fluss Catalog
Expand Down
55 changes: 48 additions & 7 deletions website/docs/quickstart/lakehouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ For further information how to store catalog configurations, see [Flink's Catalo
:::

### Create Tables
<Tabs groupId="lake-tabs">
<TabItem value="paimon" label="Paimon" default>


Running the following SQL to create Fluss tables to be used in this guide:
```sql title="Flink SQL"
CREATE TABLE fluss_order (
Expand Down Expand Up @@ -366,6 +370,46 @@ CREATE TABLE fluss_nation (
);
```

</TabItem>

<TabItem value="iceberg" label="Iceberg">


Running the following SQL to create Fluss tables to be used in this guide:
```sql title="Flink SQL"
CREATE TABLE fluss_order (
`order_key` BIGINT,
`cust_key` INT NOT NULL,
`total_price` DECIMAL(15, 2),
`order_date` DATE,
`order_priority` STRING,
`clerk` STRING,
`ptime` AS PROCTIME()
);
```

```sql title="Flink SQL"
CREATE TABLE fluss_customer (
`cust_key` INT NOT NULL,
`name` STRING,
`phone` STRING,
`nation_key` INT NOT NULL,
`acctbal` DECIMAL(15, 2),
`mktsegment` STRING,
PRIMARY KEY (`cust_key`) NOT ENFORCED
);
```

```sql title="Flink SQL"
CREATE TABLE fluss_nation (
`nation_key` INT NOT NULL,
`name` STRING,
PRIMARY KEY (`nation_key`) NOT ENFORCED
);
```

</TabItem>
</Tabs>
## Streaming into Fluss

First, run the following SQL to sync data from source tables to Fluss tables:
Expand Down Expand Up @@ -520,13 +564,10 @@ SELECT o.order_key,
c.acctbal,
c.mktsegment,
n.name
FROM (
SELECT *, PROCTIME() as ptime
FROM `default_catalog`.`default_database`.source_order
) o
LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF o.ptime AS c
FROM fluss_order o
LEFT JOIN fluss_customer FOR SYSTEM_TIME AS OF `o`.`ptime` AS `c`
ON o.cust_key = c.cust_key
LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF o.ptime AS n
LEFT JOIN fluss_nation FOR SYSTEM_TIME AS OF `o`.`ptime` AS `n`
ON c.nation_key = n.nation_key;
```

Expand Down Expand Up @@ -714,4 +755,4 @@ After finishing the tutorial, run `exit` to exit Flink SQL CLI Container and the
```shell
docker compose down -v
```
to stop all containers.
to stop all containers.