From: Peter Amstutz Date: Thu, 22 Oct 2015 13:20:13 +0000 (-0400) Subject: 7582: Uploader passes tests X-Git-Tag: 1.1.0~1276^2~15 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/30bfda443a98efc1a717f35258e3c3ffd7369d7c?hp=7600537f3f34ee88a76688dbb0e1d73723905fa7 7582: Uploader passes tests --- diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go index 2196a9d7df..7f1fd8a823 100644 --- a/sdk/go/crunchrunner/upload.go +++ b/sdk/go/crunchrunner/upload.go @@ -3,11 +3,11 @@ package main import ( "bytes" "crypto/md5" + "errors" "fmt" "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/manifest" "io" - "log" "os" "path/filepath" ) @@ -23,6 +23,7 @@ type ManifestStreamWriter struct { offset int64 *Block uploader chan *Block + finish chan []error } type IKeepClient interface { @@ -58,16 +59,25 @@ func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) { } func (m *ManifestStreamWriter) goUpload() { - select { - case block, valid := <-m.uploader: - if !valid { - return + var errors []error + uploader := m.uploader + finish := m.finish + for true { + select { + case block, valid := <-uploader: + if !valid { + finish <- errors + return + } + hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset])) + signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset]) + if err != nil { + errors = append(errors, err) + } else { + m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash) + } } - hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset])) - signedHash, _, _ := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset]) - m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash) } - } type ManifestWriter struct { @@ -76,23 +86,16 @@ type ManifestWriter struct { Streams map[string]*ManifestStreamWriter } -type walker struct { - currentDir string - m *ManifestWriter -} - -func (w walker) WalkFunc(path string, info os.FileInfo, err error) error { - log.Print("path ", path, " ", info.Name(), " ", info.IsDir()) - +func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error { if info.IsDir() { - if path == w.currentDir { - return nil - } - return filepath.Walk(path, walker{path, w.m}.WalkFunc) + return nil + } + + var dir string + if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) { + dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)] } - m := w.m - dir := path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()))] fn := path[(len(path) - len(info.Name())):] if m.Streams[dir] == nil { @@ -101,7 +104,8 @@ func (w walker) WalkFunc(path string, info os.FileInfo, err error) error { &manifest.ManifestStream{StreamName: dir}, 0, nil, - make(chan *Block)} + make(chan *Block), + make(chan []error)} go m.Streams[dir].goUpload() } @@ -128,7 +132,8 @@ func (w walker) WalkFunc(path string, info os.FileInfo, err error) error { return nil } -func (m *ManifestWriter) Finish() { +func (m *ManifestWriter) Finish() error { + var errstring string for _, v := range m.Streams { if v.uploader != nil { if v.Block != nil { @@ -136,8 +141,23 @@ func (m *ManifestWriter) Finish() { } close(v.uploader) v.uploader = nil + + errors := <-v.finish + close(v.finish) + v.finish = nil + + if errors != nil { + for _, r := range errors { + errstring = fmt.Sprintf("%v%v\n", errstring, r.Error()) + } + } } } + if errstring != "" { + return errors.New(errstring) + } else { + return nil + } } func (m *ManifestWriter) ManifestText() string { @@ -164,13 +184,16 @@ func (m *ManifestWriter) ManifestText() string { func WriteTree(kc IKeepClient, root string) (manifest string, err error) { mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}} - err = filepath.Walk(root, walker{root, &mw}.WalkFunc) - mw.Finish() + err = filepath.Walk(root, mw.WalkFunc) if err != nil { return "", err - } else { - return mw.ManifestText(), nil } + err = mw.Finish() + if err != nil { + return "", err + } + + return mw.ManifestText(), nil } diff --git a/sdk/go/crunchrunner/upload_test.go b/sdk/go/crunchrunner/upload_test.go index 6e0e103e88..e337b76a53 100644 --- a/sdk/go/crunchrunner/upload_test.go +++ b/sdk/go/crunchrunner/upload_test.go @@ -2,9 +2,11 @@ package main import ( "crypto/md5" + "errors" "fmt" . "gopkg.in/check.v1" "io/ioutil" + "log" "os" ) @@ -21,6 +23,8 @@ func (k KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) { } func (s *TestSuite) TestSimpleUpload(c *C) { + log.Print("--TestSimpleUpload--") + tmpdir, _ := ioutil.TempDir("", "") defer func() { os.RemoveAll(tmpdir) @@ -34,6 +38,8 @@ func (s *TestSuite) TestSimpleUpload(c *C) { } func (s *TestSuite) TestSimpleUploadTwofiles(c *C) { + log.Print("--TestSimpleUploadTwofiles--") + tmpdir, _ := ioutil.TempDir("", "") defer func() { os.RemoveAll(tmpdir) @@ -48,19 +54,102 @@ func (s *TestSuite) TestSimpleUploadTwofiles(c *C) { } func (s *TestSuite) TestSimpleUploadSubdir(c *C) { + log.Print("--TestSimpleUploadSubdir--") + + tmpdir, _ := ioutil.TempDir("", "") + defer func() { + os.RemoveAll(tmpdir) + }() + + os.Mkdir(tmpdir+"/subdir", 0700) + + ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600) + ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600) + + str, err := WriteTree(KeepTestClient{}, tmpdir) + c.Check(err, IsNil) + c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt +./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt +`) +} + +func (s *TestSuite) TestSimpleUploadLarge(c *C) { + log.Print("--TestSimpleUploadLarge--") + + tmpdir, _ := ioutil.TempDir("", "") + defer func() { + os.RemoveAll(tmpdir) + }() + + file, _ := os.Create(tmpdir + "/" + "file1.txt") + data := make([]byte, 1024*1024-1) + for i := 0; i < 1024*1024-1; i++ { + data[i] = byte(i % 10) + } + for i := 0; i < 65; i++ { + file.Write(data) + } + file.Close() + + ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600) + + str, err := WriteTree(KeepTestClient{}, tmpdir) + c.Check(err, IsNil) + c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n") +} + +func (s *TestSuite) TestUploadEmptySubdir(c *C) { + log.Print("--TestUploadEmptySubdir--") + tmpdir, _ := ioutil.TempDir("", "") defer func() { os.RemoveAll(tmpdir) }() - os.Mkdir(tmpdir+"/"+"subdir", 0600) + os.Mkdir(tmpdir+"/subdir", 0700) ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600) - ioutil.WriteFile(tmpdir+"/"+"subdir/file2.txt", []byte("bar"), 0600) str, err := WriteTree(KeepTestClient{}, tmpdir) c.Check(err, IsNil) - c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+6 0:3:file1.txt -./subdir acbd18db4cc2f85cedef654fccc4a4d8+6 0:3:file2.txt + c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt +`) +} + +func (s *TestSuite) TestUploadEmptyFile(c *C) { + log.Print("--TestUploadEmptyFile--") + + tmpdir, _ := ioutil.TempDir("", "") + defer func() { + os.RemoveAll(tmpdir) + }() + + ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600) + + str, err := WriteTree(KeepTestClient{}, tmpdir) + c.Check(err, IsNil) + c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt `) } + +type KeepErrorTestClient struct { +} + +func (k KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) { + return "", 0, errors.New("Failed!") +} + +func (s *TestSuite) TestUploadError(c *C) { + log.Print("--TestSimpleUpload--") + + tmpdir, _ := ioutil.TempDir("", "") + defer func() { + os.RemoveAll(tmpdir) + }() + + ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600) + + str, err := WriteTree(KeepErrorTestClient{}, tmpdir) + c.Check(err, NotNil) + c.Check(str, Equals, "") +}