14715: keepproxy.service checks for cluster config
[arvados.git] / sdk / go / arvados / fs_collection_test.go
index a6d4ab1e5b71baccabafdbdf810db0ee264420a5..7fd03b120a7f34240393f884f88992b885499e1f 100644 (file)
@@ -19,10 +19,10 @@ import (
        "runtime"
        "strings"
        "sync"
+       "sync/atomic"
        "testing"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        check "gopkg.in/check.v1"
 )
 
@@ -31,6 +31,7 @@ var _ = check.Suite(&CollectionFSSuite{})
 type keepClientStub struct {
        blocks      map[string][]byte
        refreshable map[string]bool
+       onPut       func(bufcopy []byte) // called from PutB, before acquiring lock
        sync.RWMutex
 }
 
@@ -50,6 +51,9 @@ func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
        locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
        buf := make([]byte, len(p))
        copy(buf, p)
+       if kcs.onPut != nil {
+               kcs.onPut(buf)
+       }
        kcs.Lock()
        defer kcs.Unlock()
        kcs.blocks[locator[:32]] = buf
@@ -82,7 +86,7 @@ type CollectionFSSuite struct {
 
 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
        s.client = NewClientFromEnv()
-       err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+arvadostest.FooAndBarFilesInDirUUID, nil, nil)
+       err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
        c.Assert(err, check.IsNil)
        s.kc = &keepClientStub{
                blocks: map[string][]byte{
@@ -583,7 +587,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
        const ngoroutines = 256
 
        var wg sync.WaitGroup
-       for n := 0; n < nfiles; n++ {
+       for n := 0; n < ngoroutines; n++ {
                wg.Add(1)
                go func(n int) {
                        defer wg.Done()
@@ -592,7 +596,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
                        f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
                        c.Assert(err, check.IsNil)
                        defer f.Close()
-                       for i := 0; i < ngoroutines; i++ {
+                       for i := 0; i < nfiles; i++ {
                                trunc := rand.Intn(65)
                                woff := rand.Intn(trunc + 1)
                                wbytes = wbytes[:rand.Intn(64-woff+1)]
@@ -618,11 +622,18 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
                                c.Check(string(buf), check.Equals, string(expect))
                                c.Check(err, check.IsNil)
                        }
-                       s.checkMemSize(c, f)
                }(n)
        }
        wg.Wait()
 
+       for n := 0; n < ngoroutines; n++ {
+               f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
+               c.Assert(err, check.IsNil)
+               f.(*filehandle).inode.(*filenode).waitPrune()
+               s.checkMemSize(c, f)
+               defer f.Close()
+       }
+
        root, err := s.fs.Open("/")
        c.Assert(err, check.IsNil)
        defer root.Close()
@@ -1029,8 +1040,37 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
 }
 
 func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+       defer func(wab, mbs int) {
+               writeAheadBlocks = wab
+               maxBlockSize = mbs
+       }(writeAheadBlocks, maxBlockSize)
+       writeAheadBlocks = 2
        maxBlockSize = 1024
-       defer func() { maxBlockSize = 2 << 26 }()
+
+       proceed := make(chan struct{})
+       var started, concurrent int32
+       blk2done := false
+       s.kc.onPut = func([]byte) {
+               atomic.AddInt32(&concurrent, 1)
+               switch atomic.AddInt32(&started, 1) {
+               case 1:
+                       // Wait until block 2 starts and finishes, and block 3 starts
+                       select {
+                       case <-proceed:
+                               c.Check(blk2done, check.Equals, true)
+                       case <-time.After(time.Second):
+                               c.Error("timed out")
+                       }
+               case 2:
+                       time.Sleep(time.Millisecond)
+                       blk2done = true
+               case 3:
+                       close(proceed)
+               default:
+                       time.Sleep(time.Millisecond)
+               }
+               c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true)
+       }
 
        fs, err := (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
@@ -1056,6 +1096,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
                }
                return
        }
+       f.(*filehandle).inode.(*filenode).waitPrune()
        c.Check(currentMemExtents(), check.HasLen, 1)
 
        m, err := fs.MarshalManifest(".")