X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5fb633ee021c613d99280e8958a6598602041011..16f704326f44fd1e5e5e60b936c9b5895d6a6ff8:/sdk/go/crunchrunner/upload.go diff --git a/sdk/go/crunchrunner/upload.go b/sdk/go/crunchrunner/upload.go index 4feb1421f3..2848d1087c 100644 --- a/sdk/go/crunchrunner/upload.go +++ b/sdk/go/crunchrunner/upload.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: Apache-2.0 + package main import ( @@ -5,12 +9,15 @@ import ( "crypto/md5" "errors" "fmt" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - "git.curoverse.com/arvados.git/sdk/go/manifest" "io" "log" "os" "path/filepath" + "sort" + "strings" + + "git.curoverse.com/arvados.git/sdk/go/keepclient" + "git.curoverse.com/arvados.git/sdk/go/manifest" ) type Block struct { @@ -31,10 +38,9 @@ type IKeepClient interface { PutHB(hash string, buf []byte) (string, int, error) } -func (m *ManifestStreamWriter) Write(p []byte) (n int, err error) { - // Needed to conform to Writer interface, but not implemented - // because io.Copy will actually use ReadFrom instead. - return 0, nil +func (m *ManifestStreamWriter) Write(p []byte) (int, error) { + n, err := m.ReadFrom(bytes.NewReader(p)) + return int(n), err } func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) { @@ -48,37 +54,34 @@ func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) { count, err = r.Read(m.Block.data[m.Block.offset:]) total += int64(count) m.Block.offset += int64(count) - if count > 0 { - if m.Block.offset == keepclient.BLOCKSIZE { - m.uploader <- m.Block - m.Block = nil - } + if m.Block.offset == keepclient.BLOCKSIZE { + m.uploader <- m.Block + m.Block = nil } } - return total, err + if err == io.EOF { + return total, nil + } else { + return total, err + } + } func (m *ManifestStreamWriter) goUpload() { var errors []error uploader := m.uploader finish := m.finish - for true { - select { - case block, valid := <-uploader: - if !valid { - finish <- errors - return - } - hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset])) - signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset]) - if err != nil { - errors = append(errors, err) - } else { - m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash) - } + for block := range uploader { + hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset])) + signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset]) + if err != nil { + errors = append(errors, err) + } else { + m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash) } } + finish <- errors } type ManifestWriter struct { @@ -88,7 +91,26 @@ type ManifestWriter struct { } func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error { - if info.IsDir() { + if err != nil { + return err + } + + targetPath, targetInfo := path, info + if info.Mode()&os.ModeSymlink != 0 { + // Update targetpath/info to reflect the symlink + // target, not the symlink itself + targetPath, err = filepath.EvalSymlinks(path) + if err != nil { + return err + } + targetInfo, err = os.Stat(targetPath) + if err != nil { + return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err) + } + } + + if targetInfo.Mode()&os.ModeType != 0 { + // Skip directories, pipes, other non-regular files return nil } @@ -126,37 +148,36 @@ func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) erro var count int64 count, err = io.Copy(stream, file) - if err != nil && err != io.EOF { + if err != nil { return err } stream.offset += count - stream.ManifestStream.Files = append(stream.ManifestStream.Files, - fmt.Sprintf("%v:%v:%v", fileStart, count, fn)) + stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments, + manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn}) return nil } func (m *ManifestWriter) Finish() error { var errstring string - for _, v := range m.Streams { - if v.uploader != nil { - if v.Block != nil { - v.uploader <- v.Block - } - close(v.uploader) - v.uploader = nil - - errors := <-v.finish - close(v.finish) - v.finish = nil - - if errors != nil { - for _, r := range errors { - errstring = fmt.Sprintf("%v%v\n", errstring, r.Error()) - } - } + for _, stream := range m.Streams { + if stream.uploader == nil { + continue + } + if stream.Block != nil { + stream.uploader <- stream.Block + } + close(stream.uploader) + stream.uploader = nil + + errors := <-stream.finish + close(stream.finish) + stream.finish = nil + + for _, r := range errors { + errstring = fmt.Sprintf("%v%v\n", errstring, r.Error()) } } if errstring != "" { @@ -169,19 +190,34 @@ func (m *ManifestWriter) Finish() error { func (m *ManifestWriter) ManifestText() string { m.Finish() var buf bytes.Buffer - for k, v := range m.Streams { + + dirs := make([]string, len(m.Streams)) + i := 0 + for k := range m.Streams { + dirs[i] = k + i++ + } + sort.Strings(dirs) + + for _, k := range dirs { + v := m.Streams[k] + if k == "." { buf.WriteString(".") } else { + k = strings.Replace(k, " ", "\\040", -1) + k = strings.Replace(k, "\n", "", -1) buf.WriteString("./" + k) } for _, b := range v.Blocks { buf.WriteString(" ") buf.WriteString(b) } - for _, f := range v.Files { + for _, f := range v.FileStreamSegments { buf.WriteString(" ") - buf.WriteString(f) + name := strings.Replace(f.Name, " ", "\\040", -1) + name = strings.Replace(name, "\n", "", -1) + buf.WriteString(fmt.Sprintf("%d:%d:%s", f.SegPos, f.SegLen, name)) } buf.WriteString("\n") }