X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/19cb98ad222177fb7dd3613282446060e74dd2ce..a8d59274b32098abcc6b36b46ae354de3dcd13ad:/services/crunch-run/upload.go?ds=sidebyside diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go index a068a2a77b..bb2776a426 100644 --- a/services/crunch-run/upload.go +++ b/services/crunch-run/upload.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main // Originally based on sdk/go/crunchrunner/upload.go @@ -14,14 +18,15 @@ import ( "crypto/md5" "errors" "fmt" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - "git.curoverse.com/arvados.git/sdk/go/manifest" "io" "log" "os" "path/filepath" "strings" "sync" + + "git.curoverse.com/arvados.git/sdk/go/keepclient" + "git.curoverse.com/arvados.git/sdk/go/manifest" ) // Block is a data block in a manifest stream @@ -87,27 +92,50 @@ func (m *CollectionFileWriter) NewFile(fn string) { m.fn = fn } -func (m *CollectionFileWriter) goUpload() { +func (m *CollectionFileWriter) goUpload(workers chan struct{}) { + var mtx sync.Mutex + var wg sync.WaitGroup + var errors []error uploader := m.uploader finish := m.finish for block := range uploader { - hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset])) - signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset]) - if err != nil { - errors = append(errors, err) - } else { - m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash) - } + mtx.Lock() + m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "") + blockIndex := len(m.ManifestStream.Blocks) - 1 + mtx.Unlock() + + workers <- struct{}{} // wait for an available worker slot + wg.Add(1) + + go func(block *Block, blockIndex int) { + hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset])) + signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset]) + <-workers + + mtx.Lock() + if err != nil { + errors = append(errors, err) + } else { + m.ManifestStream.Blocks[blockIndex] = signedHash + } + mtx.Unlock() + + wg.Done() + }(block, blockIndex) } + wg.Wait() + finish <- errors } // CollectionWriter implements creating new Keep collections by opening files // and writing to them. type CollectionWriter struct { + MaxWriters int IKeepClient Streams []*CollectionFileWriter + workers chan struct{} mtx sync.Mutex } @@ -134,10 +162,18 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser { make(chan *Block), make(chan []error), fn} - go fw.goUpload() m.mtx.Lock() defer m.mtx.Unlock() + if m.workers == nil { + if m.MaxWriters < 1 { + m.MaxWriters = 2 + } + m.workers = make(chan struct{}, m.MaxWriters) + } + + go fw.goUpload(m.workers) + m.Streams = append(m.Streams, fw) return fw @@ -218,17 +254,38 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) { } type WalkUpload struct { + MaxWriters int kc IKeepClient stripPrefix string streamMap map[string]*CollectionFileWriter status *log.Logger + workers chan struct{} + mtx sync.Mutex } // WalkFunc walks a directory tree, uploads each file found and adds it to the // CollectionWriter. func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } - if info.IsDir() { + targetPath, targetInfo := path, info + if info.Mode()&os.ModeSymlink != 0 { + // Update targetpath/info to reflect the symlink + // target, not the symlink itself + targetPath, err = filepath.EvalSymlinks(path) + if err != nil { + return err + } + targetInfo, err = os.Stat(targetPath) + if err != nil { + return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err) + } + } + + if targetInfo.Mode()&os.ModeType != 0 { + // Skip directories, pipes, other non-regular files return nil } @@ -252,7 +309,17 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error { make(chan *Block), make(chan []error), ""} - go m.streamMap[dir].goUpload() + + m.mtx.Lock() + if m.workers == nil { + if m.MaxWriters < 1 { + m.MaxWriters = 2 + } + m.workers = make(chan struct{}, m.MaxWriters) + } + m.mtx.Unlock() + + go m.streamMap[dir].goUpload(m.workers) } fileWriter := m.streamMap[dir] @@ -281,7 +348,7 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error { func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) { streamMap := make(map[string]*CollectionFileWriter) - wu := &WalkUpload{cw.IKeepClient, root, streamMap, status} + wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}} err = filepath.Walk(root, wu.WalkFunc) if err != nil {