Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/utitcase-spark-3.x.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ jobs:
test_modules+="org.apache.paimon:paimon-spark-${suffix},"
done
test_modules="${test_modules%,}"
mvn -T 2C -B test -pl "${test_modules}" -Duser.timezone=$jvm_timezone
mvn -T 2C -B verify -pl "${test_modules}" -Duser.timezone=$jvm_timezone
env:
MAVEN_OPTS: -Xmx4096m
2 changes: 1 addition & 1 deletion .github/workflows/utitcase-spark-4.x.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ jobs:
test_modules+="org.apache.paimon:paimon-spark-${suffix},"
done
test_modules="${test_modules%,}"
mvn -T 2C -B test -pl "${test_modules}" -Duser.timezone=$jvm_timezone -Pspark4,flink1
mvn -T 2C -B verify -pl "${test_modules}" -Duser.timezone=$jvm_timezone -Pspark4,flink1
env:
MAVEN_OPTS: -Xmx4096m
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ private static void adaptScanVersion(Options options, TagManager tagManager) {
} else if (version.chars().allMatch(Character::isDigit)) {
options.set(SCAN_SNAPSHOT_ID.key(), version);
} else {
// by here, the scan version should be a tag.
options.set(SCAN_TAG_NAME.key(), version);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, when querying a tag using the VERSION AS OF syntax, if a tag did not exist, the query would not throw an error but instead return the result of the latest snapshot, which is wrong. This is because the scan version was removed from the options during time travel.

throw new RuntimeException("Cannot find a time travel version for " + version);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,8 +592,9 @@ private void innerTest(String tableName, boolean hasPk, boolean partitioned) {
Dataset<Row> dataset = spark.read().format("paimon").load(tablePath.toString());
assertThat(dataset.select("order_id", "buyer_id", "dt").collectAsList().toString())
.isEqualTo("[[1,10,2022-07-20]]");
assertThat(dataset.select("coupon_info").collectAsList().toString())
.isEqualTo("[[WrappedArray(loyalty_discount, shipping_discount)]]");

RowTestHelper.checkRowEquals(
dataset.select("coupon_info"), row(array("loyalty_discount", "shipping_discount")));

// test drop table
assertThat(
Expand Down Expand Up @@ -647,37 +648,38 @@ public void testCreateAndDropNamespace() {
}

private void innerTestNestedType(Dataset<Row> dataset) {
List<Row> results = dataset.collectAsList();
assertThat(results.toString())
.isEqualTo(
"[[1,WrappedArray(AAA, BBB),[[1.0,WrappedArray(null)],1]], "
+ "[2,WrappedArray(CCC, DDD),[[null,WrappedArray(true)],null]], "
+ "[3,WrappedArray(null, null),[[2.0,WrappedArray(true, false)],2]], "
+ "[4,WrappedArray(null, EEE),[[3.0,WrappedArray(true, false, true)],3]]]");
RowTestHelper.checkRowEquals(
dataset,
Arrays.asList(
row(1, array("AAA", "BBB"), row(row(1.0, array(null)), 1L)),
row(2, array("CCC", "DDD"), row(row(null, array(true)), null)),
row(3, array(null, null), row(row(2.0, array(true, false)), 2L)),
row(4, array(null, "EEE"), row(row(3.0, array(true, false, true)), 3L))));

results = dataset.select("a").collectAsList();
assertThat(results.toString()).isEqualTo("[[1], [2], [3], [4]]");
RowTestHelper.checkRowEquals(
dataset.select("a"), Arrays.asList(row(1), row(2), row(3), row(4)));

results = dataset.select("c.c1").collectAsList();
assertThat(results.toString())
.isEqualTo(
"[[[1.0,WrappedArray(null)]], [[null,WrappedArray(true)]], "
+ "[[2.0,WrappedArray(true, false)]], "
+ "[[3.0,WrappedArray(true, false, true)]]]");
RowTestHelper.checkRowEquals(
dataset.select("c.c1"),
Arrays.asList(
row(row(1.0, array(null))),
row(row(null, array(true))),
row(row(2.0, array(true, false))),
row(row(3.0, array(true, false, true)))));

results = dataset.select("c.c2").collectAsList();
assertThat(results.toString()).isEqualTo("[[1], [null], [2], [3]]");
RowTestHelper.checkRowEquals(
dataset.select("c.c2"), Arrays.asList(row(1), row(null), row(2), row(3)));

results = dataset.select("c.c1.c11").collectAsList();
assertThat(results.toString()).isEqualTo("[[1.0], [null], [2.0], [3.0]]");
RowTestHelper.checkRowEquals(
dataset.select("c.c1.c11"), Arrays.asList(row(1.0), row(null), row(2.0), row(3.0)));

results = dataset.select("c.c1.c12").collectAsList();
assertThat(results.toString())
.isEqualTo(
"[[WrappedArray(null)], "
+ "[WrappedArray(true)], "
+ "[WrappedArray(true, false)], "
+ "[WrappedArray(true, false, true)]]");
RowTestHelper.checkRowEquals(
dataset.select("c.c1.c12"),
Arrays.asList(
row(array(null)),
row(array(true)),
row(array(true, false)),
row(array(true, false, true))));
}

private void innerTestSimpleTypeFilterPushDown(Dataset<Row> dataset) {
Expand All @@ -689,28 +691,27 @@ private void innerTestSimpleTypeFilterPushDown(Dataset<Row> dataset) {
}

private void innerTestNestedTypeFilterPushDown(Dataset<Row> dataset) {
List<Row> results = dataset.filter("a < 4").select("a").collectAsList();
assertThat(results.toString()).isEqualTo("[[1], [2], [3]]");
RowTestHelper.checkRowEquals(
dataset.filter("a < 4").select("a"), Arrays.asList(row(1), row(2), row(3)));

results = dataset.filter("array_contains(b, 'AAA')").select("b").collectAsList();
assertThat(results.toString()).isEqualTo("[[WrappedArray(AAA, BBB)]]");
RowTestHelper.checkRowEquals(
dataset.filter("array_contains(b, 'AAA')").select("b"), row(array("AAA", "BBB")));

results = dataset.filter("c.c1.c11 is null").select("a", "c").collectAsList();
assertThat(results.toString()).isEqualTo("[[2,[[null,WrappedArray(true)],null]]]");
RowTestHelper.checkRowEquals(
dataset.filter("c.c1.c11 is null").select("a", "c"),
row(2, row(row(null, array(true)), null)));

results = dataset.filter("c.c1.c11 = 1.0").select("a", "c.c1").collectAsList();
assertThat(results.toString()).isEqualTo("[[1,[1.0,WrappedArray(null)]]]");
RowTestHelper.checkRowEquals(
dataset.filter("c.c1.c11 = 1.0").select("a", "c.c1"),
row(1, row(1.0, array(null))));

results = dataset.filter("c.c2 is null").select("a", "c").collectAsList();
assertThat(results.toString()).isEqualTo("[[2,[[null,WrappedArray(true)],null]]]");
RowTestHelper.checkRowEquals(
dataset.filter("c.c2 is null").select("a", "c"),
row(2, row(row(null, array(true)), null)));

results =
dataset.filter("array_contains(c.c1.c12, false)")
.select("a", "c.c1.c12", "c.c2")
.collectAsList();
assertThat(results.toString())
.isEqualTo(
"[[3,WrappedArray(true, false),2], [4,WrappedArray(true, false, true),3]]");
RowTestHelper.checkRowEquals(
dataset.filter("array_contains(c.c1.c12, false)").select("a", "c.c1.c12", "c.c2"),
Arrays.asList(row(3, array(true, false), 2), row(4, array(true, false, true), 3)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import scala.collection.Seq;

import static org.assertj.core.api.Assertions.assertThat;

/** Base tests for spark read. */
Expand Down Expand Up @@ -251,4 +253,14 @@ protected String defaultShowCreateString(String table) {
protected String defaultShowCreateStringWithNonNullColumn(String table) {
return showCreateString(table, "a INT NOT NULL", "b BIGINT NOT NULL", "c STRING");
}

protected static Row row(Object... values) {
Object[] array = values != null ? values : new Object[] {null};
return RowTestHelper.row(array);
}

protected static Seq array(Object... values) {
Object[] array = values != null ? values : new Object[] {null};
return RowTestHelper.seq(array);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -183,10 +184,16 @@ public void testRenameColumn() {
Dataset<Row> table = spark.table("testRenameColumn");
results = table.select("bb", "c").collectAsList();
assertThat(results.toString()).isEqualTo("[[2,1], [6,3]]");

assertThatThrownBy(() -> table.select("b", "c"))
.isInstanceOf(AnalysisException.class)
// Messages vary across different Spark versions, only validating the common parts.
// Spark 4: A column, variable, or function parameter with name `b` cannot be
// resolved. Did you mean one of the following? [`a`, `bb`, `c`]
// Spark 3.5 and earlier versions: A column or function parameter with name `b`
// cannot be resolved. Did you mean one of the following? [`a`, `bb`, `c`]
.hasMessageContaining(
"A column or function parameter with name `b` cannot be resolved. Did you mean one of the following?");
"name `b` cannot be resolved. Did you mean one of the following? [`a`, `bb`, `c`]");
}

@Test
Expand Down Expand Up @@ -388,13 +395,15 @@ public void testUpdateColumnPosition() {
"Cannot move itself for column b"));

// missing column
// Messages vary across different Spark versions and there are no common parts, only
// validate the exception class
createTable("tableMissing");
assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissing ALTER COLUMN d FIRST"))
.hasMessageContaining("Missing field d in table paimon.default.tableMissing");
.isInstanceOf(AnalysisException.class);

createTable("tableMissingAfter");
assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissingAfter ALTER COLUMN a AFTER d"))
.hasMessageContaining("Missing field d in table paimon.default.tableMissingAfter");
.isInstanceOf(AnalysisException.class);
}

@Test
Expand Down Expand Up @@ -806,13 +815,12 @@ public void testAddAndDropNestedColumnInArray(String formatType) {
+ tableName
+ " VALUES (1, ARRAY(STRUCT('apple', 100), STRUCT('banana', 101))), "
+ "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog', 201)))");
assertThat(
spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList()
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
"[1,WrappedArray([apple,100], [banana,101])]",
"[2,WrappedArray([cat,200], [dog,201])]");

RowTestHelper.checkRowEquals(
spark.sql("SELECT * FROM paimon.default." + tableName),
Arrays.asList(
row(1, array(row("apple", 100), row("banana", 101))),
row(2, array(row("cat", 200), row("dog", 201)))));

spark.sql(
"ALTER TABLE paimon.default."
Expand All @@ -824,14 +832,13 @@ public void testAddAndDropNestedColumnInArray(String formatType) {
+ tableName
+ " VALUES (1, ARRAY(STRUCT(110, 'APPLE'), STRUCT(111, 'BANANA'))), "
+ "(3, ARRAY(STRUCT(310, 'FLOWER')))");
assertThat(
spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList()
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
"[1,WrappedArray([110,APPLE], [111,BANANA])]",
"[2,WrappedArray([200,null], [201,null])]",
"[3,WrappedArray([310,FLOWER])]");

RowTestHelper.checkRowEquals(
spark.sql("SELECT * FROM paimon.default." + tableName),
Arrays.asList(
row(1, array(row(110, "APPLE"), row(111, "BANANA"))),
row(2, array(row(200, null), row(201, null))),
row(3, array(row(310, "FLOWER")))));
}

@ParameterizedTest()
Expand Down Expand Up @@ -1012,13 +1019,12 @@ public void testUpdateNestedColumnTypeInArray(String formatType) {
+ tableName
+ " VALUES (1, ARRAY(STRUCT('apple', 100), STRUCT('banana', 101))), "
+ "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog', 201)))");
assertThat(
spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList()
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
"[1,WrappedArray([apple,100], [banana,101])]",
"[2,WrappedArray([cat,200], [dog,201])]");

RowTestHelper.checkRowEquals(
spark.sql("SELECT * FROM paimon.default." + tableName),
Arrays.asList(
row(1, array(row("apple", 100), row("banana", 101))),
row(2, array(row("cat", 200), row("dog", 201)))));

spark.sql(
"ALTER TABLE paimon.default."
Expand All @@ -1029,14 +1035,13 @@ public void testUpdateNestedColumnTypeInArray(String formatType) {
+ tableName
+ " VALUES (1, ARRAY(STRUCT('APPLE', 1000000000000), STRUCT('BANANA', 111))), "
+ "(3, ARRAY(STRUCT('FLOWER', 3000000000000)))");
assertThat(
spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList()
.stream()
.map(Row::toString))
.containsExactlyInAnyOrder(
"[1,WrappedArray([APPLE,1000000000000], [BANANA,111])]",
"[2,WrappedArray([cat,200], [dog,201])]",
"[3,WrappedArray([FLOWER,3000000000000])]");

RowTestHelper.checkRowEquals(
spark.sql("SELECT * FROM paimon.default." + tableName),
Arrays.asList(
row(1, array(row("APPLE", 1000000000000L), row("BANANA", 111))),
row(2, array(row("cat", 200), row("dog", 201))),
row(3, array(row("FLOWER", 3000000000000L)))));
}

@ParameterizedTest()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,7 @@ public void testTravelToNonExistingTag() {
() -> spark.sql("SELECT * FROM t VERSION AS OF 'unknown'").collectAsList())
.satisfies(
anyCauseMatches(
RuntimeException.class,
"Cannot find a time travel version for unknown"));
IllegalArgumentException.class, "Tag 'unknown' doesn't exist"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.QueryTest.checkAnswer
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

import scala.collection.JavaConverters._

/**
* A helper class for facilitating the comparison of Spark Row objects in Java unit tests, which
* leverages QueryTest.checkAnswer for the comparison.
*/
class RowTestHelper extends QueryTest {
override protected def spark: SparkSession = {
throw new UnsupportedOperationException("Not supported")
}
}

object RowTestHelper {
def checkRowEquals(df: DataFrame, expectedRows: java.util.List[Row]): Unit = {
checkAnswer(df, expectedRows)
}

def checkRowEquals(df: DataFrame, expectedRow: Row): Unit = {
checkAnswer(df, Seq(expectedRow))
}

def row(values: Array[Any]): Row = {
Row.fromSeq(values)
}

def seq(values: Array[Any]): Seq[Any] = values.toSeq
}
Loading