gem 'safe_yaml'
gem 'npm-rails'
+
+# arvados-google-api-client and googleauth (and thus arvados) gems
+# depend on signet, but signet 0.12 is incompatible with ruby 2.3.
+gem 'signet', '< 0.12'
flamegraph (0.9.5)
globalid (0.4.2)
activesupport (>= 4.2.0)
- googleauth (0.10.0)
+ googleauth (0.9.0)
faraday (~> 0.12)
jwt (>= 1.4, < 3.0)
memoist (~> 0.16)
multi_json (~> 1.11)
os (>= 0.9, < 2.0)
- signet (~> 0.12)
+ signet (~> 0.7)
headless (1.0.2)
highline (2.0.2)
httpclient (2.8.3)
selenium-webdriver (3.141.0)
childprocess (~> 0.5)
rubyzip (~> 1.2, >= 1.2.2)
- signet (0.12.0)
+ signet (0.11.0)
addressable (~> 2.3)
faraday (~> 0.9)
jwt (>= 1.5, < 3.0)
sass
sassc-rails
selenium-webdriver (~> 3)
+ signet (< 0.12)
simplecov (~> 0.7)
simplecov-rcov
sshkey
// preparing to run a container that
// has already been unlocked/requeued.
go sch.kill(uuid, fmt.Sprintf("state=%s", ent.Container.State))
+ } else if ent.Container.Priority == 0 {
+ sch.logger.WithFields(logrus.Fields{
+ "ContainerUUID": uuid,
+ "State": ent.Container.State,
+ "Priority": ent.Container.Priority,
+ }).Info("container on hold")
+ sch.queue.Forget(uuid)
}
case arvados.ContainerStateLocked:
if running && !exited.IsZero() && qUpdated.After(exited) {
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package scheduler
+
+import (
+ "context"
+ "time"
+
+ "git.curoverse.com/arvados.git/lib/dispatchcloud/test"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
+ check "gopkg.in/check.v1"
+)
+
+// Ensure the scheduler expunges containers from the queue when they
+// are no longer relevant (completed and not running, queued with
+// priority 0, etc).
+func (*SchedulerSuite) TestForgetIrrelevantContainers(c *check.C) {
+ ctx := ctxlog.Context(context.Background(), ctxlog.TestLogger(c))
+ pool := stubPool{}
+ queue := test.Queue{
+ ChooseType: chooseType,
+ Containers: []arvados.Container{
+ {
+ UUID: test.ContainerUUID(1),
+ Priority: 0,
+ State: arvados.ContainerStateQueued,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
+ },
+ {
+ UUID: test.ContainerUUID(2),
+ Priority: 12345,
+ State: arvados.ContainerStateComplete,
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 1,
+ RAM: 1 << 30,
+ },
+ },
+ },
+ }
+ queue.Update()
+
+ ents, _ := queue.Entries()
+ c.Check(ents, check.HasLen, 1)
+
+ sch := New(ctx, &queue, &pool, time.Millisecond, time.Millisecond)
+ sch.sync()
+
+ ents, _ = queue.Entries()
+ c.Check(ents, check.HasLen, 0)
+}
// while locking multiple inodes.
locker() sync.Locker
+ // throttle for limiting concurrent background writers
+ throttle() *throttle
+
// create a new node with nil parent.
newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error)
Remove(name string) error
RemoveAll(name string) error
Rename(oldname, newname string) error
+
+ // Write buffered data from memory to storage, returning when
+ // all updates have been saved to persistent storage.
Sync() error
+
+ // Write buffered data from memory to storage, but don't wait
+ // for all writes to finish before returning. If shortBlocks
+ // is true, flush everything; otherwise, if there's less than
+ // a full block of buffered data at the end of a stream, leave
+ // it buffered in memory in case more data can be appended. If
+ // path is "", flush all dirs/streams; otherwise, flush only
+ // the specified dir/stream.
+ Flush(path string, shortBlocks bool) error
}
type inode interface {
root inode
fsBackend
mutex sync.Mutex
+ thr *throttle
}
func (fs *fileSystem) rootnode() inode {
return fs.root
}
+func (fs *fileSystem) throttle() *throttle {
+ return fs.thr
+}
+
func (fs *fileSystem) locker() sync.Locker {
return &fs.mutex
}
return ErrInvalidOperation
}
+func (fs *fileSystem) Flush(string, bool) error {
+ log.Printf("TODO: flush fileSystem")
+ return ErrInvalidOperation
+}
+
// rlookup (recursive lookup) returns the inode for the file/directory
// with the given name (which may contain "/" separators). If no such
// file/directory exists, the returned node is nil.
var (
maxBlockSize = 1 << 26
- concurrentWriters = 4 // max goroutines writing to Keep during sync()
- writeAheadBlocks = 1 // max background jobs flushing to Keep before blocking writes
+ concurrentWriters = 4 // max goroutines writing to Keep in background and during flush()
)
// A CollectionFileSystem is a FileSystem that can be serialized as a
// Total data bytes in all files.
Size() int64
+
+ // Memory consumed by buffered file data.
+ memorySize() int64
}
type collectionFileSystem struct {
uuid: c.UUID,
fileSystem: fileSystem{
fsBackend: keepBackend{apiClient: client, keepClient: kc},
+ thr: newThrottle(concurrentWriters),
},
}
root := &dirnode{
return nil
}
+func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
+ node, err := rlookup(fs.fileSystem.root, path)
+ if err != nil {
+ return err
+ }
+ dn, ok := node.(*dirnode)
+ if !ok {
+ return ErrNotADirectory
+ }
+ dn.Lock()
+ defer dn.Unlock()
+ names := dn.sortedNames()
+ if path != "" {
+ // Caller only wants to flush the specified dir,
+ // non-recursively. Drop subdirs from the list of
+ // names.
+ var filenames []string
+ for _, name := range names {
+ if _, ok := dn.inodes[name].(*filenode); ok {
+ filenames = append(filenames, name)
+ }
+ }
+ names = filenames
+ }
+ for _, name := range names {
+ child := dn.inodes[name]
+ child.Lock()
+ defer child.Unlock()
+ }
+ return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
+}
+
+func (fs *collectionFileSystem) memorySize() int64 {
+ fs.fileSystem.root.Lock()
+ defer fs.fileSystem.root.Unlock()
+ return fs.fileSystem.root.(*dirnode).memorySize()
+}
+
func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
fs.fileSystem.root.Lock()
defer fs.fileSystem.root.Unlock()
- return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters))
+ return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix)
}
func (fs *collectionFileSystem) Size() int64 {
memsize int64 // bytes in memSegments
sync.RWMutex
nullnode
- throttle *throttle
}
// caller must have lock
// Write some data out to disk to reduce memory use. Caller must have
// write lock.
func (fn *filenode) pruneMemSegments() {
- // TODO: share code with (*dirnode)sync()
+ // TODO: share code with (*dirnode)flush()
// TODO: pack/flush small blocks too, when fragmented
- if fn.throttle == nil {
- // TODO: share a throttle with filesystem
- fn.throttle = newThrottle(writeAheadBlocks)
- }
for idx, seg := range fn.segments {
seg, ok := seg.(*memSegment)
if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
// progress, block here until one finishes, rather
// than pile up an unlimited number of buffered writes
// and network flush operations.
- fn.throttle.Acquire()
+ fn.fs.throttle().Acquire()
go func() {
defer close(done)
locator, _, err := fn.FS().PutB(buf)
- fn.throttle.Release()
+ fn.fs.throttle().Release()
fn.Lock()
defer fn.Unlock()
- if curbuf := seg.buf[:1]; &curbuf[0] != &buf[0] {
+ if seg.flushing != done {
// A new seg.buf has been allocated.
return
}
}
}
-// Block until all pending pruneMemSegments work is finished. Caller
-// must NOT have lock.
+// Block until all pending pruneMemSegments/flush work is
+// finished. Caller must NOT have lock.
func (fn *filenode) waitPrune() {
var pending []<-chan struct{}
fn.Lock()
// storedSegments that reference the relevant portions of the new
// block.
//
+// bufsize is the total data size in refs. It is used to preallocate
+// the correct amount of memory when len(refs)>1.
+//
+// If sync is false, commitBlock returns right away, after starting a
+// goroutine to do the writes, reacquire the filenodes' locks, and
+// swap out the *memSegments. Some filenodes' segments might get
+// modified/rearranged in the meantime, in which case commitBlock
+// won't replace them.
+//
// Caller must have write lock.
-func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
+func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error {
if len(refs) == 0 {
return nil
}
- throttle.Acquire()
- defer throttle.Release()
if err := ctx.Err(); err != nil {
return err
}
- block := make([]byte, 0, maxBlockSize)
+ done := make(chan struct{})
+ var block []byte
+ segs := make([]*memSegment, 0, len(refs))
+ offsets := make([]int, 0, len(refs)) // location of segment's data within block
for _, ref := range refs {
- block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
- }
- locator, _, err := dn.fs.PutB(block)
- if err != nil {
- return err
+ seg := ref.fn.segments[ref.idx].(*memSegment)
+ if seg.flushing != nil && !sync {
+ // Let the other flushing goroutine finish. If
+ // it fails, we'll try again next time.
+ return nil
+ } else {
+ // In sync mode, we proceed regardless of
+ // whether another flush is in progress: It
+ // can't finish before we do, because we hold
+ // fn's lock until we finish our own writes.
+ }
+ seg.flushing = done
+ offsets = append(offsets, len(block))
+ if len(refs) == 1 {
+ block = seg.buf
+ } else if block == nil {
+ block = append(make([]byte, 0, bufsize), seg.buf...)
+ } else {
+ block = append(block, seg.buf...)
+ }
+ segs = append(segs, seg)
}
- off := 0
- for _, ref := range refs {
- data := ref.fn.segments[ref.idx].(*memSegment).buf
- ref.fn.segments[ref.idx] = storedSegment{
- kc: dn.fs,
- locator: locator,
- size: len(block),
- offset: off,
- length: len(data),
+ dn.fs.throttle().Acquire()
+ errs := make(chan error, 1)
+ go func() {
+ defer close(done)
+ defer close(errs)
+ locked := map[*filenode]bool{}
+ locator, _, err := dn.fs.PutB(block)
+ dn.fs.throttle().Release()
+ {
+ if !sync {
+ for _, name := range dn.sortedNames() {
+ if fn, ok := dn.inodes[name].(*filenode); ok {
+ fn.Lock()
+ defer fn.Unlock()
+ locked[fn] = true
+ }
+ }
+ }
+ defer func() {
+ for _, seg := range segs {
+ if seg.flushing == done {
+ seg.flushing = nil
+ }
+ }
+ }()
+ }
+ if err != nil {
+ errs <- err
+ return
}
- off += len(data)
- ref.fn.memsize -= int64(len(data))
+ for idx, ref := range refs {
+ if !sync {
+ // In async mode, fn's lock was
+ // released while we were waiting for
+ // PutB(); lots of things might have
+ // changed.
+ if len(ref.fn.segments) <= ref.idx {
+ // file segments have
+ // rearranged or changed in
+ // some way
+ continue
+ } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
+ // segment has been replaced
+ continue
+ } else if seg.flushing != done {
+ // seg.buf has been replaced
+ continue
+ } else if !locked[ref.fn] {
+ // file was renamed, moved, or
+ // deleted since we called
+ // PutB
+ continue
+ }
+ }
+ data := ref.fn.segments[ref.idx].(*memSegment).buf
+ ref.fn.segments[ref.idx] = storedSegment{
+ kc: dn.fs,
+ locator: locator,
+ size: len(block),
+ offset: offsets[idx],
+ length: len(data),
+ }
+ ref.fn.memsize -= int64(len(data))
+ }
+ }()
+ if sync {
+ return <-errs
+ } else {
+ return nil
}
- return nil
}
-// sync flushes in-memory data and remote block references (for the
+type flushOpts struct {
+ sync bool
+ shortBlocks bool
+}
+
+// flush in-memory data and remote-cluster block references (for the
// children with the given names, which must be children of dn) to
-// local persistent storage. Caller must have write lock on dn and the
-// named children.
-func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
+// local-cluster persistent storage.
+//
+// Caller must have write lock on dn and the named children.
+//
+// If any children are dirs, they will be flushed recursively.
+func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) error {
cg := newContextGroup(ctx)
defer cg.Cancel()
- goCommit := func(refs []fnSegmentRef) {
+ goCommit := func(refs []fnSegmentRef, bufsize int) {
cg.Go(func() error {
- return dn.commitBlock(cg.Context(), throttle, refs)
+ return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync)
})
}
var pendingLen int = 0
localLocator := map[string]string{}
for _, name := range names {
- fn, ok := dn.inodes[name].(*filenode)
- if !ok {
- continue
- }
- for idx, seg := range fn.segments {
- switch seg := seg.(type) {
- case storedSegment:
- loc, ok := localLocator[seg.locator]
- if !ok {
- var err error
- loc, err = dn.fs.LocalLocator(seg.locator)
- if err != nil {
- return err
+ switch node := dn.inodes[name].(type) {
+ case *dirnode:
+ grandchildNames := node.sortedNames()
+ for _, grandchildName := range grandchildNames {
+ grandchild := node.inodes[grandchildName]
+ grandchild.Lock()
+ defer grandchild.Unlock()
+ }
+ cg.Go(func() error { return node.flush(cg.Context(), grandchildNames, opts) })
+ case *filenode:
+ for idx, seg := range node.segments {
+ switch seg := seg.(type) {
+ case storedSegment:
+ loc, ok := localLocator[seg.locator]
+ if !ok {
+ var err error
+ loc, err = dn.fs.LocalLocator(seg.locator)
+ if err != nil {
+ return err
+ }
+ localLocator[seg.locator] = loc
}
- localLocator[seg.locator] = loc
- }
- seg.locator = loc
- fn.segments[idx] = seg
- case *memSegment:
- if seg.Len() > maxBlockSize/2 {
- goCommit([]fnSegmentRef{{fn, idx}})
- continue
- }
- if pendingLen+seg.Len() > maxBlockSize {
- goCommit(pending)
- pending = nil
- pendingLen = 0
+ seg.locator = loc
+ node.segments[idx] = seg
+ case *memSegment:
+ if seg.Len() > maxBlockSize/2 {
+ goCommit([]fnSegmentRef{{node, idx}}, seg.Len())
+ continue
+ }
+ if pendingLen+seg.Len() > maxBlockSize {
+ goCommit(pending, pendingLen)
+ pending = nil
+ pendingLen = 0
+ }
+ pending = append(pending, fnSegmentRef{node, idx})
+ pendingLen += seg.Len()
+ default:
+ panic(fmt.Sprintf("can't sync segment type %T", seg))
}
- pending = append(pending, fnSegmentRef{fn, idx})
- pendingLen += seg.Len()
- default:
- panic(fmt.Sprintf("can't sync segment type %T", seg))
}
}
}
- goCommit(pending)
+ if opts.shortBlocks {
+ goCommit(pending, pendingLen)
+ }
return cg.Wait()
}
// caller must have write lock.
-func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
+func (dn *dirnode) memorySize() (size int64) {
+ for _, name := range dn.sortedNames() {
+ node := dn.inodes[name]
+ node.Lock()
+ defer node.Unlock()
+ switch node := node.(type) {
+ case *dirnode:
+ size += node.memorySize()
+ case *filenode:
+ for _, seg := range node.segments {
+ switch seg := seg.(type) {
+ case *memSegment:
+ size += int64(seg.Len())
+ }
+ }
+ }
+ }
+ return
+}
+
+// caller must have write lock.
+func (dn *dirnode) sortedNames() []string {
+ names := make([]string, 0, len(dn.inodes))
+ for name := range dn.inodes {
+ names = append(names, name)
+ }
+ sort.Strings(names)
+ return names
+}
+
+// caller must have write lock.
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) {
cg := newContextGroup(ctx)
defer cg.Cancel()
return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
}
- names := make([]string, 0, len(dn.inodes))
- for name := range dn.inodes {
- names = append(names, name)
- }
- sort.Strings(names)
+ names := dn.sortedNames()
// Wait for children to finish any pending write operations
// before locking them.
for i, name := range dirnames {
i, name := i, name
cg.Go(func() error {
- txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, throttle)
+ txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name)
subdirs[i] = txt
return err
})
var fileparts []filepart
var blocks []string
- if err := dn.sync(cg.Context(), throttle, names); err != nil {
+ if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
return err
}
for _, name := range filenames {
default:
// This can't happen: we
// haven't unlocked since
- // calling sync().
+ // calling flush(sync=true).
panic(fmt.Sprintf("can't marshal segment type %T", seg))
}
}
}
maxBlockSize = 8
- defer func() { maxBlockSize = 2 << 26 }()
+ defer func() { maxBlockSize = 1 << 26 }()
var wg sync.WaitGroup
for n := 0; n < 128; n++ {
c.Check(err, check.ErrorMatches, `invalid flag.*`)
}
-func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
- defer func(wab, mbs int) {
- writeAheadBlocks = wab
+func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
+ defer func(cw, mbs int) {
+ concurrentWriters = cw
maxBlockSize = mbs
- }(writeAheadBlocks, maxBlockSize)
- writeAheadBlocks = 2
+ }(concurrentWriters, maxBlockSize)
+ concurrentWriters = 2
maxBlockSize = 1024
proceed := make(chan struct{})
default:
time.Sleep(time.Millisecond)
}
- c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true)
+ c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
}
fs, err := (&Collection{}).FileSystem(s.client, s.kc)
c.Check(currentMemExtents(), check.HasLen, 0)
}
+// Ensure blocks get flushed to disk if a lot of data is written to
+// small files/directories without calling sync().
+//
+// Write four 512KiB files into each of 256 top-level dirs (total
+// 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
+// exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
+// 2MiB).
+func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ s.kc.onPut = func([]byte) {
+ // discard flushed data -- otherwise the stub will use
+ // unlimited memory
+ time.Sleep(time.Millisecond)
+ s.kc.Lock()
+ defer s.kc.Unlock()
+ s.kc.blocks = map[string][]byte{}
+ }
+ for i := 0; i < 256; i++ {
+ buf := bytes.NewBuffer(make([]byte, 524288))
+ fmt.Fprintf(buf, "test file in dir%d", i)
+
+ dir := fmt.Sprintf("dir%d", i)
+ fs.Mkdir(dir, 0755)
+ for j := 0; j < 2; j++ {
+ f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ _, err = io.Copy(f, buf)
+ c.Assert(err, check.IsNil)
+ }
+
+ if i%8 == 0 {
+ fs.Flush("", true)
+ }
+
+ size := fs.memorySize()
+ if !c.Check(size <= 1<<24, check.Equals, true) {
+ c.Logf("at dir%d fs.memorySize()=%d", i, size)
+ return
+ }
+ }
+}
+
+// Ensure short blocks at the end of a stream don't get flushed by
+// Flush(false).
+//
+// Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
+// blocks have been flushed while 8x 3MiB is still buffered in memory.
+func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ var flushed int64
+ s.kc.onPut = func(p []byte) {
+ atomic.AddInt64(&flushed, int64(len(p)))
+ }
+
+ nDirs := int64(8)
+ megabyte := make([]byte, 1<<20)
+ for i := int64(0); i < nDirs; i++ {
+ dir := fmt.Sprintf("dir%d", i)
+ fs.Mkdir(dir, 0755)
+ for j := 0; j < 67; j++ {
+ f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ _, err = f.Write(megabyte)
+ c.Assert(err, check.IsNil)
+ }
+ }
+ c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
+ c.Check(flushed, check.Equals, int64(0))
+
+ waitForFlush := func(expectUnflushed, expectFlushed int64) {
+ for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+ }
+ c.Check(fs.memorySize(), check.Equals, expectUnflushed)
+ c.Check(flushed, check.Equals, expectFlushed)
+ }
+
+ // Nothing flushed yet
+ waitForFlush((nDirs*67)<<20, 0)
+
+ // Flushing a non-empty dir "/" is non-recursive and there are
+ // no top-level files, so this has no effect
+ fs.Flush("/", false)
+ waitForFlush((nDirs*67)<<20, 0)
+
+ // Flush the full block in dir0
+ fs.Flush("dir0", false)
+ waitForFlush((nDirs*67-64)<<20, 64<<20)
+
+ err = fs.Flush("dir-does-not-exist", false)
+ c.Check(err, check.NotNil)
+
+ // Flush full blocks in all dirs
+ fs.Flush("", false)
+ waitForFlush(nDirs*3<<20, nDirs*64<<20)
+
+ // Flush non-full blocks, too
+ fs.Flush("", true)
+ waitForFlush(0, nDirs*67<<20)
+}
+
+// Even when writing lots of files/dirs from different goroutines, as
+// long as Flush(dir,false) is called after writing each file,
+// unflushed data should be limited to one full block per
+// concurrentWriter, plus one nearly-full block at the end of each
+// dir/stream.
+func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
+ nDirs := int64(8)
+ maxUnflushed := (int64(concurrentWriters) + nDirs) << 26
+
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ release := make(chan struct{})
+ timeout := make(chan struct{})
+ time.AfterFunc(10*time.Second, func() { close(timeout) })
+ var putCount, concurrency int64
+ var unflushed int64
+ s.kc.onPut = func(p []byte) {
+ defer atomic.AddInt64(&unflushed, -int64(len(p)))
+ cur := atomic.AddInt64(&concurrency, 1)
+ defer atomic.AddInt64(&concurrency, -1)
+ pc := atomic.AddInt64(&putCount, 1)
+ if pc < int64(concurrentWriters) {
+ // Block until we reach concurrentWriters, to
+ // make sure we're really accepting concurrent
+ // writes.
+ select {
+ case <-release:
+ case <-timeout:
+ c.Error("timeout")
+ }
+ } else if pc == int64(concurrentWriters) {
+ // Unblock the first N-1 PUT reqs.
+ close(release)
+ }
+ c.Assert(cur <= int64(concurrentWriters), check.Equals, true)
+ c.Assert(atomic.LoadInt64(&unflushed) <= maxUnflushed, check.Equals, true)
+ }
+
+ var owg sync.WaitGroup
+ megabyte := make([]byte, 1<<20)
+ for i := int64(0); i < nDirs; i++ {
+ dir := fmt.Sprintf("dir%d", i)
+ fs.Mkdir(dir, 0755)
+ owg.Add(1)
+ go func() {
+ defer owg.Done()
+ defer fs.Flush(dir, true)
+ var iwg sync.WaitGroup
+ defer iwg.Wait()
+ for j := 0; j < 67; j++ {
+ iwg.Add(1)
+ go func(j int) {
+ defer iwg.Done()
+ f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ n, err := f.Write(megabyte)
+ c.Assert(err, check.IsNil)
+ atomic.AddInt64(&unflushed, int64(n))
+ fs.Flush(dir, false)
+ }(j)
+ }
+ }()
+ }
+ owg.Wait()
+ fs.Flush("", true)
+}
+
func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
for _, txt := range []string{
"\n",
type customFileSystem struct {
fileSystem
root *vdirnode
+ thr *throttle
staleThreshold time.Time
staleLock sync.Mutex
fileSystem: fileSystem{
fsBackend: keepBackend{apiClient: c, keepClient: kc},
root: root,
+ thr: newThrottle(concurrentWriters),
},
}
root.inode = &treenode{
public class FileToken {
private int filePosition;
- private int fileSize;
+ private long fileSize;
private String fileName;
private String path;
private void splitFileTokenInfo(String fileTokenInfo) {
String[] tokenPieces = fileTokenInfo.split(":");
this.filePosition = Integer.parseInt(tokenPieces[0]);
- this.fileSize = Integer.parseInt(tokenPieces[1]);
+ this.fileSize = Long.parseLong(tokenPieces[1]);
this.fileName = tokenPieces[2].replace(Characters.SPACE, " ");
}
return this.filePosition;
}
- public int getFileSize() {
+ public long getFileSize() {
return this.fileSize;
}
// values for tracking file output streams and matching data chunks with initial files
int currentDataChunkNumber;
int bytesDownloadedFromChunk;
- int bytesToDownload;
+ long bytesToDownload;
byte[] currentDataChunk;
boolean remainingDataInChunk;
final List<KeepLocator> keepLocators;
this.keepLocators = keepLocators;
}
- private int getBytesToDownload() {
+ private long getBytesToDownload() {
return bytesToDownload;
}
- private void setBytesToDownload(int bytesToDownload) {
+ private void setBytesToDownload(long bytesToDownload) {
this.bytesToDownload = bytesToDownload;
}
private void writeDownDataChunkPartially(FileOutputStream fos) throws IOException {
//write all remaining bytes for this file from current chunk
- fos.write(currentDataChunk, bytesDownloadedFromChunk, bytesToDownload);
+ fos.write(currentDataChunk, bytesDownloadedFromChunk, (int) bytesToDownload);
// update number of bytes downloaded from this chunk
bytesDownloadedFromChunk += bytesToDownload;
// set remaining data in chunk to true
public static final String FILE_TOKEN_INFO = "0:1024:test-file1";
public static final int FILE_POSITION = 0;
- public static final int FILE_LENGTH = 1024;
+ public static final long FILE_LENGTH = 1024L;
public static final String FILE_NAME = "test-file1";
public static final String FILE_PATH = "c" + Characters.SLASH;
gem 'sass-rails'
+# arvados-google-api-client and googleauth depend on signet, but
+# signet 0.12 is incompatible with ruby 2.3.
+gem 'signet', '< 0.12'
+
# Install any plugin gems
Dir.glob(File.join(File.dirname(__FILE__), 'lib', '**', "Gemfile")) do |f|
eval(IO.read(f), binding)
ffi (1.9.25)
globalid (0.4.2)
activesupport (>= 4.2.0)
- googleauth (0.10.0)
+ googleauth (0.9.0)
faraday (~> 0.12)
jwt (>= 1.4, < 3.0)
memoist (~> 0.16)
multi_json (~> 1.11)
os (>= 0.9, < 2.0)
- signet (~> 0.12)
+ signet (~> 0.7)
hashie (3.6.0)
highline (2.0.1)
httpclient (2.8.3)
sprockets (>= 2.8, < 4.0)
sprockets-rails (>= 2.0, < 4.0)
tilt (>= 1.1, < 3)
- signet (0.12.0)
+ signet (0.11.0)
addressable (~> 2.3)
faraday (~> 0.9)
jwt (>= 1.5, < 3.0)
rvm-capistrano
safe_yaml
sass-rails
+ signet (< 0.12)
simplecov (~> 0.7.1)
simplecov-rcov
sshkey
"strings"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
)
return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
}
}
+ var unflushed int64
+ var lastparentdir string
for _, f := range cp.files {
- err = cp.copyFile(fs, f)
+ // If a dir has just had its last file added, do a
+ // full Flush. Otherwise, do a partial Flush (write
+ // full-size blocks, but leave the last short block
+ // open so f's data can be packed with it).
+ dir, _ := filepath.Split(f.dst)
+ if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE {
+ if err := fs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
+ return "", fmt.Errorf("error flushing output collection file data: %v", err)
+ }
+ unflushed = 0
+ }
+ lastparentdir = dir
+
+ n, err := cp.copyFile(fs, f)
if err != nil {
return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
}
+ unflushed += n
}
return fs.MarshalManifest(".")
}
-func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error {
+func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
- return err
+ return 0, err
}
src, err := os.Open(f.src)
if err != nil {
dst.Close()
- return err
+ return 0, err
}
defer src.Close()
- _, err = io.Copy(dst, src)
+ n, err := io.Copy(dst, src)
if err != nil {
dst.Close()
- return err
+ return n, err
}
- return dst.Close()
+ return n, dst.Close()
}
// Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an