package arvados
import (
+ "crypto/md5"
"errors"
+ "fmt"
"io"
"net/http"
"os"
"path"
"regexp"
+ "sort"
"strconv"
"strings"
"sync"
Size() int64
Readdir(int) ([]os.FileInfo, error)
Stat() (os.FileInfo, error)
+ Truncate(int64) error
}
type keepClient interface {
Stat(name string) (os.FileInfo, error)
Create(name string) (File, error)
OpenFile(name string, flag int, perm os.FileMode) (File, error)
+ MarshalManifest(string) (string, error)
}
type fileSystem struct {
Parent() inode
Read([]byte, filenodePtr) (int, filenodePtr, error)
Write([]byte, filenodePtr) (int, filenodePtr, error)
+ Truncate(int64) error
Readdir() []os.FileInfo
Stat() os.FileInfo
sync.Locker
defer fn.Unlock()
fn.extents = append(fn.extents, e)
fn.fileinfo.size += int64(e.Len())
- fn.repacked++
}
func (fn *filenode) OpenFile(string, int, os.FileMode) (*file, error) {
return
}
+func (fn *filenode) Truncate(size int64) error {
+ fn.Lock()
+ defer fn.Unlock()
+ if size < fn.fileinfo.size {
+ ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
+ if ptr.extentOff == 0 {
+ fn.extents = fn.extents[:ptr.extentIdx]
+ } else {
+ fn.extents = fn.extents[:ptr.extentIdx+1]
+ e := fn.extents[ptr.extentIdx]
+ if e, ok := e.(writableExtent); ok {
+ e.Truncate(ptr.extentOff)
+ } else {
+ fn.extents[ptr.extentIdx] = e.Slice(0, ptr.extentOff)
+ }
+ }
+ fn.fileinfo.size = size
+ fn.repacked++
+ return nil
+ }
+ for size > fn.fileinfo.size {
+ grow := size - fn.fileinfo.size
+ var e writableExtent
+ var ok bool
+ if len(fn.extents) == 0 {
+ e = &memExtent{}
+ fn.extents = append(fn.extents, e)
+ } else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
+ e = &memExtent{}
+ fn.extents = append(fn.extents, e)
+ } else {
+ fn.repacked++
+ }
+ if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
+ grow = maxgrow
+ }
+ e.Truncate(e.Len() + int(grow))
+ fn.fileinfo.size += grow
+ }
+ return nil
+}
+
func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
fn.Lock()
defer fn.Unlock()
return f.ptr.off, nil
}
+func (f *file) Truncate(size int64) error {
+ return f.inode.Truncate(size)
+}
+
func (f *file) Write(p []byte) (n int, err error) {
if !f.writable {
return 0, ErrReadOnlyFile
sync.RWMutex
}
+// caller must hold dn.Lock().
+func (dn *dirnode) sync() error {
+ type shortBlock struct {
+ fn *filenode
+ idx int
+ }
+ var pending []shortBlock
+ var pendingLen int
+
+ flush := func(sbs []shortBlock) error {
+ if len(sbs) == 0 {
+ return nil
+ }
+ hash := md5.New()
+ size := 0
+ for _, sb := range sbs {
+ data := sb.fn.extents[sb.idx].(*memExtent).buf
+ if _, err := hash.Write(data); err != nil {
+ return err
+ }
+ size += len(data)
+ }
+ // FIXME: write to keep
+ locator := fmt.Sprintf("%x+%d", hash.Sum(nil), size)
+ off := 0
+ for _, sb := range sbs {
+ data := sb.fn.extents[sb.idx].(*memExtent).buf
+ sb.fn.extents[sb.idx] = storedExtent{
+ cache: dn.cache,
+ locator: locator,
+ size: size,
+ offset: off,
+ length: len(data),
+ }
+ off += len(data)
+ }
+ return nil
+ }
+
+ names := make([]string, 0, len(dn.inodes))
+ for name := range dn.inodes {
+ names = append(names, name)
+ }
+ sort.Strings(names)
+
+ for _, name := range names {
+ fn, ok := dn.inodes[name].(*filenode)
+ if !ok {
+ continue
+ }
+ fn.Lock()
+ defer fn.Unlock()
+ for idx, ext := range fn.extents {
+ ext, ok := ext.(*memExtent)
+ if !ok {
+ continue
+ }
+ if ext.Len() > maxBlockSize/2 {
+ if err := flush([]shortBlock{{fn, idx}}); err != nil {
+ return err
+ }
+ continue
+ }
+ if pendingLen+ext.Len() > maxBlockSize {
+ if err := flush(pending); err != nil {
+ return err
+ }
+ pending = nil
+ pendingLen = 0
+ }
+ pending = append(pending, shortBlock{fn, idx})
+ pendingLen += ext.Len()
+ }
+ }
+ return flush(pending)
+}
+
+func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
+ dn.Lock()
+ defer dn.Unlock()
+ if err := dn.sync(); err != nil {
+ return "", err
+ }
+
+ var streamLen int64
+ type m1segment struct {
+ name string
+ offset int64
+ length int64
+ }
+ var segments []m1segment
+ var subdirs string
+ var blocks []string
+
+ names := make([]string, 0, len(dn.inodes))
+ for name := range dn.inodes {
+ names = append(names, name)
+ }
+ sort.Strings(names)
+
+ for _, name := range names {
+ node := dn.inodes[name]
+ switch node := node.(type) {
+ case *dirnode:
+ subdir, err := node.MarshalManifest(prefix + "/" + node.Name())
+ if err != nil {
+ return "", err
+ }
+ subdirs = subdirs + subdir
+ case *filenode:
+ for _, e := range node.extents {
+ switch e := e.(type) {
+ case *memExtent:
+ blocks = append(blocks, fmt.Sprintf("FIXME+%d", e.Len()))
+ segments = append(segments, m1segment{
+ name: node.Name(),
+ offset: streamLen,
+ length: int64(e.Len()),
+ })
+ streamLen += int64(e.Len())
+ case storedExtent:
+ if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
+ streamLen -= int64(e.size)
+ } else {
+ blocks = append(blocks, e.locator)
+ }
+ segments = append(segments, m1segment{
+ name: node.Name(),
+ offset: streamLen + int64(e.offset),
+ length: int64(e.length),
+ })
+ streamLen += int64(e.size)
+ default:
+ panic(fmt.Sprintf("can't marshal extent type %T", e))
+ }
+ }
+ default:
+ panic(fmt.Sprintf("can't marshal inode type %T", node))
+ }
+ }
+ var filetokens []string
+ for _, s := range segments {
+ filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, s.name))
+ }
+ if len(filetokens) == 0 {
+ return subdirs, nil
+ } else if len(blocks) == 0 {
+ blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+ }
+ return prefix + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+}
+
func (dn *dirnode) loadManifest(txt string) {
// FIXME: faster
var dirname string
}
extents = append(extents, storedExtent{
locator: token,
+ size: int(length),
offset: 0,
length: int(length),
})
f.inode.(*filenode).appendExtent(storedExtent{
cache: dn.cache,
locator: e.locator,
+ size: e.size,
offset: blkOff,
length: blkLen,
})
return 0, ptr, ErrInvalidOperation
}
+func (dn *dirnode) Truncate(int64) error {
+ return ErrInvalidOperation
+}
+
func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
name = strings.TrimSuffix(name, "/")
if name == "." || name == "" {
newbuf := make([]byte, n, newsize)
copy(newbuf, me.buf)
me.buf = newbuf
+ } else {
+ // Zero unused part when shrinking, in case we grow
+ // and start using it again later.
+ for i := n; i < len(me.buf); i++ {
+ me.buf[i] = 0
+ }
}
me.buf = me.buf[:n]
}
type storedExtent struct {
cache blockCache
locator string
+ size int
offset int
length int
}