"strconv"
"strings"
"sync"
+ "sync/atomic"
"time"
)
}
segs = append(segs, seg)
}
+ blocksize := len(block)
dn.fs.throttle().Acquire()
errs := make(chan error, 1)
go func() {
dn.fs.throttle().Release()
{
if !sync {
+ dn.Lock()
+ defer dn.Unlock()
for _, name := range dn.sortedNames() {
if fn, ok := dn.inodes[name].(*filenode); ok {
fn.Lock()
ref.fn.segments[ref.idx] = storedSegment{
kc: dn.fs,
locator: locator,
- size: len(block),
+ size: blocksize,
offset: offsets[idx],
length: len(data),
}
- ref.fn.memsize -= int64(len(data))
+ // atomic is needed here despite caller having
+ // lock: caller might be running concurrent
+ // commitBlock() goroutines using the same
+ // lock, writing different segments from the
+ // same file.
+ atomic.AddInt64(&ref.fn.memsize, -int64(len(data)))
}
}()
if sync {
fs.Flush("", true)
}
+func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
+ s.kc.onPut = func([]byte) {
+ s.kc.Lock()
+ s.kc.blocks = map[string][]byte{}
+ s.kc.Unlock()
+ }
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ for _, blocksize := range []int{8, 1000000} {
+ dir := fmt.Sprintf("dir%d", blocksize)
+ err = fs.Mkdir(dir, 0755)
+ c.Assert(err, check.IsNil)
+ data := make([]byte, blocksize)
+ for i := 0; i < 100; i++ {
+ f, err := fs.OpenFile(fmt.Sprintf("%s/file%d", dir, i), os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write(data)
+ c.Assert(err, check.IsNil)
+ f.Close()
+ fs.Flush(dir, false)
+ }
+ }
+}
+
func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
for _, txt := range []string{
"\n",