X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/50f8d8487ad5156058087438b670d7c6f8a8d718..5c1c5e34118a3867fca9e7f0150074ea18623939:/sdk/go/arvados/fs_collection_test.go diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go index 8567a830c5..7fd03b120a 100644 --- a/sdk/go/arvados/fs_collection_test.go +++ b/sdk/go/arvados/fs_collection_test.go @@ -19,10 +19,10 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "testing" "time" - "git.curoverse.com/arvados.git/sdk/go/arvadostest" check "gopkg.in/check.v1" ) @@ -31,6 +31,7 @@ var _ = check.Suite(&CollectionFSSuite{}) type keepClientStub struct { blocks map[string][]byte refreshable map[string]bool + onPut func(bufcopy []byte) // called from PutB, before acquiring lock sync.RWMutex } @@ -50,6 +51,9 @@ func (kcs *keepClientStub) PutB(p []byte) (string, int, error) { locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p)) buf := make([]byte, len(p)) copy(buf, p) + if kcs.onPut != nil { + kcs.onPut(buf) + } kcs.Lock() defer kcs.Unlock() kcs.blocks[locator[:32]] = buf @@ -82,7 +86,7 @@ type CollectionFSSuite struct { func (s *CollectionFSSuite) SetUpTest(c *check.C) { s.client = NewClientFromEnv() - err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+arvadostest.FooAndBarFilesInDirUUID, nil, nil) + err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil) c.Assert(err, check.IsNil) s.kc = &keepClientStub{ blocks: map[string][]byte{ @@ -618,11 +622,18 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) { c.Check(string(buf), check.Equals, string(expect)) c.Check(err, check.IsNil) } - s.checkMemSize(c, f) }(n) } wg.Wait() + for n := 0; n < ngoroutines; n++ { + f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0) + c.Assert(err, check.IsNil) + f.(*filehandle).inode.(*filenode).waitPrune() + s.checkMemSize(c, f) + defer f.Close() + } + root, err := s.fs.Open("/") c.Assert(err, check.IsNil) defer root.Close() @@ -1029,8 +1040,37 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) { } func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) { + defer func(wab, mbs int) { + writeAheadBlocks = wab + maxBlockSize = mbs + }(writeAheadBlocks, maxBlockSize) + writeAheadBlocks = 2 maxBlockSize = 1024 - defer func() { maxBlockSize = 2 << 26 }() + + proceed := make(chan struct{}) + var started, concurrent int32 + blk2done := false + s.kc.onPut = func([]byte) { + atomic.AddInt32(&concurrent, 1) + switch atomic.AddInt32(&started, 1) { + case 1: + // Wait until block 2 starts and finishes, and block 3 starts + select { + case <-proceed: + c.Check(blk2done, check.Equals, true) + case <-time.After(time.Second): + c.Error("timed out") + } + case 2: + time.Sleep(time.Millisecond) + blk2done = true + case 3: + close(proceed) + default: + time.Sleep(time.Millisecond) + } + c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true) + } fs, err := (&Collection{}).FileSystem(s.client, s.kc) c.Assert(err, check.IsNil) @@ -1056,8 +1096,6 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) { } return } - - c.Check(len(currentMemExtents()) <= writeAheadBlocks+1, check.Equals, true) f.(*filehandle).inode.(*filenode).waitPrune() c.Check(currentMemExtents(), check.HasLen, 1)