12483: Track memory use. Flush filled blocks while writing.
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 14 Nov 2017 20:17:12 +0000 (15:17 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 14 Nov 2017 20:35:06 +0000 (15:35 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

sdk/go/arvados/collection_fs.go
sdk/go/arvados/collection_fs_test.go

index 0499a0447b53301ae26bb520bad4f7d38cbfff62..79626500c10acb8c392f8432ebc2add3abff4c7a 100644 (file)
@@ -139,6 +139,7 @@ type filenode struct {
        parent   *dirnode
        extents  []extent
        repacked int64 // number of times anything in []extents has changed len
+       memsize  int64 // bytes in memExtents
        sync.RWMutex
 }
 
@@ -273,15 +274,21 @@ func (fn *filenode) Truncate(size int64) error {
        defer fn.Unlock()
        if size < fn.fileinfo.size {
                ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
+               for i := ptr.extentIdx; i < len(fn.extents); i++ {
+                       if ext, ok := fn.extents[i].(*memExtent); ok {
+                               fn.memsize -= int64(ext.Len())
+                       }
+               }
                if ptr.extentOff == 0 {
                        fn.extents = fn.extents[:ptr.extentIdx]
                } else {
                        fn.extents = fn.extents[:ptr.extentIdx+1]
-                       e := fn.extents[ptr.extentIdx]
-                       if e, ok := e.(writableExtent); ok {
-                               e.Truncate(ptr.extentOff)
-                       } else {
-                               fn.extents[ptr.extentIdx] = e.Slice(0, ptr.extentOff)
+                       switch ext := fn.extents[ptr.extentIdx].(type) {
+                       case *memExtent:
+                               ext.Truncate(ptr.extentOff)
+                               fn.memsize += int64(ext.Len())
+                       default:
+                               fn.extents[ptr.extentIdx] = ext.Slice(0, ptr.extentOff)
                        }
                }
                fn.fileinfo.size = size
@@ -306,6 +313,7 @@ func (fn *filenode) Truncate(size int64) error {
                }
                e.Truncate(e.Len() + int(grow))
                fn.fileinfo.size += grow
+               fn.memsize += grow
        }
        return nil
 }
@@ -357,6 +365,7 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
                        prev++
                        e := &memExtent{}
                        e.Truncate(len(cando))
+                       fn.memsize += int64(len(cando))
                        fn.extents[cur] = e
                        fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
                        ptr.extentIdx++
@@ -398,6 +407,7 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
                                ptr.extentIdx--
                                ptr.extentOff = fn.extents[prev].Len()
                                fn.extents[prev].(writableExtent).Truncate(ptr.extentOff + len(cando))
+                               fn.memsize += int64(len(cando))
                                ptr.repacked++
                                fn.repacked++
                        } else {
@@ -413,6 +423,7 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
                                }
                                e := &memExtent{}
                                e.Truncate(len(cando))
+                               fn.memsize += int64(len(cando))
                                fn.extents[cur] = e
                                cur++
                                prev++
@@ -426,6 +437,9 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 
                ptr.off += int64(len(cando))
                ptr.extentOff += len(cando)
+               if ptr.extentOff >= maxBlockSize {
+                       fn.pruneMemExtents()
+               }
                if fn.extents[ptr.extentIdx].Len() == ptr.extentOff {
                        ptr.extentOff = 0
                        ptr.extentIdx++
@@ -434,6 +448,35 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
        return
 }
 
+// Write some data out to disk to reduce memory use. Caller must have
+// write lock.
+func (fn *filenode) pruneMemExtents() {
+       // TODO: async (don't hold Lock() while waiting for Keep)
+       // TODO: share code with (*dirnode)sync()
+       // TODO: pack/flush small blocks too, when fragmented
+       for idx, ext := range fn.extents {
+               ext, ok := ext.(*memExtent)
+               if !ok || ext.Len() < maxBlockSize {
+                       continue
+               }
+               locator, _, err := fn.parent.kc.PutB(ext.buf)
+               if err != nil {
+                       // TODO: stall (or return errors from)
+                       // subsequent writes until flushing
+                       // starts to succeed
+                       continue
+               }
+               fn.memsize -= int64(ext.Len())
+               fn.extents[idx] = storedExtent{
+                       kc:      fn.parent.kc,
+                       locator: locator,
+                       size:    ext.Len(),
+                       offset:  0,
+                       length:  ext.Len(),
+               }
+       }
+}
+
 // FileSystem returns a CollectionFileSystem for the collection.
 func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
        fs := &fileSystem{dirnode: dirnode{
@@ -540,7 +583,8 @@ type dirnode struct {
        sync.RWMutex
 }
 
-// caller must hold dn.Lock().
+// sync flushes in-memory data (for all files in the tree rooted at
+// dn) to persistent storage. Caller must hold dn.Lock().
 func (dn *dirnode) sync() error {
        type shortBlock struct {
                fn  *filenode
@@ -572,6 +616,7 @@ func (dn *dirnode) sync() error {
                                length:  len(data),
                        }
                        off += len(data)
+                       sb.fn.memsize -= int64(len(data))
                }
                return nil
        }
index 4e2e84fba2ec8a382b509f966b341a611674f5fb..4be1a3d221069bf076d4728f76154924c3499f88 100644 (file)
@@ -307,6 +307,11 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
        c.Check(string(buf2), check.Equals, "12345678abcdefg")
        c.Check(len(f.(*file).inode.(*filenode).extents), check.Equals, 2)
 
+       // Force flush to ensure the block "12345678" gets stored, so
+       // we know what to expect in the final manifest below.
+       _, err = s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+
        // Truncate to size=3 while f2's ptr is at 15
        err = f.Truncate(3)
        c.Check(err, check.IsNil)
@@ -322,7 +327,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
        m, err := s.fs.MarshalManifest(".")
        c.Check(err, check.IsNil)
        m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
-       c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 202cb962ac59075b964b07152d234b70+3 3:3:bar 6:3:foo\n")
+       c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
 }
 
 func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
@@ -439,15 +444,18 @@ func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
 }
 
 func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
-       maxBlockSize = 8
+       maxBlockSize = 40
        defer func() { maxBlockSize = 2 << 26 }()
 
        var err error
        s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
 
+       const nfiles = 256
+       const ngoroutines = 256
+
        var wg sync.WaitGroup
-       for n := 0; n < 128; n++ {
+       for n := 0; n < nfiles; n++ {
                wg.Add(1)
                go func(n int) {
                        defer wg.Done()
@@ -456,7 +464,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
                        f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
                        c.Assert(err, check.IsNil)
                        defer f.Close()
-                       for i := 0; i < 6502; i++ {
+                       for i := 0; i < ngoroutines; i++ {
                                trunc := rand.Intn(65)
                                woff := rand.Intn(trunc + 1)
                                wbytes = wbytes[:rand.Intn(64-woff+1)]
@@ -482,6 +490,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
                                c.Check(string(buf), check.Equals, string(expect))
                                c.Check(err, check.IsNil)
                        }
+                       s.checkMemSize(c, f)
                }(n)
        }
        wg.Wait()
@@ -491,7 +500,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
        defer root.Close()
        fi, err := root.Readdir(-1)
        c.Check(err, check.IsNil)
-       c.Check(len(fi), check.Equals, 128)
+       c.Check(len(fi), check.Equals, nfiles)
 
        _, err = s.fs.MarshalManifest(".")
        c.Check(err, check.IsNil)
@@ -565,6 +574,42 @@ func (s *CollectionFSSuite) TestPersist(c *check.C) {
        }
 }
 
+func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+       maxBlockSize = 1024
+       defer func() { maxBlockSize = 2 << 26 }()
+
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0)
+       c.Assert(err, check.IsNil)
+       defer f.Close()
+
+       data := make([]byte, 500)
+       rand.Read(data)
+
+       for i := 0; i < 100; i++ {
+               n, err := f.Write(data)
+               c.Assert(n, check.Equals, len(data))
+               c.Assert(err, check.IsNil)
+       }
+
+       currentMemExtents := func() (memExtents []int) {
+               for idx, e := range f.(*file).inode.(*filenode).extents {
+                       switch e.(type) {
+                       case *memExtent:
+                               memExtents = append(memExtents, idx)
+                       }
+               }
+               return
+       }
+       c.Check(currentMemExtents(), check.HasLen, 1)
+
+       m, err := fs.MarshalManifest(".")
+       c.Check(m, check.Not(check.Equals), "")
+       c.Check(err, check.IsNil)
+       c.Check(currentMemExtents(), check.HasLen, 0)
+}
+
 func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
        for _, txt := range []string{
                "\n",
@@ -606,6 +651,17 @@ func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
        }
 }
 
+func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
+       fn := f.(*file).inode.(*filenode)
+       var memsize int64
+       for _, ext := range fn.extents {
+               if e, ok := ext.(*memExtent); ok {
+                       memsize += int64(len(e.buf))
+               }
+       }
+       c.Check(fn.memsize, check.Equals, memsize)
+}
+
 // Gocheck boilerplate
 func Test(t *testing.T) {
        check.TestingT(t)