12483: Add MarshalManifest().
[arvados.git] / sdk / go / arvados / collection_fs.go
index 1669f888c91e1bb7627f17b241e63dc57a189d28..2f1ec2fee43c1adea2ddc775e1884cac0583c6de 100644 (file)
@@ -5,12 +5,15 @@
 package arvados
 
 import (
 package arvados
 
 import (
+       "crypto/md5"
        "errors"
        "errors"
+       "fmt"
        "io"
        "net/http"
        "os"
        "path"
        "regexp"
        "io"
        "net/http"
        "os"
        "path"
        "regexp"
+       "sort"
        "strconv"
        "strings"
        "sync"
        "strconv"
        "strings"
        "sync"
@@ -35,6 +38,7 @@ type File interface {
        Size() int64
        Readdir(int) ([]os.FileInfo, error)
        Stat() (os.FileInfo, error)
        Size() int64
        Readdir(int) ([]os.FileInfo, error)
        Stat() (os.FileInfo, error)
+       Truncate(int64) error
 }
 
 type keepClient interface {
 }
 
 type keepClient interface {
@@ -89,6 +93,7 @@ type CollectionFileSystem interface {
        Stat(name string) (os.FileInfo, error)
        Create(name string) (File, error)
        OpenFile(name string, flag int, perm os.FileMode) (File, error)
        Stat(name string) (os.FileInfo, error)
        Create(name string) (File, error)
        OpenFile(name string, flag int, perm os.FileMode) (File, error)
+       MarshalManifest(string) (string, error)
 }
 
 type fileSystem struct {
 }
 
 type fileSystem struct {
@@ -122,6 +127,7 @@ type inode interface {
        Parent() inode
        Read([]byte, filenodePtr) (int, filenodePtr, error)
        Write([]byte, filenodePtr) (int, filenodePtr, error)
        Parent() inode
        Read([]byte, filenodePtr) (int, filenodePtr, error)
        Write([]byte, filenodePtr) (int, filenodePtr, error)
+       Truncate(int64) error
        Readdir() []os.FileInfo
        Stat() os.FileInfo
        sync.Locker
        Readdir() []os.FileInfo
        Stat() os.FileInfo
        sync.Locker
@@ -215,7 +221,6 @@ func (fn *filenode) appendExtent(e extent) {
        defer fn.Unlock()
        fn.extents = append(fn.extents, e)
        fn.fileinfo.size += int64(e.Len())
        defer fn.Unlock()
        fn.extents = append(fn.extents, e)
        fn.fileinfo.size += int64(e.Len())
-       fn.repacked++
 }
 
 func (fn *filenode) OpenFile(string, int, os.FileMode) (*file, error) {
 }
 
 func (fn *filenode) OpenFile(string, int, os.FileMode) (*file, error) {
@@ -257,6 +262,48 @@ func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr
        return
 }
 
        return
 }
 
+func (fn *filenode) Truncate(size int64) error {
+       fn.Lock()
+       defer fn.Unlock()
+       if size < fn.fileinfo.size {
+               ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
+               if ptr.extentOff == 0 {
+                       fn.extents = fn.extents[:ptr.extentIdx]
+               } else {
+                       fn.extents = fn.extents[:ptr.extentIdx+1]
+                       e := fn.extents[ptr.extentIdx]
+                       if e, ok := e.(writableExtent); ok {
+                               e.Truncate(ptr.extentOff)
+                       } else {
+                               fn.extents[ptr.extentIdx] = e.Slice(0, ptr.extentOff)
+                       }
+               }
+               fn.fileinfo.size = size
+               fn.repacked++
+               return nil
+       }
+       for size > fn.fileinfo.size {
+               grow := size - fn.fileinfo.size
+               var e writableExtent
+               var ok bool
+               if len(fn.extents) == 0 {
+                       e = &memExtent{}
+                       fn.extents = append(fn.extents, e)
+               } else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
+                       e = &memExtent{}
+                       fn.extents = append(fn.extents, e)
+               } else {
+                       fn.repacked++
+               }
+               if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
+                       grow = maxgrow
+               }
+               e.Truncate(e.Len() + int(grow))
+               fn.fileinfo.size += grow
+       }
+       return nil
+}
+
 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
        fn.Lock()
        defer fn.Unlock()
 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
        fn.Lock()
        defer fn.Unlock()
@@ -435,6 +482,10 @@ func (f *file) Seek(off int64, whence int) (pos int64, err error) {
        return f.ptr.off, nil
 }
 
        return f.ptr.off, nil
 }
 
+func (f *file) Truncate(size int64) error {
+       return f.inode.Truncate(size)
+}
+
 func (f *file) Write(p []byte) (n int, err error) {
        if !f.writable {
                return 0, ErrReadOnlyFile
 func (f *file) Write(p []byte) (n int, err error) {
        if !f.writable {
                return 0, ErrReadOnlyFile
@@ -487,6 +538,158 @@ type dirnode struct {
        sync.RWMutex
 }
 
        sync.RWMutex
 }
 
+// caller must hold dn.Lock().
+func (dn *dirnode) sync() error {
+       type shortBlock struct {
+               fn  *filenode
+               idx int
+       }
+       var pending []shortBlock
+       var pendingLen int
+
+       flush := func(sbs []shortBlock) error {
+               if len(sbs) == 0 {
+                       return nil
+               }
+               hash := md5.New()
+               size := 0
+               for _, sb := range sbs {
+                       data := sb.fn.extents[sb.idx].(*memExtent).buf
+                       if _, err := hash.Write(data); err != nil {
+                               return err
+                       }
+                       size += len(data)
+               }
+               // FIXME: write to keep
+               locator := fmt.Sprintf("%x+%d", hash.Sum(nil), size)
+               off := 0
+               for _, sb := range sbs {
+                       data := sb.fn.extents[sb.idx].(*memExtent).buf
+                       sb.fn.extents[sb.idx] = storedExtent{
+                               cache:   dn.cache,
+                               locator: locator,
+                               size:    size,
+                               offset:  off,
+                               length:  len(data),
+                       }
+                       off += len(data)
+               }
+               return nil
+       }
+
+       names := make([]string, 0, len(dn.inodes))
+       for name := range dn.inodes {
+               names = append(names, name)
+       }
+       sort.Strings(names)
+
+       for _, name := range names {
+               fn, ok := dn.inodes[name].(*filenode)
+               if !ok {
+                       continue
+               }
+               fn.Lock()
+               defer fn.Unlock()
+               for idx, ext := range fn.extents {
+                       ext, ok := ext.(*memExtent)
+                       if !ok {
+                               continue
+                       }
+                       if ext.Len() > maxBlockSize/2 {
+                               if err := flush([]shortBlock{{fn, idx}}); err != nil {
+                                       return err
+                               }
+                               continue
+                       }
+                       if pendingLen+ext.Len() > maxBlockSize {
+                               if err := flush(pending); err != nil {
+                                       return err
+                               }
+                               pending = nil
+                               pendingLen = 0
+                       }
+                       pending = append(pending, shortBlock{fn, idx})
+                       pendingLen += ext.Len()
+               }
+       }
+       return flush(pending)
+}
+
+func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
+       dn.Lock()
+       defer dn.Unlock()
+       if err := dn.sync(); err != nil {
+               return "", err
+       }
+
+       var streamLen int64
+       type m1segment struct {
+               name   string
+               offset int64
+               length int64
+       }
+       var segments []m1segment
+       var subdirs string
+       var blocks []string
+
+       names := make([]string, 0, len(dn.inodes))
+       for name := range dn.inodes {
+               names = append(names, name)
+       }
+       sort.Strings(names)
+
+       for _, name := range names {
+               node := dn.inodes[name]
+               switch node := node.(type) {
+               case *dirnode:
+                       subdir, err := node.MarshalManifest(prefix + "/" + node.Name())
+                       if err != nil {
+                               return "", err
+                       }
+                       subdirs = subdirs + subdir
+               case *filenode:
+                       for _, e := range node.extents {
+                               switch e := e.(type) {
+                               case *memExtent:
+                                       blocks = append(blocks, fmt.Sprintf("FIXME+%d", e.Len()))
+                                       segments = append(segments, m1segment{
+                                               name:   node.Name(),
+                                               offset: streamLen,
+                                               length: int64(e.Len()),
+                                       })
+                                       streamLen += int64(e.Len())
+                               case storedExtent:
+                                       if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
+                                               streamLen -= int64(e.size)
+                                       } else {
+                                               blocks = append(blocks, e.locator)
+                                       }
+                                       segments = append(segments, m1segment{
+                                               name:   node.Name(),
+                                               offset: streamLen + int64(e.offset),
+                                               length: int64(e.length),
+                                       })
+                                       streamLen += int64(e.size)
+                               default:
+                                       panic(fmt.Sprintf("can't marshal extent type %T", e))
+                               }
+                       }
+               default:
+                       panic(fmt.Sprintf("can't marshal inode type %T", node))
+               }
+       }
+       var filetokens []string
+       for _, s := range segments {
+               filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, s.name))
+       }
+       if len(filetokens) == 0 {
+               return subdirs, nil
+       } else if len(blocks) == 0 {
+               blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+       }
+       return prefix + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+}
+
 func (dn *dirnode) loadManifest(txt string) {
        // FIXME: faster
        var dirname string
 func (dn *dirnode) loadManifest(txt string) {
        // FIXME: faster
        var dirname string
@@ -510,6 +713,7 @@ func (dn *dirnode) loadManifest(txt string) {
                                }
                                extents = append(extents, storedExtent{
                                        locator: token,
                                }
                                extents = append(extents, storedExtent{
                                        locator: token,
+                                       size:    int(length),
                                        offset:  0,
                                        length:  int(length),
                                })
                                        offset:  0,
                                        length:  int(length),
                                })
@@ -566,6 +770,7 @@ func (dn *dirnode) loadManifest(txt string) {
                                f.inode.(*filenode).appendExtent(storedExtent{
                                        cache:   dn.cache,
                                        locator: e.locator,
                                f.inode.(*filenode).appendExtent(storedExtent{
                                        cache:   dn.cache,
                                        locator: e.locator,
+                                       size:    e.size,
                                        offset:  blkOff,
                                        length:  blkLen,
                                })
                                        offset:  blkOff,
                                        length:  blkLen,
                                })
@@ -628,6 +833,10 @@ func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
        return 0, ptr, ErrInvalidOperation
 }
 
        return 0, ptr, ErrInvalidOperation
 }
 
+func (dn *dirnode) Truncate(int64) error {
+       return ErrInvalidOperation
+}
+
 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
        name = strings.TrimSuffix(name, "/")
        if name == "." || name == "" {
 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
        name = strings.TrimSuffix(name, "/")
        if name == "." || name == "" {
@@ -720,6 +929,12 @@ func (me *memExtent) Truncate(n int) {
                newbuf := make([]byte, n, newsize)
                copy(newbuf, me.buf)
                me.buf = newbuf
                newbuf := make([]byte, n, newsize)
                copy(newbuf, me.buf)
                me.buf = newbuf
+       } else {
+               // Zero unused part when shrinking, in case we grow
+               // and start using it again later.
+               for i := n; i < len(me.buf); i++ {
+                       me.buf[i] = 0
+               }
        }
        me.buf = me.buf[:n]
 }
        }
        me.buf = me.buf[:n]
 }
@@ -746,6 +961,7 @@ func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
 type storedExtent struct {
        cache   blockCache
        locator string
 type storedExtent struct {
        cache   blockCache
        locator string
+       size    int
        offset  int
        length  int
 }
        offset  int
        length  int
 }