@@ -124,8 +124,57 @@ func (t *FSTree) extractHeaderAndStream(id oid.ID, f *os.File) (*objectSDK.Objec
124
124
125
125
// readHeaderAndPayload reads an object header from the file and returns reader for payload.
126
126
// This function takes ownership of the io.ReadCloser and will close it if it does not return it.
127
- func (t * FSTree ) readHeaderAndPayload (f io.ReadCloser , initial []byte ) (* objectSDK.Object , io.ReadSeekCloser , error ) {
127
+ func (t * FSTree ) readHeaderAndPayload (f io.ReadSeekCloser , initial []byte ) (* objectSDK.Object , io.ReadSeekCloser , error ) {
128
128
var err error
129
+ var hLen , pLen uint32
130
+ if len (initial ) >= streamDataOff {
131
+ hLen , pLen = parseStreamPrefix (initial )
132
+ } else {
133
+ var p []byte
134
+ copy (p [:], initial )
135
+ _ , err := io .ReadFull (f , p [len (initial ):])
136
+ if err != nil && ! errors .Is (err , io .EOF ) && ! errors .Is (err , io .ErrUnexpectedEOF ) {
137
+ return nil , f , fmt .Errorf ("read stream prefix: %w" , err )
138
+ }
139
+ hLen , pLen = parseStreamPrefix (p )
140
+ if hLen == 0 {
141
+ initial = p [:]
142
+ }
143
+ }
144
+ if hLen > 0 {
145
+ initial = initial [streamDataOff :]
146
+ var header []byte
147
+ if len (initial ) < int (hLen ) {
148
+ header = make ([]byte , hLen )
149
+ copy (header , initial )
150
+ _ , err = io .ReadFull (f , header [len (initial ):])
151
+ if err != nil {
152
+ return nil , nil , fmt .Errorf ("read stream header: %w" , err )
153
+ }
154
+ initial = header
155
+ }
156
+ header = initial [:hLen ]
157
+ var obj objectSDK.Object
158
+ err = obj .Unmarshal (header )
159
+ if err != nil {
160
+ return nil , nil , fmt .Errorf ("unmarshal object: %w" , err )
161
+ }
162
+
163
+ data := initial [hLen :]
164
+ reader := io .LimitReader (io .MultiReader (bytes .NewReader (data ), f ), int64 (pLen ))
165
+ if t .IsCompressed (data ) {
166
+ decoder , err := zstd .NewReader (reader )
167
+ if err != nil {
168
+ return nil , nil , fmt .Errorf ("zstd decoder: %w" , err )
169
+ }
170
+ reader = decoder .IOReadCloser ()
171
+ }
172
+ return & obj , & payloadReader {
173
+ Reader : reader ,
174
+ close : f .Close ,
175
+ }, nil
176
+ }
177
+
129
178
if len (initial ) < objectSDK .MaxHeaderLen {
130
179
_ = f .Close ()
131
180
initial , err = t .Decompress (initial )
@@ -168,80 +217,98 @@ func (t *FSTree) readUntilPayload(f io.ReadCloser, initial []byte) (*objectSDK.O
168
217
initial = buf [:n ]
169
218
}
170
219
171
- obj , rest , err := extractHeaderAndPayload (initial )
220
+ var (
221
+ obj object.Object
222
+ res objectSDK.Object
223
+ )
224
+
225
+ _ , offset , err := extractHeaderAndPayload (initial , func (num int , val []byte ) error {
226
+ switch num {
227
+ case fieldObjectID :
228
+ obj .ObjectId = new (refs.ObjectID )
229
+ err := proto .Unmarshal (val , obj .ObjectId )
230
+ if err != nil {
231
+ return fmt .Errorf ("unmarshal object ID: %w" , err )
232
+ }
233
+ case fieldObjectSignature :
234
+ obj .Signature = new (refs.Signature )
235
+ err := proto .Unmarshal (val , obj .Signature )
236
+ if err != nil {
237
+ return fmt .Errorf ("unmarshal object signature: %w" , err )
238
+ }
239
+ case fieldObjectHeader :
240
+ obj .Header = new (object.Header )
241
+ err := proto .Unmarshal (val , obj .Header )
242
+ if err != nil {
243
+ return fmt .Errorf ("unmarshal object header: %w" , err )
244
+ }
245
+ default :
246
+ return fmt .Errorf ("unknown field number: %d" , num )
247
+ }
248
+ return nil
249
+ })
172
250
if err != nil {
173
251
_ = reader .Close ()
174
252
return nil , nil , fmt .Errorf ("extract header and payload: %w" , err )
175
253
}
176
254
177
- return obj , & payloadReader {
178
- Reader : io .MultiReader (bytes .NewReader (rest ), reader ),
255
+ err = res .FromProtoMessage (& obj )
256
+ if err != nil {
257
+ _ = reader .Close ()
258
+ return nil , nil , fmt .Errorf ("convert to objectSDK.Object: %w" , err )
259
+ }
260
+
261
+ return & res , & payloadReader {
262
+ Reader : io .MultiReader (bytes .NewReader (initial [offset :]), reader ),
179
263
close : reader .Close ,
180
264
}, nil
181
265
}
182
266
183
- // extractHeaderAndPayload extracts the header of an object from the given byte slice and returns rest of the data.
184
- func extractHeaderAndPayload (data []byte ) (* objectSDK.Object , []byte , error ) {
185
- var (
186
- offset int
187
- res objectSDK.Object
188
- obj object.Object
189
- )
267
+ // extractHeaderAndPayload processes the initial data to extract the header and payload
268
+ // fields of an object. It calls the provided dataHandler for each field found in the data.
269
+ // It returns the start offset of the header, the end offset of the payload, and an error if any.
270
+ func extractHeaderAndPayload (data []byte , dataHandler func (int , []byte ) error ) (int , int , error ) {
271
+ var offset , headerEnd int
190
272
191
273
if len (data ) == 0 {
192
- return nil , nil , fmt .Errorf ("empty data" )
274
+ return 0 , 0 , fmt .Errorf ("empty data" )
193
275
}
194
276
195
277
for offset < len (data ) {
196
278
num , typ , n := protowire .ConsumeTag (data [offset :])
197
279
if err := protowire .ParseError (n ); err != nil {
198
- return nil , nil , fmt .Errorf ("invalid tag at offset %d: %w" , offset , err )
280
+ return 0 , 0 , fmt .Errorf ("invalid tag at offset %d: %w" , offset , err )
199
281
}
200
282
offset += n
201
283
202
284
if typ != protowire .BytesType {
203
- return nil , nil , fmt .Errorf ("unexpected wire type: %v" , typ )
285
+ return 0 , 0 , fmt .Errorf ("unexpected wire type: %v" , typ )
204
286
}
205
287
206
288
if num == fieldObjectPayload {
289
+ headerEnd = offset - n
207
290
_ , n = binary .Varint (data [offset :])
208
291
if err := protowire .ParseError (n ); err != nil {
209
- return nil , nil , fmt .Errorf ("invalid varint at offset %d: %w" , offset , err )
292
+ return 0 , 0 , fmt .Errorf ("invalid varint at offset %d: %w" , offset , err )
210
293
}
211
294
offset += n
212
295
break
213
296
}
214
297
val , n := protowire .ConsumeBytes (data [offset :])
215
298
if err := protowire .ParseError (n ); err != nil {
216
- return nil , nil , fmt .Errorf ("invalid bytes field at offset %d: %w" , offset , err )
299
+ return 0 , 0 , fmt .Errorf ("invalid bytes field at offset %d: %w" , offset , err )
217
300
}
218
301
offset += n
219
302
220
- switch num {
221
- case fieldObjectID :
222
- obj .ObjectId = new (refs.ObjectID )
223
- err := proto .Unmarshal (val , obj .ObjectId )
224
- if err != nil {
225
- return nil , nil , fmt .Errorf ("unmarshal object ID: %w" , err )
226
- }
227
- case fieldObjectSignature :
228
- obj .Signature = new (refs.Signature )
229
- err := proto .Unmarshal (val , obj .Signature )
303
+ if dataHandler != nil {
304
+ err := dataHandler (int (num ), val )
230
305
if err != nil {
231
- return nil , nil , fmt .Errorf ("unmarshal object signature : %w" , err )
306
+ return 0 , 0 , fmt .Errorf ("data handler error at offset %d : %w" , offset , err )
232
307
}
233
- case fieldObjectHeader :
234
- obj .Header = new (object.Header )
235
- err := proto .Unmarshal (val , obj .Header )
236
- if err != nil {
237
- return nil , nil , fmt .Errorf ("unmarshal object header: %w" , err )
238
- }
239
- default :
240
- return nil , nil , fmt .Errorf ("unknown field number: %d" , num )
241
308
}
242
309
}
243
310
244
- return & res , data [ offset :], res . FromProtoMessage ( & obj )
311
+ return headerEnd , offset , nil
245
312
}
246
313
247
314
type payloadReader struct {
0 commit comments