package arvados
import (
+ "crypto/md5"
"errors"
+ "fmt"
"io"
"net/http"
"os"
"path"
"regexp"
+ "sort"
"strconv"
"strings"
"sync"
Stat(name string) (os.FileInfo, error)
Create(name string) (File, error)
OpenFile(name string, flag int, perm os.FileMode) (File, error)
+ MarshalManifest(string) (string, error)
}
type fileSystem struct {
sync.RWMutex
}
+// caller must hold dn.Lock().
+func (dn *dirnode) sync() error {
+ type shortBlock struct {
+ fn *filenode
+ idx int
+ }
+ var pending []shortBlock
+ var pendingLen int
+
+ flush := func(sbs []shortBlock) error {
+ if len(sbs) == 0 {
+ return nil
+ }
+ hash := md5.New()
+ size := 0
+ for _, sb := range sbs {
+ data := sb.fn.extents[sb.idx].(*memExtent).buf
+ if _, err := hash.Write(data); err != nil {
+ return err
+ }
+ size += len(data)
+ }
+ // FIXME: write to keep
+ locator := fmt.Sprintf("%x+%d", hash.Sum(nil), size)
+ off := 0
+ for _, sb := range sbs {
+ data := sb.fn.extents[sb.idx].(*memExtent).buf
+ sb.fn.extents[sb.idx] = storedExtent{
+ cache: dn.cache,
+ locator: locator,
+ size: size,
+ offset: off,
+ length: len(data),
+ }
+ off += len(data)
+ }
+ return nil
+ }
+
+ names := make([]string, 0, len(dn.inodes))
+ for name := range dn.inodes {
+ names = append(names, name)
+ }
+ sort.Strings(names)
+
+ for _, name := range names {
+ fn, ok := dn.inodes[name].(*filenode)
+ if !ok {
+ continue
+ }
+ fn.Lock()
+ defer fn.Unlock()
+ for idx, ext := range fn.extents {
+ ext, ok := ext.(*memExtent)
+ if !ok {
+ continue
+ }
+ if ext.Len() > maxBlockSize/2 {
+ if err := flush([]shortBlock{{fn, idx}}); err != nil {
+ return err
+ }
+ continue
+ }
+ if pendingLen+ext.Len() > maxBlockSize {
+ if err := flush(pending); err != nil {
+ return err
+ }
+ pending = nil
+ pendingLen = 0
+ }
+ pending = append(pending, shortBlock{fn, idx})
+ pendingLen += ext.Len()
+ }
+ }
+ return flush(pending)
+}
+
+func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
+ dn.Lock()
+ defer dn.Unlock()
+ if err := dn.sync(); err != nil {
+ return "", err
+ }
+
+ var streamLen int64
+ type m1segment struct {
+ name string
+ offset int64
+ length int64
+ }
+ var segments []m1segment
+ var subdirs string
+ var blocks []string
+
+ names := make([]string, 0, len(dn.inodes))
+ for name := range dn.inodes {
+ names = append(names, name)
+ }
+ sort.Strings(names)
+
+ for _, name := range names {
+ node := dn.inodes[name]
+ switch node := node.(type) {
+ case *dirnode:
+ subdir, err := node.MarshalManifest(prefix + "/" + node.Name())
+ if err != nil {
+ return "", err
+ }
+ subdirs = subdirs + subdir
+ case *filenode:
+ for _, e := range node.extents {
+ switch e := e.(type) {
+ case *memExtent:
+ blocks = append(blocks, fmt.Sprintf("FIXME+%d", e.Len()))
+ segments = append(segments, m1segment{
+ name: node.Name(),
+ offset: streamLen,
+ length: int64(e.Len()),
+ })
+ streamLen += int64(e.Len())
+ case storedExtent:
+ if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
+ streamLen -= int64(e.size)
+ } else {
+ blocks = append(blocks, e.locator)
+ }
+ segments = append(segments, m1segment{
+ name: node.Name(),
+ offset: streamLen + int64(e.offset),
+ length: int64(e.length),
+ })
+ streamLen += int64(e.size)
+ default:
+ panic(fmt.Sprintf("can't marshal extent type %T", e))
+ }
+ }
+ default:
+ panic(fmt.Sprintf("can't marshal inode type %T", node))
+ }
+ }
+ var filetokens []string
+ for _, s := range segments {
+ filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, s.name))
+ }
+ if len(filetokens) == 0 {
+ return subdirs, nil
+ } else if len(blocks) == 0 {
+ blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+ }
+ return prefix + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+}
+
func (dn *dirnode) loadManifest(txt string) {
// FIXME: faster
var dirname string
}
extents = append(extents, storedExtent{
locator: token,
+ size: int(length),
offset: 0,
length: int(length),
})
f.inode.(*filenode).appendExtent(storedExtent{
cache: dn.cache,
locator: e.locator,
+ size: e.size,
offset: blkOff,
length: blkLen,
})
type storedExtent struct {
cache blockCache
locator string
+ size int
offset int
length int
}
"math/rand"
"net/http"
"os"
+ "regexp"
"sync"
"testing"
c.Check(err, check.IsNil)
c.Check(string(buf2), check.Equals, "123")
c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 1)
+
+ m, err := s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+ c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 202cb962ac59075b964b07152d234b70+3 3:3:bar 6:3:foo\n")
+}
+
+func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
+ maxBlockSize = 8
+ defer func() { maxBlockSize = 2 << 26 }()
+
+ s.fs = (&Collection{}).FileSystem(s.client, s.kc)
+ for _, name := range []string{"foo", "bar", "baz"} {
+ f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ f.Write([]byte(name))
+ f.Close()
+ }
+
+ m, err := s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+ c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
}
func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
fi, err := root.Readdir(-1)
c.Check(err, check.IsNil)
c.Logf("Readdir(): %#v", fi)
+
+ m, err := s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ c.Logf("%s", m)
}
// Gocheck boilerplate