From 236ede1415d7a09f97a05c5ab63e2cedd4d19d48 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Sun, 12 Nov 2017 03:31:16 -0500 Subject: [PATCH] 12483: Add MarshalManifest(). Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- sdk/go/arvados/collection_fs.go | 159 +++++++++++++++++++++++++++ sdk/go/arvados/collection_fs_test.go | 28 +++++ 2 files changed, 187 insertions(+) diff --git a/sdk/go/arvados/collection_fs.go b/sdk/go/arvados/collection_fs.go index 17740dbead..2f1ec2fee4 100644 --- a/sdk/go/arvados/collection_fs.go +++ b/sdk/go/arvados/collection_fs.go @@ -5,12 +5,15 @@ package arvados import ( + "crypto/md5" "errors" + "fmt" "io" "net/http" "os" "path" "regexp" + "sort" "strconv" "strings" "sync" @@ -90,6 +93,7 @@ type CollectionFileSystem interface { Stat(name string) (os.FileInfo, error) Create(name string) (File, error) OpenFile(name string, flag int, perm os.FileMode) (File, error) + MarshalManifest(string) (string, error) } type fileSystem struct { @@ -534,6 +538,158 @@ type dirnode struct { sync.RWMutex } +// caller must hold dn.Lock(). +func (dn *dirnode) sync() error { + type shortBlock struct { + fn *filenode + idx int + } + var pending []shortBlock + var pendingLen int + + flush := func(sbs []shortBlock) error { + if len(sbs) == 0 { + return nil + } + hash := md5.New() + size := 0 + for _, sb := range sbs { + data := sb.fn.extents[sb.idx].(*memExtent).buf + if _, err := hash.Write(data); err != nil { + return err + } + size += len(data) + } + // FIXME: write to keep + locator := fmt.Sprintf("%x+%d", hash.Sum(nil), size) + off := 0 + for _, sb := range sbs { + data := sb.fn.extents[sb.idx].(*memExtent).buf + sb.fn.extents[sb.idx] = storedExtent{ + cache: dn.cache, + locator: locator, + size: size, + offset: off, + length: len(data), + } + off += len(data) + } + return nil + } + + names := make([]string, 0, len(dn.inodes)) + for name := range dn.inodes { + names = append(names, name) + } + sort.Strings(names) + + for _, name := range names { + fn, ok := dn.inodes[name].(*filenode) + if !ok { + continue + } + fn.Lock() + defer fn.Unlock() + for idx, ext := range fn.extents { + ext, ok := ext.(*memExtent) + if !ok { + continue + } + if ext.Len() > maxBlockSize/2 { + if err := flush([]shortBlock{{fn, idx}}); err != nil { + return err + } + continue + } + if pendingLen+ext.Len() > maxBlockSize { + if err := flush(pending); err != nil { + return err + } + pending = nil + pendingLen = 0 + } + pending = append(pending, shortBlock{fn, idx}) + pendingLen += ext.Len() + } + } + return flush(pending) +} + +func (dn *dirnode) MarshalManifest(prefix string) (string, error) { + dn.Lock() + defer dn.Unlock() + if err := dn.sync(); err != nil { + return "", err + } + + var streamLen int64 + type m1segment struct { + name string + offset int64 + length int64 + } + var segments []m1segment + var subdirs string + var blocks []string + + names := make([]string, 0, len(dn.inodes)) + for name := range dn.inodes { + names = append(names, name) + } + sort.Strings(names) + + for _, name := range names { + node := dn.inodes[name] + switch node := node.(type) { + case *dirnode: + subdir, err := node.MarshalManifest(prefix + "/" + node.Name()) + if err != nil { + return "", err + } + subdirs = subdirs + subdir + case *filenode: + for _, e := range node.extents { + switch e := e.(type) { + case *memExtent: + blocks = append(blocks, fmt.Sprintf("FIXME+%d", e.Len())) + segments = append(segments, m1segment{ + name: node.Name(), + offset: streamLen, + length: int64(e.Len()), + }) + streamLen += int64(e.Len()) + case storedExtent: + if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator { + streamLen -= int64(e.size) + } else { + blocks = append(blocks, e.locator) + } + segments = append(segments, m1segment{ + name: node.Name(), + offset: streamLen + int64(e.offset), + length: int64(e.length), + }) + streamLen += int64(e.size) + default: + panic(fmt.Sprintf("can't marshal extent type %T", e)) + } + } + default: + panic(fmt.Sprintf("can't marshal inode type %T", node)) + } + } + var filetokens []string + for _, s := range segments { + filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, s.name)) + } + if len(filetokens) == 0 { + return subdirs, nil + } else if len(blocks) == 0 { + blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"} + } + return prefix + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil +} + func (dn *dirnode) loadManifest(txt string) { // FIXME: faster var dirname string @@ -557,6 +713,7 @@ func (dn *dirnode) loadManifest(txt string) { } extents = append(extents, storedExtent{ locator: token, + size: int(length), offset: 0, length: int(length), }) @@ -613,6 +770,7 @@ func (dn *dirnode) loadManifest(txt string) { f.inode.(*filenode).appendExtent(storedExtent{ cache: dn.cache, locator: e.locator, + size: e.size, offset: blkOff, length: blkLen, }) @@ -803,6 +961,7 @@ func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) { type storedExtent struct { cache blockCache locator string + size int offset int length int } diff --git a/sdk/go/arvados/collection_fs_test.go b/sdk/go/arvados/collection_fs_test.go index 4208079318..70d46e234c 100644 --- a/sdk/go/arvados/collection_fs_test.go +++ b/sdk/go/arvados/collection_fs_test.go @@ -11,6 +11,7 @@ import ( "math/rand" "net/http" "os" + "regexp" "sync" "testing" @@ -299,6 +300,29 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) { c.Check(err, check.IsNil) c.Check(string(buf2), check.Equals, "123") c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 1) + + m, err := s.fs.MarshalManifest(".") + c.Check(err, check.IsNil) + m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "") + c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 202cb962ac59075b964b07152d234b70+3 3:3:bar 6:3:foo\n") +} + +func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) { + maxBlockSize = 8 + defer func() { maxBlockSize = 2 << 26 }() + + s.fs = (&Collection{}).FileSystem(s.client, s.kc) + for _, name := range []string{"foo", "bar", "baz"} { + f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0) + c.Assert(err, check.IsNil) + f.Write([]byte(name)) + f.Close() + } + + m, err := s.fs.MarshalManifest(".") + c.Check(err, check.IsNil) + m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "") + c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n") } func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) { @@ -389,6 +413,10 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) { fi, err := root.Readdir(-1) c.Check(err, check.IsNil) c.Logf("Readdir(): %#v", fi) + + m, err := s.fs.MarshalManifest(".") + c.Check(err, check.IsNil) + c.Logf("%s", m) } // Gocheck boilerplate -- 2.30.2