package arvados
import (
+ "context"
"encoding/json"
"fmt"
"io"
- "log"
"os"
"path"
"regexp"
"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
)
-var maxBlockSize = 1 << 26
+var (
+ maxBlockSize = 1 << 26
+ concurrentWriters = 4 // max goroutines writing to Keep in background and during flush()
+)
// A CollectionFileSystem is a FileSystem that can be serialized as a
// manifest and stored as a collection.
// Total data bytes in all files.
Size() int64
+
+ // Memory consumed by buffered file data.
+ memorySize() int64
}
type collectionFileSystem struct {
// FileSystem returns a CollectionFileSystem for the collection.
func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) {
- var modTime time.Time
- if c.ModifiedAt == nil {
+ modTime := c.ModifiedAt
+ if modTime.IsZero() {
modTime = time.Now()
- } else {
- modTime = *c.ModifiedAt
}
fs := &collectionFileSystem{
uuid: c.UUID,
fileSystem: fileSystem{
fsBackend: keepBackend{apiClient: client, keepClient: kc},
+ thr: newThrottle(concurrentWriters),
},
}
root := &dirnode{
}
func (fs *collectionFileSystem) Sync() error {
- log.Printf("cfs.Sync()")
if fs.uuid == "" {
return nil
}
txt, err := fs.MarshalManifest(".")
if err != nil {
- log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
- return err
+ return fmt.Errorf("sync failed: %s", err)
}
coll := &Collection{
UUID: fs.uuid,
ManifestText: txt,
}
- err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, fs.UpdateBody(coll), map[string]interface{}{"select": []string{"uuid"}})
+ err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
+ "collection": map[string]string{
+ "manifest_text": coll.ManifestText,
+ },
+ "select": []string{"uuid"},
+ })
+ if err != nil {
+ return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
+ }
+ return nil
+}
+
+func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
+ node, err := rlookup(fs.fileSystem.root, path)
if err != nil {
- log.Printf("WARNING: (collectionFileSystem)Sync() failed: %s", err)
+ return err
+ }
+ dn, ok := node.(*dirnode)
+ if !ok {
+ return ErrNotADirectory
+ }
+ dn.Lock()
+ defer dn.Unlock()
+ names := dn.sortedNames()
+ if path != "" {
+ // Caller only wants to flush the specified dir,
+ // non-recursively. Drop subdirs from the list of
+ // names.
+ var filenames []string
+ for _, name := range names {
+ if _, ok := dn.inodes[name].(*filenode); ok {
+ filenames = append(filenames, name)
+ }
+ }
+ names = filenames
+ }
+ for _, name := range names {
+ child := dn.inodes[name]
+ child.Lock()
+ defer child.Unlock()
}
- return err
+ return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
+}
+
+func (fs *collectionFileSystem) memorySize() int64 {
+ fs.fileSystem.root.Lock()
+ defer fs.fileSystem.root.Unlock()
+ return fs.fileSystem.root.(*dirnode).memorySize()
}
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
fs.fileSystem.root.Lock()
defer fs.fileSystem.root.Unlock()
- return fs.fileSystem.root.(*dirnode).marshalManifest(prefix)
+ return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix)
}
func (fs *collectionFileSystem) Size() int64 {
// Write some data out to disk to reduce memory use. Caller must have
// write lock.
func (fn *filenode) pruneMemSegments() {
- // TODO: async (don't hold Lock() while waiting for Keep)
- // TODO: share code with (*dirnode)sync()
+ // TODO: share code with (*dirnode)flush()
// TODO: pack/flush small blocks too, when fragmented
for idx, seg := range fn.segments {
seg, ok := seg.(*memSegment)
- if !ok || seg.Len() < maxBlockSize {
- continue
- }
- locator, _, err := fn.FS().PutB(seg.buf)
- if err != nil {
- // TODO: stall (or return errors from)
- // subsequent writes until flushing
- // starts to succeed
+ if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
continue
}
- fn.memsize -= int64(seg.Len())
- fn.segments[idx] = storedSegment{
- kc: fn.FS(),
- locator: locator,
- size: seg.Len(),
- offset: 0,
- length: seg.Len(),
+ // Setting seg.flushing guarantees seg.buf will not be
+ // modified in place: WriteAt and Truncate will
+ // allocate a new buf instead, if necessary.
+ idx, buf := idx, seg.buf
+ done := make(chan struct{})
+ seg.flushing = done
+ // If lots of background writes are already in
+ // progress, block here until one finishes, rather
+ // than pile up an unlimited number of buffered writes
+ // and network flush operations.
+ fn.fs.throttle().Acquire()
+ go func() {
+ defer close(done)
+ locator, _, err := fn.FS().PutB(buf)
+ fn.fs.throttle().Release()
+ fn.Lock()
+ defer fn.Unlock()
+ if seg.flushing != done {
+ // A new seg.buf has been allocated.
+ return
+ }
+ seg.flushing = nil
+ if err != nil {
+ // TODO: stall (or return errors from)
+ // subsequent writes until flushing
+ // starts to succeed.
+ return
+ }
+ if len(fn.segments) <= idx || fn.segments[idx] != seg || len(seg.buf) != len(buf) {
+ // Segment has been dropped/moved/resized.
+ return
+ }
+ fn.memsize -= int64(len(buf))
+ fn.segments[idx] = storedSegment{
+ kc: fn.FS(),
+ locator: locator,
+ size: len(buf),
+ offset: 0,
+ length: len(buf),
+ }
+ }()
+ }
+}
+
+// Block until all pending pruneMemSegments/flush work is
+// finished. Caller must NOT have lock.
+func (fn *filenode) waitPrune() {
+ var pending []<-chan struct{}
+ fn.Lock()
+ for _, seg := range fn.segments {
+ if seg, ok := seg.(*memSegment); ok && seg.flushing != nil {
+ pending = append(pending, seg.flushing)
}
}
+ fn.Unlock()
+ for _, p := range pending {
+ <-p
+ }
}
type dirnode struct {
return dn.treenode.Child(name, replace)
}
-// sync flushes in-memory data (for the children with the given names,
-// which must be children of dn) to persistent storage. Caller must
-// have write lock on dn and the named children.
-func (dn *dirnode) sync(names []string) error {
- type shortBlock struct {
- fn *filenode
- idx int
- }
- var pending []shortBlock
- var pendingLen int
+type fnSegmentRef struct {
+ fn *filenode
+ idx int
+}
- flush := func(sbs []shortBlock) error {
- if len(sbs) == 0 {
+// commitBlock concatenates the data from the given filenode segments
+// (which must be *memSegments), writes the data out to Keep as a
+// single block, and replaces the filenodes' *memSegments with
+// storedSegments that reference the relevant portions of the new
+// block.
+//
+// bufsize is the total data size in refs. It is used to preallocate
+// the correct amount of memory when len(refs)>1.
+//
+// If sync is false, commitBlock returns right away, after starting a
+// goroutine to do the writes, reacquire the filenodes' locks, and
+// swap out the *memSegments. Some filenodes' segments might get
+// modified/rearranged in the meantime, in which case commitBlock
+// won't replace them.
+//
+// Caller must have write lock.
+func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error {
+ if len(refs) == 0 {
+ return nil
+ }
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ done := make(chan struct{})
+ var block []byte
+ segs := make([]*memSegment, 0, len(refs))
+ offsets := make([]int, 0, len(refs)) // location of segment's data within block
+ for _, ref := range refs {
+ seg := ref.fn.segments[ref.idx].(*memSegment)
+ if seg.flushing != nil && !sync {
+ // Let the other flushing goroutine finish. If
+ // it fails, we'll try again next time.
return nil
+ } else {
+ // In sync mode, we proceed regardless of
+ // whether another flush is in progress: It
+ // can't finish before we do, because we hold
+ // fn's lock until we finish our own writes.
}
- block := make([]byte, 0, maxBlockSize)
- for _, sb := range sbs {
- block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
+ seg.flushing = done
+ offsets = append(offsets, len(block))
+ if len(refs) == 1 {
+ block = seg.buf
+ } else if block == nil {
+ block = append(make([]byte, 0, bufsize), seg.buf...)
+ } else {
+ block = append(block, seg.buf...)
}
+ segs = append(segs, seg)
+ }
+ blocksize := len(block)
+ dn.fs.throttle().Acquire()
+ errs := make(chan error, 1)
+ go func() {
+ defer close(done)
+ defer close(errs)
locator, _, err := dn.fs.PutB(block)
+ dn.fs.throttle().Release()
+ {
+ defer func() {
+ for _, seg := range segs {
+ if seg.flushing == done {
+ seg.flushing = nil
+ }
+ }
+ }()
+ }
if err != nil {
- return err
+ errs <- err
+ return
}
- off := 0
- for _, sb := range sbs {
- data := sb.fn.segments[sb.idx].(*memSegment).buf
- sb.fn.segments[sb.idx] = storedSegment{
+ for idx, ref := range refs {
+ if !sync {
+ ref.fn.Lock()
+ // In async mode, fn's lock was
+ // released while we were waiting for
+ // PutB(); lots of things might have
+ // changed.
+ if len(ref.fn.segments) <= ref.idx {
+ // file segments have
+ // rearranged or changed in
+ // some way
+ ref.fn.Unlock()
+ continue
+ } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
+ // segment has been replaced
+ ref.fn.Unlock()
+ continue
+ } else if seg.flushing != done {
+ // seg.buf has been replaced
+ ref.fn.Unlock()
+ continue
+ }
+ }
+ data := ref.fn.segments[ref.idx].(*memSegment).buf
+ ref.fn.segments[ref.idx] = storedSegment{
kc: dn.fs,
locator: locator,
- size: len(block),
- offset: off,
+ size: blocksize,
+ offset: offsets[idx],
length: len(data),
}
- off += len(data)
- sb.fn.memsize -= int64(len(data))
+ // atomic is needed here despite caller having
+ // lock: caller might be running concurrent
+ // commitBlock() goroutines using the same
+ // lock, writing different segments from the
+ // same file.
+ atomic.AddInt64(&ref.fn.memsize, -int64(len(data)))
+ if !sync {
+ ref.fn.Unlock()
+ }
}
+ }()
+ if sync {
+ return <-errs
+ } else {
return nil
}
+}
+
+type flushOpts struct {
+ sync bool
+ shortBlocks bool
+}
+// flush in-memory data and remote-cluster block references (for the
+// children with the given names, which must be children of dn) to
+// local-cluster persistent storage.
+//
+// Caller must have write lock on dn and the named children.
+//
+// If any children are dirs, they will be flushed recursively.
+func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) error {
+ cg := newContextGroup(ctx)
+ defer cg.Cancel()
+
+ goCommit := func(refs []fnSegmentRef, bufsize int) {
+ cg.Go(func() error {
+ return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync)
+ })
+ }
+
+ var pending []fnSegmentRef
+ var pendingLen int = 0
+ localLocator := map[string]string{}
for _, name := range names {
- fn, ok := dn.inodes[name].(*filenode)
- if !ok {
- continue
- }
- for idx, seg := range fn.segments {
- seg, ok := seg.(*memSegment)
- if !ok {
- continue
- }
- if seg.Len() > maxBlockSize/2 {
- if err := flush([]shortBlock{{fn, idx}}); err != nil {
- return err
- }
- continue
+ switch node := dn.inodes[name].(type) {
+ case *dirnode:
+ grandchildNames := node.sortedNames()
+ for _, grandchildName := range grandchildNames {
+ grandchild := node.inodes[grandchildName]
+ grandchild.Lock()
+ defer grandchild.Unlock()
}
- if pendingLen+seg.Len() > maxBlockSize {
- if err := flush(pending); err != nil {
- return err
+ cg.Go(func() error { return node.flush(cg.Context(), grandchildNames, opts) })
+ case *filenode:
+ for idx, seg := range node.segments {
+ switch seg := seg.(type) {
+ case storedSegment:
+ loc, ok := localLocator[seg.locator]
+ if !ok {
+ var err error
+ loc, err = dn.fs.LocalLocator(seg.locator)
+ if err != nil {
+ return err
+ }
+ localLocator[seg.locator] = loc
+ }
+ seg.locator = loc
+ node.segments[idx] = seg
+ case *memSegment:
+ if seg.Len() > maxBlockSize/2 {
+ goCommit([]fnSegmentRef{{node, idx}}, seg.Len())
+ continue
+ }
+ if pendingLen+seg.Len() > maxBlockSize {
+ goCommit(pending, pendingLen)
+ pending = nil
+ pendingLen = 0
+ }
+ pending = append(pending, fnSegmentRef{node, idx})
+ pendingLen += seg.Len()
+ default:
+ panic(fmt.Sprintf("can't sync segment type %T", seg))
}
- pending = nil
- pendingLen = 0
}
- pending = append(pending, shortBlock{fn, idx})
- pendingLen += seg.Len()
}
}
- return flush(pending)
+ if opts.shortBlocks {
+ goCommit(pending, pendingLen)
+ }
+ return cg.Wait()
}
// caller must have write lock.
-func (dn *dirnode) marshalManifest(prefix string) (string, error) {
- var streamLen int64
- type filepart struct {
- name string
- offset int64
- length int64
+func (dn *dirnode) memorySize() (size int64) {
+ for _, name := range dn.sortedNames() {
+ node := dn.inodes[name]
+ node.Lock()
+ defer node.Unlock()
+ switch node := node.(type) {
+ case *dirnode:
+ size += node.memorySize()
+ case *filenode:
+ for _, seg := range node.segments {
+ switch seg := seg.(type) {
+ case *memSegment:
+ size += int64(seg.Len())
+ }
+ }
+ }
}
- var fileparts []filepart
- var subdirs string
- var blocks []string
+ return
+}
+// caller must have write lock.
+func (dn *dirnode) sortedNames() []string {
names := make([]string, 0, len(dn.inodes))
for name := range dn.inodes {
names = append(names, name)
}
sort.Strings(names)
+ return names
+}
+
+// caller must have write lock.
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) {
+ cg := newContextGroup(ctx)
+ defer cg.Cancel()
+
+ if len(dn.inodes) == 0 {
+ if prefix == "." {
+ return "", nil
+ }
+ // Express the existence of an empty directory by
+ // adding an empty file named `\056`, which (unlike
+ // the more obvious spelling `.`) is accepted by the
+ // API's manifest validator.
+ return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
+ }
+
+ names := dn.sortedNames()
+
+ // Wait for children to finish any pending write operations
+ // before locking them.
for _, name := range names {
node := dn.inodes[name]
- node.Lock()
- defer node.Unlock()
- }
- if err := dn.sync(names); err != nil {
- return "", err
+ if fn, ok := node.(*filenode); ok {
+ fn.waitPrune()
+ }
}
+
+ var dirnames []string
+ var filenames []string
for _, name := range names {
- switch node := dn.inodes[name].(type) {
+ node := dn.inodes[name]
+ node.Lock()
+ defer node.Unlock()
+ switch node := node.(type) {
case *dirnode:
- subdir, err := node.marshalManifest(prefix + "/" + name)
- if err != nil {
- return "", err
- }
- subdirs = subdirs + subdir
+ dirnames = append(dirnames, name)
case *filenode:
+ filenames = append(filenames, name)
+ default:
+ panic(fmt.Sprintf("can't marshal inode type %T", node))
+ }
+ }
+
+ subdirs := make([]string, len(dirnames))
+ rootdir := ""
+ for i, name := range dirnames {
+ i, name := i, name
+ cg.Go(func() error {
+ txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name)
+ subdirs[i] = txt
+ return err
+ })
+ }
+
+ cg.Go(func() error {
+ var streamLen int64
+ type filepart struct {
+ name string
+ offset int64
+ length int64
+ }
+
+ var fileparts []filepart
+ var blocks []string
+ if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
+ return err
+ }
+ for _, name := range filenames {
+ node := dn.inodes[name].(*filenode)
if len(node.segments) == 0 {
fileparts = append(fileparts, filepart{name: name})
- break
+ continue
}
for _, seg := range node.segments {
switch seg := seg.(type) {
default:
// This can't happen: we
// haven't unlocked since
- // calling sync().
+ // calling flush(sync=true).
panic(fmt.Sprintf("can't marshal segment type %T", seg))
}
}
- default:
- panic(fmt.Sprintf("can't marshal inode type %T", node))
}
- }
- var filetokens []string
- for _, s := range fileparts {
- filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
- }
- if len(filetokens) == 0 {
- return subdirs, nil
- } else if len(blocks) == 0 {
- blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
- }
- return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+ var filetokens []string
+ for _, s := range fileparts {
+ filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
+ }
+ if len(filetokens) == 0 {
+ return nil
+ } else if len(blocks) == 0 {
+ blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+ }
+ rootdir = manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n"
+ return nil
+ })
+ err := cg.Wait()
+ return rootdir + strings.Join(subdirs, ""), err
}
func (dn *dirnode) loadManifest(txt string) error {
}
name := dirname + "/" + manifestUnescape(toks[2])
fnode, err := dn.createFileAndParents(name)
- if err != nil {
- return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
+ if fnode == nil && err == nil && length == 0 {
+ // Special case: an empty file used as
+ // a marker to preserve an otherwise
+ // empty directory in a manifest.
+ continue
+ }
+ if err != nil || (fnode == nil && length != 0) {
+ return fmt.Errorf("line %d: cannot use path %q with length %d: %s", lineno, name, length, err)
}
// Map the stream offset/range coordinates to
// block/offset/range coordinates and add
return nil
}
-// only safe to call from loadManifest -- no locking
+// only safe to call from loadManifest -- no locking.
+//
+// If path is a "parent directory exists" marker (the last path
+// component is "."), the returned values are both nil.
func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
var node inode = dn
names := strings.Split(path, "/")
basename := names[len(names)-1]
- if !permittedName(basename) {
- err = fmt.Errorf("invalid file part %q in path %q", basename, path)
- return
- }
for _, name := range names[:len(names)-1] {
switch name {
case "", ".":
return
}
}
+ if basename == "." {
+ return
+ } else if !permittedName(basename) {
+ err = fmt.Errorf("invalid file part %q in path %q", basename, path)
+ return
+ }
_, err = node.Child(basename, func(child inode) (inode, error) {
switch child := child.(type) {
case nil:
type memSegment struct {
buf []byte
+ // If flushing is not nil, then a) buf is being shared by a
+ // pruneMemSegments goroutine, and must be copied on write;
+ // and b) the flushing channel will close when the goroutine
+ // finishes, whether it succeeds or not.
+ flushing <-chan struct{}
}
func (me *memSegment) Len() int {
}
func (me *memSegment) Truncate(n int) {
- if n > cap(me.buf) {
+ if n > cap(me.buf) || (me.flushing != nil && n > len(me.buf)) {
newsize := 1024
for newsize < n {
newsize = newsize << 2
}
newbuf := make([]byte, n, newsize)
copy(newbuf, me.buf)
- me.buf = newbuf
+ me.buf, me.flushing = newbuf, nil
} else {
- // Zero unused part when shrinking, in case we grow
- // and start using it again later.
- for i := n; i < len(me.buf); i++ {
+ // reclaim existing capacity, and zero reclaimed part
+ oldlen := len(me.buf)
+ me.buf = me.buf[:n]
+ for i := oldlen; i < n; i++ {
me.buf[i] = 0
}
}
- me.buf = me.buf[:n]
}
func (me *memSegment) WriteAt(p []byte, off int) {
if off+len(p) > len(me.buf) {
panic("overflowed segment")
}
+ if me.flushing != nil {
+ me.buf, me.flushing = append([]byte(nil), me.buf...), nil
+ }
copy(me.buf[off:], p)
}