Skip to content

Commit aaf25ba

Browse files
test: Add Spark write and Embucket read verification test (#1443)
* test: Add Spark write and Embucket read verification test * refactor: update test_read_spark with pytest logic, add pytest configuration and Spark session fixtures for testing * refactor: remove logging and AWS configuration from Spark tests --------- Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
1 parent 0367285 commit aaf25ba

File tree

3 files changed

+176
-0
lines changed

3 files changed

+176
-0
lines changed

test/integration_tests/conftest.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import pytest
2+
import os
3+
from pyspark.sql import SparkSession
4+
5+
CATALOG_URL = "http://localhost:8080"
6+
WAREHOUSE_ID = "test_db"
7+
AWS_REGION = os.getenv("AWS_REGION")
8+
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
9+
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
10+
11+
@pytest.fixture()
12+
def rest_spark_session():
13+
spark_session = spark_session_factory(config_type='file_catalog', app_name='FileCatalogTest')
14+
yield spark_session
15+
spark_session.stop()
16+
17+
18+
@pytest.fixture()
19+
def s3_spark_session():
20+
spark_session = spark_session_factory(config_type='s3_catalog', app_name='S3CatalogTest')
21+
yield spark_session
22+
spark_session.stop()
23+
24+
25+
def spark_session_factory(
26+
config_type: str,
27+
app_name: str,
28+
config_overrides: dict = None
29+
):
30+
"""
31+
The actual function that builds the SparkSession.
32+
33+
Args:
34+
config_type (str): The type of configuration to use.
35+
Expected values: 'file_catalog' or 's3_catalog'.
36+
app_name (str): The name for the Spark application.
37+
config_overrides (dict): A dictionary of Spark configs to add or
38+
override the base configuration.
39+
40+
Returns:
41+
SparkSession: An initialized SparkSession object.
42+
"""
43+
print(f"\nBuilding Spark session with config_type='{config_type}'...")
44+
builder = SparkSession.builder.appName(app_name)
45+
46+
# --- Configuration Type 1: REST Client with SimpleAWSCredentialsProvider ---
47+
if config_type == 'file_catalog':
48+
builder.config("spark.driver.memory", "15g") \
49+
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1") \
50+
.config("spark.driver.extraJavaOptions", "-Dlog4j.configurationFile=log4j2.properties") \
51+
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
52+
.config("spark.hadoop.fs.s3a.change.detection.mode", "error") \
53+
.config("spark.hadoop.fs.s3a.change.detection.version.required", "false") \
54+
.config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "true") \
55+
.config("spark.hadoop.fs.s3a.impl", "org.apache.iceberg.hadoop.HadoopFileIO") \
56+
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
57+
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
58+
.config("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog") \
59+
.config("spark.sql.catalog.rest.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") \
60+
.config("spark.sql.catalog.rest.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO") \
61+
.config("spark.sql.catalog.rest.uri", CATALOG_URL) \
62+
.config("spark.sql.catalog.rest.warehouse", WAREHOUSE_ID) \
63+
.config("spark.sql.defaultCatalog", "rest")
64+
65+
# --- Configuration Type 2: Direct S3 Access with Explicit Keys ---
66+
elif config_type == 's3_catalog':
67+
builder.config("spark.driver.memory", "15g") \
68+
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.9.1,org.apache.hadoop:hadoop-aws:3.3.4") \
69+
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
70+
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
71+
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
72+
.config("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog") \
73+
.config("spark.sql.catalog.rest.catalog-impl", "org.apache.iceberg.rest.RESTCatalog") \
74+
.config("spark.sql.catalog.rest.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO") \
75+
.config("spark.sql.catalog.rest.uri", CATALOG_URL) \
76+
.config("spark.sql.catalog.rest.warehouse", WAREHOUSE_ID) \
77+
.config("spark.sql.defaultCatalog", "rest")
78+
else:
79+
raise ValueError(f"Unknown config_type: '{config_type}'. Expected 'file_catalog' or 's3_catalog'.")
80+
81+
# Apply any specific overrides for the test
82+
if config_overrides:
83+
for key, value in config_overrides.items():
84+
builder.config(key, value)
85+
86+
spark_session = builder.getOrCreate()
87+
return spark_session

test/integration_tests/pytest.ini

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[pytest]
2+
log_cli = true
3+
log_cli_level = INFO
4+
log_cli_format = %(asctime)s %(levelname)s %(message)s
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import pandas as pd
2+
from clients import EmbucketClient
3+
4+
DB = "test_db"
5+
SCHEMA = "public"
6+
TABLE = "spark_embucket"
7+
8+
def get_embucket_client():
9+
emb = EmbucketClient()
10+
emb.volume()
11+
emb.sql(f"CREATE DATABASE IF NOT EXISTS {DB} EXTERNAL_VOLUME = 'test'")
12+
emb.sql(f"CREATE SCHEMA IF NOT EXISTS {DB}.{SCHEMA}")
13+
return emb
14+
15+
16+
def perform_spark_operations(spark):
17+
# create table
18+
spark.sql(
19+
f"""CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE} (
20+
id INT,
21+
page_name STRING,
22+
category STRING
23+
)"""
24+
)
25+
26+
# clear
27+
spark.sql(f"DELETE FROM {SCHEMA}.{TABLE} WHERE TRUE")
28+
29+
# insert rows
30+
spark.sql(
31+
f"""
32+
INSERT INTO {SCHEMA}.{TABLE} VALUES
33+
(1, 'page_1', 'category_1'),
34+
(2, 'is_object', 'category_2'),
35+
(3, 'page_3', 'Conditional_expression')
36+
"""
37+
)
38+
39+
# update
40+
spark.sql(
41+
f"UPDATE {SCHEMA}.{TABLE} SET category='updated_category' WHERE page_name='is_object'"
42+
)
43+
44+
# delete
45+
spark.sql(
46+
f"DELETE FROM {SCHEMA}.{TABLE} WHERE category='Conditional_expression'"
47+
)
48+
49+
50+
def read_and_validate_from_embucket(emb: EmbucketClient) -> pd.DataFrame:
51+
res = emb.sql(f"SELECT id, page_name, category FROM {DB}.{SCHEMA}.{TABLE} ORDER BY id")
52+
result = res.get("result", {})
53+
rows = result.get("rows", [])
54+
cols = [c["name"] for c in result.get("columns", [])]
55+
df = pd.DataFrame(rows, columns=cols)
56+
return df
57+
58+
59+
# --- test ---
60+
def test_spark_embucket_sync(rest_spark_session):
61+
"""
62+
Write via Spark (create/insert/update/delete) and verify Embucket sees those changes
63+
"""
64+
spark = rest_spark_session
65+
66+
# Perform Spark-side mutations
67+
perform_spark_operations(spark)
68+
69+
# Read back through Embucket
70+
embucket_client = get_embucket_client()
71+
df = read_and_validate_from_embucket(embucket_client)
72+
73+
# Validate update happened
74+
updated_mask = (df["page_name"] == "is_object") & (df["category"] == "updated_category")
75+
assert updated_mask.any(), "Expected row with page_name='is_object' to have category='updated_category'"
76+
77+
# Validate delete happened
78+
assert not (df["category"] == "Conditional_expression").any(), (
79+
"Expected no rows with category='Conditional_expression' after delete"
80+
)
81+
82+
# Check that unaffected row is still there
83+
assert ((df["page_name"] == "page_1") & (df["category"] == "category_1")).any(), (
84+
"Expected original row ('page_1','category_1') to persist"
85+
)

0 commit comments

Comments
 (0)