"runtime"
"strings"
"sync"
+ "sync/atomic"
"testing"
"time"
type keepClientStub struct {
blocks map[string][]byte
refreshable map[string]bool
+ onPut func(bufcopy []byte) // called from PutB, before acquiring lock
sync.RWMutex
}
locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
buf := make([]byte, len(p))
copy(buf, p)
+ if kcs.onPut != nil {
+ kcs.onPut(buf)
+ }
kcs.Lock()
defer kcs.Unlock()
kcs.blocks[locator[:32]] = buf
c.Check(string(buf), check.Equals, string(expect))
c.Check(err, check.IsNil)
}
- s.checkMemSize(c, f)
}(n)
}
wg.Wait()
+ for n := 0; n < ngoroutines; n++ {
+ f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ f.(*filehandle).inode.(*filenode).waitPrune()
+ s.checkMemSize(c, f)
+ defer f.Close()
+ }
+
root, err := s.fs.Open("/")
c.Assert(err, check.IsNil)
defer root.Close()
}
func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+ defer func(wab, mbs int) {
+ writeAheadBlocks = wab
+ maxBlockSize = mbs
+ }(writeAheadBlocks, maxBlockSize)
+ writeAheadBlocks = 2
maxBlockSize = 1024
- defer func() { maxBlockSize = 2 << 26 }()
+
+ proceed := make(chan struct{})
+ var started, concurrent int32
+ blk2done := false
+ s.kc.onPut = func([]byte) {
+ atomic.AddInt32(&concurrent, 1)
+ switch atomic.AddInt32(&started, 1) {
+ case 1:
+ // Wait until block 2 starts and finishes, and block 3 starts
+ select {
+ case <-proceed:
+ c.Check(blk2done, check.Equals, true)
+ case <-time.After(time.Second):
+ c.Error("timed out")
+ }
+ case 2:
+ time.Sleep(time.Millisecond)
+ blk2done = true
+ case 3:
+ close(proceed)
+ default:
+ time.Sleep(time.Millisecond)
+ }
+ c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true)
+ }
fs, err := (&Collection{}).FileSystem(s.client, s.kc)
c.Assert(err, check.IsNil)
}
return
}
-
- c.Check(len(currentMemExtents()) <= writeAheadBlocks+1, check.Equals, true)
f.(*filehandle).inode.(*filenode).waitPrune()
c.Check(currentMemExtents(), check.HasLen, 1)