Merge branch '15061-fed-migrate' refs #15061
[arvados.git] / sdk / go / arvados / fs_collection.go
index cd3dcf053942a4df8c858c550828608081ab5cbf..6644f4cfb8e93ef7d601e667cee21a9dbce5d39b 100644 (file)
@@ -19,9 +19,11 @@ import (
        "time"
 )
 
-var maxBlockSize = 1 << 26
-
-var concurrentWriters = 4
+var (
+       maxBlockSize      = 1 << 26
+       concurrentWriters = 4 // max goroutines writing to Keep during sync()
+       writeAheadBlocks  = 1 // max background jobs flushing to Keep before blocking writes
+)
 
 // A CollectionFileSystem is a FileSystem that can be serialized as a
 // manifest and stored as a collection.
@@ -498,11 +500,11 @@ func (fn *filenode) pruneMemSegments() {
        // TODO: pack/flush small blocks too, when fragmented
        if fn.throttle == nil {
                // TODO: share a throttle with filesystem
-               fn.throttle = newThrottle(concurrentWriters)
+               fn.throttle = newThrottle(writeAheadBlocks)
        }
        for idx, seg := range fn.segments {
                seg, ok := seg.(*memSegment)
-               if !ok || seg.Len() < maxBlockSize || seg.Len() == 0 || seg.flushing != nil {
+               if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
                        continue
                }
                // Setting seg.flushing guarantees seg.buf will not be
@@ -595,60 +597,67 @@ func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode
        return dn.treenode.Child(name, replace)
 }
 
+type fnSegmentRef struct {
+       fn  *filenode
+       idx int
+}
+
+// commitBlock concatenates the data from the given filenode segments
+// (which must be *memSegments), writes the data out to Keep as a
+// single block, and replaces the filenodes' *memSegments with
+// storedSegments that reference the relevant portions of the new
+// block.
+//
+// Caller must have write lock.
+func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
+       if len(refs) == 0 {
+               return nil
+       }
+       throttle.Acquire()
+       defer throttle.Release()
+       if err := ctx.Err(); err != nil {
+               return err
+       }
+       block := make([]byte, 0, maxBlockSize)
+       for _, ref := range refs {
+               block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
+       }
+       locator, _, err := dn.fs.PutB(block)
+       if err != nil {
+               return err
+       }
+       off := 0
+       for _, ref := range refs {
+               data := ref.fn.segments[ref.idx].(*memSegment).buf
+               ref.fn.segments[ref.idx] = storedSegment{
+                       kc:      dn.fs,
+                       locator: locator,
+                       size:    len(block),
+                       offset:  off,
+                       length:  len(data),
+               }
+               off += len(data)
+               ref.fn.memsize -= int64(len(data))
+       }
+       return nil
+}
+
 // sync flushes in-memory data and remote block references (for the
 // children with the given names, which must be children of dn) to
 // local persistent storage. Caller must have write lock on dn and the
 // named children.
-func (dn *dirnode) sync(ctx context.Context, names []string, throttle *throttle) error {
+func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
        cg := newContextGroup(ctx)
        defer cg.Cancel()
 
-       type shortBlock struct {
-               fn  *filenode
-               idx int
-       }
-
-       flush := func(sbs []shortBlock) error {
-               if len(sbs) == 0 {
-                       return nil
-               }
-               throttle.Acquire()
-               defer throttle.Release()
-               if err := cg.Context().Err(); err != nil {
-                       return err
-               }
-               block := make([]byte, 0, maxBlockSize)
-               for _, sb := range sbs {
-                       block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
-               }
-               locator, _, err := dn.fs.PutB(block)
-               if err != nil {
-                       return err
-               }
-               off := 0
-               for _, sb := range sbs {
-                       data := sb.fn.segments[sb.idx].(*memSegment).buf
-                       sb.fn.segments[sb.idx] = storedSegment{
-                               kc:      dn.fs,
-                               locator: locator,
-                               size:    len(block),
-                               offset:  off,
-                               length:  len(data),
-                       }
-                       off += len(data)
-                       sb.fn.memsize -= int64(len(data))
-               }
-               return nil
-       }
-
-       goFlush := func(sbs []shortBlock) {
+       goCommit := func(refs []fnSegmentRef) {
                cg.Go(func() error {
-                       return flush(sbs)
+                       return dn.commitBlock(cg.Context(), throttle, refs)
                })
        }
 
-       var pending []shortBlock
-       var pendingLen int
+       var pending []fnSegmentRef
+       var pendingLen int = 0
        localLocator := map[string]string{}
        for _, name := range names {
                fn, ok := dn.inodes[name].(*filenode)
@@ -671,22 +680,22 @@ func (dn *dirnode) sync(ctx context.Context, names []string, throttle *throttle)
                                fn.segments[idx] = seg
                        case *memSegment:
                                if seg.Len() > maxBlockSize/2 {
-                                       goFlush([]shortBlock{{fn, idx}})
+                                       goCommit([]fnSegmentRef{{fn, idx}})
                                        continue
                                }
                                if pendingLen+seg.Len() > maxBlockSize {
-                                       goFlush(pending)
+                                       goCommit(pending)
                                        pending = nil
                                        pendingLen = 0
                                }
-                               pending = append(pending, shortBlock{fn, idx})
+                               pending = append(pending, fnSegmentRef{fn, idx})
                                pendingLen += seg.Len()
                        default:
                                panic(fmt.Sprintf("can't sync segment type %T", seg))
                        }
                }
        }
-       goFlush(pending)
+       goCommit(pending)
        return cg.Wait()
 }
 
@@ -758,7 +767,7 @@ func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle
 
                var fileparts []filepart
                var blocks []string
-               if err := dn.sync(cg.Context(), names, throttle); err != nil {
+               if err := dn.sync(cg.Context(), throttle, names); err != nil {
                        return err
                }
                for _, name := range filenames {