X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/174d343cf86e02e083293746f86366e1d0a95786..6247858b8041caf4899da501456661d25dd5491b:/services/crunch-run/upload.go diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go index faf20b4f8b..b54e336c2d 100644 --- a/services/crunch-run/upload.go +++ b/services/crunch-run/upload.go @@ -19,7 +19,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "log" "os" "path/filepath" @@ -264,55 +263,27 @@ type WalkUpload struct { mtx sync.Mutex } -// WalkFunc walks a directory tree, uploads each file found and adds it to the -// CollectionWriter. -func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - - targetPath, targetInfo := path, info - if info.Mode()&os.ModeSymlink != 0 { - // Update targetpath/info to reflect the symlink - // target, not the symlink itself - targetPath, err = filepath.EvalSymlinks(path) - if err != nil { - return err - } - targetInfo, err = os.Stat(targetPath) - if err != nil { - return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err) - } - if targetInfo.Mode()&os.ModeDir != 0 { - // Symlinks to directories don't get walked, so do it - // here. We've previously checked that they stay in - // the output directory and don't result in an endless - // loop. - var rd []os.FileInfo - rd, err = ioutil.ReadDir(path) - if err != nil { - return err - } - for _, ent := range rd { - err = filepath.Walk(filepath.Join(path, ent.Name()), m.WalkFunc) - } - } - } - - if targetInfo.Mode()&os.ModeType != 0 { - // Skip directories, pipes, other non-regular files - return nil - } - +func (m *WalkUpload) UploadFile(path string, sourcePath string) error { var dir string - if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) { - dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)] + basename := filepath.Base(path) + if len(path) > (len(m.stripPrefix) + len(basename) + 1) { + dir = path[len(m.stripPrefix)+1 : (len(path) - len(basename) - 1)] } if dir == "" { dir = "." } - fn := path[(len(path) - len(info.Name())):] + fn := path[(len(path) - len(basename)):] + + info, err := os.Stat(sourcePath) + if err != nil { + return err + } + file, err := os.Open(sourcePath) + if err != nil { + return err + } + defer file.Close() if m.streamMap[dir] == nil { m.streamMap[dir] = &CollectionFileWriter{ @@ -342,16 +313,11 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error { // Reset the CollectionFileWriter for a new file fileWriter.NewFile(fn) - file, err := os.Open(path) - if err != nil { - return err - } - defer file.Close() - m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size()) _, err = io.Copy(fileWriter, file) if err != nil { + m.status.Printf("Uh oh") return err } @@ -361,20 +327,15 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error { return nil } -func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) { +func (cw *CollectionWriter) BeginUpload(root string, status *log.Logger) *WalkUpload { streamMap := make(map[string]*CollectionFileWriter) - wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}} - err = filepath.Walk(root, wu.WalkFunc) - - if err != nil { - return "", err - } + return &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}} +} +func (cw *CollectionWriter) EndUpload(wu *WalkUpload) { cw.mtx.Lock() - for _, st := range streamMap { + for _, st := range wu.streamMap { cw.Streams = append(cw.Streams, st) } cw.mtx.Unlock() - - return cw.ManifestText() }