14287: Add ensure_unique_name param to untrash options.
[arvados.git] / sdk / go / arvados / fs_collection.go
index 7ce37aa24e7b35bfbabec9508af3b2e308d4cc76..972b3979fcfa4dd7fdb3cde62a90eacd37b27c56 100644 (file)
@@ -5,10 +5,10 @@
 package arvados
 
 import (
+       "context"
        "encoding/json"
        "fmt"
        "io"
-       "log"
        "os"
        "path"
        "regexp"
@@ -19,7 +19,11 @@ import (
        "time"
 )
 
-var maxBlockSize = 1 << 26
+var (
+       maxBlockSize      = 1 << 26
+       concurrentWriters = 4 // max goroutines writing to Keep during sync()
+       writeAheadBlocks  = 1 // max background jobs flushing to Keep before blocking writes
+)
 
 // A CollectionFileSystem is a FileSystem that can be serialized as a
 // manifest and stored as a collection.
@@ -31,6 +35,9 @@ type CollectionFileSystem interface {
        // Prefix (normally ".") is a top level directory, effectively
        // prepended to all paths in the returned manifest.
        MarshalManifest(prefix string) (string, error)
+
+       // Total data bytes in all files.
+       Size() int64
 }
 
 type collectionFileSystem struct {
@@ -113,30 +120,37 @@ func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime t
 }
 
 func (fs *collectionFileSystem) Sync() error {
-       log.Printf("cfs.Sync()")
        if fs.uuid == "" {
                return nil
        }
        txt, err := fs.MarshalManifest(".")
        if err != nil {
-               log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
-               return err
+               return fmt.Errorf("sync failed: %s", err)
        }
        coll := &Collection{
                UUID:         fs.uuid,
                ManifestText: txt,
        }
-       err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, fs.UpdateBody(coll), map[string]interface{}{"select": []string{"uuid"}})
+       err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
+               "collection": map[string]string{
+                       "manifest_text": coll.ManifestText,
+               },
+               "select": []string{"uuid"},
+       })
        if err != nil {
-               log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
+               return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
        }
-       return err
+       return nil
 }
 
 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
        fs.fileSystem.root.Lock()
        defer fs.fileSystem.root.Unlock()
-       return fs.fileSystem.root.(*dirnode).marshalManifest(prefix)
+       return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters))
+}
+
+func (fs *collectionFileSystem) Size() int64 {
+       return fs.fileSystem.root.(*dirnode).TreeSize()
 }
 
 // filenodePtr is an offset into a file that is (usually) efficient to
@@ -224,6 +238,7 @@ type filenode struct {
        memsize  int64 // bytes in memSegments
        sync.RWMutex
        nullnode
+       throttle *throttle
 }
 
 // caller must have lock
@@ -486,30 +501,75 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 // Write some data out to disk to reduce memory use. Caller must have
 // write lock.
 func (fn *filenode) pruneMemSegments() {
-       // TODO: async (don't hold Lock() while waiting for Keep)
        // TODO: share code with (*dirnode)sync()
        // TODO: pack/flush small blocks too, when fragmented
+       if fn.throttle == nil {
+               // TODO: share a throttle with filesystem
+               fn.throttle = newThrottle(writeAheadBlocks)
+       }
        for idx, seg := range fn.segments {
                seg, ok := seg.(*memSegment)
-               if !ok || seg.Len() < maxBlockSize {
-                       continue
-               }
-               locator, _, err := fn.FS().PutB(seg.buf)
-               if err != nil {
-                       // TODO: stall (or return errors from)
-                       // subsequent writes until flushing
-                       // starts to succeed
+               if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
                        continue
                }
-               fn.memsize -= int64(seg.Len())
-               fn.segments[idx] = storedSegment{
-                       kc:      fn.FS(),
-                       locator: locator,
-                       size:    seg.Len(),
-                       offset:  0,
-                       length:  seg.Len(),
+               // Setting seg.flushing guarantees seg.buf will not be
+               // modified in place: WriteAt and Truncate will
+               // allocate a new buf instead, if necessary.
+               idx, buf := idx, seg.buf
+               done := make(chan struct{})
+               seg.flushing = done
+               // If lots of background writes are already in
+               // progress, block here until one finishes, rather
+               // than pile up an unlimited number of buffered writes
+               // and network flush operations.
+               fn.throttle.Acquire()
+               go func() {
+                       defer close(done)
+                       locator, _, err := fn.FS().PutB(buf)
+                       fn.throttle.Release()
+                       fn.Lock()
+                       defer fn.Unlock()
+                       if curbuf := seg.buf[:1]; &curbuf[0] != &buf[0] {
+                               // A new seg.buf has been allocated.
+                               return
+                       }
+                       seg.flushing = nil
+                       if err != nil {
+                               // TODO: stall (or return errors from)
+                               // subsequent writes until flushing
+                               // starts to succeed.
+                               return
+                       }
+                       if len(fn.segments) <= idx || fn.segments[idx] != seg || len(seg.buf) != len(buf) {
+                               // Segment has been dropped/moved/resized.
+                               return
+                       }
+                       fn.memsize -= int64(len(buf))
+                       fn.segments[idx] = storedSegment{
+                               kc:      fn.FS(),
+                               locator: locator,
+                               size:    len(buf),
+                               offset:  0,
+                               length:  len(buf),
+                       }
+               }()
+       }
+}
+
+// Block until all pending pruneMemSegments work is finished. Caller
+// must NOT have lock.
+func (fn *filenode) waitPrune() {
+       var pending []<-chan struct{}
+       fn.Lock()
+       for _, seg := range fn.segments {
+               if seg, ok := seg.(*memSegment); ok && seg.flushing != nil {
+                       pending = append(pending, seg.flushing)
                }
        }
+       fn.Unlock()
+       for _, p := range pending {
+               <-p
+       }
 }
 
 type dirnode struct {
@@ -542,118 +602,184 @@ func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode
        return dn.treenode.Child(name, replace)
 }
 
-// sync flushes in-memory data (for all files in the tree rooted at
-// dn) to persistent storage. Caller must hold dn.Lock().
-func (dn *dirnode) sync() error {
-       type shortBlock struct {
-               fn  *filenode
-               idx int
-       }
-       var pending []shortBlock
-       var pendingLen int
+type fnSegmentRef struct {
+       fn  *filenode
+       idx int
+}
 
-       flush := func(sbs []shortBlock) error {
-               if len(sbs) == 0 {
-                       return nil
-               }
-               block := make([]byte, 0, maxBlockSize)
-               for _, sb := range sbs {
-                       block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
-               }
-               locator, _, err := dn.fs.PutB(block)
-               if err != nil {
-                       return err
-               }
-               off := 0
-               for _, sb := range sbs {
-                       data := sb.fn.segments[sb.idx].(*memSegment).buf
-                       sb.fn.segments[sb.idx] = storedSegment{
-                               kc:      dn.fs,
-                               locator: locator,
-                               size:    len(block),
-                               offset:  off,
-                               length:  len(data),
-                       }
-                       off += len(data)
-                       sb.fn.memsize -= int64(len(data))
-               }
+// commitBlock concatenates the data from the given filenode segments
+// (which must be *memSegments), writes the data out to Keep as a
+// single block, and replaces the filenodes' *memSegments with
+// storedSegments that reference the relevant portions of the new
+// block.
+//
+// Caller must have write lock.
+func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
+       if len(refs) == 0 {
                return nil
        }
+       throttle.Acquire()
+       defer throttle.Release()
+       if err := ctx.Err(); err != nil {
+               return err
+       }
+       block := make([]byte, 0, maxBlockSize)
+       for _, ref := range refs {
+               block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
+       }
+       locator, _, err := dn.fs.PutB(block)
+       if err != nil {
+               return err
+       }
+       off := 0
+       for _, ref := range refs {
+               data := ref.fn.segments[ref.idx].(*memSegment).buf
+               ref.fn.segments[ref.idx] = storedSegment{
+                       kc:      dn.fs,
+                       locator: locator,
+                       size:    len(block),
+                       offset:  off,
+                       length:  len(data),
+               }
+               off += len(data)
+               ref.fn.memsize -= int64(len(data))
+       }
+       return nil
+}
 
-       names := make([]string, 0, len(dn.inodes))
-       for name := range dn.inodes {
-               names = append(names, name)
+// sync flushes in-memory data and remote block references (for the
+// children with the given names, which must be children of dn) to
+// local persistent storage. Caller must have write lock on dn and the
+// named children.
+func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
+       cg := newContextGroup(ctx)
+       defer cg.Cancel()
+
+       goCommit := func(refs []fnSegmentRef) {
+               cg.Go(func() error {
+                       return dn.commitBlock(cg.Context(), throttle, refs)
+               })
        }
-       sort.Strings(names)
 
+       var pending []fnSegmentRef
+       var pendingLen int = 0
+       localLocator := map[string]string{}
        for _, name := range names {
                fn, ok := dn.inodes[name].(*filenode)
                if !ok {
                        continue
                }
-               fn.Lock()
-               defer fn.Unlock()
                for idx, seg := range fn.segments {
-                       seg, ok := seg.(*memSegment)
-                       if !ok {
-                               continue
-                       }
-                       if seg.Len() > maxBlockSize/2 {
-                               if err := flush([]shortBlock{{fn, idx}}); err != nil {
-                                       return err
+                       switch seg := seg.(type) {
+                       case storedSegment:
+                               loc, ok := localLocator[seg.locator]
+                               if !ok {
+                                       var err error
+                                       loc, err = dn.fs.LocalLocator(seg.locator)
+                                       if err != nil {
+                                               return err
+                                       }
+                                       localLocator[seg.locator] = loc
                                }
-                               continue
-                       }
-                       if pendingLen+seg.Len() > maxBlockSize {
-                               if err := flush(pending); err != nil {
-                                       return err
+                               seg.locator = loc
+                               fn.segments[idx] = seg
+                       case *memSegment:
+                               if seg.Len() > maxBlockSize/2 {
+                                       goCommit([]fnSegmentRef{{fn, idx}})
+                                       continue
+                               }
+                               if pendingLen+seg.Len() > maxBlockSize {
+                                       goCommit(pending)
+                                       pending = nil
+                                       pendingLen = 0
                                }
-                               pending = nil
-                               pendingLen = 0
+                               pending = append(pending, fnSegmentRef{fn, idx})
+                               pendingLen += seg.Len()
+                       default:
+                               panic(fmt.Sprintf("can't sync segment type %T", seg))
                        }
-                       pending = append(pending, shortBlock{fn, idx})
-                       pendingLen += seg.Len()
                }
        }
-       return flush(pending)
+       goCommit(pending)
+       return cg.Wait()
 }
 
-// caller must have read lock.
-func (dn *dirnode) marshalManifest(prefix string) (string, error) {
-       var streamLen int64
-       type filepart struct {
-               name   string
-               offset int64
-               length int64
-       }
-       var fileparts []filepart
-       var subdirs string
-       var blocks []string
+// caller must have write lock.
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
+       cg := newContextGroup(ctx)
+       defer cg.Cancel()
 
-       if err := dn.sync(); err != nil {
-               return "", err
+       if len(dn.inodes) == 0 {
+               if prefix == "." {
+                       return "", nil
+               }
+               // Express the existence of an empty directory by
+               // adding an empty file named `\056`, which (unlike
+               // the more obvious spelling `.`) is accepted by the
+               // API's manifest validator.
+               return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
        }
 
        names := make([]string, 0, len(dn.inodes))
-       for name, node := range dn.inodes {
+       for name := range dn.inodes {
                names = append(names, name)
-               node.Lock()
-               defer node.Unlock()
        }
        sort.Strings(names)
 
+       // Wait for children to finish any pending write operations
+       // before locking them.
        for _, name := range names {
-               switch node := dn.inodes[name].(type) {
+               node := dn.inodes[name]
+               if fn, ok := node.(*filenode); ok {
+                       fn.waitPrune()
+               }
+       }
+
+       var dirnames []string
+       var filenames []string
+       for _, name := range names {
+               node := dn.inodes[name]
+               node.Lock()
+               defer node.Unlock()
+               switch node := node.(type) {
                case *dirnode:
-                       subdir, err := node.marshalManifest(prefix + "/" + name)
-                       if err != nil {
-                               return "", err
-                       }
-                       subdirs = subdirs + subdir
+                       dirnames = append(dirnames, name)
                case *filenode:
+                       filenames = append(filenames, name)
+               default:
+                       panic(fmt.Sprintf("can't marshal inode type %T", node))
+               }
+       }
+
+       subdirs := make([]string, len(dirnames))
+       rootdir := ""
+       for i, name := range dirnames {
+               i, name := i, name
+               cg.Go(func() error {
+                       txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, throttle)
+                       subdirs[i] = txt
+                       return err
+               })
+       }
+
+       cg.Go(func() error {
+               var streamLen int64
+               type filepart struct {
+                       name   string
+                       offset int64
+                       length int64
+               }
+
+               var fileparts []filepart
+               var blocks []string
+               if err := dn.sync(cg.Context(), throttle, names); err != nil {
+                       return err
+               }
+               for _, name := range filenames {
+                       node := dn.inodes[name].(*filenode)
                        if len(node.segments) == 0 {
                                fileparts = append(fileparts, filepart{name: name})
-                               break
+                               continue
                        }
                        for _, seg := range node.segments {
                                switch seg := seg.(type) {
@@ -683,20 +809,21 @@ func (dn *dirnode) marshalManifest(prefix string) (string, error) {
                                        panic(fmt.Sprintf("can't marshal segment type %T", seg))
                                }
                        }
-               default:
-                       panic(fmt.Sprintf("can't marshal inode type %T", node))
                }
-       }
-       var filetokens []string
-       for _, s := range fileparts {
-               filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
-       }
-       if len(filetokens) == 0 {
-               return subdirs, nil
-       } else if len(blocks) == 0 {
-               blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
-       }
-       return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+               var filetokens []string
+               for _, s := range fileparts {
+                       filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
+               }
+               if len(filetokens) == 0 {
+                       return nil
+               } else if len(blocks) == 0 {
+                       blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+               }
+               rootdir = manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n"
+               return nil
+       })
+       err := cg.Wait()
+       return rootdir + strings.Join(subdirs, ""), err
 }
 
 func (dn *dirnode) loadManifest(txt string) error {
@@ -757,8 +884,14 @@ func (dn *dirnode) loadManifest(txt string) error {
                        }
                        name := dirname + "/" + manifestUnescape(toks[2])
                        fnode, err := dn.createFileAndParents(name)
-                       if err != nil {
-                               return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
+                       if fnode == nil && err == nil && length == 0 {
+                               // Special case: an empty file used as
+                               // a marker to preserve an otherwise
+                               // empty directory in a manifest.
+                               continue
+                       }
+                       if err != nil || (fnode == nil && length != 0) {
+                               return fmt.Errorf("line %d: cannot use path %q with length %d: %s", lineno, name, length, err)
                        }
                        // Map the stream offset/range coordinates to
                        // block/offset/range coordinates and add
@@ -816,15 +949,14 @@ func (dn *dirnode) loadManifest(txt string) error {
        return nil
 }
 
-// only safe to call from loadManifest -- no locking
+// only safe to call from loadManifest -- no locking.
+//
+// If path is a "parent directory exists" marker (the last path
+// component is "."), the returned values are both nil.
 func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
        var node inode = dn
        names := strings.Split(path, "/")
        basename := names[len(names)-1]
-       if !permittedName(basename) {
-               err = fmt.Errorf("invalid file part %q in path %q", basename, path)
-               return
-       }
        for _, name := range names[:len(names)-1] {
                switch name {
                case "", ".":
@@ -855,6 +987,12 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
                        return
                }
        }
+       if basename == "." {
+               return
+       } else if !permittedName(basename) {
+               err = fmt.Errorf("invalid file part %q in path %q", basename, path)
+               return
+       }
        _, err = node.Child(basename, func(child inode) (inode, error) {
                switch child := child.(type) {
                case nil:
@@ -877,6 +1015,20 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
        return
 }
 
+func (dn *dirnode) TreeSize() (bytes int64) {
+       dn.RLock()
+       defer dn.RUnlock()
+       for _, i := range dn.inodes {
+               switch i := i.(type) {
+               case *filenode:
+                       bytes += i.Size()
+               case *dirnode:
+                       bytes += i.TreeSize()
+               }
+       }
+       return
+}
+
 type segment interface {
        io.ReaderAt
        Len() int
@@ -887,6 +1039,11 @@ type segment interface {
 
 type memSegment struct {
        buf []byte
+       // If flushing is not nil, then a) buf is being shared by a
+       // pruneMemSegments goroutine, and must be copied on write;
+       // and b) the flushing channel will close when the goroutine
+       // finishes, whether it succeeds or not.
+       flushing <-chan struct{}
 }
 
 func (me *memSegment) Len() int {
@@ -903,28 +1060,31 @@ func (me *memSegment) Slice(off, length int) segment {
 }
 
 func (me *memSegment) Truncate(n int) {
-       if n > cap(me.buf) {
+       if n > cap(me.buf) || (me.flushing != nil && n > len(me.buf)) {
                newsize := 1024
                for newsize < n {
                        newsize = newsize << 2
                }
                newbuf := make([]byte, n, newsize)
                copy(newbuf, me.buf)
-               me.buf = newbuf
+               me.buf, me.flushing = newbuf, nil
        } else {
-               // Zero unused part when shrinking, in case we grow
-               // and start using it again later.
-               for i := n; i < len(me.buf); i++ {
+               // reclaim existing capacity, and zero reclaimed part
+               oldlen := len(me.buf)
+               me.buf = me.buf[:n]
+               for i := oldlen; i < n; i++ {
                        me.buf[i] = 0
                }
        }
-       me.buf = me.buf[:n]
 }
 
 func (me *memSegment) WriteAt(p []byte, off int) {
        if off+len(p) > len(me.buf) {
                panic("overflowed segment")
        }
+       if me.flushing != nil {
+               me.buf, me.flushing = append([]byte(nil), me.buf...), nil
+       }
        copy(me.buf[off:], p)
 }