15910: Fix races in collectionfs flush/sync.
[arvados.git] / sdk / go / arvados / fs_collection_test.go
index 8d32eb2473fcfc4945a3ceffc7ea5a99fd721981..6ef7627db2943a0340a8f6eee14dd3a0654123d9 100644 (file)
@@ -7,6 +7,7 @@ package arvados
 import (
        "bytes"
        "crypto/md5"
+       "crypto/sha1"
        "errors"
        "fmt"
        "io"
@@ -16,7 +17,9 @@ import (
        "os"
        "regexp"
        "runtime"
+       "strings"
        "sync"
+       "sync/atomic"
        "testing"
        "time"
 
@@ -27,7 +30,9 @@ import (
 var _ = check.Suite(&CollectionFSSuite{})
 
 type keepClientStub struct {
-       blocks map[string][]byte
+       blocks      map[string][]byte
+       refreshable map[string]bool
+       onPut       func(bufcopy []byte) // called from PutB, before acquiring lock
        sync.RWMutex
 }
 
@@ -47,17 +52,37 @@ func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
        locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
        buf := make([]byte, len(p))
        copy(buf, p)
+       if kcs.onPut != nil {
+               kcs.onPut(buf)
+       }
        kcs.Lock()
        defer kcs.Unlock()
        kcs.blocks[locator[:32]] = buf
        return locator, 1, nil
 }
 
+var localOrRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
+
+func (kcs *keepClientStub) LocalLocator(locator string) (string, error) {
+       kcs.Lock()
+       defer kcs.Unlock()
+       if strings.Contains(locator, "+R") {
+               if len(locator) < 32 {
+                       return "", fmt.Errorf("bad locator: %q", locator)
+               }
+               if _, ok := kcs.blocks[locator[:32]]; !ok && !kcs.refreshable[locator[:32]] {
+                       return "", fmt.Errorf("kcs.refreshable[%q]==false", locator)
+               }
+       }
+       fakeSig := fmt.Sprintf("+A%x@%x", sha1.Sum(nil), time.Now().Add(time.Hour*24*14).Unix())
+       return localOrRemoteSignature.ReplaceAllLiteralString(locator, fakeSig), nil
+}
+
 type CollectionFSSuite struct {
        client *Client
        coll   Collection
        fs     CollectionFileSystem
-       kc     keepClient
+       kc     *keepClientStub
 }
 
 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
@@ -213,6 +238,7 @@ func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
        c.Check(f.Close(), check.IsNil)
 
        m, err := s.fs.MarshalManifest(".")
+       c.Assert(err, check.IsNil)
        c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
 }
 
@@ -266,7 +292,9 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
        c.Check(err, check.IsNil)
        pos, err = f.Seek(0, io.SeekCurrent)
        c.Check(pos, check.Equals, int64(18))
+       c.Check(err, check.IsNil)
        pos, err = f.Seek(-18, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(0))
        c.Check(err, check.IsNil)
        n, err = io.ReadFull(f, buf)
        c.Check(n, check.Equals, 18)
@@ -279,6 +307,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
 
        // truncate to current size
        err = f.Truncate(18)
+       c.Check(err, check.IsNil)
        f2.Seek(0, io.SeekStart)
        buf2, err = ioutil.ReadAll(f2)
        c.Check(err, check.IsNil)
@@ -312,6 +341,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
 
        // shrink to block/extent boundary
        err = f.Truncate(32)
+       c.Check(err, check.IsNil)
        f2.Seek(0, io.SeekStart)
        buf2, err = ioutil.ReadAll(f2)
        c.Check(err, check.IsNil)
@@ -320,6 +350,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
 
        // shrink to partial block/extent
        err = f.Truncate(15)
+       c.Check(err, check.IsNil)
        f2.Seek(0, io.SeekStart)
        buf2, err = ioutil.ReadAll(f2)
        c.Check(err, check.IsNil)
@@ -347,6 +378,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
        c.Check(err, check.IsNil)
        m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
        c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
+       c.Check(s.fs.Size(), check.Equals, int64(6))
 }
 
 func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
@@ -358,14 +390,17 @@ func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
 
        checkSize := func(size int64) {
                fi, err := f.Stat()
+               c.Assert(err, check.IsNil)
                c.Check(fi.Size(), check.Equals, size)
 
                f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
                c.Assert(err, check.IsNil)
                defer f.Close()
                fi, err = f.Stat()
+               c.Check(err, check.IsNil)
                c.Check(fi.Size(), check.Equals, size)
                pos, err := f.Seek(0, io.SeekEnd)
+               c.Check(err, check.IsNil)
                c.Check(pos, check.Equals, size)
        }
 
@@ -389,6 +424,37 @@ func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
        checkSize(11)
 }
 
+func (s *CollectionFSSuite) TestMarshalCopiesRemoteBlocks(c *check.C) {
+       foo := "foo"
+       bar := "bar"
+       hash := map[string]string{
+               foo: fmt.Sprintf("%x", md5.Sum([]byte(foo))),
+               bar: fmt.Sprintf("%x", md5.Sum([]byte(bar))),
+       }
+
+       fs, err := (&Collection{
+               ManifestText: ". " + hash[foo] + "+3+Rzaaaa-foo@bab " + hash[bar] + "+3+A12345@ffffff 0:2:fo.txt 2:4:obar.txt\n",
+       }).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       manifest, err := fs.MarshalManifest(".")
+       c.Check(manifest, check.Equals, "")
+       c.Check(err, check.NotNil)
+
+       s.kc.refreshable = map[string]bool{hash[bar]: true}
+
+       for _, sigIn := range []string{"Rzaaaa-foo@bab", "A12345@abcde"} {
+               fs, err = (&Collection{
+                       ManifestText: ". " + hash[foo] + "+3+A12345@fffff " + hash[bar] + "+3+" + sigIn + " 0:2:fo.txt 2:4:obar.txt\n",
+               }).FileSystem(s.client, s.kc)
+               c.Assert(err, check.IsNil)
+               manifest, err := fs.MarshalManifest(".")
+               c.Check(err, check.IsNil)
+               // Both blocks should now have +A signatures.
+               c.Check(manifest, check.Matches, `.*\+A.* .*\+A.*\n`)
+               c.Check(manifest, check.Not(check.Matches), `.*\+R.*\n`)
+       }
+}
+
 func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
        maxBlockSize = 8
        defer func() { maxBlockSize = 2 << 26 }()
@@ -433,7 +499,7 @@ func (s *CollectionFSSuite) TestMkdir(c *check.C) {
        err = s.fs.Remove("foo/bar")
        c.Check(err, check.IsNil)
 
-       // mkdir succeds after the file is deleted
+       // mkdir succeeds after the file is deleted
        err = s.fs.Mkdir("foo/bar", 0755)
        c.Check(err, check.IsNil)
 
@@ -470,7 +536,7 @@ func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
        }
 
        maxBlockSize = 8
-       defer func() { maxBlockSize = 2 << 26 }()
+       defer func() { maxBlockSize = 1 << 26 }()
 
        var wg sync.WaitGroup
        for n := 0; n < 128; n++ {
@@ -480,16 +546,20 @@ func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
                        f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
                        c.Assert(err, check.IsNil)
                        defer f.Close()
-                       for i := 0; i < 6502; i++ {
-                               switch rand.Int() & 3 {
-                               case 0:
+                       for i := 0; i < 1024; i++ {
+                               r := rand.Uint32()
+                               switch {
+                               case r%11 == 0:
+                                       _, err := s.fs.MarshalManifest(".")
+                                       c.Check(err, check.IsNil)
+                               case r&3 == 0:
                                        f.Truncate(int64(rand.Intn(64)))
-                               case 1:
+                               case r&3 == 1:
                                        f.Seek(int64(rand.Intn(64)), io.SeekStart)
-                               case 2:
+                               case r&3 == 2:
                                        _, err := f.Write([]byte("beep boop"))
                                        c.Check(err, check.IsNil)
-                               case 3:
+                               case r&3 == 3:
                                        _, err := ioutil.ReadAll(f)
                                        c.Check(err, check.IsNil)
                                }
@@ -518,7 +588,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
        const ngoroutines = 256
 
        var wg sync.WaitGroup
-       for n := 0; n < nfiles; n++ {
+       for n := 0; n < ngoroutines; n++ {
                wg.Add(1)
                go func(n int) {
                        defer wg.Done()
@@ -527,7 +597,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
                        f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
                        c.Assert(err, check.IsNil)
                        defer f.Close()
-                       for i := 0; i < ngoroutines; i++ {
+                       for i := 0; i < nfiles; i++ {
                                trunc := rand.Intn(65)
                                woff := rand.Intn(trunc + 1)
                                wbytes = wbytes[:rand.Intn(64-woff+1)]
@@ -553,11 +623,18 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
                                c.Check(string(buf), check.Equals, string(expect))
                                c.Check(err, check.IsNil)
                        }
-                       s.checkMemSize(c, f)
                }(n)
        }
        wg.Wait()
 
+       for n := 0; n < ngoroutines; n++ {
+               f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
+               c.Assert(err, check.IsNil)
+               f.(*filehandle).inode.(*filenode).waitPrune()
+               s.checkMemSize(c, f)
+               defer f.Close()
+       }
+
        root, err := s.fs.Open("/")
        c.Assert(err, check.IsNil)
        defer root.Close()
@@ -627,6 +704,25 @@ func (s *CollectionFSSuite) TestRenameError(c *check.C) {
        c.Check(data, check.DeepEquals, []byte{1, 2, 3, 4, 5})
 }
 
+func (s *CollectionFSSuite) TestRenameDirectory(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("foo", 0755)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("bar", 0755)
+       c.Assert(err, check.IsNil)
+       err = fs.Rename("bar", "baz")
+       c.Check(err, check.IsNil)
+       err = fs.Rename("foo", "baz")
+       c.Check(err, check.NotNil)
+       err = fs.Rename("foo", "baz/")
+       c.Check(err, check.IsNil)
+       err = fs.Rename("baz/foo", ".")
+       c.Check(err, check.Equals, ErrInvalidArgument)
+       err = fs.Rename("baz/foo/", ".")
+       c.Check(err, check.Equals, ErrInvalidArgument)
+}
+
 func (s *CollectionFSSuite) TestRename(c *check.C) {
        fs, err := (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
@@ -777,21 +873,21 @@ func (s *CollectionFSSuite) TestPersist(c *check.C) {
        }
 }
 
-func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
+func (s *CollectionFSSuite) TestPersistEmptyFilesAndDirs(c *check.C) {
        var err error
        s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
-       for _, name := range []string{"dir", "dir/zerodir", "zero", "zero/zero"} {
+       for _, name := range []string{"dir", "dir/zerodir", "empty", "not empty", "not empty/empty", "zero", "zero/zero"} {
                err = s.fs.Mkdir(name, 0755)
                c.Assert(err, check.IsNil)
        }
 
        expect := map[string][]byte{
                "0":                nil,
-               "00":               []byte{},
-               "one":              []byte{1},
+               "00":               {},
+               "one":              {1},
                "dir/0":            nil,
-               "dir/two":          []byte{1, 2},
+               "dir/two":          {1, 2},
                "dir/zero":         nil,
                "dir/zerodir/zero": nil,
                "zero/zero/zero":   nil,
@@ -814,10 +910,10 @@ func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
        c.Assert(err, check.IsNil)
 
        for name, data := range expect {
-               f, err := persisted.Open("bogus-" + name)
+               _, err = persisted.Open("bogus-" + name)
                c.Check(err, check.NotNil)
 
-               f, err = persisted.Open(name)
+               f, err := persisted.Open(name)
                c.Assert(err, check.IsNil)
 
                if data == nil {
@@ -827,6 +923,23 @@ func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
                c.Check(err, check.IsNil)
                c.Check(buf, check.DeepEquals, data)
        }
+
+       expectDir := map[string]int{
+               "empty":           0,
+               "not empty":       1,
+               "not empty/empty": 0,
+       }
+       for name, expectLen := range expectDir {
+               _, err := persisted.Open(name + "/bogus")
+               c.Check(err, check.NotNil)
+
+               d, err := persisted.Open(name)
+               defer d.Close()
+               c.Check(err, check.IsNil)
+               fi, err := d.Readdir(-1)
+               c.Check(err, check.IsNil)
+               c.Check(fi, check.HasLen, expectLen)
+       }
 }
 
 func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
@@ -875,9 +988,11 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
        c.Check(n, check.Equals, 1)
        c.Check(buf[:1], check.DeepEquals, []byte{1})
        pos, err = f.Seek(0, io.SeekCurrent)
+       c.Assert(err, check.IsNil)
        c.Check(pos, check.Equals, int64(1))
        f.Write([]byte{4, 5, 6})
        pos, err = f.Seek(0, io.SeekCurrent)
+       c.Assert(err, check.IsNil)
        c.Check(pos, check.Equals, int64(6))
        f.Seek(0, io.SeekStart)
        n, err = f.Read(buf)
@@ -895,6 +1010,7 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
        c.Check(pos, check.Equals, int64(3))
        f.Write([]byte{7, 8, 9})
        pos, err = f.Seek(0, io.SeekCurrent)
+       c.Check(err, check.IsNil)
        c.Check(pos, check.Equals, int64(9))
        f.Close()
 
@@ -924,9 +1040,38 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
        c.Check(err, check.ErrorMatches, `invalid flag.*`)
 }
 
-func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+func (s *CollectionFSSuite) TestFlushFullBlocksWritingLongFile(c *check.C) {
+       defer func(cw, mbs int) {
+               concurrentWriters = cw
+               maxBlockSize = mbs
+       }(concurrentWriters, maxBlockSize)
+       concurrentWriters = 2
        maxBlockSize = 1024
-       defer func() { maxBlockSize = 2 << 26 }()
+
+       proceed := make(chan struct{})
+       var started, concurrent int32
+       blk2done := false
+       s.kc.onPut = func([]byte) {
+               atomic.AddInt32(&concurrent, 1)
+               switch atomic.AddInt32(&started, 1) {
+               case 1:
+                       // Wait until block 2 starts and finishes, and block 3 starts
+                       select {
+                       case <-proceed:
+                               c.Check(blk2done, check.Equals, true)
+                       case <-time.After(time.Second):
+                               c.Error("timed out")
+                       }
+               case 2:
+                       time.Sleep(time.Millisecond)
+                       blk2done = true
+               case 3:
+                       close(proceed)
+               default:
+                       time.Sleep(time.Millisecond)
+               }
+               c.Check(atomic.AddInt32(&concurrent, -1) < int32(concurrentWriters), check.Equals, true)
+       }
 
        fs, err := (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
@@ -952,6 +1097,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
                }
                return
        }
+       f.(*filehandle).inode.(*filenode).waitPrune()
        c.Check(currentMemExtents(), check.HasLen, 1)
 
        m, err := fs.MarshalManifest(".")
@@ -960,6 +1106,205 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
        c.Check(currentMemExtents(), check.HasLen, 0)
 }
 
+// Ensure blocks get flushed to disk if a lot of data is written to
+// small files/directories without calling sync().
+//
+// Write four 512KiB files into each of 256 top-level dirs (total
+// 512MiB), calling Flush() every 8 dirs. Ensure memory usage never
+// exceeds 24MiB (4 concurrentWriters * 2MiB + 8 unflushed dirs *
+// 2MiB).
+func (s *CollectionFSSuite) TestFlushAll(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       s.kc.onPut = func([]byte) {
+               // discard flushed data -- otherwise the stub will use
+               // unlimited memory
+               time.Sleep(time.Millisecond)
+               s.kc.Lock()
+               defer s.kc.Unlock()
+               s.kc.blocks = map[string][]byte{}
+       }
+       for i := 0; i < 256; i++ {
+               buf := bytes.NewBuffer(make([]byte, 524288))
+               fmt.Fprintf(buf, "test file in dir%d", i)
+
+               dir := fmt.Sprintf("dir%d", i)
+               fs.Mkdir(dir, 0755)
+               for j := 0; j < 2; j++ {
+                       f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+                       c.Assert(err, check.IsNil)
+                       defer f.Close()
+                       _, err = io.Copy(f, buf)
+                       c.Assert(err, check.IsNil)
+               }
+
+               if i%8 == 0 {
+                       fs.Flush("", true)
+               }
+
+               size := fs.memorySize()
+               if !c.Check(size <= 1<<24, check.Equals, true) {
+                       c.Logf("at dir%d fs.memorySize()=%d", i, size)
+                       return
+               }
+       }
+}
+
+// Ensure short blocks at the end of a stream don't get flushed by
+// Flush(false).
+//
+// Write 67x 1MiB files to each of 8 dirs, and check that 8 full 64MiB
+// blocks have been flushed while 8x 3MiB is still buffered in memory.
+func (s *CollectionFSSuite) TestFlushFullBlocksOnly(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       var flushed int64
+       s.kc.onPut = func(p []byte) {
+               atomic.AddInt64(&flushed, int64(len(p)))
+       }
+
+       nDirs := int64(8)
+       megabyte := make([]byte, 1<<20)
+       for i := int64(0); i < nDirs; i++ {
+               dir := fmt.Sprintf("dir%d", i)
+               fs.Mkdir(dir, 0755)
+               for j := 0; j < 67; j++ {
+                       f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+                       c.Assert(err, check.IsNil)
+                       defer f.Close()
+                       _, err = f.Write(megabyte)
+                       c.Assert(err, check.IsNil)
+               }
+       }
+       c.Check(fs.memorySize(), check.Equals, int64(nDirs*67<<20))
+       c.Check(flushed, check.Equals, int64(0))
+
+       waitForFlush := func(expectUnflushed, expectFlushed int64) {
+               for deadline := time.Now().Add(5 * time.Second); fs.memorySize() > expectUnflushed && time.Now().Before(deadline); time.Sleep(10 * time.Millisecond) {
+               }
+               c.Check(fs.memorySize(), check.Equals, expectUnflushed)
+               c.Check(flushed, check.Equals, expectFlushed)
+       }
+
+       // Nothing flushed yet
+       waitForFlush((nDirs*67)<<20, 0)
+
+       // Flushing a non-empty dir "/" is non-recursive and there are
+       // no top-level files, so this has no effect
+       fs.Flush("/", false)
+       waitForFlush((nDirs*67)<<20, 0)
+
+       // Flush the full block in dir0
+       fs.Flush("dir0", false)
+       waitForFlush((nDirs*67-64)<<20, 64<<20)
+
+       err = fs.Flush("dir-does-not-exist", false)
+       c.Check(err, check.NotNil)
+
+       // Flush full blocks in all dirs
+       fs.Flush("", false)
+       waitForFlush(nDirs*3<<20, nDirs*64<<20)
+
+       // Flush non-full blocks, too
+       fs.Flush("", true)
+       waitForFlush(0, nDirs*67<<20)
+}
+
+// Even when writing lots of files/dirs from different goroutines, as
+// long as Flush(dir,false) is called after writing each file,
+// unflushed data should be limited to one full block per
+// concurrentWriter, plus one nearly-full block at the end of each
+// dir/stream.
+func (s *CollectionFSSuite) TestMaxUnflushed(c *check.C) {
+       nDirs := int64(8)
+       maxUnflushed := (int64(concurrentWriters) + nDirs) << 26
+
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       release := make(chan struct{})
+       timeout := make(chan struct{})
+       time.AfterFunc(10*time.Second, func() { close(timeout) })
+       var putCount, concurrency int64
+       var unflushed int64
+       s.kc.onPut = func(p []byte) {
+               defer atomic.AddInt64(&unflushed, -int64(len(p)))
+               cur := atomic.AddInt64(&concurrency, 1)
+               defer atomic.AddInt64(&concurrency, -1)
+               pc := atomic.AddInt64(&putCount, 1)
+               if pc < int64(concurrentWriters) {
+                       // Block until we reach concurrentWriters, to
+                       // make sure we're really accepting concurrent
+                       // writes.
+                       select {
+                       case <-release:
+                       case <-timeout:
+                               c.Error("timeout")
+                       }
+               } else if pc == int64(concurrentWriters) {
+                       // Unblock the first N-1 PUT reqs.
+                       close(release)
+               }
+               c.Assert(cur <= int64(concurrentWriters), check.Equals, true)
+               c.Assert(atomic.LoadInt64(&unflushed) <= maxUnflushed, check.Equals, true)
+       }
+
+       var owg sync.WaitGroup
+       megabyte := make([]byte, 1<<20)
+       for i := int64(0); i < nDirs; i++ {
+               dir := fmt.Sprintf("dir%d", i)
+               fs.Mkdir(dir, 0755)
+               owg.Add(1)
+               go func() {
+                       defer owg.Done()
+                       defer fs.Flush(dir, true)
+                       var iwg sync.WaitGroup
+                       defer iwg.Wait()
+                       for j := 0; j < 67; j++ {
+                               iwg.Add(1)
+                               go func(j int) {
+                                       defer iwg.Done()
+                                       f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, j), os.O_WRONLY|os.O_CREATE, 0)
+                                       c.Assert(err, check.IsNil)
+                                       defer f.Close()
+                                       n, err := f.Write(megabyte)
+                                       c.Assert(err, check.IsNil)
+                                       atomic.AddInt64(&unflushed, int64(n))
+                                       fs.Flush(dir, false)
+                               }(j)
+                       }
+               }()
+       }
+       owg.Wait()
+       fs.Flush("", true)
+}
+
+func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
+       s.kc.onPut = func([]byte) {
+               s.kc.Lock()
+               s.kc.blocks = map[string][]byte{}
+               s.kc.Unlock()
+       }
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       for _, blocksize := range []int{8, 1000000} {
+               dir := fmt.Sprintf("dir%d", blocksize)
+               err = fs.Mkdir(dir, 0755)
+               c.Assert(err, check.IsNil)
+               data := make([]byte, blocksize)
+               for i := 0; i < 100; i++ {
+                       f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, i), os.O_WRONLY|os.O_CREATE, 0)
+                       c.Assert(err, check.IsNil)
+                       _, err = f.Write(data)
+                       c.Assert(err, check.IsNil)
+                       f.Close()
+                       fs.Flush(dir, false)
+               }
+       }
+}
+
 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
        for _, txt := range []string{
                "\n",
@@ -975,6 +1320,12 @@ func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
                ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
                ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:\\056\\057\\056\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:.\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:..\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:..\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/..\n",
                ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
                "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
        } {
@@ -990,7 +1341,9 @@ func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
        for _, txt := range []string{
                "",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
-               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:...\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:. 0:0:. 0:0:\\056 0:0:\\056\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/. 0:0:. 0:0:foo\\057bar\\057\\056\n",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
                ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
        } {
@@ -1048,6 +1401,7 @@ func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
        f, err := coll.FileSystem(nil, nil)
        c.Check(err, check.IsNil)
        c.Logf("%s loaded", time.Now())
+       c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))
 
        for i := 0; i < dirCount; i++ {
                for j := 0; j < fileCount; j++ {