12483: Add MarshalManifest().
[arvados.git] / sdk / go / arvados / collection_fs.go
index 1669f888c91e1bb7627f17b241e63dc57a189d28..2f1ec2fee43c1adea2ddc775e1884cac0583c6de 100644 (file)
@@ -5,12 +5,15 @@
 package arvados
 
 import (
+       "crypto/md5"
        "errors"
+       "fmt"
        "io"
        "net/http"
        "os"
        "path"
        "regexp"
+       "sort"
        "strconv"
        "strings"
        "sync"
@@ -35,6 +38,7 @@ type File interface {
        Size() int64
        Readdir(int) ([]os.FileInfo, error)
        Stat() (os.FileInfo, error)
+       Truncate(int64) error
 }
 
 type keepClient interface {
@@ -89,6 +93,7 @@ type CollectionFileSystem interface {
        Stat(name string) (os.FileInfo, error)
        Create(name string) (File, error)
        OpenFile(name string, flag int, perm os.FileMode) (File, error)
+       MarshalManifest(string) (string, error)
 }
 
 type fileSystem struct {
@@ -122,6 +127,7 @@ type inode interface {
        Parent() inode
        Read([]byte, filenodePtr) (int, filenodePtr, error)
        Write([]byte, filenodePtr) (int, filenodePtr, error)
+       Truncate(int64) error
        Readdir() []os.FileInfo
        Stat() os.FileInfo
        sync.Locker
@@ -215,7 +221,6 @@ func (fn *filenode) appendExtent(e extent) {
        defer fn.Unlock()
        fn.extents = append(fn.extents, e)
        fn.fileinfo.size += int64(e.Len())
-       fn.repacked++
 }
 
 func (fn *filenode) OpenFile(string, int, os.FileMode) (*file, error) {
@@ -257,6 +262,48 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
        return
 }
 
+func (fn *filenode) Truncate(size int64) error {
+       fn.Lock()
+       defer fn.Unlock()
+       if size < fn.fileinfo.size {
+               ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
+               if ptr.extentOff == 0 {
+                       fn.extents = fn.extents[:ptr.extentIdx]
+               } else {
+                       fn.extents = fn.extents[:ptr.extentIdx+1]
+                       e := fn.extents[ptr.extentIdx]
+                       if e, ok := e.(writableExtent); ok {
+                               e.Truncate(ptr.extentOff)
+                       } else {
+                               fn.extents[ptr.extentIdx] = e.Slice(0, ptr.extentOff)
+                       }
+               }
+               fn.fileinfo.size = size
+               fn.repacked++
+               return nil
+       }
+       for size > fn.fileinfo.size {
+               grow := size - fn.fileinfo.size
+               var e writableExtent
+               var ok bool
+               if len(fn.extents) == 0 {
+                       e = &memExtent{}
+                       fn.extents = append(fn.extents, e)
+               } else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
+                       e = &memExtent{}
+                       fn.extents = append(fn.extents, e)
+               } else {
+                       fn.repacked++
+               }
+               if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
+                       grow = maxgrow
+               }
+               e.Truncate(e.Len() + int(grow))
+               fn.fileinfo.size += grow
+       }
+       return nil
+}
+
 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
        fn.Lock()
        defer fn.Unlock()
@@ -435,6 +482,10 @@ func (f *file) Seek(off int64, whence int) (pos int64, err error) {
        return f.ptr.off, nil
 }
 
+func (f *file) Truncate(size int64) error {
+       return f.inode.Truncate(size)
+}
+
 func (f *file) Write(p []byte) (n int, err error) {
        if !f.writable {
                return 0, ErrReadOnlyFile
@@ -487,6 +538,158 @@ type dirnode struct {
        sync.RWMutex
 }
 
+// caller must hold dn.Lock().
+func (dn *dirnode) sync() error {
+       type shortBlock struct {
+               fn  *filenode
+               idx int
+       }
+       var pending []shortBlock
+       var pendingLen int
+
+       flush := func(sbs []shortBlock) error {
+               if len(sbs) == 0 {
+                       return nil
+               }
+               hash := md5.New()
+               size := 0
+               for _, sb := range sbs {
+                       data := sb.fn.extents[sb.idx].(*memExtent).buf
+                       if _, err := hash.Write(data); err != nil {
+                               return err
+                       }
+                       size += len(data)
+               }
+               // FIXME: write to keep
+               locator := fmt.Sprintf("%x+%d", hash.Sum(nil), size)
+               off := 0
+               for _, sb := range sbs {
+                       data := sb.fn.extents[sb.idx].(*memExtent).buf
+                       sb.fn.extents[sb.idx] = storedExtent{
+                               cache:   dn.cache,
+                               locator: locator,
+                               size:    size,
+                               offset:  off,
+                               length:  len(data),
+                       }
+                       off += len(data)
+               }
+               return nil
+       }
+
+       names := make([]string, 0, len(dn.inodes))
+       for name := range dn.inodes {
+               names = append(names, name)
+       }
+       sort.Strings(names)
+
+       for _, name := range names {
+               fn, ok := dn.inodes[name].(*filenode)
+               if !ok {
+                       continue
+               }
+               fn.Lock()
+               defer fn.Unlock()
+               for idx, ext := range fn.extents {
+                       ext, ok := ext.(*memExtent)
+                       if !ok {
+                               continue
+                       }
+                       if ext.Len() > maxBlockSize/2 {
+                               if err := flush([]shortBlock{{fn, idx}}); err != nil {
+                                       return err
+                               }
+                               continue
+                       }
+                       if pendingLen+ext.Len() > maxBlockSize {
+                               if err := flush(pending); err != nil {
+                                       return err
+                               }
+                               pending = nil
+                               pendingLen = 0
+                       }
+                       pending = append(pending, shortBlock{fn, idx})
+                       pendingLen += ext.Len()
+               }
+       }
+       return flush(pending)
+}
+
+func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
+       dn.Lock()
+       defer dn.Unlock()
+       if err := dn.sync(); err != nil {
+               return "", err
+       }
+
+       var streamLen int64
+       type m1segment struct {
+               name   string
+               offset int64
+               length int64
+       }
+       var segments []m1segment
+       var subdirs string
+       var blocks []string
+
+       names := make([]string, 0, len(dn.inodes))
+       for name := range dn.inodes {
+               names = append(names, name)
+       }
+       sort.Strings(names)
+
+       for _, name := range names {
+               node := dn.inodes[name]
+               switch node := node.(type) {
+               case *dirnode:
+                       subdir, err := node.MarshalManifest(prefix + "/" + node.Name())
+                       if err != nil {
+                               return "", err
+                       }
+                       subdirs = subdirs + subdir
+               case *filenode:
+                       for _, e := range node.extents {
+                               switch e := e.(type) {
+                               case *memExtent:
+                                       blocks = append(blocks, fmt.Sprintf("FIXME+%d", e.Len()))
+                                       segments = append(segments, m1segment{
+                                               name:   node.Name(),
+                                               offset: streamLen,
+                                               length: int64(e.Len()),
+                                       })
+                                       streamLen += int64(e.Len())
+                               case storedExtent:
+                                       if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
+                                               streamLen -= int64(e.size)
+                                       } else {
+                                               blocks = append(blocks, e.locator)
+                                       }
+                                       segments = append(segments, m1segment{
+                                               name:   node.Name(),
+                                               offset: streamLen + int64(e.offset),
+                                               length: int64(e.length),
+                                       })
+                                       streamLen += int64(e.size)
+                               default:
+                                       panic(fmt.Sprintf("can't marshal extent type %T", e))
+                               }
+                       }
+               default:
+                       panic(fmt.Sprintf("can't marshal inode type %T", node))
+               }
+       }
+       var filetokens []string
+       for _, s := range segments {
+               filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, s.name))
+       }
+       if len(filetokens) == 0 {
+               return subdirs, nil
+       } else if len(blocks) == 0 {
+               blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+       }
+       return prefix + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+}
+
 func (dn *dirnode) loadManifest(txt string) {
        // FIXME: faster
        var dirname string
@@ -510,6 +713,7 @@ func (dn *dirnode) loadManifest(txt string) {
                                }
                                extents = append(extents, storedExtent{
                                        locator: token,
+                                       size:    int(length),
                                        offset:  0,
                                        length:  int(length),
                                })
@@ -566,6 +770,7 @@ func (dn *dirnode) loadManifest(txt string) {
                                f.inode.(*filenode).appendExtent(storedExtent{
                                        cache:   dn.cache,
                                        locator: e.locator,
+                                       size:    e.size,
                                        offset:  blkOff,
                                        length:  blkLen,
                                })
@@ -628,6 +833,10 @@ func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
        return 0, ptr, ErrInvalidOperation
 }
 
+func (dn *dirnode) Truncate(int64) error {
+       return ErrInvalidOperation
+}
+
 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
        name = strings.TrimSuffix(name, "/")
        if name == "." || name == "" {
@@ -720,6 +929,12 @@ func (me *memExtent) Truncate(n int) {
                newbuf := make([]byte, n, newsize)
                copy(newbuf, me.buf)
                me.buf = newbuf
+       } else {
+               // Zero unused part when shrinking, in case we grow
+               // and start using it again later.
+               for i := n; i < len(me.buf); i++ {
+                       me.buf[i] = 0
+               }
        }
        me.buf = me.buf[:n]
 }
@@ -746,6 +961,7 @@ func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
 type storedExtent struct {
        cache   blockCache
        locator string
+       size    int
        offset  int
        length  int
 }