@@ -38,11 +38,11 @@ import (
38
38
39
39
"github.com/cenkalti/backoff"
40
40
"github.com/dustin/go-humanize"
41
+ "github.com/k0kubun/go-ansi"
41
42
"github.com/miolini/datacounter"
42
43
"github.com/nightlyone/lockfile"
43
- "golang.org/x/sync/errgroup"
44
44
"github.com/schollz/progressbar/v3"
45
- "github.com/k0kubun/go-ansi "
45
+ "golang.org/x/sync/errgroup "
46
46
47
47
"github.com/someone1/zfsbackup-go/backends"
48
48
"github.com/someone1/zfsbackup-go/config"
@@ -419,16 +419,19 @@ func Backup(pctx context.Context, jobInfo *files.JobInfo) error {
419
419
} else {
420
420
fmt .Fprintf (
421
421
config .Stdout ,
422
- "Done.\n \t Total ZFS Stream Bytes: %d (%s)\n \t Total Bytes Written: %d (%s)\n \t Elapsed Time: %v\n \t Total Files Uploaded: %d\n " ,
422
+ "Done.\n \t Total ZFS Stream Bytes: %d (%s)\n \t Total Bytes Written: %d (%s)\n \t Elapsed Time: %v\n \t Total Files Uploaded: %d\n \t Average Upload Rate: %s/TB \n " ,
423
423
jobInfo .ZFSStreamBytes ,
424
424
humanize .IBytes (jobInfo .ZFSStreamBytes ),
425
425
totalWrittenBytes ,
426
426
humanize .IBytes (totalWrittenBytes ),
427
427
time .Since (jobInfo .StartTime ),
428
428
len (jobInfo .Volumes )+ 1 ,
429
+ humanize .IBytes (uint64 (float64 (totalWrittenBytes )/ time .Since (jobInfo .StartTime ).Hours ()/ 1024 )),
429
430
)
430
431
}
431
432
433
+ fmt .Printf ("Backup of %s completed successfully.\n " , jobInfo .VolumeName )
434
+
432
435
log .AppLogger .Debugf ("Cleaning up resources..." )
433
436
434
437
for _ , backend := range usedBackends {
@@ -483,28 +486,28 @@ func saveManifest(ctx context.Context, j *files.JobInfo, final bool) (*files.Vol
483
486
// nolint:funlen,gocyclo // Difficult to break this apart
484
487
485
488
func sendStream (ctx context.Context , j * files.JobInfo , c chan <- * files.VolumeInfo , buffer <- chan bool ) error {
486
- var group * errgroup.Group
487
- group , ctx = errgroup .WithContext (ctx )
488
-
489
- buf := bytes .NewBuffer (nil )
490
- cmd := zfs .GetZFSSendCommand (ctx , j )
491
- cin , cout := io .Pipe ()
492
- cmd .Stdout = cout
493
- cmd .Stderr = buf
494
- counter := datacounter .NewReaderCounter (cin )
495
- usingPipe := false
496
- if j .MaxFileBuffer == 0 {
497
- usingPipe = true
498
- }
499
-
500
- // Get total dataset size for progress tracking
501
- totalSize , err := zfs .GetDatasetSize (ctx , j .VolumeName )
502
- if err != nil {
503
- return err
504
- }
505
-
506
- // Initialize progress bar
507
- bar := progressbar .NewOptions64 (int64 (totalSize ),
489
+ var group * errgroup.Group
490
+ group , ctx = errgroup .WithContext (ctx )
491
+
492
+ buf := bytes .NewBuffer (nil )
493
+ cmd := zfs .GetZFSSendCommand (ctx , j )
494
+ cin , cout := io .Pipe ()
495
+ cmd .Stdout = cout
496
+ cmd .Stderr = buf
497
+ counter := datacounter .NewReaderCounter (cin )
498
+ usingPipe := false
499
+ if j .MaxFileBuffer == 0 {
500
+ usingPipe = true
501
+ }
502
+
503
+ // Get total dataset size for progress tracking
504
+ totalSize , err := zfs .GetDatasetSize (ctx , j .VolumeName )
505
+ if err != nil {
506
+ return err
507
+ }
508
+
509
+ // Initialize progress bar
510
+ bar := progressbar .NewOptions64 (int64 (totalSize ),
508
511
progressbar .OptionSetWriter (ansi .NewAnsiStdout ()),
509
512
progressbar .OptionEnableColorCodes (true ),
510
513
progressbar .OptionShowBytes (true ),
@@ -526,128 +529,128 @@ func sendStream(ctx context.Context, j *files.JobInfo, c chan<- *files.VolumeInf
526
529
}),
527
530
)
528
531
529
- // Initialize chunk tracking variables
530
- totalChunks := int (totalSize / (j .VolumeSize * humanize .MiByte ))
531
- var processedChunks int
532
-
533
- group .Go (func () error {
534
- var lastTotalBytes uint64
535
- defer close (c )
536
- var err error
537
- var volume * files.VolumeInfo
538
- skipBytes , volNum := j .TotalBytesStreamedAndVols ()
539
- lastTotalBytes = skipBytes
540
- for {
541
- // Skip bytes if we are resuming
542
- if skipBytes > 0 {
543
- log .AppLogger .Debugf ("Want to skip %d bytes." , skipBytes )
544
- written , serr := io .CopyN (ioutil .Discard , counter , int64 (skipBytes ))
545
- if serr != nil && serr != io .EOF {
546
- log .AppLogger .Errorf ("Error while trying to read from the zfs stream to skip %d bytes - %v" , skipBytes , serr )
547
- return serr
548
- }
549
- skipBytes -= uint64 (written )
550
- log .AppLogger .Debugf ("Skipped %d bytes of the ZFS send stream." , written )
551
- continue
552
- }
553
-
554
- // Setup next Volume
555
- if volume == nil || volume .Counter () >= (j .VolumeSize * humanize .MiByte )- 50 * humanize .KiByte {
556
- if volume != nil {
557
- log .AppLogger .Debugf ("Finished creating volume %s" , volume .ObjectName )
558
- volume .ZFSStreamBytes = counter .Count () - lastTotalBytes
559
- lastTotalBytes = counter .Count ()
560
- if err = volume .Close (); err != nil {
561
- log .AppLogger .Errorf ("Error while trying to close volume %s - %v" , volume .ObjectName , err )
562
- return err
563
- }
564
- if ! usingPipe {
565
- c <- volume
566
- }
567
- processedChunks ++
568
- bar .Describe (fmt .Sprintf ("Backing up... (%d/%d chunks)" , processedChunks , totalChunks ))
569
- }
570
- <- buffer
571
- volume , err = files .CreateBackupVolume (ctx , j , volNum )
572
- if err != nil {
573
- log .AppLogger .Errorf ("Error while creating volume %d - %v" , volNum , err )
574
- return err
575
- }
576
- log .AppLogger .Debugf ("Starting volume %s" , volume .ObjectName )
577
- volNum ++
578
- if usingPipe {
579
- c <- volume
580
- }
581
- }
582
-
583
- // Write a little at a time and break the output between volumes as needed
584
- bytesWritten , ierr := io .CopyN (volume , counter , files .BufferSize * 2 )
585
- if ierr == io .EOF {
586
- // We are done!
587
- log .AppLogger .Debugf ("Finished creating volume %s" , volume .ObjectName )
588
- volume .ZFSStreamBytes = counter .Count () - lastTotalBytes
589
- if err = volume .Close (); err != nil {
590
- log .AppLogger .Errorf ("Error while trying to close volume %s - %v" , volume .ObjectName , err )
591
- return err
592
- }
593
- if ! usingPipe {
594
- c <- volume
595
- }
596
- processedChunks ++
597
- bar .Describe (fmt .Sprintf ("Backing up... (%d/%d chunks)" , processedChunks , totalChunks ))
598
- return nil
599
- } else if ierr != nil {
600
- log .AppLogger .Errorf ("Error while trying to read from the zfs stream for volume %s - %v" , volume .ObjectName , ierr )
601
- return ierr
602
- }
603
- // Update progress bar
604
- bar .Add64 (int64 (bytesWritten ))
605
- }
606
- })
607
-
608
- // Start the zfs send command
609
- log .AppLogger .Infof ("Starting zfs send command: %s" , strings .Join (cmd .Args , " " ))
610
- err = cmd .Start ()
611
- if err != nil {
612
- log .AppLogger .Errorf ("Error starting zfs command - %v" , err )
613
- return err
614
- }
615
-
616
- group .Go (func () error {
617
- defer cout .Close ()
618
- return cmd .Wait ()
619
- })
620
-
621
- defer func () {
622
- if cmd .ProcessState == nil || ! cmd .ProcessState .Exited () {
623
- err = cmd .Process .Kill ()
624
- if err != nil {
625
- log .AppLogger .Errorf ("Could not kill zfs send command due to error - %v" , err )
626
- return
627
- }
628
- err = cmd .Process .Release ()
629
- if err != nil {
630
- log .AppLogger .Errorf ("Could not release resources from zfs send command due to error - %v" , err )
631
- return
632
- }
633
- }
634
- }()
635
-
636
- manifestmutex .Lock ()
637
- j .ZFSCommandLine = strings .Join (cmd .Args , " " )
638
- manifestmutex .Unlock ()
639
- // Wait for the command to finish
640
-
641
- err = group .Wait ()
642
- if err != nil {
643
- log .AppLogger .Errorf ("Error waiting for zfs command to finish - %v: %s" , err , buf .String ())
644
- return err
645
- }
646
- log .AppLogger .Infof ("zfs send completed without error" )
647
- manifestmutex .Lock ()
648
- j .ZFSStreamBytes = counter .Count ()
649
- manifestmutex .Unlock ()
650
- return nil
532
+ // Initialize chunk tracking variables
533
+ totalChunks := int (totalSize / (j .VolumeSize * humanize .MiByte ))
534
+ var processedChunks int
535
+
536
+ group .Go (func () error {
537
+ var lastTotalBytes uint64
538
+ defer close (c )
539
+ var err error
540
+ var volume * files.VolumeInfo
541
+ skipBytes , volNum := j .TotalBytesStreamedAndVols ()
542
+ lastTotalBytes = skipBytes
543
+ for {
544
+ // Skip bytes if we are resuming
545
+ if skipBytes > 0 {
546
+ log .AppLogger .Debugf ("Want to skip %d bytes." , skipBytes )
547
+ written , serr := io .CopyN (ioutil .Discard , counter , int64 (skipBytes ))
548
+ if serr != nil && serr != io .EOF {
549
+ log .AppLogger .Errorf ("Error while trying to read from the zfs stream to skip %d bytes - %v" , skipBytes , serr )
550
+ return serr
551
+ }
552
+ skipBytes -= uint64 (written )
553
+ log .AppLogger .Debugf ("Skipped %d bytes of the ZFS send stream." , written )
554
+ continue
555
+ }
556
+
557
+ // Setup next Volume
558
+ if volume == nil || volume .Counter () >= (j .VolumeSize * humanize .MiByte )- 50 * humanize .KiByte {
559
+ if volume != nil {
560
+ log .AppLogger .Debugf ("Finished creating volume %s" , volume .ObjectName )
561
+ volume .ZFSStreamBytes = counter .Count () - lastTotalBytes
562
+ lastTotalBytes = counter .Count ()
563
+ if err = volume .Close (); err != nil {
564
+ log .AppLogger .Errorf ("Error while trying to close volume %s - %v" , volume .ObjectName , err )
565
+ return err
566
+ }
567
+ if ! usingPipe {
568
+ c <- volume
569
+ }
570
+ processedChunks ++
571
+ bar .Describe (fmt .Sprintf ("Backing up... (%d/%d chunks)" , processedChunks , totalChunks ))
572
+ }
573
+ <- buffer
574
+ volume , err = files .CreateBackupVolume (ctx , j , volNum )
575
+ if err != nil {
576
+ log .AppLogger .Errorf ("Error while creating volume %d - %v" , volNum , err )
577
+ return err
578
+ }
579
+ log .AppLogger .Debugf ("Starting volume %s" , volume .ObjectName )
580
+ volNum ++
581
+ if usingPipe {
582
+ c <- volume
583
+ }
584
+ }
585
+
586
+ // Write a little at a time and break the output between volumes as needed
587
+ bytesWritten , ierr := io .CopyN (volume , counter , files .BufferSize * 2 )
588
+ if ierr == io .EOF {
589
+ // We are done!
590
+ log .AppLogger .Debugf ("Finished creating volume %s" , volume .ObjectName )
591
+ volume .ZFSStreamBytes = counter .Count () - lastTotalBytes
592
+ if err = volume .Close (); err != nil {
593
+ log .AppLogger .Errorf ("Error while trying to close volume %s - %v" , volume .ObjectName , err )
594
+ return err
595
+ }
596
+ if ! usingPipe {
597
+ c <- volume
598
+ }
599
+ processedChunks ++
600
+ bar .Describe (fmt .Sprintf ("Backing up... (%d/%d chunks)" , processedChunks , totalChunks ))
601
+ return nil
602
+ } else if ierr != nil {
603
+ log .AppLogger .Errorf ("Error while trying to read from the zfs stream for volume %s - %v" , volume .ObjectName , ierr )
604
+ return ierr
605
+ }
606
+ // Update progress bar
607
+ bar .Add64 (int64 (bytesWritten ))
608
+ }
609
+ })
610
+
611
+ // Start the zfs send command
612
+ log .AppLogger .Infof ("Starting zfs send command: %s" , strings .Join (cmd .Args , " " ))
613
+ err = cmd .Start ()
614
+ if err != nil {
615
+ log .AppLogger .Errorf ("Error starting zfs command - %v" , err )
616
+ return err
617
+ }
618
+
619
+ group .Go (func () error {
620
+ defer cout .Close ()
621
+ return cmd .Wait ()
622
+ })
623
+
624
+ defer func () {
625
+ if cmd .ProcessState == nil || ! cmd .ProcessState .Exited () {
626
+ err = cmd .Process .Kill ()
627
+ if err != nil {
628
+ log .AppLogger .Errorf ("Could not kill zfs send command due to error - %v" , err )
629
+ return
630
+ }
631
+ err = cmd .Process .Release ()
632
+ if err != nil {
633
+ log .AppLogger .Errorf ("Could not release resources from zfs send command due to error - %v" , err )
634
+ return
635
+ }
636
+ }
637
+ }()
638
+
639
+ manifestmutex .Lock ()
640
+ j .ZFSCommandLine = strings .Join (cmd .Args , " " )
641
+ manifestmutex .Unlock ()
642
+ // Wait for the command to finish
643
+
644
+ err = group .Wait ()
645
+ if err != nil {
646
+ log .AppLogger .Errorf ("Error waiting for zfs command to finish - %v: %s" , err , buf .String ())
647
+ return err
648
+ }
649
+ log .AppLogger .Infof ("zfs send completed without error" )
650
+ manifestmutex .Lock ()
651
+ j .ZFSStreamBytes = counter .Count ()
652
+ manifestmutex .Unlock ()
653
+ return nil
651
654
}
652
655
653
656
func tryResume (ctx context.Context , j * files.JobInfo ) error {
0 commit comments