12453: Persist written data to keep.
[arvados.git] / sdk / go / arvados / collection_fs.go
index f0f8b0ced3505cc578068f4e38f5665babc87570..36b3bfd554ec937c53d2828760cee42e39eba18e 100644 (file)
@@ -5,7 +5,6 @@
 package arvados
 
 import (
-       "crypto/md5"
        "errors"
        "fmt"
        "io"
@@ -44,6 +43,7 @@ type File interface {
 
 type keepClient interface {
        ReadAt(locator string, p []byte, off int) (int, error)
+       PutB(p []byte) (string, int, error)
 }
 
 type fileinfo struct {
@@ -429,7 +429,6 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 // FileSystem returns a CollectionFileSystem for the collection.
 func (c *Collection) FileSystem(client *Client, kc keepClient) CollectionFileSystem {
        fs := &fileSystem{dirnode: dirnode{
-               cache:    &keepBlockCache{kc: kc},
                client:   client,
                kc:       kc,
                fileinfo: fileinfo{name: ".", mode: os.ModeDir | 0755},
@@ -527,7 +526,6 @@ type dirnode struct {
        parent *dirnode
        client *Client
        kc     keepClient
-       cache  blockCache
        inodes map[string]inode
        sync.RWMutex
 }
@@ -545,24 +543,21 @@ func (dn *dirnode) sync() error {
                if len(sbs) == 0 {
                        return nil
                }
-               hash := md5.New()
-               size := 0
+               block := make([]byte, 0, maxBlockSize)
                for _, sb := range sbs {
-                       data := sb.fn.extents[sb.idx].(*memExtent).buf
-                       if _, err := hash.Write(data); err != nil {
-                               return err
-                       }
-                       size += len(data)
+                       block = append(block, sb.fn.extents[sb.idx].(*memExtent).buf...)
+               }
+               locator, _, err := dn.kc.PutB(block)
+               if err != nil {
+                       return err
                }
-               // FIXME: write to keep
-               locator := fmt.Sprintf("%x+%d", hash.Sum(nil), size)
                off := 0
                for _, sb := range sbs {
                        data := sb.fn.extents[sb.idx].(*memExtent).buf
                        sb.fn.extents[sb.idx] = storedExtent{
-                               cache:   dn.cache,
+                               kc:      dn.kc,
                                locator: locator,
-                               size:    size,
+                               size:    len(block),
                                offset:  off,
                                length:  len(data),
                        }
@@ -674,14 +669,14 @@ func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
        }
        var filetokens []string
        for _, s := range segments {
-               filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, s.name))
+               filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
        }
        if len(filetokens) == 0 {
                return subdirs, nil
        } else if len(blocks) == 0 {
                blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
        }
-       return prefix + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+       return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
 }
 
 func (dn *dirnode) loadManifest(txt string) {
@@ -732,13 +727,13 @@ func (dn *dirnode) loadManifest(txt string) {
                        dn.makeParentDirs(name)
                        f, err := dn.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0700)
                        if err != nil {
-                               // FIXME: broken
-                               continue
+                               // FIXME: don't panic
+                               panic(fmt.Errorf("cannot append to %q: %s", name, err))
                        }
                        if f.inode.Stat().IsDir() {
                                f.Close()
-                               // FIXME: broken manifest
-                               continue
+                               // FIXME: don't panic
+                               panic(fmt.Errorf("cannot append to %q: is a directory", name))
                        }
                        // Map the stream offset/range coordinates to
                        // block/offset/range coordinates and add
@@ -762,7 +757,7 @@ func (dn *dirnode) loadManifest(txt string) {
                                        blkLen = int(offset + length - pos - int64(blkOff))
                                }
                                f.inode.(*filenode).appendExtent(storedExtent{
-                                       cache:   dn.cache,
+                                       kc:      dn.kc,
                                        locator: e.locator,
                                        size:    e.size,
                                        offset:  blkOff,
@@ -1015,7 +1010,7 @@ func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
 }
 
 type storedExtent struct {
-       cache   blockCache
+       kc      keepClient
        locator string
        size    int
        offset  int
@@ -1042,27 +1037,13 @@ func (se storedExtent) ReadAt(p []byte, off int64) (n int, err error) {
        maxlen := se.length - int(off)
        if len(p) > maxlen {
                p = p[:maxlen]
-               n, err = se.cache.ReadAt(se.locator, p, int(off)+se.offset)
+               n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
                if err == nil {
                        err = io.EOF
                }
                return
        }
-       return se.cache.ReadAt(se.locator, p, int(off)+se.offset)
-}
-
-type blockCache interface {
-       ReadAt(locator string, p []byte, off int) (n int, err error)
-}
-
-type keepBlockCache struct {
-       kc keepClient
-}
-
-var scratch = make([]byte, 2<<26)
-
-func (kbc *keepBlockCache) ReadAt(locator string, p []byte, off int) (int, error) {
-       return kbc.kc.ReadAt(locator, p, off)
+       return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
 }
 
 func canonicalName(name string) string {
@@ -1077,7 +1058,7 @@ func canonicalName(name string) string {
 
 var manifestEscapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
 
-func manifestUnescapeSeq(seq string) string {
+func manifestUnescapeFunc(seq string) string {
        if seq == `\\` {
                return `\`
        }
@@ -1090,5 +1071,15 @@ func manifestUnescapeSeq(seq string) string {
 }
 
 func manifestUnescape(s string) string {
-       return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeSeq)
+       return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
+}
+
+var manifestEscapedChar = regexp.MustCompile(`[^\.\w/]`)
+
+func manifestEscapeFunc(seq string) string {
+       return fmt.Sprintf("\\%03o", byte(seq[0]))
+}
+
+func manifestEscape(s string) string {
+       return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
 }