import (
"bytes"
+ "context"
"crypto/md5"
"errors"
"fmt"
type keepClientStub struct {
blocks map[string][]byte
refreshable map[string]bool
- onPut func(bufcopy []byte) // called from PutB, before acquiring lock
+ reads []string // locators from ReadAt() calls
+ onWrite func(bufcopy []byte) // called from WriteBlock, before acquiring lock
authToken string // client's auth token (used for signing locators)
sigkey string // blob signing key
sigttl time.Duration // blob signing ttl
var errStub404 = errors.New("404 block not found")
func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
+ kcs.Lock()
+ kcs.reads = append(kcs.reads, locator)
+ kcs.Unlock()
kcs.RLock()
defer kcs.RUnlock()
+ if err := VerifySignature(locator, kcs.authToken, kcs.sigttl, []byte(kcs.sigkey)); err != nil {
+ return 0, err
+ }
buf := kcs.blocks[locator[:32]]
if buf == nil {
return 0, errStub404
return copy(p, buf[off:]), nil
}
-func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
- locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(p), len(p)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
- buf := make([]byte, len(p))
- copy(buf, p)
- if kcs.onPut != nil {
- kcs.onPut(buf)
+func (kcs *keepClientStub) BlockWrite(_ context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
+ if opts.Data == nil {
+ panic("oops, stub is not made for this")
+ }
+ locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
+ buf := make([]byte, len(opts.Data))
+ copy(buf, opts.Data)
+ if kcs.onWrite != nil {
+ kcs.onWrite(buf)
+ }
+ for _, sc := range opts.StorageClasses {
+ if sc != "default" {
+ return BlockWriteResponse{}, fmt.Errorf("stub does not write storage class %q", sc)
+ }
}
kcs.Lock()
defer kcs.Unlock()
kcs.blocks[locator[:32]] = buf
- return locator, 1, nil
+ return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
}
var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
func (s *CollectionFSSuite) SetUpTest(c *check.C) {
s.client = NewClientFromEnv()
+ s.client.AuthToken = fixtureActiveToken
err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
c.Assert(err, check.IsNil)
s.kc = &keepClientStub{
c.Assert(err, check.IsNil)
}
+func (s *CollectionFSSuite) TestSyncNonCanonicalManifest(c *check.C) {
+ var coll Collection
+ err := s.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
+ c.Assert(err, check.IsNil)
+ mtxt := strings.Replace(coll.ManifestText, "3:3:bar 0:3:foo", "0:3:foo 3:3:bar", -1)
+ c.Assert(mtxt, check.Not(check.Equals), coll.ManifestText)
+ err = s.client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+ "collection": map[string]interface{}{
+ "manifest_text": mtxt}})
+ c.Assert(err, check.IsNil)
+ // In order for the rest of the test to work as intended, the API server
+ // needs to retain the file ordering we set manually. We check that here.
+ // We can't check `mtxt == coll.ManifestText` because the API server
+ // might've returned new block signatures if the GET and POST happened in
+ // different seconds.
+ expectPattern := `\./dir1 \S+ 0:3:foo 3:3:bar\n`
+ c.Assert(coll.ManifestText, check.Matches, expectPattern)
+
+ fs, err := coll.FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ err = fs.Sync()
+ c.Check(err, check.IsNil)
+
+ // fs had no local changes, so Sync should not have saved
+ // anything back to the API/database. (If it did, we would see
+ // the manifest rewritten in canonical order.)
+ var saved Collection
+ err = s.client.RequestAndDecode(&saved, "GET", "arvados/v1/collections/"+coll.UUID, nil, nil)
+ c.Assert(err, check.IsNil)
+ c.Check(saved.ManifestText, check.Matches, expectPattern)
+}
+
func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
_, ok := s.fs.(http.FileSystem)
c.Check(ok, check.Equals, true)
}
+func (s *CollectionFSSuite) TestUnattainableStorageClasses(c *check.C) {
+ fs, err := (&Collection{
+ StorageClassesDesired: []string{"unobtainium"},
+ }).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ f, err := fs.OpenFile("/foo", os.O_CREATE|os.O_WRONLY, 0777)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write([]byte("food"))
+ c.Assert(err, check.IsNil)
+ err = f.Close()
+ c.Assert(err, check.IsNil)
+ _, err = fs.MarshalManifest(".")
+ c.Assert(err, check.ErrorMatches, `.*stub does not write storage class \"unobtainium\"`)
+}
+
func (s *CollectionFSSuite) TestColonInFilename(c *check.C) {
fs, err := (&Collection{
ManifestText: "./foo:foo 3858f62230ac3c915f300c664312c63f+3 0:3:bar:bar\n",
proceed := make(chan struct{})
var started, concurrent int32
blk2done := false
- s.kc.onPut = func([]byte) {
+ s.kc.onWrite = func([]byte) {
atomic.AddInt32(&concurrent, 1)
switch atomic.AddInt32(&started, 1) {
case 1:
fs, err := (&Collection{}).FileSystem(s.client, s.kc)
c.Assert(err, check.IsNil)
- s.kc.onPut = func([]byte) {
+ s.kc.onWrite = func([]byte) {
// discard flushed data -- otherwise the stub will use
// unlimited memory
time.Sleep(time.Millisecond)
fs.Flush("", true)
}
- size := fs.memorySize()
+ size := fs.MemorySize()
if !c.Check(size <= 1<<24, check.Equals, true) {
- c.Logf("at dir%d fs.memorySize()=%d", i, size)
+ c.Logf("at dir%d fs.MemorySize()=%d", i, size)
return
}
}
c.Assert(err, check.IsNil)
var flushed int64
- s.kc.onPut = func(p []byte) {
+ s.kc.onWrite = func(p []byte) {
atomic.AddInt64(&flushed, int64(len(p)))
}
nDirs := int64(8)
+ nFiles := int64(67)
megabyte := make([]byte, 1<<20)
for i := int64(0); i < nDirs; i++ {
dir := fmt.Sprintf("dir%d", i)
fs.Mkdir(dir, 0755)
- for j := 0; j < 67; j++ {
+ for j := int64(0); j < nFiles; j++ {
f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
c.Assert(err, check.IsNil)
defer f.Close()
c.Assert(err, check.IsNil)
}
}
- c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
+ inodebytes := int64((nDirs*(nFiles+1) + 1) * 64)
+ c.Check(fs.MemorySize(), check.Equals, nDirs*nFiles*(1<<20+64)+inodebytes)
c.Check(flushed, check.Equals, int64(0))
waitForFlush := func(expectUnflushed, expectFlushed int64) {
- for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+ for deadline := time.Now().Add(5 * time.Second); fs.MemorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
}
- c.Check(fs.memorySize(), check.Equals, expectUnflushed)
+ c.Check(fs.MemorySize(), check.Equals, expectUnflushed)
c.Check(flushed, check.Equals, expectFlushed)
}
// Nothing flushed yet
- waitForFlush((nDirs*67)<<20, 0)
+ waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
// Flushing a non-empty dir "/" is non-recursive and there are
// no top-level files, so this has no effect
fs.Flush("/", false)
- waitForFlush((nDirs*67)<<20, 0)
+ waitForFlush(nDirs*nFiles*(1<<20+64)+inodebytes, 0)
// Flush the full block in dir0
fs.Flush("dir0", false)
- waitForFlush((nDirs*67-64)<<20, 64<<20)
+ bigloclen := int64(32 + 9 + 51 + 64) // md5 + "+" + "67xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
+ waitForFlush((nDirs*nFiles-64)*(1<<20+64)+inodebytes+bigloclen*64, 64<<20)
err = fs.Flush("dir-does-not-exist", false)
c.Check(err, check.NotNil)
// Flush full blocks in all dirs
fs.Flush("", false)
- waitForFlush(nDirs*3<<20, nDirs*64<<20)
+ waitForFlush(nDirs*3*(1<<20+64)+inodebytes+bigloclen*64*nDirs, nDirs*64<<20)
// Flush non-full blocks, too
fs.Flush("", true)
- waitForFlush(0, nDirs*67<<20)
+ smallloclen := int64(32 + 8 + 51 + 64) // md5 + "+" + "3xxxxxx" + "+Axxxxxx..." + 64 (see (storedSegment)memorySize)
+ waitForFlush(inodebytes+bigloclen*64*nDirs+smallloclen*3*nDirs, nDirs*67<<20)
}
// Even when writing lots of files/dirs from different goroutines, as
time.AfterFunc(10*time.Second, func() { close(timeout) })
var putCount, concurrency int64
var unflushed int64
- s.kc.onPut = func(p []byte) {
+ s.kc.onWrite = func(p []byte) {
defer atomic.AddInt64(&unflushed, -int64(len(p)))
cur := atomic.AddInt64(&concurrency, 1)
defer atomic.AddInt64(&concurrency, -1)
})
wrote := 0
- s.kc.onPut = func(p []byte) {
+ s.kc.onWrite = func(p []byte) {
s.kc.Lock()
s.kc.blocks = map[string][]byte{}
wrote++
}
func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
- s.kc.onPut = func([]byte) {
+ s.kc.onWrite = func([]byte) {
s.kc.Lock()
s.kc.blocks = map[string][]byte{}
s.kc.Unlock()
}
}
+func (s *CollectionFSSuite) TestSnapshotSplice(c *check.C) {
+ filedata1 := "hello snapshot+splice world\n"
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ {
+ f, err := fs.OpenFile("file1", os.O_CREATE|os.O_RDWR, 0700)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write([]byte(filedata1))
+ c.Assert(err, check.IsNil)
+ err = f.Close()
+ c.Assert(err, check.IsNil)
+ }
+
+ snap, err := Snapshot(fs, "/")
+ c.Assert(err, check.IsNil)
+ err = Splice(fs, "dir1", snap)
+ c.Assert(err, check.IsNil)
+ f, err := fs.Open("dir1/file1")
+ c.Assert(err, check.IsNil)
+ buf, err := io.ReadAll(f)
+ c.Assert(err, check.IsNil)
+ c.Check(string(buf), check.Equals, filedata1)
+}
+
+func (s *CollectionFSSuite) TestRefreshSignatures(c *check.C) {
+ filedata1 := "hello refresh signatures world\n"
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ fs.Mkdir("d1", 0700)
+ f, err := fs.OpenFile("d1/file1", os.O_CREATE|os.O_RDWR, 0700)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write([]byte(filedata1))
+ c.Assert(err, check.IsNil)
+ err = f.Close()
+ c.Assert(err, check.IsNil)
+
+ filedata2 := "hello refresh signatures universe\n"
+ fs.Mkdir("d2", 0700)
+ f, err = fs.OpenFile("d2/file2", os.O_CREATE|os.O_RDWR, 0700)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write([]byte(filedata2))
+ c.Assert(err, check.IsNil)
+ err = f.Close()
+ c.Assert(err, check.IsNil)
+ txt, err := fs.MarshalManifest(".")
+ c.Assert(err, check.IsNil)
+ var saved Collection
+ err = s.client.RequestAndDecode(&saved, "POST", "arvados/v1/collections", nil, map[string]interface{}{
+ "select": []string{"manifest_text", "uuid", "portable_data_hash"},
+ "collection": map[string]interface{}{
+ "manifest_text": txt,
+ },
+ })
+ c.Assert(err, check.IsNil)
+
+ // Update signatures synchronously if they are already expired
+ // when Read() is called.
+ {
+ saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, time.Now().Add(-2*time.Second), s.kc.sigttl, []byte(s.kc.sigkey))
+ fs, err := saved.FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ f, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ buf, err := ioutil.ReadAll(f)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf), check.Equals, filedata1)
+ }
+
+ // Update signatures asynchronously if we're more than half
+ // way to TTL when Read() is called.
+ {
+ exp := time.Now().Add(2 * time.Minute)
+ saved.ManifestText = SignManifest(saved.ManifestText, s.kc.authToken, exp, s.kc.sigttl, []byte(s.kc.sigkey))
+ fs, err := saved.FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ f1, err := fs.OpenFile("d1/file1", os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ f2, err := fs.OpenFile("d2/file2", os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ buf, err := ioutil.ReadAll(f1)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf), check.Equals, filedata1)
+
+ // Ensure fs treats the 2-minute TTL as less than half
+ // the server's signing TTL. If we don't do this,
+ // collectionfs will guess the signature is fresh,
+ // i.e., signing TTL is 2 minutes, and won't do an
+ // async refresh.
+ fs.(*collectionFileSystem).guessSignatureTTL = time.Hour
+
+ refreshed := false
+ for deadline := time.Now().Add(time.Second * 10); time.Now().Before(deadline) && !refreshed; time.Sleep(time.Second / 10) {
+ _, err = f1.Seek(0, io.SeekStart)
+ c.Assert(err, check.IsNil)
+ buf, err = ioutil.ReadAll(f1)
+ c.Assert(err, check.IsNil)
+ c.Assert(string(buf), check.Equals, filedata1)
+ loc := s.kc.reads[len(s.kc.reads)-1]
+ t, err := signatureExpiryTime(loc)
+ c.Assert(err, check.IsNil)
+ c.Logf("last read block %s had signature expiry time %v", loc, t)
+ if t.Sub(time.Now()) > time.Hour {
+ refreshed = true
+ }
+ }
+ c.Check(refreshed, check.Equals, true)
+
+ // Second locator should have been updated at the same
+ // time.
+ buf, err = ioutil.ReadAll(f2)
+ c.Assert(err, check.IsNil)
+ c.Assert(string(buf), check.Equals, filedata2)
+ loc := s.kc.reads[len(s.kc.reads)-1]
+ c.Check(loc, check.Not(check.Equals), s.kc.reads[len(s.kc.reads)-2])
+ t, err := signatureExpiryTime(s.kc.reads[len(s.kc.reads)-1])
+ c.Assert(err, check.IsNil)
+ c.Logf("last read block %s had signature expiry time %v", loc, t)
+ c.Check(t.Sub(time.Now()) > time.Hour, check.Equals, true)
+ }
+}
+
+var bigmanifest = func() string {
+ var buf bytes.Buffer
+ for i := 0; i < 2000; i++ {
+ fmt.Fprintf(&buf, "./dir%d", i)
+ for i := 0; i < 100; i++ {
+ fmt.Fprintf(&buf, " d41d8cd98f00b204e9800998ecf8427e+99999")
+ }
+ for i := 0; i < 2000; i++ {
+ fmt.Fprintf(&buf, " 1200000:300000:file%d", i)
+ }
+ fmt.Fprintf(&buf, "\n")
+ }
+ return buf.String()
+}()
+
+func (s *CollectionFSSuite) BenchmarkParseManifest(c *check.C) {
+ DebugLocksPanicMode = false
+ c.Logf("test manifest is %d bytes", len(bigmanifest))
+ for i := 0; i < c.N; i++ {
+ fs, err := (&Collection{ManifestText: bigmanifest}).FileSystem(s.client, s.kc)
+ c.Check(err, check.IsNil)
+ c.Check(fs, check.NotNil)
+ }
+}
+
func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
fn := f.(*filehandle).inode.(*filenode)
var memsize int64
runtime.ReadMemStats(&memstats)
c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
- f, err := coll.FileSystem(nil, nil)
+ f, err := coll.FileSystem(NewClientFromEnv(), &keepClientStub{})
c.Check(err, check.IsNil)
c.Logf("%s loaded", time.Now())
c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))