@@ -74,6 +74,7 @@ func (sr *snapshotReader) Read(p []byte) (n int, err error) {
7474 return n , err
7575 }
7676 return n , io .EOF
77+ default :
7778 }
7879 }
7980}
@@ -380,10 +381,46 @@ var cmdCreate = &cli.Command{
380381 }
381382 }()
382383
384+ rrPath := filepath .Join (flagExportDir , flagFileName )
385+ for {
386+ info , err := os .Stat (rrPath )
387+ if os .IsNotExist (err ) {
388+ logger .Infow ("waiting for snapshot car file to begin writing" )
389+ time .Sleep (time .Second * 15 )
390+ continue
391+ } else if info .IsDir () {
392+ return xerrors .Errorf ("trying to open directory instead of car file" )
393+ }
394+ break
395+ }
396+
397+ rr := newSnapshotReader (rrPath , errCh )
398+
399+ go func () {
400+ var lastSize int64
401+ for {
402+ select {
403+ case <- time .After (flagProgressUpdate ):
404+ size := e .Progress (rrPath )
405+ if size == 0 {
406+ continue
407+ }
408+ logger .Infow ("update" , "total" , size , "speed" , (size - lastSize )/ int64 (flagProgressUpdate / time .Second ))
409+ lastSize = size
410+ case err := <- errCh :
411+ if err != nil {
412+ break
413+ }
414+ }
415+ }
416+ }()
417+
383418 if flagDiscard {
384419 logger .Infow ("discarding output" )
385- g , _ := errgroup .WithContext (ctx )
386-
420+ g , ctxGroup := errgroup .WithContext (ctx )
421+ g .Go (func () error {
422+ return runWriteCompressed (ctxGroup , rrPath + ".zstd" , rr )
423+ })
387424 if err := g .Wait (); err != nil {
388425 return err
389426 }
@@ -392,36 +429,6 @@ var cmdCreate = &cli.Command{
392429 return err
393430 }
394431 } else {
395- rrPath := filepath .Join (flagExportDir , flagFileName )
396- for {
397- info , err := os .Stat (rrPath )
398- if os .IsNotExist (err ) {
399- logger .Infow ("waiting for snapshot car file to begin writing" )
400- time .Sleep (time .Second * 15 )
401- continue
402- } else if info .IsDir () {
403- return xerrors .Errorf ("trying to open directory instead of car file" )
404- }
405- break
406- }
407-
408- rr := newSnapshotReader (rrPath , errCh )
409-
410- go func () {
411- var lastSize int64
412- for {
413- select {
414- case <- time .After (flagProgressUpdate ):
415- size := e .Progress (rrPath )
416- if size == 0 {
417- continue
418- }
419- logger .Infow ("update" , "total" , size , "speed" , (size - lastSize )/ int64 (flagProgressUpdate / time .Second ))
420- lastSize = size
421- }
422- }
423- }()
424-
425432 host := u .Hostname ()
426433 port := u .Port ()
427434 if port == "" {
@@ -503,13 +510,32 @@ var cmdCreate = &cli.Command{
503510 },
504511}
505512
506- func runUploadCompressed (ctx context.Context , minioClient * minio.Client , flagBucket , flagNamePrefix , flagRetrievalEndpointPrefix , name , peerID string , bt time.Time , source io.Reader ) (* snapshotInfo , error ) {
507-
508- r1 , w1 := io .Pipe ()
513+ func compress (source io.Reader ) io.Reader {
514+ r , w := io .Pipe ()
509515 go func () {
510- Compress (source , w1 )
511- w1 .Close ()
516+ Compress (source , w )
517+ w .Close ()
512518 }()
519+ return r
520+ }
521+
522+ func runWriteCompressed (ctx context.Context , path string , source io.Reader ) error {
523+ file , err := os .Create (path )
524+ if err != nil {
525+ return err
526+ }
527+ r := compress (source )
528+ n , err := io .Copy (file , r )
529+ if err != nil {
530+ return err
531+ }
532+ logger .Infow ("data copied to file:" , n )
533+ return nil
534+ }
535+
536+ func runUploadCompressed (ctx context.Context , minioClient * minio.Client , flagBucket , flagNamePrefix , flagRetrievalEndpointPrefix , name , peerID string , bt time.Time , source io.Reader ) (* snapshotInfo , error ) {
537+ r1 := compress (source )
538+
513539 h := sha256 .New ()
514540 r := io .TeeReader (r1 , h )
515541
0 commit comments