parent *dirnode
extents []extent
repacked int64 // number of times anything in []extents has changed len
+ memsize int64 // bytes in memExtents
sync.RWMutex
}
defer fn.Unlock()
if size < fn.fileinfo.size {
ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
+ for i := ptr.extentIdx; i < len(fn.extents); i++ {
+ if ext, ok := fn.extents[i].(*memExtent); ok {
+ fn.memsize -= int64(ext.Len())
+ }
+ }
if ptr.extentOff == 0 {
fn.extents = fn.extents[:ptr.extentIdx]
} else {
fn.extents = fn.extents[:ptr.extentIdx+1]
- e := fn.extents[ptr.extentIdx]
- if e, ok := e.(writableExtent); ok {
- e.Truncate(ptr.extentOff)
- } else {
- fn.extents[ptr.extentIdx] = e.Slice(0, ptr.extentOff)
+ switch ext := fn.extents[ptr.extentIdx].(type) {
+ case *memExtent:
+ ext.Truncate(ptr.extentOff)
+ fn.memsize += int64(ext.Len())
+ default:
+ fn.extents[ptr.extentIdx] = ext.Slice(0, ptr.extentOff)
}
}
fn.fileinfo.size = size
}
e.Truncate(e.Len() + int(grow))
fn.fileinfo.size += grow
+ fn.memsize += grow
}
return nil
}
prev++
e := &memExtent{}
e.Truncate(len(cando))
+ fn.memsize += int64(len(cando))
fn.extents[cur] = e
fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
ptr.extentIdx++
ptr.extentIdx--
ptr.extentOff = fn.extents[prev].Len()
fn.extents[prev].(writableExtent).Truncate(ptr.extentOff + len(cando))
+ fn.memsize += int64(len(cando))
ptr.repacked++
fn.repacked++
} else {
}
e := &memExtent{}
e.Truncate(len(cando))
+ fn.memsize += int64(len(cando))
fn.extents[cur] = e
cur++
prev++
ptr.off += int64(len(cando))
ptr.extentOff += len(cando)
+ if ptr.extentOff >= maxBlockSize {
+ fn.pruneMemExtents()
+ }
if fn.extents[ptr.extentIdx].Len() == ptr.extentOff {
ptr.extentOff = 0
ptr.extentIdx++
return
}
+// Write some data out to disk to reduce memory use. Caller must have
+// write lock.
+func (fn *filenode) pruneMemExtents() {
+ // TODO: async (don't hold Lock() while waiting for Keep)
+ // TODO: share code with (*dirnode)sync()
+ // TODO: pack/flush small blocks too, when fragmented
+ for idx, ext := range fn.extents {
+ ext, ok := ext.(*memExtent)
+ if !ok || ext.Len() < maxBlockSize {
+ continue
+ }
+ locator, _, err := fn.parent.kc.PutB(ext.buf)
+ if err != nil {
+ // TODO: stall (or return errors from)
+ // subsequent writes until flushing
+ // starts to succeed
+ continue
+ }
+ fn.memsize -= int64(ext.Len())
+ fn.extents[idx] = storedExtent{
+ kc: fn.parent.kc,
+ locator: locator,
+ size: ext.Len(),
+ offset: 0,
+ length: ext.Len(),
+ }
+ }
+}
+
// FileSystem returns a CollectionFileSystem for the collection.
func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
fs := &fileSystem{dirnode: dirnode{
sync.RWMutex
}
-// caller must hold dn.Lock().
+// sync flushes in-memory data (for all files in the tree rooted at
+// dn) to persistent storage. Caller must hold dn.Lock().
func (dn *dirnode) sync() error {
type shortBlock struct {
fn *filenode
length: len(data),
}
off += len(data)
+ sb.fn.memsize -= int64(len(data))
}
return nil
}