14538: Use concurrent writers to sync a directory.
[arvados.git] / sdk / go / arvados / fs_collection_test.go
index d2f55d0e37d80502919f6385c2a0d457374a26c2..a6d4ab1e5b71baccabafdbdf810db0ee264420a5 100644 (file)
@@ -7,6 +7,7 @@ package arvados
 import (
        "bytes"
        "crypto/md5"
+       "crypto/sha1"
        "errors"
        "fmt"
        "io"
@@ -16,6 +17,7 @@ import (
        "os"
        "regexp"
        "runtime"
+       "strings"
        "sync"
        "testing"
        "time"
@@ -27,7 +29,8 @@ import (
 var _ = check.Suite(&CollectionFSSuite{})
 
 type keepClientStub struct {
-       blocks map[string][]byte
+       blocks      map[string][]byte
+       refreshable map[string]bool
        sync.RWMutex
 }
 
@@ -53,11 +56,28 @@ func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
        return locator, 1, nil
 }
 
+var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
+
+func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
+       kcs.Lock()
+       defer kcs.Unlock()
+       if strings.Contains(locator, "+R") {
+               if len(locator) < 32 {
+                       return "", fmt.Errorf("bad locator: %q", locator)
+               }
+               if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
+                       return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
+               }
+       }
+       fakeSig := fmt.Sprintf("+A%x@%x", sha1.Sum(nil), time.Now().Add(time.Hour*24*14).Unix())
+       return localOrRemoteSignature.ReplaceAllLiteralString(locator, fakeSig), nil
+}
+
 type CollectionFSSuite struct {
        client *Client
        coll   Collection
        fs     CollectionFileSystem
-       kc     keepClient
+       kc     *keepClientStub
 }
 
 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
@@ -353,6 +373,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
        c.Check(err, check.IsNil)
        m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
        c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
+       c.Check(s.fs.Size(), check.Equals, int64(6))
 }
 
 func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
@@ -398,6 +419,37 @@ func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
        checkSize(11)
 }
 
+func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
+       foo := "foo"
+       bar := "bar"
+       hash := map[string]string{
+               foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
+               bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
+       }
+
+       fs, err := (&Collection{
+               ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
+       }).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       manifest, err := fs.MarshalManifest(".")
+       c.Check(manifest, check.Equals, "")
+       c.Check(err, check.NotNil)
+
+       s.kc.refreshable = map[string]bool{hash[bar]: true}
+
+       for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
+               fs, err = (&Collection{
+                       ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
+               }).FileSystem(s.client, s.kc)
+               c.Assert(err, check.IsNil)
+               manifest, err := fs.MarshalManifest(".")
+               c.Check(err, check.IsNil)
+               // Both blocks should now have +A signatures.
+               c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
+               c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
+       }
+}
+
 func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
        maxBlockSize = 8
        defer func() { maxBlockSize = 2 << 26 }()
@@ -489,16 +541,20 @@ func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
                        f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
                        c.Assert(err, check.IsNil)
                        defer f.Close()
-                       for i := 0; i < 6502; i++ {
-                               switch rand.Int() & 3 {
-                               case 0:
+                       for i := 0; i < 1024; i++ {
+                               r := rand.Uint32()
+                               switch {
+                               case r%11 == 0:
+                                       _, err := s.fs.MarshalManifest(".")
+                                       c.Check(err, check.IsNil)
+                               case r&3 == 0:
                                        f.Truncate(int64(rand.Intn(64)))
-                               case 1:
+                               case r&3 == 1:
                                        f.Seek(int64(rand.Intn(64)), io.SeekStart)
-                               case 2:
+                               case r&3 == 2:
                                        _, err := f.Write([]byte("beep boop"))
                                        c.Check(err, check.IsNil)
-                               case 3:
+                               case r&3 == 3:
                                        _, err := ioutil.ReadAll(f)
                                        c.Check(err, check.IsNil)
                                }
@@ -636,6 +692,25 @@ func (s *CollectionFSSuite) TestRenameError(c *check.C) {
        c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
 }
 
+func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("foo", 0755)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("bar", 0755)
+       c.Assert(err, check.IsNil)
+       err = fs.Rename("bar", "baz")
+       c.Check(err, check.IsNil)
+       err = fs.Rename("foo", "baz")
+       c.Check(err, check.NotNil)
+       err = fs.Rename("foo", "baz/")
+       c.Check(err, check.IsNil)
+       err = fs.Rename("baz/foo", ".")
+       c.Check(err, check.Equals, ErrInvalidArgument)
+       err = fs.Rename("baz/foo/", ".")
+       c.Check(err, check.Equals, ErrInvalidArgument)
+}
+
 func (s *CollectionFSSuite) TestRename(c *check.C) {
        fs, err := (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
@@ -786,11 +861,11 @@ func (s *CollectionFSSuite) TestPersist(c *check.C) {
        }
 }
 
-func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
+func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
        var err error
        s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
-       for _, name := range []string{"dir", "dir/zerodir", "zero", "zero/zero"} {
+       for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
                err = s.fs.Mkdir(name, 0755)
                c.Assert(err, check.IsNil)
        }
@@ -836,6 +911,23 @@ func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
                c.Check(err, check.IsNil)
                c.Check(buf, check.DeepEquals, data)
        }
+
+       expectDir := map[string]int{
+               "empty":           0,
+               "not empty":       1,
+               "not empty/empty": 0,
+       }
+       for name, expectLen := range expectDir {
+               _, err := persisted.Open(name + "/bogus")
+               c.Check(err, check.NotNil)
+
+               d, err := persisted.Open(name)
+               defer d.Close()
+               c.Check(err, check.IsNil)
+               fi, err := d.Readdir(-1)
+               c.Check(err, check.IsNil)
+               c.Check(fi, check.HasLen, expectLen)
+       }
 }
 
 func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
@@ -987,6 +1079,12 @@ func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
                ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
                ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
                ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
                "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
        } {
@@ -1002,7 +1100,9 @@ func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
        for _, txt := range []string{
                "",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
-               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
        } {
@@ -1060,6 +1160,7 @@ func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
        f, err := coll.FileSystem(nil, nil)
        c.Check(err, check.IsNil)
        c.Logf("%s loaded", time.Now())
+       c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))
 
        for i := 0; i < dirCount; i++ {
                for j := 0; j < fileCount; j++ {