import (
"bytes"
+ "context"
"crypto/md5"
- "crypto/sha1"
"errors"
"fmt"
"io"
type keepClientStub struct {
blocks map[string][]byte
refreshable map[string]bool
- onPut func(bufcopy []byte) // called from PutB, before acquiring lock
+ onWrite func(bufcopy []byte) // called from WriteBlock, before acquiring lock
+ authToken string // client's auth token (used for signing locators)
+ sigkey string // blob signing key
+ sigttl time.Duration // blob signing ttl
sync.RWMutex
}
return copy(p, buf[off:]), nil
}
-func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
- locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
- buf := make([]byte, len(p))
- copy(buf, p)
- if kcs.onPut != nil {
- kcs.onPut(buf)
+func (kcs *keepClientStub) BlockWrite(_ context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
+ if opts.Data == nil {
+ panic("oops, stub is not made for this")
+ }
+ locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
+ buf := make([]byte, len(opts.Data))
+ copy(buf, opts.Data)
+ if kcs.onWrite != nil {
+ kcs.onWrite(buf)
+ }
+ for _, sc := range opts.StorageClasses {
+ if sc != "default" {
+ return BlockWriteResponse{}, fmt.Errorf("stub does not write storage class %q", sc)
+ }
}
kcs.Lock()
defer kcs.Unlock()
kcs.blocks[locator[:32]] = buf
- return locator, 1, nil
+ return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
}
-var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
+var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
+ if strings.Contains(locator, "+A") {
+ return locator, nil
+ }
kcs.Lock()
defer kcs.Unlock()
if strings.Contains(locator, "+R") {
return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
}
}
- fakeSig := fmt.Sprintf("+A%x@%x", sha1.Sum(nil), time.Now().Add(time.Hour*24*14).Unix())
- return localOrRemoteSignature.ReplaceAllLiteralString(locator, fakeSig), nil
+ locator = reRemoteSignature.ReplaceAllLiteralString(locator, "")
+ locator = SignLocator(locator, kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
+ return locator, nil
}
type CollectionFSSuite struct {
s.kc = &keepClientStub{
blocks: map[string][]byte{
"3858f62230ac3c915f300c664312c63f": []byte("foobar"),
- }}
+ },
+ sigkey: fixtureBlobSigningKey,
+ sigttl: fixtureBlobSigningTTL,
+ authToken: fixtureActiveToken,
+ }
s.fs, err = s.coll.FileSystem(s.client, s.kc)
c.Assert(err, check.IsNil)
}
c.Check(ok, check.Equals, true)
}
+func (s *CollectionFSSuite) TestUnattainableStorageClasses(c *check.C) {
+ fs, err := (&Collection{
+ StorageClassesDesired: []string{"unobtainium"},
+ }).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ f, err := fs.OpenFile("/foo", os.O_CREATE|os.O_WRONLY, 0777)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write([]byte("food"))
+ c.Assert(err, check.IsNil)
+ err = f.Close()
+ c.Assert(err, check.IsNil)
+ _, err = fs.MarshalManifest(".")
+ c.Assert(err, check.ErrorMatches, `.*stub does not write storage class \"unobtainium\"`)
+}
+
func (s *CollectionFSSuite) TestColonInFilename(c *check.C) {
fs, err := (&Collection{
ManifestText: "./foo:foo 3858f62230ac3c915f300c664312c63f+3 0:3:bar:bar\n",
proceed := make(chan struct{})
var started, concurrent int32
blk2done := false
- s.kc.onPut = func([]byte) {
+ s.kc.onWrite = func([]byte) {
atomic.AddInt32(&concurrent, 1)
switch atomic.AddInt32(&started, 1) {
case 1:
fs, err := (&Collection{}).FileSystem(s.client, s.kc)
c.Assert(err, check.IsNil)
- s.kc.onPut = func([]byte) {
+ s.kc.onWrite = func([]byte) {
// discard flushed data -- otherwise the stub will use
// unlimited memory
time.Sleep(time.Millisecond)
fs.Flush("", true)
}
- size := fs.memorySize()
+ size := fs.MemorySize()
if !c.Check(size <= 1<<24, check.Equals, true) {
- c.Logf("at dir%d fs.memorySize()=%d", i, size)
+ c.Logf("at dir%d fs.MemorySize()=%d", i, size)
return
}
}
c.Assert(err, check.IsNil)
var flushed int64
- s.kc.onPut = func(p []byte) {
+ s.kc.onWrite = func(p []byte) {
atomic.AddInt64(&flushed, int64(len(p)))
}
c.Assert(err, check.IsNil)
}
}
- c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
+ c.Check(fs.MemorySize(), check.Equals, int64(nDirs*67<<20))
c.Check(flushed, check.Equals, int64(0))
waitForFlush := func(expectUnflushed, expectFlushed int64) {
- for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+ for deadline := time.Now().Add(5 * time.Second); fs.MemorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
}
- c.Check(fs.memorySize(), check.Equals, expectUnflushed)
+ c.Check(fs.MemorySize(), check.Equals, expectUnflushed)
c.Check(flushed, check.Equals, expectFlushed)
}
time.AfterFunc(10*time.Second, func() { close(timeout) })
var putCount, concurrency int64
var unflushed int64
- s.kc.onPut = func(p []byte) {
+ s.kc.onWrite = func(p []byte) {
defer atomic.AddInt64(&unflushed, -int64(len(p)))
cur := atomic.AddInt64(&concurrency, 1)
defer atomic.AddInt64(&concurrency, -1)
})
wrote := 0
- s.kc.onPut = func(p []byte) {
+ s.kc.onWrite = func(p []byte) {
s.kc.Lock()
s.kc.blocks = map[string][]byte{}
wrote++
}
func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
- s.kc.onPut = func([]byte) {
+ s.kc.onWrite = func([]byte) {
s.kc.Lock()
s.kc.blocks = map[string][]byte{}
s.kc.Unlock()
}
}
+var bigmanifest = func() string {
+ var buf bytes.Buffer
+ for i := 0; i < 2000; i++ {
+ fmt.Fprintf(&buf, "./dir%d", i)
+ for i := 0; i < 100; i++ {
+ fmt.Fprintf(&buf, " d41d8cd98f00b204e9800998ecf8427e+99999")
+ }
+ for i := 0; i < 2000; i++ {
+ fmt.Fprintf(&buf, " 1200000:300000:file%d", i)
+ }
+ fmt.Fprintf(&buf, "\n")
+ }
+ return buf.String()
+}()
+
+func (s *CollectionFSSuite) BenchmarkParseManifest(c *check.C) {
+ DebugLocksPanicMode = false
+ c.Logf("test manifest is %d bytes", len(bigmanifest))
+ for i := 0; i < c.N; i++ {
+ fs, err := (&Collection{ManifestText: bigmanifest}).FileSystem(s.client, s.kc)
+ c.Check(err, check.IsNil)
+ c.Check(fs, check.NotNil)
+ }
+}
+
func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
fn := f.(*filehandle).inode.(*filenode)
var memsize int64