12453: Persist written data to keep.
authorTom Clegg <tclegg@veritasgenetics.com>
Mon, 13 Nov 2017 21:50:22 +0000 (16:50 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 14 Nov 2017 14:23:28 +0000 (09:23 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/arvados/collection_fs.go
sdk/go/arvados/collection_fs_test.go

index f0f8b0ced3505cc578068f4e38f5665babc87570..36b3bfd554ec937c53d2828760cee42e39eba18e 100644 (file)
@@ -5,7 +5,6 @@
 package arvados
 
 import (
-       "crypto/md5"
        "errors"
        "fmt"
        "io"
@@ -44,6 +43,7 @@ type File interface {
 
 type keepClient interface {
        ReadAt(locator string, p []byte, off int) (int, error)
+       PutB(p []byte) (string, int, error)
 }
 
 type fileinfo struct {
@@ -429,7 +429,6 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 // FileSystem returns a CollectionFileSystem for the collection.
 func (c *Collection) FileSystem(client *Client, kc keepClient) CollectionFileSystem {
        fs := &fileSystem{dirnode: dirnode{
-               cache:    &keepBlockCache{kc: kc},
                client:   client,
                kc:       kc,
                fileinfo: fileinfo{name: ".", mode: os.ModeDir | 0755},
@@ -527,7 +526,6 @@ type dirnode struct {
        parent *dirnode
        client *Client
        kc     keepClient
-       cache  blockCache
        inodes map[string]inode
        sync.RWMutex
 }
@@ -545,24 +543,21 @@ func (dn *dirnode) sync() error {
                if len(sbs) == 0 {
                        return nil
                }
-               hash := md5.New()
-               size := 0
+               block := make([]byte, 0, maxBlockSize)
                for _, sb := range sbs {
-                       data := sb.fn.extents[sb.idx].(*memExtent).buf
-                       if _, err := hash.Write(data); err != nil {
-                               return err
-                       }
-                       size += len(data)
+                       block = append(block, sb.fn.extents[sb.idx].(*memExtent).buf...)
+               }
+               locator, _, err := dn.kc.PutB(block)
+               if err != nil {
+                       return err
                }
-               // FIXME: write to keep
-               locator := fmt.Sprintf("%x+%d", hash.Sum(nil), size)
                off := 0
                for _, sb := range sbs {
                        data := sb.fn.extents[sb.idx].(*memExtent).buf
                        sb.fn.extents[sb.idx] = storedExtent{
-                               cache:   dn.cache,
+                               kc:      dn.kc,
                                locator: locator,
-                               size:    size,
+                               size:    len(block),
                                offset:  off,
                                length:  len(data),
                        }
@@ -674,14 +669,14 @@ func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
        }
        var filetokens []string
        for _, s := range segments {
-               filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, s.name))
+               filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
        }
        if len(filetokens) == 0 {
                return subdirs, nil
        } else if len(blocks) == 0 {
                blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
        }
-       return prefix + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+       return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
 }
 
 func (dn *dirnode) loadManifest(txt string) {
@@ -732,13 +727,13 @@ func (dn *dirnode) loadManifest(txt string) {
                        dn.makeParentDirs(name)
                        f, err := dn.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0700)
                        if err != nil {
-                               // FIXME: broken
-                               continue
+                               // FIXME: don't panic
+                               panic(fmt.Errorf("cannot append to %q: %s", name, err))
                        }
                        if f.inode.Stat().IsDir() {
                                f.Close()
-                               // FIXME: broken manifest
-                               continue
+                               // FIXME: don't panic
+                               panic(fmt.Errorf("cannot append to %q: is a directory", name))
                        }
                        // Map the stream offset/range coordinates to
                        // block/offset/range coordinates and add
@@ -762,7 +757,7 @@ func (dn *dirnode) loadManifest(txt string) {
                                        blkLen = int(offset + length - pos - int64(blkOff))
                                }
                                f.inode.(*filenode).appendExtent(storedExtent{
-                                       cache:   dn.cache,
+                                       kc:      dn.kc,
                                        locator: e.locator,
                                        size:    e.size,
                                        offset:  blkOff,
@@ -1015,7 +1010,7 @@ func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
 }
 
 type storedExtent struct {
-       cache   blockCache
+       kc      keepClient
        locator string
        size    int
        offset  int
@@ -1042,27 +1037,13 @@ func (se storedExtent) ReadAt(p []byte, off int64) (n int, err error) {
        maxlen := se.length - int(off)
        if len(p) > maxlen {
                p = p[:maxlen]
-               n, err = se.cache.ReadAt(se.locator, p, int(off)+se.offset)
+               n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
                if err == nil {
                        err = io.EOF
                }
                return
        }
-       return se.cache.ReadAt(se.locator, p, int(off)+se.offset)
-}
-
-type blockCache interface {
-       ReadAt(locator string, p []byte, off int) (n int, err error)
-}
-
-type keepBlockCache struct {
-       kc keepClient
-}
-
-var scratch = make([]byte, 2<<26)
-
-func (kbc *keepBlockCache) ReadAt(locator string, p []byte, off int) (int, error) {
-       return kbc.kc.ReadAt(locator, p, off)
+       return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
 }
 
 func canonicalName(name string) string {
@@ -1077,7 +1058,7 @@ func canonicalName(name string) string {
 
 var manifestEscapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
 
-func manifestUnescapeSeq(seq string) string {
+func manifestUnescapeFunc(seq string) string {
        if seq == `\\` {
                return `\`
        }
@@ -1090,5 +1071,15 @@ func manifestUnescapeSeq(seq string) string {
 }
 
 func manifestUnescape(s string) string {
-       return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeSeq)
+       return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
+}
+
+var manifestEscapedChar = regexp.MustCompile(`[^\.\w/]`)
+
+func manifestEscapeFunc(seq string) string {
+       return fmt.Sprintf("\\%03o", byte(seq[0]))
+}
+
+func manifestEscape(s string) string {
+       return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
 }
index ce3f85d42c9cecd83da4d3813bb262cbdf5d4c63..5d49ba3da7785e915c2b4597a576a6d52ac4f928 100644 (file)
@@ -5,6 +5,8 @@
 package arvados
 
 import (
+       "crypto/md5"
+       "errors"
        "fmt"
        "io"
        "io/ioutil"
@@ -23,16 +25,31 @@ var _ = check.Suite(&CollectionFSSuite{})
 
 type keepClientStub struct {
        blocks map[string][]byte
+       sync.RWMutex
 }
 
+var errStub404 = errors.New("404 block not found")
+
 func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
+       kcs.RLock()
+       defer kcs.RUnlock()
        buf := kcs.blocks[locator[:32]]
        if buf == nil {
-               return 0, os.ErrNotExist
+               return 0, errStub404
        }
        return copy(p, buf[off:]), nil
 }
 
+func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
+       locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
+       buf := make([]byte, len(p))
+       copy(buf, p)
+       kcs.Lock()
+       defer kcs.Unlock()
+       kcs.blocks[locator[:32]] = buf
+       return locator, 1, nil
+}
+
 type CollectionFSSuite struct {
        client *Client
        coll   Collection
@@ -371,7 +388,7 @@ func (s *CollectionFSSuite) TestMkdir(c *check.C) {
        // creating foo/bar as a directory should fail
        f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
        c.Check(err, check.NotNil)
-       err = s.fs.Mkdir("foo/bar")
+       err = s.fs.Mkdir("foo/bar", 0755)
        c.Check(err, check.NotNil)
 
        m, err := s.fs.MarshalManifest(".")
@@ -422,6 +439,8 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
        maxBlockSize = 8
        defer func() { maxBlockSize = 2 << 26 }()
 
+       s.fs = (&Collection{}).FileSystem(s.client, s.kc)
+
        var wg sync.WaitGroup
        for n := 0; n < 128; n++ {
                wg.Add(1)
@@ -467,11 +486,75 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
        defer root.Close()
        fi, err := root.Readdir(-1)
        c.Check(err, check.IsNil)
-       c.Logf("Readdir(): %#v", fi)
+       c.Check(len(fi), check.Equals, 128)
+
+       _, err = s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       // TODO: check manifest content
+}
+
+func (s *CollectionFSSuite) TestPersist(c *check.C) {
+       maxBlockSize = 1024
+       defer func() { maxBlockSize = 2 << 26 }()
+
+       s.fs = (&Collection{}).FileSystem(s.client, s.kc)
+       err := s.fs.Mkdir("d:r", 0755)
+       c.Assert(err, check.IsNil)
+
+       expect := map[string][]byte{}
+
+       var wg sync.WaitGroup
+       for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
+               buf := make([]byte, 500)
+               rand.Read(buf)
+               expect[name] = buf
+
+               f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+               c.Assert(err, check.IsNil)
+               // Note: we don't close the file until after the test
+               // is done. Writes to unclosed files should persist.
+               defer f.Close()
+
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for i := 0; i < len(buf); i += 5 {
+                               _, err := f.Write(buf[i : i+5])
+                               c.Assert(err, check.IsNil)
+                       }
+               }()
+       }
+       wg.Wait()
 
        m, err := s.fs.MarshalManifest(".")
        c.Check(err, check.IsNil)
-       c.Logf("%s", m)
+       c.Logf("%q", m)
+
+       root, err := s.fs.Open("/")
+       c.Assert(err, check.IsNil)
+       defer root.Close()
+       fi, err := root.Readdir(-1)
+       c.Check(err, check.IsNil)
+       c.Check(len(fi), check.Equals, 4)
+
+       persisted := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
+
+       root, err = persisted.Open("/")
+       c.Assert(err, check.IsNil)
+       defer root.Close()
+       fi, err = root.Readdir(-1)
+       c.Check(err, check.IsNil)
+       c.Check(len(fi), check.Equals, 4)
+
+       for name, content := range expect {
+               c.Logf("read %q", name)
+               f, err := persisted.Open(name)
+               c.Assert(err, check.IsNil)
+               defer f.Close()
+               buf, err := ioutil.ReadAll(f)
+               c.Check(err, check.IsNil)
+               c.Check(buf, check.DeepEquals, content)
+       }
 }
 
 // Gocheck boilerplate