Skip to content

Commit a888d0a

Browse files
committed
wip
1 parent 36a4cca commit a888d0a

File tree

6 files changed

+251
-67
lines changed

6 files changed

+251
-67
lines changed

cli/data.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,7 @@ func filenameForDownload(meta *datapb.BinaryMetadata) string {
602602
}
603603

604604
// Replace reserved characters.
605-
fileName = data.CaptureFilePathWithReplacedReservedChars(fileName)
605+
fileName = data.FilePathWithReplacedReservedChars(fileName)
606606

607607
return fileName
608608
}

data/capture_buffer.go

+10-9
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ type CaptureBufferedWriter interface {
1515

1616
// CaptureBuffer is a persistent queue of SensorData backed by a series of *data.CaptureFile.
1717
type CaptureBuffer struct {
18-
Directory string
19-
MetaData *v1.DataCaptureMetadata
18+
directory string
19+
metaData *v1.DataCaptureMetadata
2020
nextFile *ProgFile
2121
lock sync.Mutex
2222
maxCaptureFileSize int64
@@ -25,8 +25,8 @@ type CaptureBuffer struct {
2525
// NewCaptureBuffer returns a new Buffer.
2626
func NewCaptureBuffer(dir string, md *v1.DataCaptureMetadata, maxCaptureFileSize int64) *CaptureBuffer {
2727
return &CaptureBuffer{
28-
Directory: dir,
29-
MetaData: md,
28+
directory: dir,
29+
metaData: md,
3030
maxCaptureFileSize: maxCaptureFileSize,
3131
}
3232
}
@@ -47,12 +47,13 @@ func isBinary(item *v1.SensorData) bool {
4747
return false
4848
}
4949
}
50+
5051
func (b *CaptureBuffer) Write(item *v1.SensorData) error {
5152
b.lock.Lock()
5253
defer b.lock.Unlock()
5354

5455
if isBinary(item) {
55-
binFile, err := NewProgFile(b.Directory, b.MetaData)
56+
binFile, err := NewProgFile(b.directory, b.metaData)
5657
if err != nil {
5758
return err
5859
}
@@ -66,19 +67,19 @@ func (b *CaptureBuffer) Write(item *v1.SensorData) error {
6667
}
6768

6869
if b.nextFile == nil {
69-
nextFile, err := NewProgFile(b.Directory, b.MetaData)
70+
nextFile, err := NewProgFile(b.directory, b.metaData)
7071
if err != nil {
7172
return err
7273
}
7374
b.nextFile = nextFile
7475
// We want to special case on "CaptureAllFromCamera" because it is sensor data that contains images
7576
// and their corresponding annotations. We want each image and its annotations to be stored in a
7677
// separate file.
77-
} else if b.nextFile.Size() > b.maxCaptureFileSize || b.MetaData.MethodName == "CaptureAllFromCamera" {
78+
} else if b.nextFile.Size() > b.maxCaptureFileSize || b.metaData.MethodName == "CaptureAllFromCamera" {
7879
if err := b.nextFile.Close(); err != nil {
7980
return err
8081
}
81-
nextFile, err := NewProgFile(b.Directory, b.MetaData)
82+
nextFile, err := NewProgFile(b.directory, b.metaData)
8283
if err != nil {
8384
return err
8485
}
@@ -104,5 +105,5 @@ func (b *CaptureBuffer) Flush() error {
104105

105106
// Path returns the path to the directory containing the backing data capture files.
106107
func (b *CaptureBuffer) Path() string {
107-
return b.Directory
108+
return b.directory
108109
}

data/capture_buffer_test.go

+76-8
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ func TestCaptureBufferReader(t *testing.T) {
267267
test.That(t, err, test.ShouldBeNil)
268268
defer func() { utils.UncheckedError(f.Close()) }()
269269

270-
cf, err := ReadCaptureFile(f)
270+
cf, err := NewCaptureFile(f)
271271
test.That(t, err, test.ShouldBeNil)
272272
test.That(t, cf.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata)
273273

@@ -335,7 +335,7 @@ func TestCaptureBufferReader(t *testing.T) {
335335
test.That(t, err, test.ShouldBeNil)
336336
defer func() { utils.UncheckedError(f2.Close()) }()
337337

338-
cf2, err := ReadCaptureFile(f2)
338+
cf2, err := NewCaptureFile(f2)
339339
test.That(t, err, test.ShouldBeNil)
340340
test.That(t, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata)
341341

@@ -470,7 +470,7 @@ func TestCaptureBufferReader(t *testing.T) {
470470
test.That(t, err, test.ShouldBeNil)
471471
defer func() { utils.UncheckedError(f.Close()) }()
472472

473-
cf, err := ReadCaptureFile(f)
473+
cf, err := NewCaptureFile(f)
474474
test.That(t, err, test.ShouldBeNil)
475475
test.That(t, cf.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata)
476476

@@ -508,7 +508,7 @@ func TestCaptureBufferReader(t *testing.T) {
508508
test.That(t, err, test.ShouldBeNil)
509509
defer func() { utils.UncheckedError(f2.Close()) }()
510510

511-
cf2, err := ReadCaptureFile(f2)
511+
cf2, err := NewCaptureFile(f2)
512512
test.That(t, err, test.ShouldBeNil)
513513
test.That(t, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata)
514514

@@ -563,7 +563,7 @@ func TestCaptureBufferReader(t *testing.T) {
563563
f3, err := os.Open(filepath.Join(b.Path(), newFileNames[0]))
564564
test.That(t, err, test.ShouldBeNil)
565565
defer func() { utils.UncheckedError(f3.Close()) }()
566-
cf3, err := ReadCaptureFile(f3)
566+
cf3, err := NewCaptureFile(f3)
567567
test.That(t, err, test.ShouldBeNil)
568568
test.That(t, cf3.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata)
569569
sd3, err := cf3.ReadNext()
@@ -576,7 +576,7 @@ func TestCaptureBufferReader(t *testing.T) {
576576
f4, err := os.Open(filepath.Join(b.Path(), newFileNames[1]))
577577
test.That(t, err, test.ShouldBeNil)
578578
defer func() { utils.UncheckedError(f4.Close()) }()
579-
cf4, err := ReadCaptureFile(f4)
579+
cf4, err := NewCaptureFile(f4)
580580
test.That(t, err, test.ShouldBeNil)
581581
test.That(t, cf4.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata)
582582
sd4, err := cf4.ReadNext()
@@ -644,7 +644,7 @@ func TestCaptureBufferReader(t *testing.T) {
644644
test.That(t, err, test.ShouldBeNil)
645645
defer func() { utils.UncheckedError(f.Close()) }()
646646

647-
cf2, err := ReadCaptureFile(f)
647+
cf2, err := NewCaptureFile(f)
648648
test.That(t, err, test.ShouldBeNil)
649649
test.That(t, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata)
650650

@@ -657,7 +657,75 @@ func TestCaptureBufferReader(t *testing.T) {
657657
})
658658
}
659659

660-
//nolint
660+
func NickTest(t *testing.T) {
661+
tmpDir := t.TempDir()
662+
name := resource.NewName(resource.APINamespaceRDK.WithComponentType("camera"), "my-cam")
663+
method := readImage
664+
additionalParams := map[string]string{"mime_type": rutils.MimeTypeJPEG, "test": "1"}
665+
tags := []string{"my", "tags"}
666+
methodParams, err := rprotoutils.ConvertStringMapToAnyPBMap(additionalParams)
667+
test.That(t, err, test.ShouldBeNil)
668+
669+
readImageCaptureMetadata := BuildCaptureMetadata(
670+
name.API,
671+
name.ShortName(),
672+
method,
673+
additionalParams,
674+
methodParams,
675+
tags,
676+
)
677+
678+
test.That(t, readImageCaptureMetadata, test.ShouldResemble, &v1.DataCaptureMetadata{
679+
ComponentName: "my-cam",
680+
ComponentType: "rdk:component:camera",
681+
MethodName: readImage,
682+
MethodParameters: methodParams,
683+
Tags: tags,
684+
Type: v1.DataType_DATA_TYPE_BINARY_SENSOR,
685+
FileExtension: ".jpeg",
686+
})
687+
688+
b := NewCaptureBuffer(tmpDir, readImageCaptureMetadata, int64(4*1024))
689+
690+
// Path() is the same as the first paramenter passed to NewCaptureBuffer
691+
test.That(t, b.Path(), test.ShouldResemble, tmpDir)
692+
test.That(t, b.MetaData, test.ShouldResemble, readImageCaptureMetadata)
693+
694+
now := time.Now()
695+
timeRequested := timestamppb.New(now.UTC())
696+
timeReceived := timestamppb.New(now.Add(time.Millisecond).UTC())
697+
msg := &v1.SensorData{
698+
Metadata: &v1.SensorMetadata{
699+
TimeRequested: timeRequested,
700+
TimeReceived: timeReceived,
701+
},
702+
Data: &v1.SensorData_Binary{
703+
Binary: []byte("this is a fake image"),
704+
},
705+
}
706+
test.That(t, b.Write(msg), test.ShouldBeNil)
707+
test.That(t, b.Flush(), test.ShouldBeNil)
708+
dirEntries, err := os.ReadDir(b.Path())
709+
test.That(t, err, test.ShouldBeNil)
710+
test.That(t, len(dirEntries), test.ShouldEqual, 1)
711+
test.That(t, filepath.Ext(dirEntries[0].Name()), test.ShouldResemble, CompletedCaptureFileExt)
712+
f, err := os.Open(filepath.Join(b.Path(), dirEntries[0].Name()))
713+
test.That(t, err, test.ShouldBeNil)
714+
defer func() { test.That(t, f.Close(), test.ShouldBeNil) }()
715+
716+
cf2, err := NewCaptureFile(f)
717+
test.That(t, err, test.ShouldBeNil)
718+
test.That(t, cf2.ReadMetadata(), test.ShouldResemble, readImageCaptureMetadata)
719+
720+
sd2, err := cf2.ReadNext()
721+
test.That(t, err, test.ShouldBeNil)
722+
test.That(t, sd2, test.ShouldResemble, msg)
723+
724+
_, err = cf2.ReadNext()
725+
test.That(t, err, test.ShouldBeError, io.EOF)
726+
}
727+
728+
// nolint
661729
func getCaptureFiles(dir string) (dcFiles, progFiles []string) {
662730
_ = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
663731
if err != nil {

data/capture_file.go

+46-48
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,18 @@ const (
3939
// length delimited protobuf messages, where the first message is the CaptureMetadata for the file, and ensuing
4040
// messages contain the captured data.
4141
type CaptureFile struct {
42-
path string
43-
lock sync.Mutex
44-
file *os.File
45-
size int64
46-
metadata *v1.DataCaptureMetadata
47-
42+
Metadata *v1.DataCaptureMetadata
43+
path string
44+
size int64
4845
initialReadOffset int64
49-
readOffset int64
46+
47+
lock sync.Mutex
48+
file *os.File
49+
readOffset int64
5050
}
5151

52-
// ReadCaptureFile creates a File struct from a passed os.File previously constructed using NewFile.
53-
func ReadCaptureFile(f *os.File) (*CaptureFile, error) {
52+
// NewCaptureFile creates a File struct from a passed os.File previously constructed using NewFile.
53+
func NewCaptureFile(f *os.File) (*CaptureFile, error) {
5454
if !IsDataCaptureFile(f) {
5555
return nil, errors.Errorf("%s is not a data capture file", f.Name())
5656
}
@@ -69,7 +69,7 @@ func ReadCaptureFile(f *os.File) (*CaptureFile, error) {
6969
path: f.Name(),
7070
file: f,
7171
size: finfo.Size(),
72-
metadata: md,
72+
Metadata: md,
7373
initialReadOffset: int64(initOffset),
7474
readOffset: int64(initOffset),
7575
}
@@ -79,7 +79,7 @@ func ReadCaptureFile(f *os.File) (*CaptureFile, error) {
7979

8080
// ReadMetadata reads and returns the metadata in f.
8181
func (f *CaptureFile) ReadMetadata() *v1.DataCaptureMetadata {
82-
return f.metadata
82+
return f.Metadata
8383
}
8484

8585
// ReadNext returns the next SensorData reading.
@@ -109,8 +109,6 @@ func (f *CaptureFile) Reset() {
109109

110110
// Size returns the size of the file.
111111
func (f *CaptureFile) Size() int64 {
112-
f.lock.Lock()
113-
defer f.lock.Unlock()
114112
return f.size
115113
}
116114

@@ -139,22 +137,6 @@ func IsDataCaptureFile(f *os.File) bool {
139137
return filepath.Ext(f.Name()) == CompletedCaptureFileExt
140138
}
141139

142-
// Create a filename based on the current time.
143-
func getFileTimestampName() string {
144-
// RFC3339Nano is a standard time format e.g. 2006-01-02T15:04:05Z07:00.
145-
return time.Now().Format(time.RFC3339Nano)
146-
}
147-
148-
// TODO DATA-246: Implement this in some more robust, programmatic way.
149-
func getDataType(methodName string) v1.DataType {
150-
switch methodName {
151-
case nextPointCloud, readImage, pointCloudMap, GetImages:
152-
return v1.DataType_DATA_TYPE_BINARY_SENSOR
153-
default:
154-
return v1.DataType_DATA_TYPE_TABULAR_SENSOR
155-
}
156-
}
157-
158140
// GetFileExt gets the file extension for a capture file.
159141
func GetFileExt(dataType v1.DataType, methodName string, parameters map[string]string) string {
160142
defaultFileExt := ""
@@ -188,22 +170,44 @@ func GetFileExt(dataType v1.DataType, methodName string, parameters map[string]s
188170
return defaultFileExt
189171
}
190172

191-
// SensorDataFromCaptureFilePath returns all readings in the file at filePath.
192-
// NOTE: (Nick S) At time of writing this is only used in tests.
193-
func SensorDataFromCaptureFilePath(filePath string) ([]*v1.SensorData, error) {
194-
//nolint:gosec
195-
f, err := os.Open(filePath)
196-
if err != nil {
197-
return nil, err
198-
}
199-
dcFile, err := ReadCaptureFile(f)
200-
if err != nil {
201-
return nil, err
202-
}
173+
// FilePathWithReplacedReservedChars returns the filepath with substitutions
174+
// for reserved characters.
175+
func FilePathWithReplacedReservedChars(filepath string) string {
176+
return strings.ReplaceAll(filepath, filePathReservedChars, "_")
177+
}
203178

204-
return SensorDataFromCaptureFile(dcFile)
179+
// Create a filename based on the current time.
180+
func getFileTimestampName() string {
181+
// RFC3339Nano is a standard time format e.g. 2006-01-02T15:04:05Z07:00.
182+
return time.Now().Format(time.RFC3339Nano)
183+
}
184+
185+
// TODO DATA-246: Implement this in some more robust, programmatic way.
186+
func getDataType(methodName string) v1.DataType {
187+
switch methodName {
188+
case nextPointCloud, readImage, pointCloudMap, GetImages:
189+
return v1.DataType_DATA_TYPE_BINARY_SENSOR
190+
default:
191+
return v1.DataType_DATA_TYPE_TABULAR_SENSOR
192+
}
205193
}
206194

195+
// SensorDataFromCaptureFilePath returns all readings in the file at filePath.
196+
// NOTE: (Nick S) At time of writing this is only used in tests.
197+
//func SensorDataFromCaptureFilePath(filePath string) ([]*v1.SensorData, error) {
198+
// //nolint:gosec
199+
// f, err := os.Open(filePath)
200+
// if err != nil {
201+
// return nil, err
202+
// }
203+
// dcFile, err := NewCaptureFile(f)
204+
// if err != nil {
205+
// return nil, err
206+
// }
207+
208+
// return SensorDataFromCaptureFile(dcFile)
209+
//}
210+
207211
// SensorDataFromCaptureFile returns all readings in f.
208212
func SensorDataFromCaptureFile(f *CaptureFile) ([]*v1.SensorData, error) {
209213
f.Reset()
@@ -222,9 +226,3 @@ func SensorDataFromCaptureFile(f *CaptureFile) ([]*v1.SensorData, error) {
222226
}
223227
return ret, nil
224228
}
225-
226-
// CaptureFilePathWithReplacedReservedChars returns the filepath with substitutions
227-
// for reserved characters.
228-
func CaptureFilePathWithReplacedReservedChars(filepath string) string {
229-
return strings.ReplaceAll(filepath, filePathReservedChars, "_")
230-
}

data/capture_file_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func TestReadCorruptedFile(t *testing.T) {
170170
test.That(t, f.Close(), test.ShouldBeNil)
171171

172172
// Should still be able to successfully read all the successfully written data.
173-
sd, err := SensorDataFromCaptureFilePath(captureFilePath(f.GetPath()))
173+
sd, err := SensorDataFromCaptureFilePath(captureFilePath(f.path))
174174
test.That(t, err, test.ShouldBeNil)
175175
test.That(t, len(sd), test.ShouldEqual, numReadings)
176176
}

0 commit comments

Comments
 (0)