X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/08d080d5a4edccaa579f9a5b5cfa18c9ac471430..10c8c1871bef6d88ae27797e4b389fbf8649f4b0:/services/crunch-run/upload.go diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go index d50ea581b3..b54e336c2d 100644 --- a/services/crunch-run/upload.go +++ b/services/crunch-run/upload.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main // Originally based on sdk/go/crunchrunner/upload.go @@ -14,10 +18,15 @@ import ( "crypto/md5" "errors" "fmt" - "git.curoverse.com/arvados.git/sdk/go/keepclient" - "git.curoverse.com/arvados.git/sdk/go/manifest" "io" + "log" + "os" + "path/filepath" "strings" + "sync" + + "git.curoverse.com/arvados.git/sdk/go/keepclient" + "git.curoverse.com/arvados.git/sdk/go/manifest" ) // Block is a data block in a manifest stream @@ -77,27 +86,57 @@ func (m *CollectionFileWriter) Close() error { return nil } -func (m *CollectionFileWriter) goUpload() { +func (m *CollectionFileWriter) NewFile(fn string) { + m.offset += m.length + m.length = 0 + m.fn = fn +} + +func (m *CollectionFileWriter) goUpload(workers chan struct{}) { + var mtx sync.Mutex + var wg sync.WaitGroup + var errors []error uploader := m.uploader finish := m.finish for block := range uploader { - hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset])) - signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset]) - if err != nil { - errors = append(errors, err) - } else { - m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash) - } + mtx.Lock() + m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "") + blockIndex := len(m.ManifestStream.Blocks) - 1 + mtx.Unlock() + + workers <- struct{}{} // wait for an available worker slot + wg.Add(1) + + go func(block *Block, blockIndex int) { + hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset])) + signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset]) + <-workers + + mtx.Lock() + if err != nil { + errors = append(errors, err) + } else { + m.ManifestStream.Blocks[blockIndex] = signedHash + } + mtx.Unlock() + + wg.Done() + }(block, blockIndex) } + wg.Wait() + finish <- errors } -// CollectionWriter makes implements creating new Keep collections by opening files +// CollectionWriter implements creating new Keep collections by opening files // and writing to them. type CollectionWriter struct { + MaxWriters int IKeepClient Streams []*CollectionFileWriter + workers chan struct{} + mtx sync.Mutex } // Open a new file for writing in the Keep collection. @@ -123,7 +162,17 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser { make(chan *Block), make(chan []error), fn} - go fw.goUpload() + + m.mtx.Lock() + defer m.mtx.Unlock() + if m.workers == nil { + if m.MaxWriters < 1 { + m.MaxWriters = 2 + } + m.workers = make(chan struct{}, m.MaxWriters) + } + + go fw.goUpload(m.workers) m.Streams = append(m.Streams, fw) @@ -133,6 +182,9 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser { // Finish writing the collection, wait for all blocks to complete uploading. func (m *CollectionWriter) Finish() error { var errstring string + m.mtx.Lock() + defer m.mtx.Unlock() + for _, stream := range m.Streams { if stream.uploader == nil { continue @@ -168,7 +220,12 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) { var buf bytes.Buffer + m.mtx.Lock() + defer m.mtx.Unlock() for _, v := range m.Streams { + if len(v.FileStreamSegments) == 0 { + continue + } k := v.StreamName if k == "." { buf.WriteString(".") @@ -177,9 +234,13 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) { k = strings.Replace(k, "\n", "", -1) buf.WriteString("./" + k) } - for _, b := range v.Blocks { - buf.WriteString(" ") - buf.WriteString(b) + if len(v.Blocks) > 0 { + for _, b := range v.Blocks { + buf.WriteString(" ") + buf.WriteString(b) + } + } else { + buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0") } for _, f := range v.FileStreamSegments { buf.WriteString(" ") @@ -192,24 +253,41 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) { return buf.String(), nil } -func (m *CollectionWriter) WalkFunc(path string, info os.FileInfo, err error, stripPrefix string) error { - if info.IsDir() { - return nil - } +type WalkUpload struct { + MaxWriters int + kc IKeepClient + stripPrefix string + streamMap map[string]*CollectionFileWriter + status *log.Logger + workers chan struct{} + mtx sync.Mutex +} +func (m *WalkUpload) UploadFile(path string, sourcePath string) error { var dir string - if len(path) > (len(stripPrefix) + len(info.Name()) + 1) { - dir = path[len(stripPrefix)+1 : (len(path) - len(info.Name()) - 1)] + basename := filepath.Base(path) + if len(path) > (len(m.stripPrefix) + len(basename) + 1) { + dir = path[len(m.stripPrefix)+1 : (len(path) - len(basename) - 1)] } if dir == "" { dir = "." } - fn := path[(len(path) - len(info.Name())):] + fn := path[(len(path) - len(basename)):] - if m.Streams[dir] == nil { - m.Streams[dir] = &CollectionFileWriter{ - m.IKeepClient, + info, err := os.Stat(sourcePath) + if err != nil { + return err + } + file, err := os.Open(sourcePath) + if err != nil { + return err + } + defer file.Close() + + if m.streamMap[dir] == nil { + m.streamMap[dir] = &CollectionFileWriter{ + m.kc, &manifest.ManifestStream{StreamName: dir}, 0, 0, @@ -217,48 +295,47 @@ func (m *CollectionWriter) WalkFunc(path string, info os.FileInfo, err error, st make(chan *Block), make(chan []error), ""} - go m.Streams[dir].goUpload() - } - - stream := m.Streams[dir] - fileStart := stream.offset + m.mtx.Lock() + if m.workers == nil { + if m.MaxWriters < 1 { + m.MaxWriters = 2 + } + m.workers = make(chan struct{}, m.MaxWriters) + } + m.mtx.Unlock() - file, err := os.Open(path) - if err != nil { - return err + go m.streamMap[dir].goUpload(m.workers) } - log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size()) + fileWriter := m.streamMap[dir] - var count int64 - count, err = io.Copy(stream, file) + // Reset the CollectionFileWriter for a new file + fileWriter.NewFile(fn) + + m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size()) + + _, err = io.Copy(fileWriter, file) if err != nil { + m.status.Printf("Uh oh") return err } - stream.offset += count - - stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments, - manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn}) + // Commits the current file. Legal to call this repeatedly. + fileWriter.Close() return nil } -func WriteTree(kc IKeepClient, root string) (manifest string, err error) { - mw := CollectionWriter{kc, root, map[string]*ManifestStreamWriter{}} - err = filepath.Walk(root, func(path string, info os.FileInfo, err error) { - return mw.WalkFunc(path, info, err, root) - }) - - if err != nil { - return "", err - } +func (cw *CollectionWriter) BeginUpload(root string, status *log.Logger) *WalkUpload { + streamMap := make(map[string]*CollectionFileWriter) + return &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}} +} - err = mw.Finish() - if err != nil { - return "", err +func (cw *CollectionWriter) EndUpload(wu *WalkUpload) { + cw.mtx.Lock() + for _, st := range wu.streamMap { + cw.Streams = append(cw.Streams, st) } - - return mw.ManifestText(), nil + cw.mtx.Unlock() }