Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 43 additions & 37 deletions python/pyspark/sql/pandas/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ def toPandas(self) -> "PandasDataFrameLike":
from pyspark.sql.pandas.utils import require_minimum_pyarrow_version

require_minimum_pyarrow_version()
to_arrow_schema(self.schema, prefers_large_types=jconf.arrowUseLargeVarTypes())
arrow_schema = to_arrow_schema(
self.schema, prefers_large_types=jconf.arrowUseLargeVarTypes()
)
except Exception as e:
if jconf.arrowPySparkFallbackEnabled():
msg = (
Expand Down Expand Up @@ -112,41 +114,40 @@ def toPandas(self) -> "PandasDataFrameLike":

self_destruct = jconf.arrowPySparkSelfDestructEnabled()
batches = self._collect_as_arrow(split_batches=self_destruct)
if len(batches) > 0:
table = pa.Table.from_batches(batches)
# Ensure only the table has a reference to the batches, so that
# self_destruct (if enabled) is effective
del batches
# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
# values, but we should use datetime.date to match the behavior with when
# Arrow optimization is disabled.
pandas_options = {
"date_as_object": True,
"coerce_temporal_nanoseconds": True,
}
if self_destruct:
# Configure PyArrow to use as little memory as possible:
# self_destruct - free columns as they are converted
# split_blocks - create a separate Pandas block for each column
# use_threads - convert one column at a time
pandas_options.update(
{
"self_destruct": True,
"split_blocks": True,
"use_threads": False,
}
)
# Rename columns to avoid duplicated column names.
pdf = table.rename_columns(
[f"col_{i}" for i in range(table.num_columns)]
).to_pandas(**pandas_options)

# Rename back to the original column names.
pdf.columns = self.columns
# Rename columns to avoid duplicated column names.
temp_col_names = [f"col_{i}" for i in range(len(self.columns))]
if len(batches) > 0:
table = pa.Table.from_batches(batches).rename_columns(temp_col_names)
else:
pdf = pd.DataFrame(columns=self.columns)
# empty dataset
table = arrow_schema.empty_table().rename_columns(temp_col_names)

# Ensure only the table has a reference to the batches, so that
# self_destruct (if enabled) is effective
del batches

# Pandas DataFrame created from PyArrow uses datetime64[ns] for date type
# values, but we should use datetime.date to match the behavior with when
# Arrow optimization is disabled.
pandas_options = {
"date_as_object": True,
"coerce_temporal_nanoseconds": True,
}
if self_destruct:
# Configure PyArrow to use as little memory as possible:
# self_destruct - free columns as they are converted
# split_blocks - create a separate Pandas block for each column
# use_threads - convert one column at a time
pandas_options.update(
{
"self_destruct": True,
"split_blocks": True,
"use_threads": False,
}
)

if len(pdf.columns) > 0:
if len(self.columns) > 0:
timezone = jconf.sessionLocalTimeZone()
struct_in_pandas = jconf.pandasStructHandlingMode()

Expand All @@ -155,21 +156,26 @@ def toPandas(self) -> "PandasDataFrameLike":
error_on_duplicated_field_names = True
struct_in_pandas = "dict"

return pd.concat(
pdf = pd.concat(
[
_create_converter_to_pandas(
field.dataType,
field.nullable,
timezone=timezone,
struct_in_pandas=struct_in_pandas,
error_on_duplicated_field_names=error_on_duplicated_field_names,
)(pser)
for (_, pser), field in zip(pdf.items(), self.schema.fields)
)(arrow_col.to_pandas(**pandas_options))
for arrow_col, field in zip(table.columns, self.schema.fields)
],
axis="columns",
)
else:
return pdf
# empty columns
pdf = table.to_pandas(**pandas_options)

pdf.columns = self.columns
return pdf

except Exception as e:
# We might have to allow fallback here as well but multiple Spark jobs can
# be executed. So, simply fail in this case for now.
Expand Down