Skip to content

Commit faba3a1

Browse files
authored
[Python] Support overwrite mode for writer (#6186)
1 parent e771c68 commit faba3a1

File tree

10 files changed

+425
-138
lines changed

10 files changed

+425
-138
lines changed

paimon-python/pypaimon/manifest/manifest_file_manager.py

Lines changed: 36 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@
2424
from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
2525
ManifestEntry)
2626
from pypaimon.manifest.schema.simple_stats import SimpleStats
27-
from pypaimon.table.row.binary_row import (BinaryRow, BinaryRowDeserializer,
27+
from pypaimon.table.row.binary_row import (BinaryRowDeserializer,
2828
BinaryRowSerializer)
29-
from pypaimon.write.commit_message import CommitMessage
3029

3130

3231
class ManifestFileManager:
@@ -99,46 +98,43 @@ def read(self, manifest_file_name: str, shard_filter=None) -> List[ManifestEntry
9998
entries.append(entry)
10099
return entries
101100

102-
def write(self, file_name, commit_messages: List[CommitMessage]):
101+
def write(self, file_name, entries: List[ManifestEntry]):
103102
avro_records = []
104-
for message in commit_messages:
105-
partition_bytes = BinaryRowSerializer.to_bytes(
106-
BinaryRow(list(message.partition), self.table.table_schema.get_partition_key_fields()))
107-
for file in message.new_files:
108-
avro_record = {
109-
"_VERSION": 2,
110-
"_KIND": 0,
111-
"_PARTITION": partition_bytes,
112-
"_BUCKET": message.bucket,
113-
"_TOTAL_BUCKETS": self.table.total_buckets,
114-
"_FILE": {
115-
"_FILE_NAME": file.file_name,
116-
"_FILE_SIZE": file.file_size,
117-
"_ROW_COUNT": file.row_count,
118-
"_MIN_KEY": BinaryRowSerializer.to_bytes(file.min_key),
119-
"_MAX_KEY": BinaryRowSerializer.to_bytes(file.max_key),
120-
"_KEY_STATS": {
121-
"_MIN_VALUES": BinaryRowSerializer.to_bytes(file.key_stats.min_values),
122-
"_MAX_VALUES": BinaryRowSerializer.to_bytes(file.key_stats.max_values),
123-
"_NULL_COUNTS": file.key_stats.null_counts,
124-
},
125-
"_VALUE_STATS": {
126-
"_MIN_VALUES": BinaryRowSerializer.to_bytes(file.value_stats.min_values),
127-
"_MAX_VALUES": BinaryRowSerializer.to_bytes(file.value_stats.max_values),
128-
"_NULL_COUNTS": file.value_stats.null_counts,
129-
},
130-
"_MIN_SEQUENCE_NUMBER": file.min_sequence_number,
131-
"_MAX_SEQUENCE_NUMBER": file.max_sequence_number,
132-
"_SCHEMA_ID": file.schema_id,
133-
"_LEVEL": file.level,
134-
"_EXTRA_FILES": file.extra_files,
135-
"_CREATION_TIME": file.creation_time,
136-
"_DELETE_ROW_COUNT": file.delete_row_count,
137-
"_EMBEDDED_FILE_INDEX": file.embedded_index,
138-
"_FILE_SOURCE": file.file_source,
139-
}
103+
for entry in entries:
104+
avro_record = {
105+
"_VERSION": 2,
106+
"_KIND": entry.kind,
107+
"_PARTITION": BinaryRowSerializer.to_bytes(entry.partition),
108+
"_BUCKET": entry.bucket,
109+
"_TOTAL_BUCKETS": entry.bucket,
110+
"_FILE": {
111+
"_FILE_NAME": entry.file.file_name,
112+
"_FILE_SIZE": entry.file.file_size,
113+
"_ROW_COUNT": entry.file.row_count,
114+
"_MIN_KEY": BinaryRowSerializer.to_bytes(entry.file.min_key),
115+
"_MAX_KEY": BinaryRowSerializer.to_bytes(entry.file.max_key),
116+
"_KEY_STATS": {
117+
"_MIN_VALUES": BinaryRowSerializer.to_bytes(entry.file.key_stats.min_values),
118+
"_MAX_VALUES": BinaryRowSerializer.to_bytes(entry.file.key_stats.max_values),
119+
"_NULL_COUNTS": entry.file.key_stats.null_counts,
120+
},
121+
"_VALUE_STATS": {
122+
"_MIN_VALUES": BinaryRowSerializer.to_bytes(entry.file.value_stats.min_values),
123+
"_MAX_VALUES": BinaryRowSerializer.to_bytes(entry.file.value_stats.max_values),
124+
"_NULL_COUNTS": entry.file.value_stats.null_counts,
125+
},
126+
"_MIN_SEQUENCE_NUMBER": entry.file.min_sequence_number,
127+
"_MAX_SEQUENCE_NUMBER": entry.file.max_sequence_number,
128+
"_SCHEMA_ID": entry.file.schema_id,
129+
"_LEVEL": entry.file.level,
130+
"_EXTRA_FILES": entry.file.extra_files,
131+
"_CREATION_TIME": entry.file.creation_time,
132+
"_DELETE_ROW_COUNT": entry.file.delete_row_count,
133+
"_EMBEDDED_FILE_INDEX": entry.file.embedded_index,
134+
"_FILE_SOURCE": entry.file.file_source,
140135
}
141-
avro_records.append(avro_record)
136+
}
137+
avro_records.append(avro_record)
142138

143139
manifest_path = self.manifest_path / file_name
144140
try:

paimon-python/pypaimon/read/plan.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,18 @@
1919
from dataclasses import dataclass
2020
from typing import List
2121

22+
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
2223
from pypaimon.read.split import Split
2324

2425

2526
@dataclass
2627
class Plan:
2728
"""Implementation of Plan for native Python reading."""
29+
_files: List[ManifestEntry]
2830
_splits: List[Split]
2931

32+
def files(self) -> List[ManifestEntry]:
33+
return self._files
34+
3035
def splits(self) -> List[Split]:
3136
return self._splits

paimon-python/pypaimon/read/table_scan.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,16 +73,25 @@ def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int],
7373
def plan(self) -> Plan:
7474
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
7575
if not latest_snapshot:
76-
return Plan([])
76+
return Plan([], [])
7777
manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
7878

79-
file_entries = []
79+
deleted_entries = set()
80+
added_entries = []
81+
# TODO: filter manifest files by predicate
8082
for manifest_file in manifest_files:
8183
manifest_entries = self.manifest_file_manager.read(manifest_file.file_name,
8284
lambda row: self._bucket_filter(row))
8385
for entry in manifest_entries:
8486
if entry.kind == 0:
85-
file_entries.append(entry)
87+
added_entries.append(entry)
88+
else:
89+
deleted_entries.add((tuple(entry.partition.values), entry.bucket, entry.file.file_name))
90+
91+
file_entries = [
92+
entry for entry in added_entries
93+
if (tuple(entry.partition.values), entry.bucket, entry.file.file_name) not in deleted_entries
94+
]
8695

8796
if self.predicate:
8897
file_entries = self._filter_by_predicate(file_entries)
@@ -100,7 +109,7 @@ def plan(self) -> Plan:
100109

101110
splits = self._apply_push_down_limit(splits)
102111

103-
return Plan(splits)
112+
return Plan(file_entries, splits)
104113

105114
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan':
106115
self.idx_of_this_subtask = idx_of_this_subtask

paimon-python/pypaimon/tests/py36/ao_read_write_test.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,87 @@
3232

3333
class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
3434

35+
def test_overwrite(self):
36+
simple_pa_schema = pa.schema([
37+
('f0', pa.int32()),
38+
('f1', pa.string())
39+
])
40+
schema = Schema.from_pyarrow_schema(simple_pa_schema, partition_keys=['f0'],
41+
options={'dynamic-partition-overwrite': 'false'})
42+
self.rest_catalog.create_table('default.test_overwrite', schema, False)
43+
table = self.rest_catalog.get_table('default.test_overwrite')
44+
read_builder = table.new_read_builder()
45+
46+
# test normal write
47+
write_builder = table.new_batch_write_builder()
48+
table_write = write_builder.new_write()
49+
table_commit = write_builder.new_commit()
50+
51+
df0 = pd.DataFrame({
52+
'f0': [1, 2],
53+
'f1': ['apple', 'banana'],
54+
})
55+
56+
table_write.write_pandas(df0)
57+
table_commit.commit(table_write.prepare_commit())
58+
table_write.close()
59+
table_commit.close()
60+
61+
table_scan = read_builder.new_scan()
62+
table_read = read_builder.new_read()
63+
actual_df0 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
64+
df0['f0'] = df0['f0'].astype('int32')
65+
pd.testing.assert_frame_equal(
66+
actual_df0.reset_index(drop=True), df0.reset_index(drop=True))
67+
68+
# test partially overwrite
69+
write_builder = table.new_batch_write_builder().overwrite({'f0': 1})
70+
table_write = write_builder.new_write()
71+
table_commit = write_builder.new_commit()
72+
73+
df1 = pd.DataFrame({
74+
'f0': [1],
75+
'f1': ['watermelon'],
76+
})
77+
78+
table_write.write_pandas(df1)
79+
table_commit.commit(table_write.prepare_commit())
80+
table_write.close()
81+
table_commit.close()
82+
83+
table_scan = read_builder.new_scan()
84+
table_read = read_builder.new_read()
85+
actual_df1 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
86+
expected_df1 = pd.DataFrame({
87+
'f0': [1, 2],
88+
'f1': ['watermelon', 'banana']
89+
})
90+
expected_df1['f0'] = expected_df1['f0'].astype('int32')
91+
pd.testing.assert_frame_equal(
92+
actual_df1.reset_index(drop=True), expected_df1.reset_index(drop=True))
93+
94+
# test fully overwrite
95+
write_builder = table.new_batch_write_builder().overwrite()
96+
table_write = write_builder.new_write()
97+
table_commit = write_builder.new_commit()
98+
99+
df2 = pd.DataFrame({
100+
'f0': [3],
101+
'f1': ['Neo'],
102+
})
103+
104+
table_write.write_pandas(df2)
105+
table_commit.commit(table_write.prepare_commit())
106+
table_write.close()
107+
table_commit.close()
108+
109+
table_scan = read_builder.new_scan()
110+
table_read = read_builder.new_read()
111+
actual_df2 = table_read.to_pandas(table_scan.plan().splits())
112+
df2['f0'] = df2['f0'].astype('int32')
113+
pd.testing.assert_frame_equal(
114+
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
115+
35116
def testParquetAppendOnlyReader(self):
36117
schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'])
37118
self.rest_catalog.create_table('default.test_append_only_parquet', schema, False)

paimon-python/pypaimon/tests/reader_basic_test.py

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,85 @@ def tearDownClass(cls):
6868
shutil.rmtree(cls.tempdir, ignore_errors=True)
6969

7070
def test_overwrite(self):
71-
pass
72-
# TODO: support overwrite
71+
simple_pa_schema = pa.schema([
72+
('f0', pa.int32()),
73+
('f1', pa.string())
74+
])
75+
schema = Schema.from_pyarrow_schema(simple_pa_schema, partition_keys=['f0'],
76+
options={'dynamic-partition-overwrite': 'false'})
77+
self.catalog.create_table('default.test_overwrite', schema, False)
78+
table = self.catalog.get_table('default.test_overwrite')
79+
read_builder = table.new_read_builder()
80+
81+
# test normal write
82+
write_builder = table.new_batch_write_builder()
83+
table_write = write_builder.new_write()
84+
table_commit = write_builder.new_commit()
85+
86+
df0 = pd.DataFrame({
87+
'f0': [1, 2],
88+
'f1': ['apple', 'banana'],
89+
})
90+
91+
table_write.write_pandas(df0)
92+
table_commit.commit(table_write.prepare_commit())
93+
table_write.close()
94+
table_commit.close()
95+
96+
table_scan = read_builder.new_scan()
97+
table_read = read_builder.new_read()
98+
actual_df0 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
99+
df0['f0'] = df0['f0'].astype('int32')
100+
pd.testing.assert_frame_equal(
101+
actual_df0.reset_index(drop=True), df0.reset_index(drop=True))
102+
103+
# test partially overwrite
104+
write_builder = table.new_batch_write_builder().overwrite({'f0': 1})
105+
table_write = write_builder.new_write()
106+
table_commit = write_builder.new_commit()
107+
108+
df1 = pd.DataFrame({
109+
'f0': [1],
110+
'f1': ['watermelon'],
111+
})
112+
113+
table_write.write_pandas(df1)
114+
table_commit.commit(table_write.prepare_commit())
115+
table_write.close()
116+
table_commit.close()
117+
118+
table_scan = read_builder.new_scan()
119+
table_read = read_builder.new_read()
120+
actual_df1 = table_read.to_pandas(table_scan.plan().splits()).sort_values(by='f0')
121+
expected_df1 = pd.DataFrame({
122+
'f0': [1, 2],
123+
'f1': ['watermelon', 'banana']
124+
})
125+
expected_df1['f0'] = expected_df1['f0'].astype('int32')
126+
pd.testing.assert_frame_equal(
127+
actual_df1.reset_index(drop=True), expected_df1.reset_index(drop=True))
128+
129+
# test fully overwrite
130+
write_builder = table.new_batch_write_builder().overwrite()
131+
table_write = write_builder.new_write()
132+
table_commit = write_builder.new_commit()
133+
134+
df2 = pd.DataFrame({
135+
'f0': [3],
136+
'f1': ['Neo'],
137+
})
138+
139+
table_write.write_pandas(df2)
140+
table_commit.commit(table_write.prepare_commit())
141+
table_write.close()
142+
table_commit.close()
143+
144+
table_scan = read_builder.new_scan()
145+
table_read = read_builder.new_read()
146+
actual_df2 = table_read.to_pandas(table_scan.plan().splits())
147+
df2['f0'] = df2['f0'].astype('int32')
148+
pd.testing.assert_frame_equal(
149+
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
73150

74151
def testWriteWrongSchema(self):
75152
self.catalog.create_table('default.test_wrong_schema',

0 commit comments

Comments
 (0)