12483: Add MarshalManifest().
authorTom Clegg <tclegg@veritasgenetics.com>
Sun, 12 Nov 2017 08:31:16 +0000 (03:31 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Sun, 12 Nov 2017 08:42:44 +0000 (03:42 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/arvados/collection_fs.go
sdk/go/arvados/collection_fs_test.go

index 17740dbead058cd659442f3420e532f3f82f7fd9..2f1ec2fee43c1adea2ddc775e1884cac0583c6de 100644 (file)
@@ -5,12 +5,15 @@
 package arvados
 
 import (
 package arvados
 
 import (
+       "crypto/md5"
        "errors"
        "errors"
+       "fmt"
        "io"
        "net/http"
        "os"
        "path"
        "regexp"
        "io"
        "net/http"
        "os"
        "path"
        "regexp"
+       "sort"
        "strconv"
        "strings"
        "sync"
        "strconv"
        "strings"
        "sync"
@@ -90,6 +93,7 @@ type CollectionFileSystem interface {
        Stat(name string) (os.FileInfo, error)
        Create(name string) (File, error)
        OpenFile(name string, flag int, perm os.FileMode) (File, error)
        Stat(name string) (os.FileInfo, error)
        Create(name string) (File, error)
        OpenFile(name string, flag int, perm os.FileMode) (File, error)
+       MarshalManifest(string) (string, error)
 }
 
 type fileSystem struct {
 }
 
 type fileSystem struct {
@@ -534,6 +538,158 @@ type dirnode struct {
        sync.RWMutex
 }
 
        sync.RWMutex
 }
 
+// caller must hold dn.Lock().
+func (dn *dirnode) sync() error {
+       type shortBlock struct {
+               fn  *filenode
+               idx int
+       }
+       var pending []shortBlock
+       var pendingLen int
+
+       flush := func(sbs []shortBlock) error {
+               if len(sbs) == 0 {
+                       return nil
+               }
+               hash := md5.New()
+               size := 0
+               for _, sb := range sbs {
+                       data := sb.fn.extents[sb.idx].(*memExtent).buf
+                       if _, err := hash.Write(data); err != nil {
+                               return err
+                       }
+                       size += len(data)
+               }
+               // FIXME: write to keep
+               locator := fmt.Sprintf("%x+%d", hash.Sum(nil), size)
+               off := 0
+               for _, sb := range sbs {
+                       data := sb.fn.extents[sb.idx].(*memExtent).buf
+                       sb.fn.extents[sb.idx] = storedExtent{
+                               cache:   dn.cache,
+                               locator: locator,
+                               size:    size,
+                               offset:  off,
+                               length:  len(data),
+                       }
+                       off += len(data)
+               }
+               return nil
+       }
+
+       names := make([]string, 0, len(dn.inodes))
+       for name := range dn.inodes {
+               names = append(names, name)
+       }
+       sort.Strings(names)
+
+       for _, name := range names {
+               fn, ok := dn.inodes[name].(*filenode)
+               if !ok {
+                       continue
+               }
+               fn.Lock()
+               defer fn.Unlock()
+               for idx, ext := range fn.extents {
+                       ext, ok := ext.(*memExtent)
+                       if !ok {
+                               continue
+                       }
+                       if ext.Len() > maxBlockSize/2 {
+                               if err := flush([]shortBlock{{fn, idx}}); err != nil {
+                                       return err
+                               }
+                               continue
+                       }
+                       if pendingLen+ext.Len() > maxBlockSize {
+                               if err := flush(pending); err != nil {
+                                       return err
+                               }
+                               pending = nil
+                               pendingLen = 0
+                       }
+                       pending = append(pending, shortBlock{fn, idx})
+                       pendingLen += ext.Len()
+               }
+       }
+       return flush(pending)
+}
+
+func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
+       dn.Lock()
+       defer dn.Unlock()
+       if err := dn.sync(); err != nil {
+               return "", err
+       }
+
+       var streamLen int64
+       type m1segment struct {
+               name   string
+               offset int64
+               length int64
+       }
+       var segments []m1segment
+       var subdirs string
+       var blocks []string
+
+       names := make([]string, 0, len(dn.inodes))
+       for name := range dn.inodes {
+               names = append(names, name)
+       }
+       sort.Strings(names)
+
+       for _, name := range names {
+               node := dn.inodes[name]
+               switch node := node.(type) {
+               case *dirnode:
+                       subdir, err := node.MarshalManifest(prefix + "/" + node.Name())
+                       if err != nil {
+                               return "", err
+                       }
+                       subdirs = subdirs + subdir
+               case *filenode:
+                       for _, e := range node.extents {
+                               switch e := e.(type) {
+                               case *memExtent:
+                                       blocks = append(blocks, fmt.Sprintf("FIXME+%d", e.Len()))
+                                       segments = append(segments, m1segment{
+                                               name:   node.Name(),
+                                               offset: streamLen,
+                                               length: int64(e.Len()),
+                                       })
+                                       streamLen += int64(e.Len())
+                               case storedExtent:
+                                       if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
+                                               streamLen -= int64(e.size)
+                                       } else {
+                                               blocks = append(blocks, e.locator)
+                                       }
+                                       segments = append(segments, m1segment{
+                                               name:   node.Name(),
+                                               offset: streamLen + int64(e.offset),
+                                               length: int64(e.length),
+                                       })
+                                       streamLen += int64(e.size)
+                               default:
+                                       panic(fmt.Sprintf("can't marshal extent type %T", e))
+                               }
+                       }
+               default:
+                       panic(fmt.Sprintf("can't marshal inode type %T", node))
+               }
+       }
+       var filetokens []string
+       for _, s := range segments {
+               filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, s.name))
+       }
+       if len(filetokens) == 0 {
+               return subdirs, nil
+       } else if len(blocks) == 0 {
+               blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+       }
+       return prefix + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+}
+
 func (dn *dirnode) loadManifest(txt string) {
        // FIXME: faster
        var dirname string
 func (dn *dirnode) loadManifest(txt string) {
        // FIXME: faster
        var dirname string
@@ -557,6 +713,7 @@ func (dn *dirnode) loadManifest(txt string) {
                                }
                                extents = append(extents, storedExtent{
                                        locator: token,
                                }
                                extents = append(extents, storedExtent{
                                        locator: token,
+                                       size:    int(length),
                                        offset:  0,
                                        length:  int(length),
                                })
                                        offset:  0,
                                        length:  int(length),
                                })
@@ -613,6 +770,7 @@ func (dn *dirnode) loadManifest(txt string) {
                                f.inode.(*filenode).appendExtent(storedExtent{
                                        cache:   dn.cache,
                                        locator: e.locator,
                                f.inode.(*filenode).appendExtent(storedExtent{
                                        cache:   dn.cache,
                                        locator: e.locator,
+                                       size:    e.size,
                                        offset:  blkOff,
                                        length:  blkLen,
                                })
                                        offset:  blkOff,
                                        length:  blkLen,
                                })
@@ -803,6 +961,7 @@ func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
 type storedExtent struct {
        cache   blockCache
        locator string
 type storedExtent struct {
        cache   blockCache
        locator string
+       size    int
        offset  int
        length  int
 }
        offset  int
        length  int
 }
index 42080793183ebf372309024ec3b263fd1d4fa0fd..70d46e234c37b95477e530d704c9dde716a57ebc 100644 (file)
@@ -11,6 +11,7 @@ import (
        "math/rand"
        "net/http"
        "os"
        "math/rand"
        "net/http"
        "os"
+       "regexp"
        "sync"
        "testing"
 
        "sync"
        "testing"
 
@@ -299,6 +300,29 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
        c.Check(err, check.IsNil)
        c.Check(string(buf2), check.Equals, "123")
        c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 1)
        c.Check(err, check.IsNil)
        c.Check(string(buf2), check.Equals, "123")
        c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 1)
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+       c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 202cb962ac59075b964b07152d234b70+3 3:3:bar 6:3:foo\n")
+}
+
+func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
+       maxBlockSize = 8
+       defer func() { maxBlockSize = 2 << 26 }()
+
+       s.fs = (&Collection{}).FileSystem(s.client, s.kc)
+       for _, name := range []string{"foo", "bar", "baz"} {
+               f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+               c.Assert(err, check.IsNil)
+               f.Write([]byte(name))
+               f.Close()
+       }
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+       c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
 }
 
 func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
 }
 
 func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
@@ -389,6 +413,10 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
        fi, err := root.Readdir(-1)
        c.Check(err, check.IsNil)
        c.Logf("Readdir(): %#v", fi)
        fi, err := root.Readdir(-1)
        c.Check(err, check.IsNil)
        c.Logf("Readdir(): %#v", fi)
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       c.Logf("%s", m)
 }
 
 // Gocheck boilerplate
 }
 
 // Gocheck boilerplate