Skip to content

Commit 4ca3aba

Browse files
committed
wip
1 parent 35d1284 commit 4ca3aba

File tree

4 files changed

+121
-19
lines changed

4 files changed

+121
-19
lines changed

services/datamanager/builtin/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ type Config struct {
4949
ScheduledSyncDisabled bool `json:"sync_disabled"`
5050
SelectiveSyncerName string `json:"selective_syncer_name"`
5151
SyncIntervalMins float64 `json:"sync_interval_mins"`
52+
Flag bool `json:"flag"`
5253
}
5354

5455
// Validate returns components which will be depended upon weakly due to the above matcher.
@@ -119,6 +120,7 @@ func (c *Config) syncConfig(syncSensor sensor.Sensor, syncSensorEnabled bool, lo
119120
}
120121

121122
return datasync.Config{
123+
Flag: c.Flag,
122124
AdditionalSyncPaths: c.AdditionalSyncPaths,
123125
Tags: c.Tags,
124126
CaptureDir: c.getCaptureDir(),

services/datamanager/builtin/sync/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
// Config is the sync config from builtin.
1212
type Config struct {
13+
Flag bool
1314
// AdditionalSyncPaths defines the file system paths
1415
// that should be synced in addition to the CaptureDir.
1516
// Generally 3rd party programs will write arbitrary

services/datamanager/builtin/sync/sync.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging
388388
retry := newExponentialRetry(s.configCtx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) {
389389
msg := "error uploading data capture file %s, size: %s, md: %s"
390390
errMetadata := fmt.Sprintf(msg, captureFile.GetPath(), data.FormatBytesI64(captureFile.Size()), captureFile.ReadMetadata())
391-
bytesUploaded, err := uploadDataCaptureFile(ctx, captureFile, s.cloudConn, logger)
391+
bytesUploaded, err := uploadDataCaptureFile(ctx, captureFile, s.cloudConn, s.config.Flag, logger)
392392
if err != nil {
393393
return 0, errors.Wrap(err, errMetadata)
394394
}

services/datamanager/builtin/sync/upload_data_capture_file.go

+117-18
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sync
33
import (
44
"context"
55
"fmt"
6+
"io"
67

78
"github.com/docker/go-units"
89
"github.com/go-viper/mapstructure/v2"
@@ -25,34 +26,32 @@ var MaxUnaryFileSize = int64(units.MB)
2526
// uses StreamingDataCaptureUpload API so as to not exceed the unary response size.
2627
// Otherwise, uploads data over DataCaptureUpload API.
2728
// Note: the bytes size returned is the size of the input file. It only returns a non 0 value in the success case.
28-
func uploadDataCaptureFile(ctx context.Context, f *data.CaptureFile, conn cloudConn, logger logging.Logger) (uint64, error) {
29+
func uploadDataCaptureFile(ctx context.Context, f *data.CaptureFile, conn cloudConn, flag bool, logger logging.Logger) (uint64, error) {
2930
logger.Debugf("preparing to upload data capture file: %s, size: %d", f.GetPath(), f.Size())
3031

3132
md := f.ReadMetadata()
33+
34+
// camera.GetImages is a special case. For that API we make 2 binary data upload requests
35+
if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && md.GetMethodName() == data.GetImages {
36+
return uint64(f.Size()), uploadGetImages(ctx, conn, md, f, logger)
37+
}
38+
39+
metaData := uploadMetadata(conn.partID, md, md.GetFileExtension())
40+
if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && flag {
41+
return uint64(f.Size()), uploadChunkedBinaryData(ctx, conn.client, metaData, f, logger)
42+
}
43+
3244
sensorData, err := data.SensorDataFromCaptureFile(f)
3345
if err != nil {
3446
return 0, errors.Wrap(err, "error reading sensor data")
3547
}
3648

37-
// Do not attempt to upload a file without any sensor readings.
3849
if len(sensorData) == 0 {
3950
logger.Warnf("ignoring and deleting empty capture file without syncing it: %s", f.GetPath())
4051
// log here as this will delete a .capture file without uploading it and without moving it to the failed directory
4152
return 0, nil
4253
}
4354

44-
if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && len(sensorData) > 1 {
45-
return 0, fmt.Errorf("binary sensor data file with more than one sensor reading is not supported: %s", f.GetPath())
46-
}
47-
48-
// camera.GetImages is a special case. For that API we make 2 binary data upload requests
49-
if md.GetType() == v1.DataType_DATA_TYPE_BINARY_SENSOR && md.GetMethodName() == data.GetImages {
50-
logger.Debugf("attemping to upload camera.GetImages data: %s", f.GetPath())
51-
52-
return uint64(f.Size()), uploadGetImages(ctx, conn, md, sensorData[0], f.Size(), f.GetPath(), logger)
53-
}
54-
55-
metaData := uploadMetadata(conn.partID, md, md.GetFileExtension())
5655
return uint64(f.Size()), uploadSensorData(ctx, conn.client, metaData, sensorData, f.Size(), f.GetPath(), logger)
5756
}
5857

@@ -73,11 +72,26 @@ func uploadGetImages(
7372
ctx context.Context,
7473
conn cloudConn,
7574
md *v1.DataCaptureMetadata,
76-
sd *v1.SensorData,
77-
size int64,
78-
path string,
75+
f *data.CaptureFile,
7976
logger logging.Logger,
8077
) error {
78+
logger.Debugf("attemping to upload camera.GetImages data: %s", f.GetPath())
79+
80+
sensorData, err := data.SensorDataFromCaptureFile(f)
81+
if err != nil {
82+
return errors.Wrap(err, "error reading sensor data")
83+
}
84+
85+
if len(sensorData) == 0 {
86+
logger.Warnf("ignoring and deleting empty capture file without syncing it: %s", f.GetPath())
87+
// log here as this will delete a .capture file without uploading it and without moving it to the failed directory
88+
return nil
89+
}
90+
91+
if len(sensorData) > 1 {
92+
return fmt.Errorf("binary sensor data file with more than one sensor reading is not supported: %s", f.GetPath())
93+
}
94+
sd := sensorData[0]
8195
var res pb.GetImagesResponse
8296
if err := mapstructure.Decode(sd.GetStruct().AsMap(), &res); err != nil {
8397
return errors.Wrap(err, "failed to decode camera.GetImagesResponse")
@@ -100,7 +114,7 @@ func uploadGetImages(
100114
metadata := uploadMetadata(conn.partID, md, getFileExtFromImageFormat(img.GetFormat()))
101115
// TODO: This is wrong as the size describes the size of the entire GetImages response, but we are only
102116
// uploading one of the 2 images in that response here.
103-
if err := uploadSensorData(ctx, conn.client, metadata, newSensorData, size, path, logger); err != nil {
117+
if err := uploadSensorData(ctx, conn.client, metadata, newSensorData, f.Size(), f.GetPath(), logger); err != nil {
104118
return errors.Wrapf(err, "failed uploading GetImages image index: %d", i)
105119
}
106120
}
@@ -123,6 +137,45 @@ func getImagesTimestamps(res *pb.GetImagesResponse, sensorData *v1.SensorData) (
123137
return timeRequested, timeReceived
124138
}
125139

140+
func uploadChunkedBinaryData(
141+
ctx context.Context,
142+
client v1.DataSyncServiceClient,
143+
uploadMD *v1.UploadMetadata,
144+
f *data.CaptureFile,
145+
logger logging.Logger,
146+
) error {
147+
// If it's a large binary file, we need to upload it in chunks.
148+
logger.Debugf("attempting to upload large binary file using StreamingDataCaptureUpload, file: %s", f.GetPath())
149+
var smd v1.SensorMetadata
150+
r, err := f.BinaryReader(&smd)
151+
if err != nil {
152+
return err
153+
}
154+
c, err := client.StreamingDataCaptureUpload(ctx)
155+
if err != nil {
156+
return errors.Wrap(err, "error creating StreamingDataCaptureUpload client")
157+
}
158+
159+
// First send metadata.
160+
streamMD := &v1.StreamingDataCaptureUploadRequest_Metadata{
161+
Metadata: &v1.DataCaptureUploadMetadata{
162+
UploadMetadata: uploadMD,
163+
SensorMetadata: &smd,
164+
},
165+
}
166+
if err := c.Send(&v1.StreamingDataCaptureUploadRequest{UploadPacket: streamMD}); err != nil {
167+
return errors.Wrap(err, "StreamingDataCaptureUpload failed sending metadata")
168+
}
169+
170+
// Then call the function to send the rest.
171+
if err := sendChunkedStreamingDCRequests(ctx, c, r, f.GetPath(), logger); err != nil {
172+
return errors.Wrap(err, "StreamingDataCaptureUpload failed to sync")
173+
}
174+
175+
_, err = c.CloseAndRecv()
176+
return errors.Wrap(err, "StreamingDataCaptureUpload CloseAndRecv failed")
177+
}
178+
126179
func uploadSensorData(
127180
ctx context.Context,
128181
client v1.DataSyncServiceClient,
@@ -171,6 +224,52 @@ func uploadSensorData(
171224
return errors.Wrap(err, "DataCaptureUpload failed")
172225
}
173226

227+
func sendChunkedStreamingDCRequests(
228+
ctx context.Context,
229+
stream v1.DataSyncService_StreamingDataCaptureUploadClient,
230+
r io.Reader,
231+
path string,
232+
logger logging.Logger,
233+
) error {
234+
chunk := make([]byte, UploadChunkSize)
235+
// Loop until there is no more content to send.
236+
chunkCount := 0
237+
for {
238+
select {
239+
case <-ctx.Done():
240+
return ctx.Err()
241+
default:
242+
n, errRead := r.Read(chunk)
243+
if n > 0 {
244+
// if there is data, send it
245+
// Build request with contents.
246+
uploadReq := &v1.StreamingDataCaptureUploadRequest{
247+
UploadPacket: &v1.StreamingDataCaptureUploadRequest_Data{
248+
Data: chunk[:n],
249+
},
250+
}
251+
252+
// Send request
253+
logger.Debugf("datasync.StreamingDataCaptureUpload sending chunk %d for file: %s", chunkCount, path)
254+
if errSend := stream.Send(uploadReq); errSend != nil {
255+
return errSend
256+
}
257+
}
258+
259+
// if we reached the end of the file return nil err (success)
260+
if errors.Is(errRead, io.EOF) {
261+
return nil
262+
}
263+
264+
// if Read hit an unexpected error, return the error
265+
if errRead != nil {
266+
return errRead
267+
}
268+
chunkCount++
269+
}
270+
}
271+
}
272+
174273
func sendStreamingDCRequests(
175274
ctx context.Context,
176275
stream v1.DataSyncService_StreamingDataCaptureUploadClient,

0 commit comments

Comments
 (0)