package arvados
import (
- "crypto/md5"
"errors"
"fmt"
"io"
type keepClient interface {
ReadAt(locator string, p []byte, off int) (int, error)
+ PutB(p []byte) (string, int, error)
}
type fileinfo struct {
// FileSystem returns a CollectionFileSystem for the collection.
func (c *Collection) FileSystem(client *Client, kc keepClient) CollectionFileSystem {
fs := &fileSystem{dirnode: dirnode{
- cache: &keepBlockCache{kc: kc},
client: client,
kc: kc,
fileinfo: fileinfo{name: ".", mode: os.ModeDir | 0755},
parent *dirnode
client *Client
kc keepClient
- cache blockCache
inodes map[string]inode
sync.RWMutex
}
if len(sbs) == 0 {
return nil
}
- hash := md5.New()
- size := 0
+ block := make([]byte, 0, maxBlockSize)
for _, sb := range sbs {
- data := sb.fn.extents[sb.idx].(*memExtent).buf
- if _, err := hash.Write(data); err != nil {
- return err
- }
- size += len(data)
+ block = append(block, sb.fn.extents[sb.idx].(*memExtent).buf...)
+ }
+ locator, _, err := dn.kc.PutB(block)
+ if err != nil {
+ return err
}
- // FIXME: write to keep
- locator := fmt.Sprintf("%x+%d", hash.Sum(nil), size)
off := 0
for _, sb := range sbs {
data := sb.fn.extents[sb.idx].(*memExtent).buf
sb.fn.extents[sb.idx] = storedExtent{
- cache: dn.cache,
+ kc: dn.kc,
locator: locator,
- size: size,
+ size: len(block),
offset: off,
length: len(data),
}
}
var filetokens []string
for _, s := range segments {
- filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, s.name))
+ filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
}
if len(filetokens) == 0 {
return subdirs, nil
} else if len(blocks) == 0 {
blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
}
- return prefix + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+ return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
}
func (dn *dirnode) loadManifest(txt string) {
dn.makeParentDirs(name)
f, err := dn.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0700)
if err != nil {
- // FIXME: broken
- continue
+ // FIXME: don't panic
+ panic(fmt.Errorf("cannot append to %q: %s", name, err))
}
if f.inode.Stat().IsDir() {
f.Close()
- // FIXME: broken manifest
- continue
+ // FIXME: don't panic
+ panic(fmt.Errorf("cannot append to %q: is a directory", name))
}
// Map the stream offset/range coordinates to
// block/offset/range coordinates and add
blkLen = int(offset + length - pos - int64(blkOff))
}
f.inode.(*filenode).appendExtent(storedExtent{
- cache: dn.cache,
+ kc: dn.kc,
locator: e.locator,
size: e.size,
offset: blkOff,
}
type storedExtent struct {
- cache blockCache
+ kc keepClient
locator string
size int
offset int
maxlen := se.length - int(off)
if len(p) > maxlen {
p = p[:maxlen]
- n, err = se.cache.ReadAt(se.locator, p, int(off)+se.offset)
+ n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
if err == nil {
err = io.EOF
}
return
}
- return se.cache.ReadAt(se.locator, p, int(off)+se.offset)
-}
-
-type blockCache interface {
- ReadAt(locator string, p []byte, off int) (n int, err error)
-}
-
-type keepBlockCache struct {
- kc keepClient
-}
-
-var scratch = make([]byte, 2<<26)
-
-func (kbc *keepBlockCache) ReadAt(locator string, p []byte, off int) (int, error) {
- return kbc.kc.ReadAt(locator, p, off)
+ return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
}
func canonicalName(name string) string {
var manifestEscapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
-func manifestUnescapeSeq(seq string) string {
+func manifestUnescapeFunc(seq string) string {
if seq == `\\` {
return `\`
}
}
func manifestUnescape(s string) string {
- return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeSeq)
+ return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
+}
+
+var manifestEscapedChar = regexp.MustCompile(`[^\.\w/]`)
+
+func manifestEscapeFunc(seq string) string {
+ return fmt.Sprintf("\\%03o", byte(seq[0]))
+}
+
+func manifestEscape(s string) string {
+ return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
}
package arvados
import (
+ "crypto/md5"
+ "errors"
"fmt"
"io"
"io/ioutil"
type keepClientStub struct {
blocks map[string][]byte
+ sync.RWMutex
}
+var errStub404 = errors.New("404 block not found")
+
func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
+ kcs.RLock()
+ defer kcs.RUnlock()
buf := kcs.blocks[locator[:32]]
if buf == nil {
- return 0, os.ErrNotExist
+ return 0, errStub404
}
return copy(p, buf[off:]), nil
}
+func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
+ locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
+ buf := make([]byte, len(p))
+ copy(buf, p)
+ kcs.Lock()
+ defer kcs.Unlock()
+ kcs.blocks[locator[:32]] = buf
+ return locator, 1, nil
+}
+
type CollectionFSSuite struct {
client *Client
coll Collection
// creating foo/bar as a directory should fail
f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
c.Check(err, check.NotNil)
- err = s.fs.Mkdir("foo/bar")
+ err = s.fs.Mkdir("foo/bar", 0755)
c.Check(err, check.NotNil)
m, err := s.fs.MarshalManifest(".")
maxBlockSize = 8
defer func() { maxBlockSize = 2 << 26 }()
+ s.fs = (&Collection{}).FileSystem(s.client, s.kc)
+
var wg sync.WaitGroup
for n := 0; n < 128; n++ {
wg.Add(1)
defer root.Close()
fi, err := root.Readdir(-1)
c.Check(err, check.IsNil)
- c.Logf("Readdir(): %#v", fi)
+ c.Check(len(fi), check.Equals, 128)
+
+ _, err = s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ // TODO: check manifest content
+}
+
+func (s *CollectionFSSuite) TestPersist(c *check.C) {
+ maxBlockSize = 1024
+ defer func() { maxBlockSize = 2 << 26 }()
+
+ s.fs = (&Collection{}).FileSystem(s.client, s.kc)
+ err := s.fs.Mkdir("d:r", 0755)
+ c.Assert(err, check.IsNil)
+
+ expect := map[string][]byte{}
+
+ var wg sync.WaitGroup
+ for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
+ buf := make([]byte, 500)
+ rand.Read(buf)
+ expect[name] = buf
+
+ f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ // Note: we don't close the file until after the test
+ // is done. Writes to unclosed files should persist.
+ defer f.Close()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for i := 0; i < len(buf); i += 5 {
+ _, err := f.Write(buf[i : i+5])
+ c.Assert(err, check.IsNil)
+ }
+ }()
+ }
+ wg.Wait()
m, err := s.fs.MarshalManifest(".")
c.Check(err, check.IsNil)
- c.Logf("%s", m)
+ c.Logf("%q", m)
+
+ root, err := s.fs.Open("/")
+ c.Assert(err, check.IsNil)
+ defer root.Close()
+ fi, err := root.Readdir(-1)
+ c.Check(err, check.IsNil)
+ c.Check(len(fi), check.Equals, 4)
+
+ persisted := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
+
+ root, err = persisted.Open("/")
+ c.Assert(err, check.IsNil)
+ defer root.Close()
+ fi, err = root.Readdir(-1)
+ c.Check(err, check.IsNil)
+ c.Check(len(fi), check.Equals, 4)
+
+ for name, content := range expect {
+ c.Logf("read %q", name)
+ f, err := persisted.Open(name)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ buf, err := ioutil.ReadAll(f)
+ c.Check(err, check.IsNil)
+ c.Check(buf, check.DeepEquals, content)
+ }
}
// Gocheck boilerplate