Merge branch 'master' into 11015-crunch-run-output-upload
authorradhika <radhika@curoverse.com>
Tue, 28 Feb 2017 18:23:45 +0000 (13:23 -0500)
committerradhika <radhika@curoverse.com>
Tue, 28 Feb 2017 18:23:45 +0000 (13:23 -0500)
services/crunch-run/crunchrun.go
services/crunch-run/upload.go
services/crunch-run/upload_test.go

index 0b59f7df91d12781b80659db7ea0be3c162a2703..561f423eaa1378c7ba09b30e29b96afe07c95311 100644 (file)
@@ -649,7 +649,7 @@ func (runner *ContainerRunner) CaptureOutput() error {
        _, err = os.Stat(collectionMetafile)
        if err != nil {
                // Regular directory
-               cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
+               cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
                manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
                if err != nil {
                        return fmt.Errorf("While uploading output files: %v", err)
@@ -1002,7 +1002,7 @@ func NewContainerRunner(api IArvadosClient,
        cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
        cr.MkTempDir = ioutil.TempDir
-       cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
+       cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
        cr.Container.UUID = containerUUID
        cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
index a068a2a77b3c6805b1eecdfc73d1df2eefd50ed0..7802fedb6c84c19abe224db26b586b412d067cdc 100644 (file)
@@ -87,27 +87,50 @@ func (m *CollectionFileWriter) NewFile(fn string) {
        m.fn = fn
 }
 
-func (m *CollectionFileWriter) goUpload() {
+func (m *CollectionFileWriter) goUpload(workers chan struct{}) {
+       var mtx sync.Mutex
+       var wg sync.WaitGroup
+
        var errors []error
        uploader := m.uploader
        finish := m.finish
        for block := range uploader {
-               hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
-               signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
-               if err != nil {
-                       errors = append(errors, err)
-               } else {
-                       m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
-               }
+               mtx.Lock()
+               m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "")
+               blockIndex := len(m.ManifestStream.Blocks) - 1
+               mtx.Unlock()
+
+               workers <- struct{}{} // wait for an available worker slot
+               wg.Add(1)
+
+               go func(block *Block, blockIndex int) {
+                       hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+                       signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
+                       <-workers
+
+                       mtx.Lock()
+                       if err != nil {
+                               errors = append(errors, err)
+                       } else {
+                               m.ManifestStream.Blocks[blockIndex] = signedHash
+                       }
+                       mtx.Unlock()
+
+                       wg.Done()
+               }(block, blockIndex)
        }
+       wg.Wait()
+
        finish <- errors
 }
 
 // CollectionWriter implements creating new Keep collections by opening files
 // and writing to them.
 type CollectionWriter struct {
+       MaxWriters int
        IKeepClient
        Streams []*CollectionFileWriter
+       workers chan struct{}
        mtx     sync.Mutex
 }
 
@@ -134,10 +157,18 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser {
                make(chan *Block),
                make(chan []error),
                fn}
-       go fw.goUpload()
 
        m.mtx.Lock()
        defer m.mtx.Unlock()
+       if m.workers == nil {
+               if m.MaxWriters < 1 {
+                       m.MaxWriters = 2
+               }
+               m.workers = make(chan struct{}, m.MaxWriters)
+       }
+
+       go fw.goUpload(m.workers)
+
        m.Streams = append(m.Streams, fw)
 
        return fw
@@ -218,10 +249,13 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
 }
 
 type WalkUpload struct {
+       MaxWriters  int
        kc          IKeepClient
        stripPrefix string
        streamMap   map[string]*CollectionFileWriter
        status      *log.Logger
+       workers     chan struct{}
+       mtx         sync.Mutex
 }
 
 // WalkFunc walks a directory tree, uploads each file found and adds it to the
@@ -252,7 +286,17 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
                        make(chan *Block),
                        make(chan []error),
                        ""}
-               go m.streamMap[dir].goUpload()
+
+               m.mtx.Lock()
+               if m.workers == nil {
+                       if m.MaxWriters < 1 {
+                               m.MaxWriters = 2
+                       }
+                       m.workers = make(chan struct{}, m.MaxWriters)
+               }
+               m.mtx.Unlock()
+
+               go m.streamMap[dir].goUpload(m.workers)
        }
 
        fileWriter := m.streamMap[dir]
@@ -281,7 +325,7 @@ func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
 
 func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
        streamMap := make(map[string]*CollectionFileWriter)
-       wu := &WalkUpload{cw.IKeepClient, root, streamMap, status}
+       wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
        err = filepath.Walk(root, wu.WalkFunc)
 
        if err != nil {
index b4b1efd1079ce60b827c4c23797b4804872c05eb..8d4ea079718573814a5d5035124389167469fa49 100644 (file)
@@ -21,7 +21,7 @@ func (s *TestSuite) TestSimpleUpload(c *C) {
 
        ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
 
-       cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+       cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
        str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
        c.Check(err, IsNil)
        c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
@@ -36,7 +36,7 @@ func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
        ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
        ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
 
-       cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+       cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
        str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
 
        c.Check(err, IsNil)
@@ -54,7 +54,7 @@ func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
        ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
        ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
 
-       cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+       cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
        str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
 
        c.Check(err, IsNil)
@@ -88,7 +88,7 @@ func (s *TestSuite) TestSimpleUploadLarge(c *C) {
 
        ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
 
-       cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+       cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
        str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
 
        c.Check(err, IsNil)
@@ -105,7 +105,7 @@ func (s *TestSuite) TestUploadEmptySubdir(c *C) {
 
        ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
 
-       cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+       cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
        str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
 
        c.Check(err, IsNil)
@@ -121,7 +121,7 @@ func (s *TestSuite) TestUploadEmptyFile(c *C) {
 
        ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
 
-       cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+       cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
        str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
 
        c.Check(err, IsNil)
@@ -137,7 +137,7 @@ func (s *TestSuite) TestUploadError(c *C) {
 
        ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
 
-       cw := CollectionWriter{&KeepErrorTestClient{}, nil, sync.Mutex{}}
+       cw := CollectionWriter{0, &KeepErrorTestClient{}, nil, nil, sync.Mutex{}}
        str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
 
        c.Check(err, NotNil)