7582: Uploader passes tests
authorPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 22 Oct 2015 13:20:13 +0000 (09:20 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Thu, 22 Oct 2015 13:20:13 +0000 (09:20 -0400)
sdk/go/crunchrunner/upload.go
sdk/go/crunchrunner/upload_test.go

index 2196a9d7dfd4a2cc528142ec81a4b23ce8d44bab..7f1fd8a82307396d6163e3a903f23232b8739bce 100644 (file)
@@ -3,11 +3,11 @@ package main
 import (
        "bytes"
        "crypto/md5"
+       "errors"
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
        "io"
-       "log"
        "os"
        "path/filepath"
 )
@@ -23,6 +23,7 @@ type ManifestStreamWriter struct {
        offset int64
        *Block
        uploader chan *Block
+       finish   chan []error
 }
 
 type IKeepClient interface {
@@ -58,16 +59,25 @@ func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
 }
 
 func (m *ManifestStreamWriter) goUpload() {
-       select {
-       case block, valid := <-m.uploader:
-               if !valid {
-                       return
+       var errors []error
+       uploader := m.uploader
+       finish := m.finish
+       for true {
+               select {
+               case block, valid := <-uploader:
+                       if !valid {
+                               finish <- errors
+                               return
+                       }
+                       hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+                       signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
+                       if err != nil {
+                               errors = append(errors, err)
+                       } else {
+                               m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+                       }
                }
-               hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
-               signedHash, _, _ := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
-               m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
        }
-
 }
 
 type ManifestWriter struct {
@@ -76,23 +86,16 @@ type ManifestWriter struct {
        Streams     map[string]*ManifestStreamWriter
 }
 
-type walker struct {
-       currentDir string
-       m          *ManifestWriter
-}
-
-func (w walker) WalkFunc(path string, info os.FileInfo, err error) error {
-       log.Print("path ", path, " ", info.Name(), " ", info.IsDir())
-
+func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
        if info.IsDir() {
-               if path == w.currentDir {
-                       return nil
-               }
-               return filepath.Walk(path, walker{path, w.m}.WalkFunc)
+               return nil
+       }
+
+       var dir string
+       if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+               dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
        }
-       m := w.m
 
-       dir := path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()))]
        fn := path[(len(path) - len(info.Name())):]
 
        if m.Streams[dir] == nil {
@@ -101,7 +104,8 @@ func (w walker) WalkFunc(path string, info os.FileInfo, err error) error {
                        &manifest.ManifestStream{StreamName: dir},
                        0,
                        nil,
-                       make(chan *Block)}
+                       make(chan *Block),
+                       make(chan []error)}
                go m.Streams[dir].goUpload()
        }
 
@@ -128,7 +132,8 @@ func (w walker) WalkFunc(path string, info os.FileInfo, err error) error {
        return nil
 }
 
-func (m *ManifestWriter) Finish() {
+func (m *ManifestWriter) Finish() error {
+       var errstring string
        for _, v := range m.Streams {
                if v.uploader != nil {
                        if v.Block != nil {
@@ -136,8 +141,23 @@ func (m *ManifestWriter) Finish() {
                        }
                        close(v.uploader)
                        v.uploader = nil
+
+                       errors := <-v.finish
+                       close(v.finish)
+                       v.finish = nil
+
+                       if errors != nil {
+                               for _, r := range errors {
+                                       errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
+                               }
+                       }
                }
        }
+       if errstring != "" {
+               return errors.New(errstring)
+       } else {
+               return nil
+       }
 }
 
 func (m *ManifestWriter) ManifestText() string {
@@ -164,13 +184,16 @@ func (m *ManifestWriter) ManifestText() string {
 
 func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
        mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
-       err = filepath.Walk(root, walker{root, &mw}.WalkFunc)
-       mw.Finish()
+       err = filepath.Walk(root, mw.WalkFunc)
 
        if err != nil {
                return "", err
-       } else {
-               return mw.ManifestText(), nil
        }
 
+       err = mw.Finish()
+       if err != nil {
+               return "", err
+       }
+
+       return mw.ManifestText(), nil
 }
index 6e0e103e8833c25d55e627d67580b0b3aef112f2..e337b76a53febe7daccb739bf2e1af67ba9615ac 100644 (file)
@@ -2,9 +2,11 @@ package main
 
 import (
        "crypto/md5"
+       "errors"
        "fmt"
        . "gopkg.in/check.v1"
        "io/ioutil"
+       "log"
        "os"
 )
 
@@ -21,6 +23,8 @@ func (k KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
 }
 
 func (s *TestSuite) TestSimpleUpload(c *C) {
+       log.Print("--TestSimpleUpload--")
+
        tmpdir, _ := ioutil.TempDir("", "")
        defer func() {
                os.RemoveAll(tmpdir)
@@ -34,6 +38,8 @@ func (s *TestSuite) TestSimpleUpload(c *C) {
 }
 
 func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+       log.Print("--TestSimpleUploadTwofiles--")
+
        tmpdir, _ := ioutil.TempDir("", "")
        defer func() {
                os.RemoveAll(tmpdir)
@@ -48,19 +54,102 @@ func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
 }
 
 func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
+       log.Print("--TestSimpleUploadSubdir--")
+
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       os.Mkdir(tmpdir+"/subdir", 0700)
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+       ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
+
+       str, err := WriteTree(KeepTestClient{}, tmpdir)
+       c.Check(err, IsNil)
+       c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
+`)
+}
+
+func (s *TestSuite) TestSimpleUploadLarge(c *C) {
+       log.Print("--TestSimpleUploadLarge--")
+
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       file, _ := os.Create(tmpdir + "/" + "file1.txt")
+       data := make([]byte, 1024*1024-1)
+       for i := 0; i < 1024*1024-1; i++ {
+               data[i] = byte(i % 10)
+       }
+       for i := 0; i < 65; i++ {
+               file.Write(data)
+       }
+       file.Close()
+
+       ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+       str, err := WriteTree(KeepTestClient{}, tmpdir)
+       c.Check(err, IsNil)
+       c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestUploadEmptySubdir(c *C) {
+       log.Print("--TestUploadEmptySubdir--")
+
        tmpdir, _ := ioutil.TempDir("", "")
        defer func() {
                os.RemoveAll(tmpdir)
        }()
 
-       os.Mkdir(tmpdir+"/"+"subdir", 0600)
+       os.Mkdir(tmpdir+"/subdir", 0700)
 
        ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-       ioutil.WriteFile(tmpdir+"/"+"subdir/file2.txt", []byte("bar"), 0600)
 
        str, err := WriteTree(KeepTestClient{}, tmpdir)
        c.Check(err, IsNil)
-       c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+6 0:3:file1.txt
-./subdir acbd18db4cc2f85cedef654fccc4a4d8+6 0:3:file2.txt
+       c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+`)
+}
+
+func (s *TestSuite) TestUploadEmptyFile(c *C) {
+       log.Print("--TestUploadEmptyFile--")
+
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
+
+       str, err := WriteTree(KeepTestClient{}, tmpdir)
+       c.Check(err, IsNil)
+       c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt
 `)
 }
+
+type KeepErrorTestClient struct {
+}
+
+func (k KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+       return "", 0, errors.New("Failed!")
+}
+
+func (s *TestSuite) TestUploadError(c *C) {
+       log.Print("--TestSimpleUpload--")
+
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+       str, err := WriteTree(KeepErrorTestClient{}, tmpdir)
+       c.Check(err, NotNil)
+       c.Check(str, Equals, "")
+}