From 92bb6fdaa8e58a17db61473da2425332881b3306 Mon Sep 17 00:00:00 2001 From: Chloe He Date: Tue, 27 Feb 2024 07:33:48 -0800 Subject: [PATCH] docs(blog): fix a formatting issue (#8473) ## Description of changes Fix a small formatting issue in the blog post. --- .../unified-stream-batch/index/execute-results/html.json | 4 ++-- docs/posts/unified-stream-batch/index.qmd | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/_freeze/posts/unified-stream-batch/index/execute-results/html.json b/docs/_freeze/posts/unified-stream-batch/index/execute-results/html.json index b538d66cc2e6..17ebdb33b14b 100644 --- a/docs/_freeze/posts/unified-stream-batch/index/execute-results/html.json +++ b/docs/_freeze/posts/unified-stream-batch/index/execute-results/html.json @@ -1,8 +1,8 @@ { - "hash": "94948d88258e850666fdf492f9ee8c81", + "hash": "c05ed046ce2729019b23b5236a28ee74", "result": { "engine": "jupyter", - "markdown": "---\ntitle: \"Stream-batch unification through Ibis\"\nauthor: \"Chloe He\"\ndate: 2024-02-26\ncategories:\n - blog\n - flink\n - risingwave\n - streaming\n---\n\nOne of my focuses in the past 10 months has been to implement the Flink backend\nfor Ibis. I was working with Apache Flink and building a feature engineering\ntool, and we stumbled upon Ibis as we attempted to build our own translation\nlayer that could turn user declarations into relation trees, then optimize and\ndeploy the query plan, all while maintaining the underlying infrastructure for\nthe user. We considered and prototyped with a number of tools and eventually\nchose Ibis. It had already established a position in the batch world and had\nsupport for 10+ of the most popular batch engines (at the time). We loved the\nidea of decoupling the user-facing interface from the execution engine, so that\nusers can swap out the execution engine depending on their needs, without\nhaving to rewrite code. And, of course, it was open-source. It was everything\nwe dreamed of.\n\nA few months later, [we started introducing Apache Flink as the first streaming\nbackend into Ibis](https://github.com/ibis-project/ibis/pull/6408). We saw so\nmuch more that Ibis can do when it steps outside of batch.\n\nIbis 8.0 marks the official launch of the first streaming backends in Ibis\n([Apache Flink](https://ibis-project.org/backends/flink) and\n[RisingWave](https://ibis-project.org/backends/risingwave)). This is a very\nsignificant milestone in Ibis development.\n\nYou may be wondering: what does this mean? Why is this such a big deal? I will\nbe answering these questions in this blog post.\n\n## Ibis combines stream and batch into a single framework beyond version 8.0\n\nToday, Ibis provides support for 20+ backends including Dask, DuckDB,\nPostgreSQL, PySpark, Snowflake, and others. However - before the introduction\nof Flink and RisingWave backends - all of the supported backends derive from a\nbatch paradigm (aside from Spark, which does offer support for stream\nprocessing, albeit using micro-batches underneath the hood).\n\nThis means that Ibis is an extremely valuable tool, but it was limited to batch\nworkloads. In the case of streaming workloads, where systems are [designed with\nunbounded data in mind](https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/),\nthe batch-oriented Ibis fell short. To deal with an infinite data stream,\nstreaming data systems operate with unique concepts such as “event time”,\n“processing time”, “watermark”, etc. All of these were missing from Ibis.\n\nAt the same time, streaming systems (Spark Streaming, Apache Flink, RisingWave,\netc) have been gaining popularity. It drove the development of more mature\ntechnologies as well as new approaches to close the gap between batch and\nstreaming worlds. [Flink SQL, for example, was born as a part of such effort\nand, through allowing users to write streaming engines in a SQL-like manner,\nhave been vastly successful in that regard.](https://www.ververica.com/blog/apache-flink-sql-past-present-and-future)\nThe success of Flink SQL both validates the potential of stream and batch\nunification and inspires the community to push for better standards, a vision\nthat Ibis is at a unique and valuable position to help build.\n\n## Why is batch-stream unification significant?\n\nFirstly, large companies that have both batch and streaming workloads often\ndeploy\n[Lambda architecture](https://en.wikipedia.org/wiki/Lambda_architecture).\nIn a Lambda infrastructure, batch and streaming pipelines are separate, which\nrequires two codebases to be set up and maintained. If you’re a platform\nengineer, you have probably found yourself trying to duplicate batch workloads\n“in streaming code” and vice versa. If you have backfilled a streaming pipeline\ndue to a bug and needed to reimplement the logic on a batch pipeline, you know\nhow painful that all is :(\n\n[LinkedIn successfully reduced processing time by 94% and resource utilization\nby 50% after switching from a Lambda architecture to unified batch and\nstreaming pipelines.](https://www.linkedin.com/blog/engineering/data-streaming-processing/unified-streaming-and-batch-pipelines-at-linkedin-reducing-proc)\nA unified system also massively increased engineer productivity because they no\nlonger needed to develop and maintain separate codebases for different\nenvironments.\n[Uber](https://www.uber.com/blog/kappa-architecture-data-stream-processing/),\n[Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020),\nand [Lyft](https://beam.apache.org/case-studies/lyft/) have also adopted\nsimilar solutions.\n\nSecondly, in the world of machine learning, it’s common for data scientists to\ndevelop locally and experiment with a sampled, batch dataset in Python. If the\nresults look promising, the features and models would then be deployed into\nproduction. Oftentimes, there is a code handover in this process, and a\ndedicated team of developers would be responsible for rewriting the logic for\nproduction, as a streaming workload.\n\nIn both cases, there is a huge amount of technical overhead. If there is a\nstreamlined architecture, using a unified API, much of this overhead can be\navoided. As a platform engineer, you no longer need to worry about maintaining\ntwo separate architectures and codebases. As a data scientist or a machine\nlearning engineer, you can write one single workload that can execute both on\nbatch and streaming backends. Wouldn’t that be amazing?\n\n## Ibis unifies batch and streaming\n\nEnter Ibis. Ibis unifies batch and streaming with a single API. It decouples\nthe dataframe API from backend execution, so that the logic for defining data\ntransformations is unaffected by implementation discrepancies across backend\nengines. There is also an ongoing effort to further increase interoperability\nacross different languages and systems via a standard query plan intermediate\nrepresentation (IR), using a library called\n[`Substrait`](https://substrait.io/).\n\nWhat does this actually look like? For example, Ibis allows users to define\nwindow aggregations using the [`over()`\nmethod](../../reference/expression-tables.qmd#ibis.expr.types.groupby.GroupedTable.over).\nWhen executed on the Flink backend, this translates into [Flink’s over\naggregation query](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/over-agg/)\nand outputs an aggregated value for every input row over a range of ordered\nrows. On streaming data, aggregation results are continuously computed and\nwritten into data sinks (e.g., Kafka, Redis) as records are received at and\nconsumed from the upstream data source (e.g., Kafka, Change Data Capture). In\npandas, the conceptual analog is [windowing\noperation](https://pandas.pydata.org/docs/user_guide/window.html). Results are\ncomputed by looking back the length of the window from the current observation,\nbut can be computed all at once because batch data is static.\n\nAnother great example is deduplication. In Flink SQL, this looks something like this:\n\n```sql\nSELECT [column_list]\nFROM (\n SELECT [column_list],\n ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]\n ORDER BY time_attr [asc|desc]) AS rownum\n FROM table_name)\nWHERE rownum = 1\n```\nIn a database like Postgres, this could be as simple as\n\n```sql\nSELECT DISTINCT t0.`string_col`, t0.`int_col`\nFROM functional_alltypes t0\n```\n\nAnd in `pandas`, you would use the method `drop_duplicates()`:\n\n```python\ndf.drop_duplicates()\n```\n\n::: {.callout-note}\nWe’re working on supporting deduplication via `distinct()` in Flink backend and\nthis feature should be available soon!\n:::\n\nThese underlying discrepancies are abstracted in such a way that you, as an\nIbis user, will no longer find yourself struggling with bugs that are the\nresult of subtleties across different engines and dialects. Need to rewrite\nyour batch workload as a streaming one or vice versa? Rest assured, Ibis has\nyou covered!\n\n## See it in action\n\nNow, let’s walk through a code example together to see how simple this\nexperience is!\n\n::: {.callout-note}\nPrerequisites for running this example:\n\n* Docker Compose: This tutorial uses Docker Compose to manage an Apache Kafka\nenvironment (including sample data generation) and a Flink cluster (for remote\nexecution). You can [download and install Docker Compose from the official\nwebsite](https://docs.docker.com/compose/install/).\n* JDK 11 release: Flink requires Java 11.\n* Python 3.9 or 3.10.\n* Follow [the setup\ntutorial](../../tutorials/open-source-software/apache-flink/0_setup.qmd) to\ninstall the Flink backend for Ibis.\n* Clone the [example repository](https://github.com/ibis-project/ibis-flink-example).\n:::\n\n::: {.callout-note}\nThis example is a hypothetical scenario and we will be using simulated data.\n:::\n\nFirst, spin up the Docker containers by running `docker compose up kafka\ninit-kafka data-generator`. This will set up a mocked Kafka source that\ncontains records that look like the following:\n\n```json\n{\n \"createTime\": \"2023-09-20 22:19:02.224\",\n \"orderId\": 1695248388,\n \"payAmount\": 88694.71922270155,\n \"payPlatform\": 0,\n \"provinceId\": 6,\n}\n```\n\nThis is a streaming data source. Commonly, to experiment with the data, we\nwould extract a chunk of the data and load it in batch:\n\n\n::: {#5e0d3e63 .cell execution_count=2}\n``` {.python .cell-code}\nfrom kafka import KafkaConsumer\n\nconsumer = KafkaConsumer(\"payment_msg\", auto_offset_reset=\"earliest\")\nrows = []\nfor _, msg in zip(range(100), consumer):\n rows.append(msg)\n```\n:::\n\n\nThis is a tabular dataset and we can convert it into a `pandas` DataFrame:\n\n::: {#071552d6 .cell execution_count=3}\n``` {.python .cell-code}\nimport json\n\nimport pandas as pd\n\ndf = pd.DataFrame([json.loads(row.value) for row in rows])\ndf[\"createTime\"] = pd.to_datetime(df[\"createTime\"])\ndf\n```\n\n::: {.cell-output .cell-output-display execution_count=3}\n```{=html}\n
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
createTimeorderIdpayAmountpayPlatformprovinceId
02024-02-24 07:15:40.896170875894185955.21175303
12024-02-24 07:15:41.402170875894270006.91813303
22024-02-24 07:15:41.90517087589434352.88570403
32024-02-24 07:15:42.405170875894420850.06859702
42024-02-24 07:15:42.909170875894572829.02007604
..................
952024-02-24 07:16:28.636170875903670241.64774905
962024-02-24 07:16:29.139170875903789075.95104812
972024-02-24 07:16:29.653170875903849521.53152805
982024-02-24 07:16:30.15417087590399171.09425102
992024-02-24 07:16:30.655170875904062199.63159703
\n

100 rows × 5 columns

\n
\n```\n:::\n:::\n\n\nWe can connect to this DataFrame in Ibis in a local execution backend:\n\n::: {#0f3b72a0 .cell execution_count=4}\n``` {.python .cell-code}\nimport ibis\n\ncon = ibis.get_backend()\ncon.create_table(\"payments\", df)\n```\n\n::: {.cell-output .cell-output-display execution_count=4}\n```{=html}\n
DatabaseTable: payments\n  createTime  timestamp(6)\n  orderId     int64\n  payAmount   float64\n  payPlatform int64\n  provinceId  int64\n
\n```\n:::\n:::\n\n\n::: {.callout-note}\nThe default execution engine for Ibis is DuckDB.\n:::\n\nThis is a series of records of order transactions. At Company Potclay, we have\njust deployed a new ad campaign, which is A/B tested by province, and we’re\ninterested in the effectiveness of this ad campaign by monitoring data\ndistribution shift over time. A crucial feature is the total transaction amount\nover the past minute, stratified by province. We would like to first experiment\nwriting this feature on a smaller set of batch data. After we make sure that\nthe logic looks correct and handles all edge cases appropriately, we want to\ndeploy this as a streaming workload.\n\nIbis allows us to write transformations on top of so-called abstract or unbound\ntables (i.e., tables that are not bound to an actual data source). This\nseparation between transformation logic and the underlying data and execution\nis one of the things that makes Ibis so powerful. It's similar to dependency\ninjection, but in this case the data is the dependency and is injected at\nruntime.\n\nTo write transformations on top of an unbound table, we need to first define an\n`ibis.table()` with a schema. Here is how we would write all of this in Ibis\ncode:\n\n::: {#aafb8b70 .cell execution_count=5}\n``` {.python .cell-code}\nimport ibis.expr.schema as sch\nimport ibis.expr.datatypes as dt\nfrom ibis import _\n\nschema = sch.Schema(\n {\n \"createTime\": dt.timestamp(scale=3),\n \"orderId\": dt.int64,\n \"payAmount\": dt.float64,\n \"payPlatform\": dt.int32,\n \"provinceId\": dt.int32,\n }\n)\nunbound_table = ibis.table(schema, name=\"payments\")\nunbound_agged = unbound_table[\n \"provinceId\",\n _.payAmount.sum()\n .over(range=(-ibis.interval(seconds=10), 0), order_by=_.createTime)\n .name(\"pay_amount\"),\n]\nunbound_agged\n```\n\n::: {.cell-output .cell-output-display execution_count=5}\n```{=html}\n
r0 := UnboundTable: payments\n  createTime  timestamp(3)\n  orderId     int64\n  payAmount   float64\n  payPlatform int32\n  provinceId  int32\n\nProject[r0]\n  provinceId: r0.provinceId\n  pay_amount: WindowFunction(func=Sum(r0.payAmount), frame=RangeWindowFrame(table=r0, start=WindowBoundary(value=10\ns, preceding=True), end=WindowBoundary(Cast(0, to=interval('s'))), order_by=[asc r0.createTime]))\n
\n```\n:::\n:::\n\n\nCarrying out the computations using the local execution backend that we\nconnected to above is as simple as:\n\n::: {#c7521f34 .cell execution_count=6}\n``` {.python .cell-code}\ncon.to_pandas(unbound_agged)\n```\n\n::: {.cell-output .cell-output-display execution_count=6}\n```{=html}\n
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
provinceIdpay_amount
038.595521e+04
131.559621e+05
231.603150e+05
321.811651e+05
442.539941e+05
.........
9551.270405e+06
9621.269567e+06
9751.277924e+06
9821.204362e+06
9931.202042e+06
\n

100 rows × 2 columns

\n
\n```\n:::\n:::\n\n\n::: {.callout-note}\nDuckDB is much faster than `pandas`, and using Ibis you don't need to write SQL\nfor it!\n:::\n\nFor local experimentation purposes, this DataFrame only consists of 100 rows,\nso doing this in memory is easy.\n\nThe outputs look correct and we didn’t run into any errors. We are now ready to\ndeploy this as a streaming job in Flink!\n\nFirst, let’s set up the Flink environment and connect to this Kafka source:\n\n::: {.callout-note}\nKafka connector is not part of the binary distribution, so we need to download\nand link it for cluster execution explicitly:\n\n\n:::\n\n::: {#c7870162 .cell execution_count=8}\n``` {.python .cell-code}\nfrom pyflink.table import EnvironmentSettings, TableEnvironment\nfrom pyflink.common import Configuration\n\nsource_schema = sch.Schema(\n {\n \"createTime\": dt.timestamp(scale=3),\n \"orderId\": dt.int64,\n \"payAmount\": dt.float64,\n \"payPlatform\": dt.int32,\n \"provinceId\": dt.int32,\n }\n)\n\nenv_settings = EnvironmentSettings.in_streaming_mode()\ntable_env = TableEnvironment.create(env_settings)\n\ntable_config = table_env.get_config()\nconfig = Configuration()\nconfig.set_string(\"parallelism.default\", \"1\")\ntable_config.add_configuration(config)\n\nconnection = ibis.flink.connect(table_env)\n\n# add the JAR downloaded above\nconnection.raw_sql(\"ADD JAR 'flink-sql-connector-kafka-1.17.1.jar'\")\n\nsource_configs = {\n \"connector\": \"kafka\",\n \"topic\": \"payment_msg\",\n \"properties.bootstrap.servers\": \"localhost:9092\",\n \"properties.group.id\": \"test_3\",\n \"scan.startup.mode\": \"earliest-offset\",\n \"format\": \"json\",\n}\n\nconnection.create_table(\n \"payments\",\n schema=source_schema,\n tbl_properties=source_configs,\n watermark=ibis.watermark(\n time_col=\"createTime\", allowed_delay=ibis.interval(seconds=15)\n ),\n)\n```\n\n::: {.cell-output .cell-output-display execution_count=8}\n```{=html}\n
DatabaseTable: payments\n  createTime  timestamp(3)\n  orderId     int64\n  payAmount   float64\n  payPlatform int32\n  provinceId  int32\n
\n```\n:::\n:::\n\n\nHow would we write this in Flink SQL? Ibis makes this extremely easy by\nexposing a `compile()` API:\n\n::: {#db65bfb1 .cell execution_count=9}\n``` {.python .cell-code}\nsql = connection.compile(unbound_agged)\nprint(sql)\n```\n\n::: {.cell-output .cell-output-stdout}\n````\nSELECT\n `t0`.`provinceId`,\n SUM(`t0`.`payAmount`) OVER (ORDER BY `t0`.`createTime` ASC NULLS LAST RANGE BETWEEN INTERVAL '10' SECOND(2) preceding AND CURRENT ROW) AS `pay_amount`\nFROM `payments` AS `t0`\n````\n:::\n:::\n\n\nBefore we can execute this query, we need to first define a data sink where the\nresults can be written:\n\n::: {#68052729 .cell execution_count=10}\n``` {.python .cell-code}\nsink_schema = sch.Schema(\n {\n \"province_id\": dt.int32,\n \"pay_amount\": dt.float64,\n }\n)\n\nkafka_sink_configs = {\n \"connector\": \"kafka\",\n \"topic\": \"sink\",\n \"properties.bootstrap.servers\": \"localhost:9092\",\n \"format\": \"json\",\n}\n\nconnection.create_table(\n \"kafka_sink\", schema=sink_schema, tbl_properties=kafka_sink_configs\n)\n```\n\n::: {.cell-output .cell-output-display execution_count=10}\n```{=html}\n
DatabaseTable: kafka_sink\n  province_id int32\n  pay_amount  float64\n
\n```\n:::\n:::\n\n\nNow, let’s write the results into this sink. Note that we can directly reuse\nthe transformation logic that we wrote above for the local execution backend!!\n\n::: {#dc110b9d .cell execution_count=11}\n``` {.python .cell-code}\nconnection.insert(\"kafka_sink\", unbound_agged)\n```\n\n::: {.cell-output .cell-output-display execution_count=11}\n```\n\n```\n:::\n:::\n\n\n::: {.callout-tip}\nYou can examine the results either using the Kafka console consumer CLI or the\n`kafka-python` library.\n:::\n\nHow easy was it to define both batch and streaming workloads using Ibis?\nWithout Ibis, you would have needed to write a `pandas`/DuckDB workload and\nthen convert it into Flink SQL manually.\n\n## Concluding thoughts\n\nWith the introduction of the first streaming backends, Ibis is now both a batch\nand a streaming Python DataFrame API and we’re excited about what’s to come\nnext. We hope that Ibis can close the gap between batch and streaming in such a\nway that we no longer talk about the two separately, but, rather, as two parts\nof the same paradigm. Streaming naturally lends itself to batch: batch is\ntechnically just a special case of streaming, where the unbounded data flow\nstops at some point.\n\nOf course, this is only the beginning. There are still technical challenges to\nbe solved (e.g., backfill, window computations over large windows, GPU\nacceleration), and we'll definitely have more exciting updates to share with\nthe community soon!\n\nCheck out the new [Apache Flink](https://ibis-project.org/backends/flink) and\n[RisingWave](https://ibis-project.org/backends/risingwave) backends and let us\nknow what you think!\n\n", + "markdown": "---\ntitle: \"Stream-batch unification through Ibis\"\nauthor: \"Chloe He\"\ndate: 2024-02-26\ncategories:\n - blog\n - flink\n - risingwave\n - streaming\n---\n\nOne of my focuses in the past 10 months has been to implement the Flink backend\nfor Ibis. I was working with Apache Flink and building a feature engineering\ntool, and we stumbled upon Ibis as we attempted to build our own translation\nlayer that could turn user declarations into relation trees, then optimize and\ndeploy the query plan, all while maintaining the underlying infrastructure for\nthe user. We considered and prototyped with a number of tools and eventually\nchose Ibis. It had already established a position in the batch world and had\nsupport for 10+ of the most popular batch engines (at the time). We loved the\nidea of decoupling the user-facing interface from the execution engine, so that\nusers can swap out the execution engine depending on their needs, without\nhaving to rewrite code. And, of course, it was open-source. It was everything\nwe dreamed of.\n\nA few months later, [we started introducing Apache Flink as the first streaming\nbackend into Ibis](https://github.com/ibis-project/ibis/pull/6408). We saw so\nmuch more that Ibis can do when it steps outside of batch.\n\nIbis 8.0 marks the official launch of the first streaming backends in Ibis\n([Apache Flink](https://ibis-project.org/backends/flink) and\n[RisingWave](https://ibis-project.org/backends/risingwave)). This is a very\nsignificant milestone in Ibis development.\n\nYou may be wondering: what does this mean? Why is this such a big deal? I will\nbe answering these questions in this blog post.\n\n## Ibis combines stream and batch into a single framework beyond version 8.0\n\nToday, Ibis provides support for 20+ backends including Dask, DuckDB,\nPostgreSQL, PySpark, Snowflake, and others. However - before the introduction\nof Flink and RisingWave backends - all of the supported backends derive from a\nbatch paradigm (aside from Spark, which does offer support for stream\nprocessing, albeit using micro-batches underneath the hood).\n\nThis means that Ibis is an extremely valuable tool, but it was limited to batch\nworkloads. In the case of streaming workloads, where systems are [designed with\nunbounded data in mind](https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/),\nthe batch-oriented Ibis fell short. To deal with an infinite data stream,\nstreaming data systems operate with unique concepts such as “event time”,\n“processing time”, “watermark”, etc. All of these were missing from Ibis.\n\nAt the same time, streaming systems (Spark Streaming, Apache Flink, RisingWave,\netc) have been gaining popularity. It drove the development of more mature\ntechnologies as well as new approaches to close the gap between batch and\nstreaming worlds. [Flink SQL, for example, was born as a part of such effort\nand, through allowing users to write streaming engines in a SQL-like manner,\nhave been vastly successful in that regard.](https://www.ververica.com/blog/apache-flink-sql-past-present-and-future)\nThe success of Flink SQL both validates the potential of stream and batch\nunification and inspires the community to push for better standards, a vision\nthat Ibis is at a unique and valuable position to help build.\n\n## Why is batch-stream unification significant?\n\nFirstly, large companies that have both batch and streaming workloads often\ndeploy\n[Lambda architecture](https://en.wikipedia.org/wiki/Lambda_architecture).\nIn a Lambda infrastructure, batch and streaming pipelines are separate, which\nrequires two codebases to be set up and maintained. If you’re a platform\nengineer, you have probably found yourself trying to duplicate batch workloads\n“in streaming code” and vice versa. If you have backfilled a streaming pipeline\ndue to a bug and needed to reimplement the logic on a batch pipeline, you know\nhow painful that all is :(\n\n[LinkedIn successfully reduced processing time by 94% and resource utilization\nby 50% after switching from a Lambda architecture to unified batch and\nstreaming pipelines.](https://www.linkedin.com/blog/engineering/data-streaming-processing/unified-streaming-and-batch-pipelines-at-linkedin-reducing-proc)\nA unified system also massively increased engineer productivity because they no\nlonger needed to develop and maintain separate codebases for different\nenvironments.\n[Uber](https://www.uber.com/blog/kappa-architecture-data-stream-processing/),\n[Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020),\nand [Lyft](https://beam.apache.org/case-studies/lyft/) have also adopted\nsimilar solutions.\n\nSecondly, in the world of machine learning, it’s common for data scientists to\ndevelop locally and experiment with a sampled, batch dataset in Python. If the\nresults look promising, the features and models would then be deployed into\nproduction. Oftentimes, there is a code handover in this process, and a\ndedicated team of developers would be responsible for rewriting the logic for\nproduction, as a streaming workload.\n\nIn both cases, there is a huge amount of technical overhead. If there is a\nstreamlined architecture, using a unified API, much of this overhead can be\navoided. As a platform engineer, you no longer need to worry about maintaining\ntwo separate architectures and codebases. As a data scientist or a machine\nlearning engineer, you can write one single workload that can execute both on\nbatch and streaming backends. Wouldn’t that be amazing?\n\n## Ibis unifies batch and streaming\n\nEnter Ibis. Ibis unifies batch and streaming with a single API. It decouples\nthe dataframe API from backend execution, so that the logic for defining data\ntransformations is unaffected by implementation discrepancies across backend\nengines. There is also an ongoing effort to further increase interoperability\nacross different languages and systems via a standard query plan intermediate\nrepresentation (IR), using a library called\n[`Substrait`](https://substrait.io/).\n\nWhat does this actually look like? For example, Ibis allows users to define\nwindow aggregations using the [`over()`\nmethod](../../reference/expression-tables.qmd#ibis.expr.types.groupby.GroupedTable.over).\nWhen executed on the Flink backend, this translates into [Flink’s over\naggregation query](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/over-agg/)\nand outputs an aggregated value for every input row over a range of ordered\nrows. On streaming data, aggregation results are continuously computed and\nwritten into data sinks (e.g., Kafka, Redis) as records are received at and\nconsumed from the upstream data source (e.g., Kafka, Change Data Capture). In\npandas, the conceptual analog is [windowing\noperation](https://pandas.pydata.org/docs/user_guide/window.html). Results are\ncomputed by looking back the length of the window from the current observation,\nbut can be computed all at once because batch data is static.\n\nAnother great example is deduplication. In Flink SQL, this looks something like this:\n\n```sql\nSELECT [column_list]\nFROM (\n SELECT [column_list],\n ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]\n ORDER BY time_attr [asc|desc]) AS rownum\n FROM table_name)\nWHERE rownum = 1\n```\nIn a database like Postgres, this could be as simple as\n\n```sql\nSELECT DISTINCT t0.`string_col`, t0.`int_col`\nFROM functional_alltypes t0\n```\n\nAnd in `pandas`, you would use the method `drop_duplicates()`:\n\n```python\ndf.drop_duplicates()\n```\n\n::: {.callout-note}\nWe’re working on supporting deduplication via `distinct()` in Flink backend and\nthis feature should be available soon!\n:::\n\nThese underlying discrepancies are abstracted in such a way that you, as an\nIbis user, will no longer find yourself struggling with bugs that are the\nresult of subtleties across different engines and dialects. Need to rewrite\nyour batch workload as a streaming one or vice versa? Rest assured, Ibis has\nyou covered!\n\n## See it in action\n\nNow, let’s walk through a code example together to see how simple this\nexperience is!\n\n::: {.callout-note}\nPrerequisites for running this example:\n\n* Docker Compose: This tutorial uses Docker Compose to manage an Apache Kafka\nenvironment (including sample data generation) and a Flink cluster (for remote\nexecution). You can [download and install Docker Compose from the official\nwebsite](https://docs.docker.com/compose/install/).\n* JDK 11 release: Flink requires Java 11.\n* Python 3.9 or 3.10.\n* Follow [the setup\ntutorial](../../tutorials/open-source-software/apache-flink/0_setup.qmd) to\ninstall the Flink backend for Ibis.\n* Clone the [example repository](https://github.com/ibis-project/ibis-flink-example).\n:::\n\n::: {.callout-note}\nThis example is a hypothetical scenario and we will be using simulated data.\n:::\n\nFirst, spin up the Docker containers by running `docker compose up kafka\ninit-kafka data-generator`. This will set up a mocked Kafka source that\ncontains records that look like the following:\n\n```json\n{\n \"createTime\": \"2023-09-20 22:19:02.224\",\n \"orderId\": 1695248388,\n \"payAmount\": 88694.71922270155,\n \"payPlatform\": 0,\n \"provinceId\": 6,\n}\n```\n\nThis is a streaming data source. Commonly, to experiment with the data, we\nwould extract a chunk of the data and load it in batch:\n\n\n::: {#3892eff1 .cell execution_count=2}\n``` {.python .cell-code}\nfrom kafka import KafkaConsumer\n\nconsumer = KafkaConsumer(\"payment_msg\", auto_offset_reset=\"earliest\")\nrows = []\nfor _, msg in zip(range(100), consumer):\n rows.append(msg)\n```\n:::\n\n\nThis is a tabular dataset and we can convert it into a `pandas` DataFrame:\n\n::: {#3c408f90 .cell execution_count=3}\n``` {.python .cell-code}\nimport json\n\nimport pandas as pd\n\ndf = pd.DataFrame([json.loads(row.value) for row in rows])\ndf[\"createTime\"] = pd.to_datetime(df[\"createTime\"])\ndf\n```\n\n::: {.cell-output .cell-output-display execution_count=3}\n```{=html}\n
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
createTimeorderIdpayAmountpayPlatformprovinceId
02024-02-24 07:15:40.896170875894185955.21175303
12024-02-24 07:15:41.402170875894270006.91813303
22024-02-24 07:15:41.90517087589434352.88570403
32024-02-24 07:15:42.405170875894420850.06859702
42024-02-24 07:15:42.909170875894572829.02007604
..................
952024-02-24 07:16:28.636170875903670241.64774905
962024-02-24 07:16:29.139170875903789075.95104812
972024-02-24 07:16:29.653170875903849521.53152805
982024-02-24 07:16:30.15417087590399171.09425102
992024-02-24 07:16:30.655170875904062199.63159703
\n

100 rows × 5 columns

\n
\n```\n:::\n:::\n\n\nWe can connect to this DataFrame in Ibis in a local execution backend:\n\n::: {#6b3df357 .cell execution_count=4}\n``` {.python .cell-code}\nimport ibis\n\ncon = ibis.get_backend()\ncon.create_table(\"payments\", df)\n```\n\n::: {.cell-output .cell-output-display execution_count=4}\n```{=html}\n
DatabaseTable: payments\n  createTime  timestamp(6)\n  orderId     int64\n  payAmount   float64\n  payPlatform int64\n  provinceId  int64\n
\n```\n:::\n:::\n\n\n::: {.callout-note}\nThe default execution engine for Ibis is DuckDB.\n:::\n\nThis is a series of records of order transactions. At Company Potclay, we have\njust deployed a new ad campaign, which is A/B tested by province, and we’re\ninterested in the effectiveness of this ad campaign by monitoring data\ndistribution shift over time. A crucial feature is the total transaction amount\nover the past minute, stratified by province. We would like to first experiment\nwriting this feature on a smaller set of batch data. After we make sure that\nthe logic looks correct and handles all edge cases appropriately, we want to\ndeploy this as a streaming workload.\n\nIbis allows us to write transformations on top of so-called abstract or unbound\ntables (i.e., tables that are not bound to an actual data source). This\nseparation between transformation logic and the underlying data and execution\nis one of the things that makes Ibis so powerful. It's similar to dependency\ninjection, but in this case the data is the dependency and is injected at\nruntime.\n\nTo write transformations on top of an unbound table, we need to first define an\n`ibis.table()` with a schema. Here is how we would write all of this in Ibis\ncode:\n\n::: {#771fc1be .cell execution_count=5}\n``` {.python .cell-code}\nimport ibis.expr.schema as sch\nimport ibis.expr.datatypes as dt\nfrom ibis import _\n\nschema = sch.Schema(\n {\n \"createTime\": dt.timestamp(scale=3),\n \"orderId\": dt.int64,\n \"payAmount\": dt.float64,\n \"payPlatform\": dt.int32,\n \"provinceId\": dt.int32,\n }\n)\nunbound_table = ibis.table(schema, name=\"payments\")\nunbound_agged = unbound_table[\n \"provinceId\",\n _.payAmount.sum()\n .over(range=(-ibis.interval(seconds=10), 0), order_by=_.createTime)\n .name(\"pay_amount\"),\n]\nunbound_agged\n```\n\n::: {.cell-output .cell-output-display execution_count=5}\n```{=html}\n
r0 := UnboundTable: payments\n  createTime  timestamp(3)\n  orderId     int64\n  payAmount   float64\n  payPlatform int32\n  provinceId  int32\n\nProject[r0]\n  provinceId: r0.provinceId\n  pay_amount: WindowFunction(func=Sum(r0.payAmount), frame=RangeWindowFrame(table=r0, start=WindowBoundary(value=10\ns, preceding=True), end=WindowBoundary(Cast(0, to=interval('s'))), order_by=[asc r0.createTime]))\n
\n```\n:::\n:::\n\n\nCarrying out the computations using the local execution backend that we\nconnected to above is as simple as:\n\n::: {#82ff34c1 .cell execution_count=6}\n``` {.python .cell-code}\ncon.to_pandas(unbound_agged)\n```\n\n::: {.cell-output .cell-output-display execution_count=6}\n```{=html}\n
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
provinceIdpay_amount
038.595521e+04
131.559621e+05
231.603150e+05
321.811651e+05
442.539941e+05
.........
9551.270405e+06
9621.269567e+06
9751.277924e+06
9821.204362e+06
9931.202042e+06
\n

100 rows × 2 columns

\n
\n```\n:::\n:::\n\n\n::: {.callout-note}\nDuckDB is much faster than `pandas`, and using Ibis you don't need to write SQL\nfor it!\n:::\n\nFor local experimentation purposes, this DataFrame only consists of 100 rows,\nso doing this in memory is easy.\n\nThe outputs look correct and we didn’t run into any errors. We are now ready to\ndeploy this as a streaming job in Flink!\n\nFirst, let’s set up the Flink environment and connect to this Kafka source:\n\n::: {.callout-note}\nKafka connector is not part of the binary distribution, so we need to download\nand link it for cluster execution explicitly:\n\n::: {#2350cbf3 .cell execution_count=7}\n``` {.python .cell-code}\n!wget -N https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar\n```\n:::\n\n\n:::\n\n::: {#f0187bee .cell execution_count=8}\n``` {.python .cell-code}\nfrom pyflink.table import EnvironmentSettings, TableEnvironment\nfrom pyflink.common import Configuration\n\nsource_schema = sch.Schema(\n {\n \"createTime\": dt.timestamp(scale=3),\n \"orderId\": dt.int64,\n \"payAmount\": dt.float64,\n \"payPlatform\": dt.int32,\n \"provinceId\": dt.int32,\n }\n)\n\nenv_settings = EnvironmentSettings.in_streaming_mode()\ntable_env = TableEnvironment.create(env_settings)\n\ntable_config = table_env.get_config()\nconfig = Configuration()\nconfig.set_string(\"parallelism.default\", \"1\")\ntable_config.add_configuration(config)\n\nconnection = ibis.flink.connect(table_env)\n\n# add the JAR downloaded above\nconnection.raw_sql(\"ADD JAR 'flink-sql-connector-kafka-1.17.1.jar'\")\n\nsource_configs = {\n \"connector\": \"kafka\",\n \"topic\": \"payment_msg\",\n \"properties.bootstrap.servers\": \"localhost:9092\",\n \"properties.group.id\": \"test_3\",\n \"scan.startup.mode\": \"earliest-offset\",\n \"format\": \"json\",\n}\n\nconnection.create_table(\n \"payments\",\n schema=source_schema,\n tbl_properties=source_configs,\n watermark=ibis.watermark(\n time_col=\"createTime\", allowed_delay=ibis.interval(seconds=15)\n ),\n)\n```\n\n::: {.cell-output .cell-output-display execution_count=8}\n```{=html}\n
DatabaseTable: payments\n  createTime  timestamp(3)\n  orderId     int64\n  payAmount   float64\n  payPlatform int32\n  provinceId  int32\n
\n```\n:::\n:::\n\n\nHow would we write this in Flink SQL? Ibis makes this extremely easy by\nexposing a `compile()` API:\n\n::: {#c2d6be4b .cell execution_count=9}\n``` {.python .cell-code}\nsql = connection.compile(unbound_agged)\nprint(sql)\n```\n\n::: {.cell-output .cell-output-stdout}\n````\nSELECT\n `t0`.`provinceId`,\n SUM(`t0`.`payAmount`) OVER (ORDER BY `t0`.`createTime` ASC NULLS LAST RANGE BETWEEN INTERVAL '10' SECOND(2) preceding AND CURRENT ROW) AS `pay_amount`\nFROM `payments` AS `t0`\n````\n:::\n:::\n\n\nBefore we can execute this query, we need to first define a data sink where the\nresults can be written:\n\n::: {#9cf5a144 .cell execution_count=10}\n``` {.python .cell-code}\nsink_schema = sch.Schema(\n {\n \"province_id\": dt.int32,\n \"pay_amount\": dt.float64,\n }\n)\n\nkafka_sink_configs = {\n \"connector\": \"kafka\",\n \"topic\": \"sink\",\n \"properties.bootstrap.servers\": \"localhost:9092\",\n \"format\": \"json\",\n}\n\nconnection.create_table(\n \"kafka_sink\", schema=sink_schema, tbl_properties=kafka_sink_configs\n)\n```\n\n::: {.cell-output .cell-output-display execution_count=10}\n```{=html}\n
DatabaseTable: kafka_sink\n  province_id int32\n  pay_amount  float64\n
\n```\n:::\n:::\n\n\nNow, let’s write the results into this sink. Note that we can directly reuse\nthe transformation logic that we wrote above for the local execution backend!!\n\n::: {#bb2cd7e7 .cell execution_count=11}\n``` {.python .cell-code}\nconnection.insert(\"kafka_sink\", unbound_agged)\n```\n\n::: {.cell-output .cell-output-display execution_count=11}\n```\n\n```\n:::\n:::\n\n\n::: {.callout-tip}\nYou can examine the results either using the Kafka console consumer CLI or the\n`kafka-python` library.\n:::\n\nHow easy was it to define both batch and streaming workloads using Ibis?\nWithout Ibis, you would have needed to write a `pandas`/DuckDB workload and\nthen convert it into Flink SQL manually.\n\n## Concluding thoughts\n\nWith the introduction of the first streaming backends, Ibis is now both a batch\nand a streaming Python DataFrame API and we’re excited about what’s to come\nnext. We hope that Ibis can close the gap between batch and streaming in such a\nway that we no longer talk about the two separately, but, rather, as two parts\nof the same paradigm. Streaming naturally lends itself to batch: batch is\ntechnically just a special case of streaming, where the unbounded data flow\nstops at some point.\n\nOf course, this is only the beginning. There are still technical challenges to\nbe solved (e.g., backfill, window computations over large windows, GPU\nacceleration), and we'll definitely have more exciting updates to share with\nthe community soon!\n\nCheck out the new [Apache Flink](https://ibis-project.org/backends/flink) and\n[RisingWave](https://ibis-project.org/backends/risingwave) backends and let us\nknow what you think!\n\n", "supporting": [ "index_files/figure-html" ], diff --git a/docs/posts/unified-stream-batch/index.qmd b/docs/posts/unified-stream-batch/index.qmd index b15295170497..3d38c3894b6c 100644 --- a/docs/posts/unified-stream-batch/index.qmd +++ b/docs/posts/unified-stream-batch/index.qmd @@ -305,7 +305,7 @@ First, let’s set up the Flink environment and connect to this Kafka source: Kafka connector is not part of the binary distribution, so we need to download and link it for cluster execution explicitly: ```{python} -# | include: false +# | output: false !wget -N https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar ``` :::