X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/df5c912a9eb5af7222e5446bc437ee97262542c8..1166aeb6033725709ded753a0c00f69320a9a873:/sdk/go/arvados/fs_collection_test.go diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go index a6d4ab1e5b..7fd03b120a 100644 --- a/sdk/go/arvados/fs_collection_test.go +++ b/sdk/go/arvados/fs_collection_test.go @@ -19,10 +19,10 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "testing" "time" - "git.curoverse.com/arvados.git/sdk/go/arvadostest" check "gopkg.in/check.v1" ) @@ -31,6 +31,7 @@ var _ = check.Suite(&CollectionFSSuite{}) type keepClientStub struct { blocks map[string][]byte refreshable map[string]bool + onPut func(bufcopy []byte) // called from PutB, before acquiring lock sync.RWMutex } @@ -50,6 +51,9 @@ func (kcs *keepClientStub) PutB(p []byte) (string, int, error) { locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p)) buf := make([]byte, len(p)) copy(buf, p) + if kcs.onPut != nil { + kcs.onPut(buf) + } kcs.Lock() defer kcs.Unlock() kcs.blocks[locator[:32]] = buf @@ -82,7 +86,7 @@ type CollectionFSSuite struct { func (s *CollectionFSSuite) SetUpTest(c *check.C) { s.client = NewClientFromEnv() - err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+arvadostest.FooAndBarFilesInDirUUID, nil, nil) + err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil) c.Assert(err, check.IsNil) s.kc = &keepClientStub{ blocks: map[string][]byte{ @@ -583,7 +587,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) { const ngoroutines = 256 var wg sync.WaitGroup - for n := 0; n < nfiles; n++ { + for n := 0; n < ngoroutines; n++ { wg.Add(1) go func(n int) { defer wg.Done() @@ -592,7 +596,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) { f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0) c.Assert(err, check.IsNil) defer f.Close() - for i := 0; i < ngoroutines; i++ { + for i := 0; i < nfiles; i++ { trunc := rand.Intn(65) woff := rand.Intn(trunc + 1) wbytes = wbytes[:rand.Intn(64-woff+1)] @@ -618,11 +622,18 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) { c.Check(string(buf), check.Equals, string(expect)) c.Check(err, check.IsNil) } - s.checkMemSize(c, f) }(n) } wg.Wait() + for n := 0; n < ngoroutines; n++ { + f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0) + c.Assert(err, check.IsNil) + f.(*filehandle).inode.(*filenode).waitPrune() + s.checkMemSize(c, f) + defer f.Close() + } + root, err := s.fs.Open("/") c.Assert(err, check.IsNil) defer root.Close() @@ -1029,8 +1040,37 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) { } func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) { + defer func(wab, mbs int) { + writeAheadBlocks = wab + maxBlockSize = mbs + }(writeAheadBlocks, maxBlockSize) + writeAheadBlocks = 2 maxBlockSize = 1024 - defer func() { maxBlockSize = 2 << 26 }() + + proceed := make(chan struct{}) + var started, concurrent int32 + blk2done := false + s.kc.onPut = func([]byte) { + atomic.AddInt32(&concurrent, 1) + switch atomic.AddInt32(&started, 1) { + case 1: + // Wait until block 2 starts and finishes, and block 3 starts + select { + case <-proceed: + c.Check(blk2done, check.Equals, true) + case <-time.After(time.Second): + c.Error("timed out") + } + case 2: + time.Sleep(time.Millisecond) + blk2done = true + case 3: + close(proceed) + default: + time.Sleep(time.Millisecond) + } + c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true) + } fs, err := (&Collection{}).FileSystem(s.client, s.kc) c.Assert(err, check.IsNil) @@ -1056,6 +1096,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) { } return } + f.(*filehandle).inode.(*filenode).waitPrune() c.Check(currentMemExtents(), check.HasLen, 1) m, err := fs.MarshalManifest(".")