Skip to content

Commit 4407337

Browse files
committed
Make IO work
1 parent 2fd3083 commit 4407337

File tree

6 files changed

+408
-1115
lines changed

6 files changed

+408
-1115
lines changed

README.md

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,74 @@
1-
pandas Expressions
2-
==================
1+
# pandas Expressions
32

4-
Nothing to see.
3+
Lazy pandas API POC.
4+
5+
## Reading from parquet files
6+
7+
Prepare the parquet file:
8+
9+
```python
10+
import pandas as pd
11+
12+
pd.DataFrame({"a": [1, 2, 3], "b": ["x", "y", "z"], "c": 1, "d": 1.5}).to_parquet("test.parquet")
13+
```
14+
15+
```python
16+
from pandas_expr import read_parquet
17+
18+
df = read_parquet("test.parquet")
19+
result = df[df["b"] == "x"][["a", "c"]]
20+
```
21+
22+
Let's look at how this query looks:
23+
24+
```python
25+
result.pprint()
26+
27+
Projection: columns=['a', 'c']
28+
Filter:
29+
ReadParquet: path='test.parquet'
30+
EQ: right='x'
31+
Projection: columns='b'
32+
ReadParquet: path='test.parquet'
33+
```
34+
35+
No need to read all of the data, we can do better:
36+
37+
```python
38+
result.optimize().pprint()
39+
40+
ReadParquet: path='test.parquet' columns=['a', 'c'] filters=[[('b', '==', 'x')]]
41+
```
42+
43+
We pushed the column selection and the filter into the ``read_parquet`` call.
44+
45+
46+
## DataFrame constructor
47+
48+
The DataFrame constructor mirrors the regular pandas constructor, but it is
49+
lazy and does not trigger any actual computation.
50+
51+
```python
52+
from pandas_expr import DataFrame
53+
54+
df = DataFrame({"a": [1, 2, 3], "b": ["x", "y", "z"], "c": 1, "d": 1.5})
55+
df = df.replace(1, 5).fillna(100)[["a", "b"]]
56+
57+
df.pprint()
58+
59+
Projection: columns=['a', 'b']
60+
Fillna: value=100
61+
Replace: to_replace=1 value=5
62+
PandasIO: data={'a': [1, 2, 3], 'b': ['x', 'y', 'z'], 'c': 1, 'd': 1.5}
63+
```
64+
65+
We can again make this more efficient:
66+
67+
```python
68+
df.optimize(fuse=False).pprint()
69+
70+
Fillna: value=100
71+
Replace: to_replace=1 value=5
72+
Projection: columns=['a', 'b']
73+
PandasIO: data={'a': [1, 2, 3], 'b': ['x', 'y', 'z'], 'c': 1, 'd': 1.5}
74+
```

pandas_expr/_collection.py

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -616,44 +616,21 @@ def read_parquet(
616616
path=None,
617617
columns=None,
618618
filters=None,
619-
categories=None,
620-
index=None,
621619
storage_options=None,
622-
dtype_backend=None,
623-
calculate_divisions=False,
624-
ignore_metadata_file=False,
625-
metadata_task_size=None,
626-
split_row_groups="infer",
627-
blocksize="default",
628-
aggregate_files=None,
629-
parquet_file_extension=(".parq", ".parquet", ".pq"),
630-
filesystem="fsspec",
631-
**kwargs,
620+
dtype_backend=pd.api.extensions.no_default,
632621
):
633622
from pandas_expr.io.parquet import ReadParquet
634623

635624
if not isinstance(path, str):
636625
path = stringify_path(path)
637626

638-
kwargs["dtype_backend"] = dtype_backend
639-
640627
return new_collection(
641628
ReadParquet(
642629
path,
643630
columns=_convert_to_list(columns),
644631
filters=filters,
645-
categories=categories,
646-
index=index,
647632
storage_options=storage_options,
648-
calculate_divisions=calculate_divisions,
649-
ignore_metadata_file=ignore_metadata_file,
650-
metadata_task_size=metadata_task_size,
651-
split_row_groups=split_row_groups,
652-
blocksize=blocksize,
653-
aggregate_files=aggregate_files,
654-
parquet_file_extension=parquet_file_extension,
655-
filesystem=filesystem,
656-
kwargs=kwargs,
633+
dtype_backend=dtype_backend,
657634
)
658635
)
659636

pandas_expr/_concat.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,6 @@ def _simplify_up(self, parent):
5959
for frame, cols in zip(self._frames, columns_frame)
6060
]
6161
return type(parent)(
62-
type(self)(self.join, self._kwargs, *frames),
62+
type(self)(self.join, *frames),
6363
*parent.operands[1:],
6464
)

pandas_expr/io/csv.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
1+
import functools
2+
3+
import pandas as pd
4+
15
from pandas_expr.io.io import BlockwiseIO
26

37

48
class ReadCSV(BlockwiseIO):
5-
_parameters = ["filename", "usecols", "header", "_partitions", "storage_options"]
9+
_parameters = ["filename", "usecols", "header", "storage_options"]
610
_defaults = {
711
"usecols": None,
812
"header": "infer",
9-
"_partitions": None,
1013
"storage_options": None,
1114
}
15+
_keyword_only = ["usecols", "header", "storage_options"]
16+
operation = staticmethod(pd.read_csv)
1217

13-
@property
18+
@functools.cached_property
1419
def _meta(self):
15-
return self._ddf._meta
20+
return pd.read_csv(self.filename, **self._kwargs, nrows=1).iloc[:0]

0 commit comments

Comments
 (0)