14715: keepproxy.service checks for cluster config
[arvados.git] / sdk / go / arvados / fs_collection_test.go
index 96347737f8f24dce8c56518cf28a106069ef4a28..7fd03b120a7f34240393f884f88992b885499e1f 100644 (file)
@@ -7,6 +7,7 @@ package arvados
 import (
        "bytes"
        "crypto/md5"
+       "crypto/sha1"
        "errors"
        "fmt"
        "io"
@@ -16,18 +17,21 @@ import (
        "os"
        "regexp"
        "runtime"
+       "strings"
        "sync"
+       "sync/atomic"
        "testing"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        check "gopkg.in/check.v1"
 )
 
 var _ = check.Suite(&CollectionFSSuite{})
 
 type keepClientStub struct {
-       blocks map[string][]byte
+       blocks      map[string][]byte
+       refreshable map[string]bool
+       onPut       func(bufcopy []byte) // called from PutB, before acquiring lock
        sync.RWMutex
 }
 
@@ -47,22 +51,42 @@ func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
        locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
        buf := make([]byte, len(p))
        copy(buf, p)
+       if kcs.onPut != nil {
+               kcs.onPut(buf)
+       }
        kcs.Lock()
        defer kcs.Unlock()
        kcs.blocks[locator[:32]] = buf
        return locator, 1, nil
 }
 
+var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
+
+func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
+       kcs.Lock()
+       defer kcs.Unlock()
+       if strings.Contains(locator, "+R") {
+               if len(locator) < 32 {
+                       return "", fmt.Errorf("bad locator: %q", locator)
+               }
+               if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
+                       return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
+               }
+       }
+       fakeSig := fmt.Sprintf("+A%x@%x", sha1.Sum(nil), time.Now().Add(time.Hour*24*14).Unix())
+       return localOrRemoteSignature.ReplaceAllLiteralString(locator, fakeSig), nil
+}
+
 type CollectionFSSuite struct {
        client *Client
        coll   Collection
        fs     CollectionFileSystem
-       kc     keepClient
+       kc     *keepClientStub
 }
 
 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
        s.client = NewClientFromEnv()
-       err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+arvadostest.FooAndBarFilesInDirUUID, nil, nil)
+       err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
        c.Assert(err, check.IsNil)
        s.kc = &keepClientStub{
                blocks: map[string][]byte{
@@ -399,6 +423,37 @@ func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
        checkSize(11)
 }
 
+func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
+       foo := "foo"
+       bar := "bar"
+       hash := map[string]string{
+               foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
+               bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
+       }
+
+       fs, err := (&Collection{
+               ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
+       }).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       manifest, err := fs.MarshalManifest(".")
+       c.Check(manifest, check.Equals, "")
+       c.Check(err, check.NotNil)
+
+       s.kc.refreshable = map[string]bool{hash[bar]: true}
+
+       for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
+               fs, err = (&Collection{
+                       ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
+               }).FileSystem(s.client, s.kc)
+               c.Assert(err, check.IsNil)
+               manifest, err := fs.MarshalManifest(".")
+               c.Check(err, check.IsNil)
+               // Both blocks should now have +A signatures.
+               c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
+               c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
+       }
+}
+
 func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
        maxBlockSize = 8
        defer func() { maxBlockSize = 2 << 26 }()
@@ -490,7 +545,7 @@ func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
                        f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
                        c.Assert(err, check.IsNil)
                        defer f.Close()
-                       for i := 0; i < 6502; i++ {
+                       for i := 0; i < 1024; i++ {
                                r := rand.Uint32()
                                switch {
                                case r%11 == 0:
@@ -532,7 +587,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
        const ngoroutines = 256
 
        var wg sync.WaitGroup
-       for n := 0; n < nfiles; n++ {
+       for n := 0; n < ngoroutines; n++ {
                wg.Add(1)
                go func(n int) {
                        defer wg.Done()
@@ -541,7 +596,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
                        f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
                        c.Assert(err, check.IsNil)
                        defer f.Close()
-                       for i := 0; i < ngoroutines; i++ {
+                       for i := 0; i < nfiles; i++ {
                                trunc := rand.Intn(65)
                                woff := rand.Intn(trunc + 1)
                                wbytes = wbytes[:rand.Intn(64-woff+1)]
@@ -567,11 +622,18 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
                                c.Check(string(buf), check.Equals, string(expect))
                                c.Check(err, check.IsNil)
                        }
-                       s.checkMemSize(c, f)
                }(n)
        }
        wg.Wait()
 
+       for n := 0; n < ngoroutines; n++ {
+               f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
+               c.Assert(err, check.IsNil)
+               f.(*filehandle).inode.(*filenode).waitPrune()
+               s.checkMemSize(c, f)
+               defer f.Close()
+       }
+
        root, err := s.fs.Open("/")
        c.Assert(err, check.IsNil)
        defer root.Close()
@@ -641,6 +703,25 @@ func (s *CollectionFSSuite) TestRenameError(c *check.C) {
        c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
 }
 
+func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("foo", 0755)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("bar", 0755)
+       c.Assert(err, check.IsNil)
+       err = fs.Rename("bar", "baz")
+       c.Check(err, check.IsNil)
+       err = fs.Rename("foo", "baz")
+       c.Check(err, check.NotNil)
+       err = fs.Rename("foo", "baz/")
+       c.Check(err, check.IsNil)
+       err = fs.Rename("baz/foo", ".")
+       c.Check(err, check.Equals, ErrInvalidArgument)
+       err = fs.Rename("baz/foo/", ".")
+       c.Check(err, check.Equals, ErrInvalidArgument)
+}
+
 func (s *CollectionFSSuite) TestRename(c *check.C) {
        fs, err := (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
@@ -791,11 +872,11 @@ func (s *CollectionFSSuite) TestPersist(c *check.C) {
        }
 }
 
-func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
+func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
        var err error
        s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
-       for _, name := range []string{"dir", "dir/zerodir", "zero", "zero/zero"} {
+       for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
                err = s.fs.Mkdir(name, 0755)
                c.Assert(err, check.IsNil)
        }
@@ -841,6 +922,23 @@ func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
                c.Check(err, check.IsNil)
                c.Check(buf, check.DeepEquals, data)
        }
+
+       expectDir := map[string]int{
+               "empty":           0,
+               "not empty":       1,
+               "not empty/empty": 0,
+       }
+       for name, expectLen := range expectDir {
+               _, err := persisted.Open(name + "/bogus")
+               c.Check(err, check.NotNil)
+
+               d, err := persisted.Open(name)
+               defer d.Close()
+               c.Check(err, check.IsNil)
+               fi, err := d.Readdir(-1)
+               c.Check(err, check.IsNil)
+               c.Check(fi, check.HasLen, expectLen)
+       }
 }
 
 func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
@@ -942,8 +1040,37 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
 }
 
 func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+       defer func(wab, mbs int) {
+               writeAheadBlocks = wab
+               maxBlockSize = mbs
+       }(writeAheadBlocks, maxBlockSize)
+       writeAheadBlocks = 2
        maxBlockSize = 1024
-       defer func() { maxBlockSize = 2 << 26 }()
+
+       proceed := make(chan struct{})
+       var started, concurrent int32
+       blk2done := false
+       s.kc.onPut = func([]byte) {
+               atomic.AddInt32(&concurrent, 1)
+               switch atomic.AddInt32(&started, 1) {
+               case 1:
+                       // Wait until block 2 starts and finishes, and block 3 starts
+                       select {
+                       case <-proceed:
+                               c.Check(blk2done, check.Equals, true)
+                       case <-time.After(time.Second):
+                               c.Error("timed out")
+                       }
+               case 2:
+                       time.Sleep(time.Millisecond)
+                       blk2done = true
+               case 3:
+                       close(proceed)
+               default:
+                       time.Sleep(time.Millisecond)
+               }
+               c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true)
+       }
 
        fs, err := (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
@@ -969,6 +1096,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
                }
                return
        }
+       f.(*filehandle).inode.(*filenode).waitPrune()
        c.Check(currentMemExtents(), check.HasLen, 1)
 
        m, err := fs.MarshalManifest(".")
@@ -992,6 +1120,12 @@ func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
                ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
                ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
                ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
                "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
        } {
@@ -1007,7 +1141,9 @@ func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
        for _, txt := range []string{
                "",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
-               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
        } {