import (
"bytes"
"crypto/md5"
+ "crypto/sha1"
"errors"
"fmt"
"io"
"os"
"regexp"
"runtime"
+ "strings"
"sync"
+ "sync/atomic"
"testing"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&CollectionFSSuite{})
type keepClientStub struct {
- blocks map[string][]byte
+ blocks map[string][]byte
+ refreshable map[string]bool
+ onPut func(bufcopy []byte) // called from PutB, before acquiring lock
sync.RWMutex
}
locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
buf := make([]byte, len(p))
copy(buf, p)
+ if kcs.onPut != nil {
+ kcs.onPut(buf)
+ }
kcs.Lock()
defer kcs.Unlock()
kcs.blocks[locator[:32]] = buf
return locator, 1, nil
}
+var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
+
+func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
+ kcs.Lock()
+ defer kcs.Unlock()
+ if strings.Contains(locator, "+R") {
+ if len(locator) < 32 {
+ return "", fmt.Errorf("bad locator: %q", locator)
+ }
+ if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
+ return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
+ }
+ }
+ fakeSig := fmt.Sprintf("+A%x@%x", sha1.Sum(nil), time.Now().Add(time.Hour*24*14).Unix())
+ return localOrRemoteSignature.ReplaceAllLiteralString(locator, fakeSig), nil
+}
+
type CollectionFSSuite struct {
client *Client
coll Collection
fs CollectionFileSystem
- kc keepClient
+ kc *keepClientStub
}
func (s *CollectionFSSuite) SetUpTest(c *check.C) {
s.client = NewClientFromEnv()
- err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+arvadostest.FooAndBarFilesInDirUUID, nil, nil)
+ err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
c.Assert(err, check.IsNil)
s.kc = &keepClientStub{
blocks: map[string][]byte{
c.Check(f.Close(), check.IsNil)
m, err := s.fs.MarshalManifest(".")
+ c.Assert(err, check.IsNil)
c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
}
c.Check(err, check.IsNil)
pos, err = f.Seek(0, io.SeekCurrent)
c.Check(pos, check.Equals, int64(18))
+ c.Check(err, check.IsNil)
pos, err = f.Seek(-18, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(0))
c.Check(err, check.IsNil)
n, err = io.ReadFull(f, buf)
c.Check(n, check.Equals, 18)
// truncate to current size
err = f.Truncate(18)
+ c.Check(err, check.IsNil)
f2.Seek(0, io.SeekStart)
buf2, err = ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
// shrink to block/extent boundary
err = f.Truncate(32)
+ c.Check(err, check.IsNil)
f2.Seek(0, io.SeekStart)
buf2, err = ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
// shrink to partial block/extent
err = f.Truncate(15)
+ c.Check(err, check.IsNil)
f2.Seek(0, io.SeekStart)
buf2, err = ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
c.Check(err, check.IsNil)
m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
+ c.Check(s.fs.Size(), check.Equals, int64(6))
}
func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
checkSize := func(size int64) {
fi, err := f.Stat()
+ c.Assert(err, check.IsNil)
c.Check(fi.Size(), check.Equals, size)
f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
c.Assert(err, check.IsNil)
defer f.Close()
fi, err = f.Stat()
+ c.Check(err, check.IsNil)
c.Check(fi.Size(), check.Equals, size)
pos, err := f.Seek(0, io.SeekEnd)
+ c.Check(err, check.IsNil)
c.Check(pos, check.Equals, size)
}
checkSize(11)
}
+func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
+ foo := "foo"
+ bar := "bar"
+ hash := map[string]string{
+ foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
+ bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
+ }
+
+ fs, err := (&Collection{
+ ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
+ }).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ manifest, err := fs.MarshalManifest(".")
+ c.Check(manifest, check.Equals, "")
+ c.Check(err, check.NotNil)
+
+ s.kc.refreshable = map[string]bool{hash[bar]: true}
+
+ for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
+ fs, err = (&Collection{
+ ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
+ }).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ manifest, err := fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ // Both blocks should now have +A signatures.
+ c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
+ c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
+ }
+}
+
func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
maxBlockSize = 8
defer func() { maxBlockSize = 2 << 26 }()
err = s.fs.Remove("foo/bar")
c.Check(err, check.IsNil)
- // mkdir succeds after the file is deleted
+ // mkdir succeeds after the file is deleted
err = s.fs.Mkdir("foo/bar", 0755)
c.Check(err, check.IsNil)
f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
c.Assert(err, check.IsNil)
defer f.Close()
- for i := 0; i < 6502; i++ {
- switch rand.Int() & 3 {
- case 0:
+ for i := 0; i < 1024; i++ {
+ r := rand.Uint32()
+ switch {
+ case r%11 == 0:
+ _, err := s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ case r&3 == 0:
f.Truncate(int64(rand.Intn(64)))
- case 1:
+ case r&3 == 1:
f.Seek(int64(rand.Intn(64)), io.SeekStart)
- case 2:
+ case r&3 == 2:
_, err := f.Write([]byte("beep boop"))
c.Check(err, check.IsNil)
- case 3:
+ case r&3 == 3:
_, err := ioutil.ReadAll(f)
c.Check(err, check.IsNil)
}
const ngoroutines = 256
var wg sync.WaitGroup
- for n := 0; n < nfiles; n++ {
+ for n := 0; n < ngoroutines; n++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
c.Assert(err, check.IsNil)
defer f.Close()
- for i := 0; i < ngoroutines; i++ {
+ for i := 0; i < nfiles; i++ {
trunc := rand.Intn(65)
woff := rand.Intn(trunc + 1)
wbytes = wbytes[:rand.Intn(64-woff+1)]
c.Check(string(buf), check.Equals, string(expect))
c.Check(err, check.IsNil)
}
- s.checkMemSize(c, f)
}(n)
}
wg.Wait()
+ for n := 0; n < ngoroutines; n++ {
+ f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ f.(*filehandle).inode.(*filenode).waitPrune()
+ s.checkMemSize(c, f)
+ defer f.Close()
+ }
+
root, err := s.fs.Open("/")
c.Assert(err, check.IsNil)
defer root.Close()
c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
}
+func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ err = fs.Mkdir("foo", 0755)
+ c.Assert(err, check.IsNil)
+ err = fs.Mkdir("bar", 0755)
+ c.Assert(err, check.IsNil)
+ err = fs.Rename("bar", "baz")
+ c.Check(err, check.IsNil)
+ err = fs.Rename("foo", "baz")
+ c.Check(err, check.NotNil)
+ err = fs.Rename("foo", "baz/")
+ c.Check(err, check.IsNil)
+ err = fs.Rename("baz/foo", ".")
+ c.Check(err, check.Equals, ErrInvalidArgument)
+ err = fs.Rename("baz/foo/", ".")
+ c.Check(err, check.Equals, ErrInvalidArgument)
+}
+
func (s *CollectionFSSuite) TestRename(c *check.C) {
fs, err := (&Collection{}).FileSystem(s.client, s.kc)
c.Assert(err, check.IsNil)
}
}
-func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
+func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
var err error
s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
c.Assert(err, check.IsNil)
- for _, name := range []string{"dir", "dir/zerodir", "zero", "zero/zero"} {
+ for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
err = s.fs.Mkdir(name, 0755)
c.Assert(err, check.IsNil)
}
expect := map[string][]byte{
"0": nil,
- "00": []byte{},
- "one": []byte{1},
+ "00": {},
+ "one": {1},
"dir/0": nil,
- "dir/two": []byte{1, 2},
+ "dir/two": {1, 2},
"dir/zero": nil,
"dir/zerodir/zero": nil,
"zero/zero/zero": nil,
c.Assert(err, check.IsNil)
for name, data := range expect {
- f, err := persisted.Open("bogus-" + name)
+ _, err = persisted.Open("bogus-" + name)
c.Check(err, check.NotNil)
- f, err = persisted.Open(name)
+ f, err := persisted.Open(name)
c.Assert(err, check.IsNil)
if data == nil {
c.Check(err, check.IsNil)
c.Check(buf, check.DeepEquals, data)
}
+
+ expectDir := map[string]int{
+ "empty": 0,
+ "not empty": 1,
+ "not empty/empty": 0,
+ }
+ for name, expectLen := range expectDir {
+ _, err := persisted.Open(name + "/bogus")
+ c.Check(err, check.NotNil)
+
+ d, err := persisted.Open(name)
+ defer d.Close()
+ c.Check(err, check.IsNil)
+ fi, err := d.Readdir(-1)
+ c.Check(err, check.IsNil)
+ c.Check(fi, check.HasLen, expectLen)
+ }
}
func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
c.Check(n, check.Equals, 1)
c.Check(buf[:1], check.DeepEquals, []byte{1})
pos, err = f.Seek(0, io.SeekCurrent)
+ c.Assert(err, check.IsNil)
c.Check(pos, check.Equals, int64(1))
f.Write([]byte{4, 5, 6})
pos, err = f.Seek(0, io.SeekCurrent)
+ c.Assert(err, check.IsNil)
c.Check(pos, check.Equals, int64(6))
f.Seek(0, io.SeekStart)
n, err = f.Read(buf)
c.Check(pos, check.Equals, int64(3))
f.Write([]byte{7, 8, 9})
pos, err = f.Seek(0, io.SeekCurrent)
+ c.Check(err, check.IsNil)
c.Check(pos, check.Equals, int64(9))
f.Close()
}
func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+ defer func(wab, mbs int) {
+ writeAheadBlocks = wab
+ maxBlockSize = mbs
+ }(writeAheadBlocks, maxBlockSize)
+ writeAheadBlocks = 2
maxBlockSize = 1024
- defer func() { maxBlockSize = 2 << 26 }()
+
+ proceed := make(chan struct{})
+ var started, concurrent int32
+ blk2done := false
+ s.kc.onPut = func([]byte) {
+ atomic.AddInt32(&concurrent, 1)
+ switch atomic.AddInt32(&started, 1) {
+ case 1:
+ // Wait until block 2 starts and finishes, and block 3 starts
+ select {
+ case <-proceed:
+ c.Check(blk2done, check.Equals, true)
+ case <-time.After(time.Second):
+ c.Error("timed out")
+ }
+ case 2:
+ time.Sleep(time.Millisecond)
+ blk2done = true
+ case 3:
+ close(proceed)
+ default:
+ time.Sleep(time.Millisecond)
+ }
+ c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true)
+ }
fs, err := (&Collection{}).FileSystem(s.client, s.kc)
c.Assert(err, check.IsNil)
}
return
}
+ f.(*filehandle).inode.(*filenode).waitPrune()
c.Check(currentMemExtents(), check.HasLen, 1)
m, err := fs.MarshalManifest(".")
". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
"./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
} {
for _, txt := range []string{
"",
". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
- ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
} {
f, err := coll.FileSystem(nil, nil)
c.Check(err, check.IsNil)
c.Logf("%s loaded", time.Now())
+ c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))
for i := 0; i < dirCount; i++ {
for j := 0; j < fileCount; j++ {