8784: Fix test for latest firefox.
[arvados.git] / sdk / go / crunchrunner / upload.go
index 7f1fd8a82307396d6163e3a903f23232b8739bce..a3dc3d52a8d1dfe4ce184d98263c3db658043ba5 100644 (file)
@@ -8,8 +8,11 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
        "io"
+       "log"
        "os"
        "path/filepath"
+       "sort"
+       "strings"
 )
 
 type Block struct {
@@ -30,10 +33,9 @@ type IKeepClient interface {
        PutHB(hash string, buf []byte) (string, int, error)
 }
 
-func (m *ManifestStreamWriter) Write(p []byte) (n int, err error) {
-       // Needed to conform to Writer interface, but not implemented
-       // because io.Copy will actually use ReadFrom instead.
-       return 0, nil
+func (m *ManifestStreamWriter) Write(p []byte) (int, error) {
+       n, err := m.ReadFrom(bytes.NewReader(p))
+       return int(n), err
 }
 
 func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
@@ -47,37 +49,34 @@ func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
                count, err = r.Read(m.Block.data[m.Block.offset:])
                total += int64(count)
                m.Block.offset += int64(count)
-               if count > 0 {
-                       if m.Block.offset == keepclient.BLOCKSIZE {
-                               m.uploader <- m.Block
-                               m.Block = nil
-                       }
+               if m.Block.offset == keepclient.BLOCKSIZE {
+                       m.uploader <- m.Block
+                       m.Block = nil
                }
        }
 
-       return total, err
+       if err == io.EOF {
+               return total, nil
+       } else {
+               return total, err
+       }
+
 }
 
 func (m *ManifestStreamWriter) goUpload() {
        var errors []error
        uploader := m.uploader
        finish := m.finish
-       for true {
-               select {
-               case block, valid := <-uploader:
-                       if !valid {
-                               finish <- errors
-                               return
-                       }
-                       hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
-                       signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
-                       if err != nil {
-                               errors = append(errors, err)
-                       } else {
-                               m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
-                       }
+       for block := range uploader {
+               hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+               signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
+               if err != nil {
+                       errors = append(errors, err)
+               } else {
+                       m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
                }
        }
+       finish <- errors
 }
 
 type ManifestWriter struct {
@@ -95,6 +94,9 @@ func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) erro
        if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
                dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
        }
+       if dir == "" {
+               dir = "."
+       }
 
        fn := path[(len(path) - len(info.Name())):]
 
@@ -118,39 +120,40 @@ func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) erro
                return err
        }
 
+       log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+
        var count int64
        count, err = io.Copy(stream, file)
-       if err != nil && err != io.EOF {
+       if err != nil {
                return err
        }
 
        stream.offset += count
 
-       stream.ManifestStream.Files = append(stream.ManifestStream.Files,
-               fmt.Sprintf("%v:%v:%v", fileStart, count, fn))
+       stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments,
+               manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn})
 
        return nil
 }
 
 func (m *ManifestWriter) Finish() error {
        var errstring string
-       for _, v := range m.Streams {
-               if v.uploader != nil {
-                       if v.Block != nil {
-                               v.uploader <- v.Block
-                       }
-                       close(v.uploader)
-                       v.uploader = nil
-
-                       errors := <-v.finish
-                       close(v.finish)
-                       v.finish = nil
-
-                       if errors != nil {
-                               for _, r := range errors {
-                                       errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
-                               }
-                       }
+       for _, stream := range m.Streams {
+               if stream.uploader == nil {
+                       continue
+               }
+               if stream.Block != nil {
+                       stream.uploader <- stream.Block
+               }
+               close(stream.uploader)
+               stream.uploader = nil
+
+               errors := <-stream.finish
+               close(stream.finish)
+               stream.finish = nil
+
+               for _, r := range errors {
+                       errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
                }
        }
        if errstring != "" {
@@ -163,19 +166,34 @@ func (m *ManifestWriter) Finish() error {
 func (m *ManifestWriter) ManifestText() string {
        m.Finish()
        var buf bytes.Buffer
-       for k, v := range m.Streams {
-               if k == "" {
+
+       dirs := make([]string, len(m.Streams))
+       i := 0
+       for k := range m.Streams {
+               dirs[i] = k
+               i++
+       }
+       sort.Strings(dirs)
+
+       for _, k := range dirs {
+               v := m.Streams[k]
+
+               if k == "." {
                        buf.WriteString(".")
                } else {
+                       k = strings.Replace(k, " ", "\\040", -1)
+                       k = strings.Replace(k, "\n", "", -1)
                        buf.WriteString("./" + k)
                }
                for _, b := range v.Blocks {
                        buf.WriteString(" ")
                        buf.WriteString(b)
                }
-               for _, f := range v.Files {
+               for _, f := range v.FileStreamSegments {
                        buf.WriteString(" ")
-                       buf.WriteString(f)
+                       name := strings.Replace(f.Name, " ", "\\040", -1)
+                       name = strings.Replace(name, "\n", "", -1)
+                       buf.WriteString(fmt.Sprintf("%d:%d:%s", f.SegPos, f.SegLen, name))
                }
                buf.WriteString("\n")
        }