_, err = os.Stat(collectionMetafile)
if err != nil {
// Regular directory
- cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
+ cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
if err != nil {
return fmt.Errorf("While uploading output files: %v", err)
cr.NewLogWriter = cr.NewArvLogWriter
cr.RunArvMount = cr.ArvMountCmd
cr.MkTempDir = ioutil.TempDir
- cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
+ cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
cr.Container.UUID = containerUUID
cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
m.fn = fn
}
-func (m *CollectionFileWriter) goUpload() {
+func (m *CollectionFileWriter) goUpload(workers chan struct{}) {
+ var mtx sync.Mutex
+ var wg sync.WaitGroup
+
var errors []error
uploader := m.uploader
finish := m.finish
for block := range uploader {
- hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
- signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
- if err != nil {
- errors = append(errors, err)
- } else {
- m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
- }
+ mtx.Lock()
+ m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "")
+ blockIndex := len(m.ManifestStream.Blocks) - 1
+ mtx.Unlock()
+
+ workers <- struct{}{} // wait for an available worker slot
+ wg.Add(1)
+
+ go func(block *Block, blockIndex int) {
+ hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+ signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
+ <-workers
+
+ mtx.Lock()
+ if err != nil {
+ errors = append(errors, err)
+ } else {
+ m.ManifestStream.Blocks[blockIndex] = signedHash
+ }
+ mtx.Unlock()
+
+ wg.Done()
+ }(block, blockIndex)
}
+ wg.Wait()
+
finish <- errors
}
// CollectionWriter implements creating new Keep collections by opening files
// and writing to them.
type CollectionWriter struct {
+ MaxWriters int
IKeepClient
Streams []*CollectionFileWriter
+ workers chan struct{}
mtx sync.Mutex
}
make(chan *Block),
make(chan []error),
fn}
- go fw.goUpload()
m.mtx.Lock()
defer m.mtx.Unlock()
+ if m.workers == nil {
+ if m.MaxWriters < 1 {
+ m.MaxWriters = 2
+ }
+ m.workers = make(chan struct{}, m.MaxWriters)
+ }
+
+ go fw.goUpload(m.workers)
+
m.Streams = append(m.Streams, fw)
return fw
}
type WalkUpload struct {
+ MaxWriters int
kc IKeepClient
stripPrefix string
streamMap map[string]*CollectionFileWriter
status *log.Logger
+ workers chan struct{}
+ mtx sync.Mutex
}
// WalkFunc walks a directory tree, uploads each file found and adds it to the
make(chan *Block),
make(chan []error),
""}
- go m.streamMap[dir].goUpload()
+
+ m.mtx.Lock()
+ if m.workers == nil {
+ if m.MaxWriters < 1 {
+ m.MaxWriters = 2
+ }
+ m.workers = make(chan struct{}, m.MaxWriters)
+ }
+ m.mtx.Unlock()
+
+ go m.streamMap[dir].goUpload(m.workers)
}
fileWriter := m.streamMap[dir]
func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
streamMap := make(map[string]*CollectionFileWriter)
- wu := &WalkUpload{cw.IKeepClient, root, streamMap, status}
+ wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
err = filepath.Walk(root, wu.WalkFunc)
if err != nil {
ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
- cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+ cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, IsNil)
c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
- cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+ cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, IsNil)
ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
- cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+ cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, IsNil)
ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
- cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+ cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, IsNil)
ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
- cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+ cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, IsNil)
ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
- cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+ cw := CollectionWriter{0, &KeepTestClient{}, nil, nil, sync.Mutex{}}
str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, IsNil)
ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
- cw := CollectionWriter{&KeepErrorTestClient{}, nil, sync.Mutex{}}
+ cw := CollectionWriter{0, &KeepErrorTestClient{}, nil, nil, sync.Mutex{}}
str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, NotNil)