From c6762d63cb75a99785387cd8ccbc0aa1ebd066e8 Mon Sep 17 00:00:00 2001 From: kbuilder Date: Fri, 20 Dec 2024 17:22:45 -0800 Subject: [PATCH] Release 1.1.0. --- CHANGES.md | 2 +- README.md | 222 +++++------------------------------------------------ 2 files changed, 21 insertions(+), 203 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 67edf9b..abf8d53 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,6 @@ # Release Notes -## Next +## 1.1.0 - 2024-12-20 Add support for exporting graphs from Spanner ## 1.0.0 - 2023-11-13 diff --git a/README.md b/README.md index 8be6bf9..b1019db 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,6 @@ -# Apache Spark SQL Connector for Google Cloud Spanner +# Apache Spark SQL connector for Google Cloud Spanner -The connector supports reading -[Google Cloud Spanner](https://cloud.google.com/spanner) tables and -[graphs](https://cloud.google.com/spanner/docs/graph/overview) into Spark -[DataFrames](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html) -and -[GraphFrames](https://graphframes.github.io/graphframes/docs/_site/user-guide.html). +The connector supports reading [Google Cloud Spanner](https://cloud.google.com/spanner) tables into Spark's DataFrames. This is done by using the [Spark SQL Data Source API](https://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources) to communicate with [Spanner Java library](https://github.com/googleapis/java-spanner). ## Requirements @@ -23,9 +18,13 @@ MY_CLUSTER=... gcloud dataproc clusters create "$MY_CLUSTER" --scopes https://www.googleapis.com/auth/cloud-platform ``` +### Permission + +If you run a Spark job on the Dataproc cluster, you'll have to assign corresponding [Spanner permission](https://cloud.google.com/spanner/docs/iam#permissions) to the [Dataproc VM service account](https://cloud.google.com/dataproc/docs/concepts/configuring-clusters/service-accounts#dataproc_service_accounts_2). If you choose to use Dataproc Serverless, you'll have to make sure the [Serverless service account](https://cloud.google.com/dataproc-serverless/docs/concepts/service-account#console) has the permission. + ## Downloading and Using the Connector -You can find the released jar file from the Releases tag on right of the github page. The name pattern is spark-3.1-spanner-x.x.x.jar. The 3.1 indicates the driver depends on the Spark 3.1 and x.x.x is the Spark Spanner connector version. The alternative way is to use `gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar` directly. +You can find the released jar file from the Releases tag on right of the github page. The name pattern is spark-3.1-spanner-x.x.x.jar. The 3.1 indicates the driver depends on the Spark 3.1 and x.x.x is the Spark Spanner connector version. The alternative way is to use `gs://spark-lib/spanner/spark-3.1-spanner-1.1.0.jar` directly. ### Connector to Spark Compatibility Matrix | Connector \ Spark | 2.3 | 2.4
(Scala 2.11) | 2.4
(Scala 2.12) | 3.0 | 3.1 | 3.2 | 3.3 | 3.4 | 3.5 | @@ -47,21 +46,17 @@ You can use the standard `--jars` or `--packages` (or alternatively, the `spark. ```shell gcloud dataproc jobs submit pyspark --cluster "$MY_CLUSTER" \ - --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \ + --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.1.0.jar \ --region us-central1 examples/SpannerSpark.py ``` ## Usage -The connector supports exporting both tables and graphs from Spanner. -It uses the cross language -[Spark SQL Data Source API](https://spark.apache.org/docs/latest/sql-data-sources.html) -to communicate with the -[Spanner Java library](https://github.com/googleapis/java-spanner). +The connector uses the cross language [Spark SQL Data Source API](https://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources): -### Exporting Spanner Tables +### Reading data from a Spanner table This is an example of using Python code to connect to a Spanner table. You can find more examples or documentations on the [usage](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html). -```python +``` from pyspark.sql import SparkSession spark = SparkSession.builder.appName('Spanner Connect App').getOrCreate() @@ -73,22 +68,11 @@ df = spark.read.format('cloud-spanner') \ .load() df.show() ``` +For the other languages support, you can refer to [Scala](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html), [Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/Dataset.html), and [R](https://spark.apache.org/docs/latest/api/R/reference/SparkDataFrame.html). You can also refer [Scala, Java](https://cloud.google.com/sdk/gcloud/reference/dataproc/jobs/submit/spark), [R](https://cloud.google.com/sdk/gcloud/reference/dataproc/jobs/submit/spark-r) about how to submit a job for other languages. -For support of other languages, you can refer to -[Scala](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Dataset.html), -[Java](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/Dataset.html), -and -[R](https://spark.apache.org/docs/latest/api/R/reference/SparkDataFrame.html). -You can also refer to -[Scala, Java](https://cloud.google.com/sdk/gcloud/reference/dataproc/jobs/submit/spark), -and -[R](https://cloud.google.com/sdk/gcloud/reference/dataproc/jobs/submit/spark-r) -about how to submit a job for other languages. +### Properties -#### Table Connector Options - -Here are the options supported in the Spark Spanner connector for reading -tables. +Here are the options supported in the Spark Spanner connector. Variable|Validation|Comments ---|---|--- @@ -98,175 +82,7 @@ databaseId|String|The databaseID of the Cloud Spanner database table|String|The Table of the Cloud Spanner database that you are reading from enableDataboost|Boolean|Enable the [Data Boost](https://cloud.google.com/spanner/docs/databoost/databoost-overview), which provides independent compute resources to query Spanner with near-zero impact to existing workloads. Note the option may trigger [extra charge](https://cloud.google.com/spanner/pricing#spanner-data-boost-pricing). -### Exporting Spanner Graphs - -To export [Spanner Graphs](https://cloud.google.com/spanner/docs/graph/overview), -please use the Python class `SpannerGraphConnector` included in the jar. - -The connector supports exporting the graph into separate node and edge -DataFrames, and exporting the graph into -[GraphFrames](https://graphframes.github.io/graphframes/docs/_site/user-guide.html) -directly. - -This is an example of exporting a graph from Spanner as a GraphFrame: - -```python -from pyspark.sql import SparkSession - -spark = (SparkSession.builder.appName("spanner-graphframe-graphx-example") - .config("spark.jars.packages", "graphframes:graphframes:0.8.4-spark3.5-s_2.12") - .config("spark.jars", path_to_connector_jar) - .getOrCreate()) - -spark.sparkContext.addPyFile(path_to_connector_jar) -from spannergraph import SpannerGraphConnector - -connector = (SpannerGraphConnector() - .spark(spark) - .project("$YourProjectId") - .instance("$YourInstanceId") - .database("$YourDatabaseId") - .graph("$YourGraphId")) - -g = connector.load_graph() -g.vertices.show() -g.edges.show() -``` - -To export node and edge DataFrames instead of GraphFrames, please use -`load_dfs` instead: - -```python -df_vertices, df_edges, df_id_map = connector.load_dfs() -``` - -#### Node ID Mapping - -While Spanner Graph allows nodes to be identified with more than one element -key, many libraries for processing graphs, including GraphFrames, expect only -one ID field, ideally integers. - -When node IDs are not integers, the connector assigns a unique integer ID to -each row in node tables and maps node keys in edge tables to integer IDs with -DataFrame joins by default. Please use `load_graph_and_mapping` or `load_dfs` -to retrieve the mapping when loading a graph: - -```python -g, df_id_map = connector.load_graph_and_mapping() -``` - -or - -```python -df_vertices, df_edges, df_id_map = connector.load_dfs() -``` - -If you do not want to let the connector perform this mapping, please specify -`.export_string_ids(True)` to let the connector output string concatenations of -table IDs (generated by the connector based on the graph schema) and element -keys directly. The format of the concatenated strings is -`{table_id}@{key_1}|{key_2}|{key_3}|...`, where element keys joined with `|` as -the separator, and `\ ` being used as the escape character. For example, the -string ID of a node with table ID `1` and keys `(a, b|b, c\c)` will be -`1@a|b\|b|c\\c`. - -#### Graph Connector Options - -Here is a summary of the options supported by the graph connector. -Please refer to the API documentation of -[`SpannerGraphConnector`](python/spannergraph/_connector.py) for details. - -##### Required - -| Option | Summary of Purpose | -|-------------------------|-----------------------------------------------------------------------------------------------------------------------| -| spark | The spark session to read graph to | -| project | ID of the Google Cloud project containing the graph | -| instance | ID of the Spanner instance containing the graph | -| database | ID of the Spanner database containing the graph | -| graph | Name of the graph as defined in the database schema | - -##### Optional - -| Option | Summary of Purpose | Default | -|-------------------------|-----------------------------------------------------------------------------------------------------------------------|----------------------------------------------------| -| data_boost | Enable [Data Boost](https://cloud.google.com/spanner/docs/databoost/databoost-overview) | Disabled | -| partition_size_bytes | The [partitionSizeBytes](https://cloungd.google.com/spanner/docs/reference/rest/v1/PartitionOptions) hint for Spanner | No hint provided | -| repartition | Enable repartitioning of node and edge DataFrames and set the target number of partitions | No repartitioning | -| read_timestamp | The timestamp of the snapshot to read from | Read the snapshot at the time when load is called | -| symmetrize_graph | Symmetrizes the output graph by adding reverse edges | No symmetrization | -| export_string_ids | Output string concatenations of the element keys instead of assigning integer IDs and performing joins | Output integer IDs | -| node_label / edge_label | Specify label element filters, additional properties to fetch, and element-wise property filters (details below) | Export all nodes and edges and no element property | -| node_query / edge_query | Overwrite the queries used to fetch nodes and edges (details below) | Use queries generated by the connector | - -#### Filters and Element Properties - -You can choose to include only graph elements with specific labels by providing -`node_label` and/or `edge_label` options. `node_label` and `edge_label` can also -be used to specify element properties to include in the output and additional -element-wise filters (i.e., WHERE clauses). The columns for the returned -properties will be prefixed with "property_" to avoid naming conflicts (e.g., -when fetching a property named "id"). - -To fetch additional properties or specify an element-wise filter without -performing any filtering by label, please use `"*"` to match any label. Other -label filters of the same type (node/edge) cannot be used if a `"*"` label -filter is specified for that type. - -This example fetches all nodes with their "name" property, all "KNOWS" edges -with their "SingerId" and "FriendId" properties, and all "CREATES_MUSIC" edges -with a release date after 1900-01-01: - -```python -connector = (connector - .node_label("*", properties=["name"]) - .edge_label("KNOWS", properties=["SingerId", "FriendId"]) - .edge_label("CREATES_MUSIC", where="release_date > '1900-01-01'")) -``` - -#### Direct Queries - -In addition to letting the connector generate queries to read nodes and edges -from Spanner, you can provide your own GQL queries with `node_query` and -`edge_query` to fetch the node and edge tables, with some restrictions: - -- The queries must be - [root-partitionable](https://cloud.google.com/spanner/docs/reads#read_data_in_parallel). -- The output columns must meet the following conditions: - - A column in the node DataFrame is named "id". - This column will be used to identify nodes. - - A column in the edge DataFrame is named "src". - This column will be used to identify source nodes. - - A column in the edge DataFrame is named "dst". - This column will be used to identify destination nodes. - -This example provides custom GQL queries to fetch the node and edge tables of -the graph: - -```python -node_query = "SELECT * FROM GRAPH_TABLE " \ - "(MusicGraph MATCH (n:SINGER) RETURN n.id AS id)" -edge_query = "SELECT * FROM GRAPH_TABLE " \ - "(MusicGraph MATCH -[e:KNOWS]-> " \ - "RETURN e.SingerId AS src, e.FriendId AS dst)" -connector = (connector - .node_query(node_query) - .edge_query(edge_query)) -``` - -#### Source and Destination Key Limitation - -Currently, the graph connector expects source_key and destination_key of an Edge -to match the node_element_key of the referenced source and destination Node -respectively -([Element Definition](https://cloud.google.com/spanner/docs/reference/standard-sql/graph-schema-statements#element_definition)). -For example, if an edge table *E* references a node table *N* as source nodes, -and *N* has a 2-part compound [node_c1, node_c2] as its node_element_key, the -source_key of *E* must also be a 2-part compound [edge_c1, edge_c2]. A partial -match, e.g. source_key = [edge_c1], can logically form a hypergraph and is not -supported. - -### Data Types +### Data types Here are the mappings for supported Spanner data types. @@ -283,7 +99,7 @@ NUMERIC |DecimalType | The NUMERIC will be converted to DecimalType with 38 pr STRING |StringType | TIMESTAMP|TimestampType| Only microseconds will be converted to Spark timestamp type. The range of timestamp is [0001-01-01 00:00:00, 9999-12-31 23:59:59.999999] -### Filter Pushdown +### Filtering The connector automatically computes column and pushdown filters the DataFrame's `SELECT` statement e.g. @@ -295,8 +111,6 @@ df.select("word") filters to the column `word` and pushed down the predicate filter `word = 'hamlet' or word = 'Claudius'`. Note filters containing ArrayType column is not pushed down. -Filter pushdown is currently not supported when exporting graphs. - ### Monitoring When Data Boost is enabled, the usage can be monitored by using Cloud Monitoring. The [page]([url](https://cloud.google.com/spanner/docs/databoost/databoost-monitor#use_to_track_usage)) explains how to do that step by step. The usage cannot be grouped by the Spark job id though. @@ -305,6 +119,10 @@ When Data Boost is enabled, the usage can be monitored by using Cloud Monitoring Dataproc [web interface]([url](https://cloud.google.com/dataproc/docs/concepts/accessing/cluster-web-interfaces)) can be used to debug especially to tune the performance. On the `YARN Application Timeline` page, it displays the execution timeline details for the executors and other functions. You can assign more workers if there are many tasks assigned to a same executor. +### Root-partitionable Query + +When DataBoost is enabled, all queries that are fed into Cloud Spanner must be root-partionable. Please see [`Read data in parallel`](https://cloud.google.com/spanner/docs/reads#read_data_in_parallel) for more details. If you encounter an issue related to partitioning when using this connector, it is probably that the table being read from is not supported. + ### PostgreSQL The connector supports the Spanner [PostgreSQL interface-enabled databases](https://cloud.google.com/spanner/docs/postgresql-interface#postgresql-components).