12483: Add MarshalManifest().
authorTom Clegg <tclegg@veritasgenetics.com>
Sun, 12 Nov 2017 08:31:16 +0000 (03:31 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Sun, 12 Nov 2017 08:42:44 +0000 (03:42 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/arvados/collection_fs.go
sdk/go/arvados/collection_fs_test.go

index 17740dbead058cd659442f3420e532f3f82f7fd9..2f1ec2fee43c1adea2ddc775e1884cac0583c6de 100644 (file)
@@ -5,12 +5,15 @@
 package arvados
 
 import (
+       "crypto/md5"
        "errors"
+       "fmt"
        "io"
        "net/http"
        "os"
        "path"
        "regexp"
+       "sort"
        "strconv"
        "strings"
        "sync"
@@ -90,6 +93,7 @@ type CollectionFileSystem interface {
        Stat(name string) (os.FileInfo, error)
        Create(name string) (File, error)
        OpenFile(name string, flag int, perm os.FileMode) (File, error)
+       MarshalManifest(string) (string, error)
 }
 
 type fileSystem struct {
@@ -534,6 +538,158 @@ type dirnode struct {
        sync.RWMutex
 }
 
+// caller must hold dn.Lock().
+func (dn *dirnode) sync() error {
+       type shortBlock struct {
+               fn  *filenode
+               idx int
+       }
+       var pending []shortBlock
+       var pendingLen int
+
+       flush := func(sbs []shortBlock) error {
+               if len(sbs) == 0 {
+                       return nil
+               }
+               hash := md5.New()
+               size := 0
+               for _, sb := range sbs {
+                       data := sb.fn.extents[sb.idx].(*memExtent).buf
+                       if _, err := hash.Write(data); err != nil {
+                               return err
+                       }
+                       size += len(data)
+               }
+               // FIXME: write to keep
+               locator := fmt.Sprintf("%x+%d", hash.Sum(nil), size)
+               off := 0
+               for _, sb := range sbs {
+                       data := sb.fn.extents[sb.idx].(*memExtent).buf
+                       sb.fn.extents[sb.idx] = storedExtent{
+                               cache:   dn.cache,
+                               locator: locator,
+                               size:    size,
+                               offset:  off,
+                               length:  len(data),
+                       }
+                       off += len(data)
+               }
+               return nil
+       }
+
+       names := make([]string, 0, len(dn.inodes))
+       for name := range dn.inodes {
+               names = append(names, name)
+       }
+       sort.Strings(names)
+
+       for _, name := range names {
+               fn, ok := dn.inodes[name].(*filenode)
+               if !ok {
+                       continue
+               }
+               fn.Lock()
+               defer fn.Unlock()
+               for idx, ext := range fn.extents {
+                       ext, ok := ext.(*memExtent)
+                       if !ok {
+                               continue
+                       }
+                       if ext.Len() > maxBlockSize/2 {
+                               if err := flush([]shortBlock{{fn, idx}}); err != nil {
+                                       return err
+                               }
+                               continue
+                       }
+                       if pendingLen+ext.Len() > maxBlockSize {
+                               if err := flush(pending); err != nil {
+                                       return err
+                               }
+                               pending = nil
+                               pendingLen = 0
+                       }
+                       pending = append(pending, shortBlock{fn, idx})
+                       pendingLen += ext.Len()
+               }
+       }
+       return flush(pending)
+}
+
+func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
+       dn.Lock()
+       defer dn.Unlock()
+       if err := dn.sync(); err != nil {
+               return "", err
+       }
+
+       var streamLen int64
+       type m1segment struct {
+               name   string
+               offset int64
+               length int64
+       }
+       var segments []m1segment
+       var subdirs string
+       var blocks []string
+
+       names := make([]string, 0, len(dn.inodes))
+       for name := range dn.inodes {
+               names = append(names, name)
+       }
+       sort.Strings(names)
+
+       for _, name := range names {
+               node := dn.inodes[name]
+               switch node := node.(type) {
+               case *dirnode:
+                       subdir, err := node.MarshalManifest(prefix + "/" + node.Name())
+                       if err != nil {
+                               return "", err
+                       }
+                       subdirs = subdirs + subdir
+               case *filenode:
+                       for _, e := range node.extents {
+                               switch e := e.(type) {
+                               case *memExtent:
+                                       blocks = append(blocks, fmt.Sprintf("FIXME+%d", e.Len()))
+                                       segments = append(segments, m1segment{
+                                               name:   node.Name(),
+                                               offset: streamLen,
+                                               length: int64(e.Len()),
+                                       })
+                                       streamLen += int64(e.Len())
+                               case storedExtent:
+                                       if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
+                                               streamLen -= int64(e.size)
+                                       } else {
+                                               blocks = append(blocks, e.locator)
+                                       }
+                                       segments = append(segments, m1segment{
+                                               name:   node.Name(),
+                                               offset: streamLen + int64(e.offset),
+                                               length: int64(e.length),
+                                       })
+                                       streamLen += int64(e.size)
+                               default:
+                                       panic(fmt.Sprintf("can't marshal extent type %T", e))
+                               }
+                       }
+               default:
+                       panic(fmt.Sprintf("can't marshal inode type %T", node))
+               }
+       }
+       var filetokens []string
+       for _, s := range segments {
+               filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, s.name))
+       }
+       if len(filetokens) == 0 {
+               return subdirs, nil
+       } else if len(blocks) == 0 {
+               blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+       }
+       return prefix + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+}
+
 func (dn *dirnode) loadManifest(txt string) {
        // FIXME: faster
        var dirname string
@@ -557,6 +713,7 @@ func (dn *dirnode) loadManifest(txt string) {
                                }
                                extents = append(extents, storedExtent{
                                        locator: token,
+                                       size:    int(length),
                                        offset:  0,
                                        length:  int(length),
                                })
@@ -613,6 +770,7 @@ func (dn *dirnode) loadManifest(txt string) {
                                f.inode.(*filenode).appendExtent(storedExtent{
                                        cache:   dn.cache,
                                        locator: e.locator,
+                                       size:    e.size,
                                        offset:  blkOff,
                                        length:  blkLen,
                                })
@@ -803,6 +961,7 @@ func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
 type storedExtent struct {
        cache   blockCache
        locator string
+       size    int
        offset  int
        length  int
 }
index 42080793183ebf372309024ec3b263fd1d4fa0fd..70d46e234c37b95477e530d704c9dde716a57ebc 100644 (file)
@@ -11,6 +11,7 @@ import (
        "math/rand"
        "net/http"
        "os"
+       "regexp"
        "sync"
        "testing"
 
@@ -299,6 +300,29 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
        c.Check(err, check.IsNil)
        c.Check(string(buf2), check.Equals, "123")
        c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 1)
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+       c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 202cb962ac59075b964b07152d234b70+3 3:3:bar 6:3:foo\n")
+}
+
+func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
+       maxBlockSize = 8
+       defer func() { maxBlockSize = 2 << 26 }()
+
+       s.fs = (&Collection{}).FileSystem(s.client, s.kc)
+       for _, name := range []string{"foo", "bar", "baz"} {
+               f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+               c.Assert(err, check.IsNil)
+               f.Write([]byte(name))
+               f.Close()
+       }
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+       c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
 }
 
 func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
@@ -389,6 +413,10 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
        fi, err := root.Readdir(-1)
        c.Check(err, check.IsNil)
        c.Logf("Readdir(): %#v", fi)
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       c.Logf("%s", m)
 }
 
 // Gocheck boilerplate