Skip to content

Commit a94e709

Browse files
huanliwang-dbzhengruifeng
authored andcommitted
[SPARK-53986][PYTHON][SS][TESTS] Reorganize Python streaming TWS test
### What changes were proposed in this pull request? Reorganize the Python TWS tests: - moving tws related tests to a new `/streaming` directory - further split the Python TWS tests to smaller ones to speed up the CI ### Why are the changes needed? Code refactoring ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? test is green ### Was this patch authored or co-authored using generative AI tooling? No Closes #52691 from huanliwang-db/huanliwang-db/split-test-2. Authored-by: huanliwang-db <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
1 parent 2b2a2a2 commit a94e709

16 files changed

+1489
-839
lines changed

dev/sparktestsupport/modules.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -662,8 +662,14 @@ def __hash__(self):
662662
"pyspark.sql.tests.streaming.test_streaming_foreach_batch",
663663
"pyspark.sql.tests.streaming.test_streaming_listener",
664664
"pyspark.sql.tests.pandas.test_pandas_grouped_map_with_state",
665-
"pyspark.sql.tests.pandas.test_pandas_transform_with_state",
666-
"pyspark.sql.tests.pandas.test_pandas_transform_with_state_checkpoint_v2",
665+
"pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state",
666+
"pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state_checkpoint_v2",
667+
"pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state_state_variable",
668+
"pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state_state_variable_checkpoint_v2", # noqa: E501
669+
"pyspark.sql.tests.pandas.streaming.test_transform_with_state",
670+
"pyspark.sql.tests.pandas.streaming.test_transform_with_state_checkpoint_v2",
671+
"pyspark.sql.tests.pandas.streaming.test_transform_with_state_state_variable",
672+
"pyspark.sql.tests.pandas.streaming.test_transform_with_state_state_variable_checkpoint_v2",
667673
],
668674
excluded_python_implementations=[
669675
"PyPy" # Skip these tests under PyPy since they require numpy and it isn't available there
@@ -1182,9 +1188,11 @@ def __hash__(self):
11821188
"pyspark.sql.tests.connect.streaming.test_parity_listener",
11831189
"pyspark.sql.tests.connect.streaming.test_parity_foreach",
11841190
"pyspark.sql.tests.connect.streaming.test_parity_foreach_batch",
1185-
"pyspark.sql.tests.connect.streaming.test_parity_transform_with_state_pyspark",
1186-
"pyspark.sql.tests.connect.pandas.test_parity_pandas_grouped_map_with_state",
1187-
"pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state",
1191+
"pyspark.sql.tests.connect.pandas.streaming.test_parity_pandas_grouped_map_with_state",
1192+
"pyspark.sql.tests.connect.pandas.streaming.test_parity_pandas_transform_with_state",
1193+
"pyspark.sql.tests.connect.pandas.streaming.test_parity_pandas_transform_with_state_state_variable", # noqa: E501
1194+
"pyspark.sql.tests.connect.pandas.streaming.test_parity_transform_with_state",
1195+
"pyspark.sql.tests.connect.pandas.streaming.test_parity_transform_with_state_state_variable", # noqa: E501
11881196
],
11891197
excluded_python_implementations=[
11901198
"PyPy" # Skip these tests under PyPy since they require numpy and it isn't available there
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#

python/pyspark/sql/tests/connect/pandas/test_parity_pandas_grouped_map_with_state.py renamed to python/pyspark/sql/tests/connect/pandas/streaming/test_parity_pandas_grouped_map_with_state.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class GroupedApplyInPandasWithStateTests(
2929

3030

3131
if __name__ == "__main__":
32-
from pyspark.sql.tests.connect.pandas.test_parity_pandas_grouped_map_with_state import * # noqa: F401,E501
32+
from pyspark.sql.tests.connect.pandas.streaming.test_parity_pandas_grouped_map_with_state import * # noqa: F401,E501
3333

3434
try:
3535
import xmlrunner

python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py renamed to python/pyspark/sql/tests/connect/pandas/streaming/test_parity_pandas_transform_with_state.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
#
1717
import unittest
1818

19-
from pyspark.sql.tests.pandas.test_pandas_transform_with_state import (
19+
from pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state import (
2020
TransformWithStateInPandasTestsMixin,
2121
)
2222
from pyspark import SparkConf
@@ -54,7 +54,7 @@ def test_schema_evolution_scenarios(self):
5454

5555

5656
if __name__ == "__main__":
57-
from pyspark.sql.tests.connect.pandas.test_parity_pandas_transform_with_state import * # noqa: F401,E501
57+
from pyspark.sql.tests.connect.pandas.streaming.test_parity_pandas_transform_with_state import * # noqa: F401,E501
5858

5959
try:
6060
import xmlrunner
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
import unittest
18+
19+
from pyspark.sql.tests.pandas.streaming.test_pandas_transform_with_state_state_variable import (
20+
TransformWithStateInPandasStateVariableTestsMixin,
21+
)
22+
from pyspark import SparkConf
23+
from pyspark.testing.connectutils import ReusedConnectTestCase
24+
25+
26+
class TransformWithStateInPandasStateVariableParityTests(
27+
TransformWithStateInPandasStateVariableTestsMixin, ReusedConnectTestCase
28+
):
29+
"""
30+
Spark connect parity tests for TransformWithStateInPandas. Run every test case in
31+
`TransformWithStateInPandasStateVariableTestsMixin` in spark connect mode.
32+
"""
33+
34+
@classmethod
35+
def conf(cls):
36+
# Due to multiple inheritance from the same level, we need to explicitly setting configs in
37+
# both TransformWithStateInPandasStateVariableTestsMixin and ReusedConnectTestCase here
38+
cfg = SparkConf(loadDefaults=False)
39+
for base in cls.__bases__:
40+
if hasattr(base, "conf"):
41+
parent_cfg = base.conf()
42+
for k, v in parent_cfg.getAll():
43+
cfg.set(k, v)
44+
45+
# Extra removing config for connect suites
46+
if cfg._jconf is not None:
47+
cfg._jconf.remove("spark.master")
48+
49+
return cfg
50+
51+
52+
if __name__ == "__main__":
53+
from pyspark.sql.tests.connect.pandas.streaming.test_parity_pandas_transform_with_state_state_variable import * # noqa: F401,E501
54+
55+
try:
56+
import xmlrunner
57+
58+
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
59+
except ImportError:
60+
testRunner = None
61+
unittest.main(testRunner=testRunner, verbosity=2)

python/pyspark/sql/tests/connect/streaming/test_parity_transform_with_state_pyspark.py renamed to python/pyspark/sql/tests/connect/pandas/streaming/test_parity_transform_with_state.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
#
1717
import unittest
1818

19-
from pyspark.sql.tests.pandas.test_pandas_transform_with_state import (
19+
from pyspark.sql.tests.pandas.streaming.test_transform_with_state import (
2020
TransformWithStateInPySparkTestsMixin,
2121
)
2222
from pyspark import SparkConf
@@ -54,7 +54,7 @@ def test_schema_evolution_scenarios(self):
5454

5555

5656
if __name__ == "__main__":
57-
from pyspark.sql.tests.connect.streaming.test_parity_transform_with_state_pyspark import * # noqa: F401,E501
57+
from pyspark.sql.tests.connect.pandas.streaming.test_parity_transform_with_state import * # noqa: F401,E501
5858

5959
try:
6060
import xmlrunner
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
import unittest
18+
19+
from pyspark.sql.tests.pandas.streaming.test_transform_with_state_state_variable import (
20+
TransformWithStateInPySparkStateVariableTestsMixin,
21+
)
22+
from pyspark import SparkConf
23+
from pyspark.testing.connectutils import ReusedConnectTestCase
24+
25+
26+
class TransformWithStateInPySparkStateVariableParityTests(
27+
TransformWithStateInPySparkStateVariableTestsMixin, ReusedConnectTestCase
28+
):
29+
"""
30+
Spark connect parity tests for TransformWithStateInPySparkStateVariable. Run every test case in
31+
`TransformWithStateInPySparkStateVariableTestsMixin` in spark connect mode.
32+
"""
33+
34+
@classmethod
35+
def conf(cls):
36+
# Due to multiple inheritance from the same level, we need to explicitly setting configs in
37+
# both TransformWithStateInPySparkStateVariableTestsMixin and ReusedConnectTestCase here
38+
cfg = SparkConf(loadDefaults=False)
39+
for base in cls.__bases__:
40+
if hasattr(base, "conf"):
41+
parent_cfg = base.conf()
42+
for k, v in parent_cfg.getAll():
43+
cfg.set(k, v)
44+
45+
# Extra removing config for connect suites
46+
if cfg._jconf is not None:
47+
cfg._jconf.remove("spark.master")
48+
49+
return cfg
50+
51+
52+
if __name__ == "__main__":
53+
from pyspark.sql.tests.connect.pandas.streaming.test_parity_transform_with_state_state_variable import * # noqa: F401,E501
54+
55+
try:
56+
import xmlrunner
57+
58+
testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2)
59+
except ImportError:
60+
testRunner = None
61+
unittest.main(testRunner=testRunner, verbosity=2)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#

0 commit comments

Comments
 (0)