+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
// Originally based on sdk/go/crunchrunner/upload.go
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
"io"
+ "log"
+ "os"
+ "path/filepath"
"strings"
+ "sync"
)
// Block is a data block in a manifest stream
return nil
}
-func (m *CollectionFileWriter) goUpload() {
+func (m *CollectionFileWriter) NewFile(fn string) {
+ m.offset += m.length
+ m.length = 0
+ m.fn = fn
+}
+
+func (m *CollectionFileWriter) goUpload(workers chan struct{}) {
+ var mtx sync.Mutex
+ var wg sync.WaitGroup
+
var errors []error
uploader := m.uploader
finish := m.finish
for block := range uploader {
- hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
- signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
- if err != nil {
- errors = append(errors, err)
- } else {
- m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
- }
+ mtx.Lock()
+ m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "")
+ blockIndex := len(m.ManifestStream.Blocks) - 1
+ mtx.Unlock()
+
+ workers <- struct{}{} // wait for an available worker slot
+ wg.Add(1)
+
+ go func(block *Block, blockIndex int) {
+ hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
+ signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
+ <-workers
+
+ mtx.Lock()
+ if err != nil {
+ errors = append(errors, err)
+ } else {
+ m.ManifestStream.Blocks[blockIndex] = signedHash
+ }
+ mtx.Unlock()
+
+ wg.Done()
+ }(block, blockIndex)
}
+ wg.Wait()
+
finish <- errors
}
-// CollectionWriter makes implements creating new Keep collections by opening files
+// CollectionWriter implements creating new Keep collections by opening files
// and writing to them.
type CollectionWriter struct {
+ MaxWriters int
IKeepClient
Streams []*CollectionFileWriter
+ workers chan struct{}
+ mtx sync.Mutex
}
// Open a new file for writing in the Keep collection.
make(chan *Block),
make(chan []error),
fn}
- go fw.goUpload()
+
+ m.mtx.Lock()
+ defer m.mtx.Unlock()
+ if m.workers == nil {
+ if m.MaxWriters < 1 {
+ m.MaxWriters = 2
+ }
+ m.workers = make(chan struct{}, m.MaxWriters)
+ }
+
+ go fw.goUpload(m.workers)
m.Streams = append(m.Streams, fw)
// Finish writing the collection, wait for all blocks to complete uploading.
func (m *CollectionWriter) Finish() error {
var errstring string
+ m.mtx.Lock()
+ defer m.mtx.Unlock()
+
for _, stream := range m.Streams {
if stream.uploader == nil {
continue
var buf bytes.Buffer
+ m.mtx.Lock()
+ defer m.mtx.Unlock()
for _, v := range m.Streams {
+ if len(v.FileStreamSegments) == 0 {
+ continue
+ }
k := v.StreamName
if k == "." {
buf.WriteString(".")
k = strings.Replace(k, "\n", "", -1)
buf.WriteString("./" + k)
}
- for _, b := range v.Blocks {
- buf.WriteString(" ")
- buf.WriteString(b)
+ if len(v.Blocks) > 0 {
+ for _, b := range v.Blocks {
+ buf.WriteString(" ")
+ buf.WriteString(b)
+ }
+ } else {
+ buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
}
for _, f := range v.FileStreamSegments {
buf.WriteString(" ")
return buf.String(), nil
}
-func (m *CollectionWriter) WalkFunc(path string, info os.FileInfo, err error, stripPrefix string) error {
+type WalkUpload struct {
+ MaxWriters int
+ kc IKeepClient
+ stripPrefix string
+ streamMap map[string]*CollectionFileWriter
+ status *log.Logger
+ workers chan struct{}
+ mtx sync.Mutex
+}
+
+// WalkFunc walks a directory tree, uploads each file found and adds it to the
+// CollectionWriter.
+func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
+
if info.IsDir() {
return nil
}
var dir string
- if len(path) > (len(stripPrefix) + len(info.Name()) + 1) {
- dir = path[len(stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+ if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+ dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
}
if dir == "" {
dir = "."
fn := path[(len(path) - len(info.Name())):]
- if m.Streams[dir] == nil {
- m.Streams[dir] = &CollectionFileWriter{
- m.IKeepClient,
+ if m.streamMap[dir] == nil {
+ m.streamMap[dir] = &CollectionFileWriter{
+ m.kc,
&manifest.ManifestStream{StreamName: dir},
0,
0,
make(chan *Block),
make(chan []error),
""}
- go m.Streams[dir].goUpload()
+
+ m.mtx.Lock()
+ if m.workers == nil {
+ if m.MaxWriters < 1 {
+ m.MaxWriters = 2
+ }
+ m.workers = make(chan struct{}, m.MaxWriters)
+ }
+ m.mtx.Unlock()
+
+ go m.streamMap[dir].goUpload(m.workers)
}
- stream := m.Streams[dir]
+ fileWriter := m.streamMap[dir]
- fileStart := stream.offset
+ // Reset the CollectionFileWriter for a new file
+ fileWriter.NewFile(fn)
file, err := os.Open(path)
if err != nil {
return err
}
+ defer file.Close()
- log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+ m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
- var count int64
- count, err = io.Copy(stream, file)
+ _, err = io.Copy(fileWriter, file)
if err != nil {
return err
}
- stream.offset += count
-
- stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments,
- manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn})
+ // Commits the current file. Legal to call this repeatedly.
+ fileWriter.Close()
return nil
}
-func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
- mw := CollectionWriter{kc, root, map[string]*ManifestStreamWriter{}}
- err = filepath.Walk(root, func(path string, info os.FileInfo, err error) {
- return mw.WalkFunc(path, info, err, root)
- })
+func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
+ streamMap := make(map[string]*CollectionFileWriter)
+ wu := &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
+ err = filepath.Walk(root, wu.WalkFunc)
if err != nil {
return "", err
}
- err = mw.Finish()
- if err != nil {
- return "", err
+ cw.mtx.Lock()
+ for _, st := range streamMap {
+ cw.Streams = append(cw.Streams, st)
}
+ cw.mtx.Unlock()
- return mw.ManifestText(), nil
+ return cw.ManifestText()
}