12519: Simplified code by nesting an additional MergingLoader
[arvados.git] / services / crunch-run / upload.go
index 3285fccb7685097bda7b7ecfcd800444a0ad6edc..b54e336c2d99c50d472cc71b668e1c169ad9c2e4 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 // Originally based on sdk/go/crunchrunner/upload.go
@@ -14,14 +18,15 @@ import (
        "crypto/md5"
        "errors"
        "fmt"
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
        "io"
        "log"
        "os"
        "path/filepath"
        "strings"
        "sync"
+
+       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
 // Block is a data block in a manifest stream
@@ -87,27 +92,50 @@ func (m *CollectionFileWriter) NewFile(fn string) {
        m.fn = fn
 }
 
-func (m *CollectionFileWriter) goUpload() {
+func (m *CollectionFileWriter) goUpload(workers chan struct{}) {
+       var mtx sync.Mutex
+       var wg sync.WaitGroup
+
        var errors []error
        uploader := m.uploader
        finish := m.finish
        for block := range uploader {
-               hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
-               signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
-               if err != nil {
-                       errors = append(errors, err)
-               } else {
-                       m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
-               }
+               mtx.Lock()
+               m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "")
+               blockIndex := len(m.ManifestStream.Blocks) - 1
+               mtx.Unlock()
+
+               workers <- struct{}{} // wait for an available worker slot
+               wg.Add(1)
+
+               go func(block *Block, blockIndex int) {
+                       hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+                       signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
+                       <-workers
+
+                       mtx.Lock()
+                       if err != nil {
+                               errors = append(errors, err)
+                       } else {
+                               m.ManifestStream.Blocks[blockIndex] = signedHash
+                       }
+                       mtx.Unlock()
+
+                       wg.Done()
+               }(block, blockIndex)
        }
+       wg.Wait()
+
        finish <- errors
 }
 
-// CollectionWriter makes implements creating new Keep collections by opening files
+// CollectionWriter implements creating new Keep collections by opening files
 // and writing to them.
 type CollectionWriter struct {
+       MaxWriters int
        IKeepClient
        Streams []*CollectionFileWriter
+       workers chan struct{}
        mtx     sync.Mutex
 }
 
@@ -134,10 +162,18 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser {
                make(chan *Block),
                make(chan []error),
                fn}
-       go fw.goUpload()
 
        m.mtx.Lock()
        defer m.mtx.Unlock()
+       if m.workers == nil {
+               if m.MaxWriters < 1 {
+                       m.MaxWriters = 2
+               }
+               m.workers = make(chan struct{}, m.MaxWriters)
+       }
+
+       go fw.goUpload(m.workers)
+
        m.Streams = append(m.Streams, fw)
 
        return fw
@@ -187,6 +223,9 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
        m.mtx.Lock()
        defer m.mtx.Unlock()
        for _, v := range m.Streams {
+               if len(v.FileStreamSegments) == 0 {
+                       continue
+               }
                k := v.StreamName
                if k == "." {
                        buf.WriteString(".")
@@ -195,9 +234,13 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
                        k = strings.Replace(k, "\n", "", -1)
                        buf.WriteString("./" + k)
                }
-               for _, b := range v.Blocks {
-                       buf.WriteString(" ")
-                       buf.WriteString(b)
+               if len(v.Blocks) > 0 {
+                       for _, b := range v.Blocks {
+                               buf.WriteString(" ")
+                               buf.WriteString(b)
+                       }
+               } else {
+                       buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
                }
                for _, f := range v.FileStreamSegments {
                        buf.WriteString(" ")
@@ -211,29 +254,36 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
 }
 
 type WalkUpload struct {
+       MaxWriters  int
        kc          IKeepClient
        stripPrefix string
        streamMap   map[string]*CollectionFileWriter
        status      *log.Logger
+       workers     chan struct{}
+       mtx         sync.Mutex
 }
 
-// WalkFunc walks a directory tree, uploads each file found and adds it to the
-// CollectionWriter.
-func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
-
-       if info.IsDir() {
-               return nil
-       }
-
+func (m *WalkUpload) UploadFile(path string, sourcePath string) error {
        var dir string
-       if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
-               dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+       basename := filepath.Base(path)
+       if len(path) > (len(m.stripPrefix) + len(basename) + 1) {
+               dir = path[len(m.stripPrefix)+1 : (len(path) - len(basename) - 1)]
        }
        if dir == "" {
                dir = "."
        }
 
-       fn := path[(len(path) - len(info.Name())):]
+       fn := path[(len(path) - len(basename)):]
+
+       info, err := os.Stat(sourcePath)
+       if err != nil {
+               return err
+       }
+       file, err := os.Open(sourcePath)
+       if err != nil {
+               return err
+       }
+       defer file.Close()
 
        if m.streamMap[dir] == nil {
                m.streamMap[dir] = &CollectionFileWriter{
@@ -245,7 +295,17 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
                        make(chan *Block),
                        make(chan []error),
                        ""}
-               go m.streamMap[dir].goUpload()
+
+               m.mtx.Lock()
+               if m.workers == nil {
+                       if m.MaxWriters < 1 {
+                               m.MaxWriters = 2
+                       }
+                       m.workers = make(chan struct{}, m.MaxWriters)
+               }
+               m.mtx.Unlock()
+
+               go m.streamMap[dir].goUpload(m.workers)
        }
 
        fileWriter := m.streamMap[dir]
@@ -253,16 +313,11 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
        // Reset the CollectionFileWriter for a new file
        fileWriter.NewFile(fn)
 
-       file, err := os.Open(path)
-       if err != nil {
-               return err
-       }
-       defer file.Close()
-
        m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
 
        _, err = io.Copy(fileWriter, file)
        if err != nil {
+               m.status.Printf("Uh oh")
                return err
        }
 
@@ -272,20 +327,15 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
        return nil
 }
 
-func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
+func (cw *CollectionWriter) BeginUpload(root string, status *log.Logger) *WalkUpload {
        streamMap := make(map[string]*CollectionFileWriter)
-       wu := &WalkUpload{cw.IKeepClient, root, streamMap, status}
-       err = filepath.Walk(root, wu.WalkFunc)
-
-       if err != nil {
-               return "", err
-       }
+       return &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
+}
 
+func (cw *CollectionWriter) EndUpload(wu *WalkUpload) {
        cw.mtx.Lock()
-       for _, st := range streamMap {
+       for _, st := range wu.streamMap {
                cw.Streams = append(cw.Streams, st)
        }
        cw.mtx.Unlock()
-
-       return cw.ManifestText()
 }