package arvados
import (
+ "context"
"encoding/json"
"fmt"
"io"
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
fs.fileSystem.root.Lock()
defer fs.fileSystem.root.Unlock()
- return fs.fileSystem.root.(*dirnode).marshalManifest(prefix, newThrottle(concurrentWriters))
+ return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters))
}
func (fs *collectionFileSystem) Size() int64 {
// children with the given names, which must be children of dn) to
// local persistent storage. Caller must have write lock on dn and the
// named children.
-func (dn *dirnode) sync(names []string, throttle *throttle) error {
+func (dn *dirnode) sync(ctx context.Context, names []string, throttle *throttle) error {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
type shortBlock struct {
fn *filenode
idx int
var pending []shortBlock
var pendingLen int
- var wg sync.WaitGroup
errors := make(chan error, 1)
+ var wg sync.WaitGroup
+ defer wg.Wait() // we have locks: unsafe to return until all goroutines finish
+
flush := func(sbs []shortBlock) {
defer wg.Done()
if len(sbs) == 0 {
}
throttle.Acquire()
defer throttle.Release()
+ if ctx.Err() != nil {
+ return
+ }
block := make([]byte, 0, maxBlockSize)
for _, sb := range sbs {
block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
case errors <- err:
default:
}
+ cancel()
}
off := 0
for _, sb := range sbs {
}
wg.Add(1)
flush(pending)
- wg.Wait()
- close(errors)
+ go func() {
+ wg.Wait()
+ close(errors)
+ }()
return <-errors
}
// caller must have write lock.
-func (dn *dirnode) marshalManifest(prefix string, throttle *throttle) (string, error) {
- var streamLen int64
- type filepart struct {
- name string
- offset int64
- length int64
- }
- var fileparts []filepart
- var subdirs string
- var blocks []string
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
if len(dn.inodes) == 0 {
if prefix == "." {
names = append(names, name)
}
sort.Strings(names)
+ var dirnames []string
+ var filenames []string
for _, name := range names {
node := dn.inodes[name]
node.Lock()
defer node.Unlock()
- }
- if err := dn.sync(names, throttle); err != nil {
- return "", err
- }
- for _, name := range names {
- switch node := dn.inodes[name].(type) {
+ switch node := node.(type) {
case *dirnode:
- subdir, err := node.marshalManifest(prefix+"/"+name, throttle)
+ dirnames = append(dirnames, name)
+ case *filenode:
+ filenames = append(filenames, name)
+ default:
+ panic(fmt.Sprintf("can't marshal inode type %T", node))
+ }
+ }
+
+ var wg sync.WaitGroup
+ errors := make(chan error, len(dirnames)+1)
+ subdirs := make([]string, len(dirnames))
+ rootdir := ""
+ for i, name := range dirnames {
+ wg.Add(1)
+ go func(i int, name string) {
+ defer wg.Done()
+ var err error
+ subdirs[i], err = dn.inodes[name].(*dirnode).marshalManifest(ctx, prefix+"/"+name, throttle)
if err != nil {
- return "", err
+ errors <- err
+ cancel()
}
- subdirs = subdirs + subdir
- case *filenode:
+ }(i, name)
+ }
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+
+ var streamLen int64
+ type filepart struct {
+ name string
+ offset int64
+ length int64
+ }
+
+ var fileparts []filepart
+ var blocks []string
+ if err := dn.sync(ctx, names, throttle); err != nil {
+ errors <- err
+ cancel()
+ return
+ }
+ for _, name := range filenames {
+ node := dn.inodes[name].(*filenode)
if len(node.segments) == 0 {
fileparts = append(fileparts, filepart{name: name})
- break
+ continue
}
for _, seg := range node.segments {
switch seg := seg.(type) {
panic(fmt.Sprintf("can't marshal segment type %T", seg))
}
}
- default:
- panic(fmt.Sprintf("can't marshal inode type %T", node))
}
+ var filetokens []string
+ for _, s := range fileparts {
+ filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
+ }
+ if len(filetokens) == 0 {
+ return
+ } else if len(blocks) == 0 {
+ blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+ }
+ rootdir = manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n"
+ }()
+
+ wg.Wait()
+ select {
+ case err := <-errors:
+ return "", err
+ default:
}
- var filetokens []string
- for _, s := range fileparts {
- filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
- }
- if len(filetokens) == 0 {
- return subdirs, nil
- } else if len(blocks) == 0 {
- blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
- }
- return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+ return rootdir + strings.Join(subdirs, ""), nil
}
func (dn *dirnode) loadManifest(txt string) error {