12483: Speed up manifest loading.
[arvados.git] / sdk / go / arvados / collection_fs.go
index 459b476428c74b62b394a7081243ed18cc06aeca..45c0731b0805bd36426261d590b4c44798f141f5 100644 (file)
@@ -803,6 +803,8 @@ func (dn *dirnode) loadManifest(txt string) error {
                lineno := i + 1
                var extents []storedExtent
                var anyFileTokens bool
+               var pos int64
+               var extIdx int
                for i, token := range strings.Split(stream, " ") {
                        if i == 0 {
                                dirname = manifestUnescape(token)
@@ -846,29 +848,28 @@ func (dn *dirnode) loadManifest(txt string) error {
                                return fmt.Errorf("line %d: bad file segment %q", lineno, token)
                        }
                        name := path.Clean(dirname + "/" + manifestUnescape(toks[2]))
-                       err = dn.makeParentDirs(name)
+                       fnode, err := dn.createFileAndParents(name)
                        if err != nil {
                                return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
                        }
-                       f, err := dn.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0700)
-                       if err != nil {
-                               return fmt.Errorf("line %d: cannot append to %q: %s", lineno, name, err)
-                       }
-                       if f.inode.Stat().IsDir() {
-                               f.Close()
-                               return fmt.Errorf("line %d: cannot append to %q: is a directory", lineno, name)
-                       }
                        // Map the stream offset/range coordinates to
                        // block/offset/range coordinates and add
                        // corresponding storedExtents to the filenode
-                       var pos int64
-                       for _, e := range extents {
-                               next := pos + int64(e.Len())
+                       if pos > offset {
+                               // Can't continue where we left off.
+                               // TODO: binary search instead of
+                               // rewinding all the way (but this
+                               // situation might be rare anyway)
+                               extIdx, pos = 0, 0
+                       }
+                       for next := int64(0); extIdx < len(extents); extIdx, pos = extIdx+1, next {
+                               e := extents[extIdx]
+                               next = pos + int64(e.Len())
                                if next <= offset || e.Len() == 0 {
                                        pos = next
                                        continue
                                }
-                               if pos > offset+length {
+                               if pos >= offset+length {
                                        break
                                }
                                var blkOff int
@@ -879,17 +880,18 @@ func (dn *dirnode) loadManifest(txt string) error {
                                if pos+int64(blkOff+blkLen) > offset+length {
                                        blkLen = int(offset + length - pos - int64(blkOff))
                                }
-                               f.inode.(*filenode).appendExtent(storedExtent{
+                               fnode.appendExtent(storedExtent{
                                        kc:      dn.kc,
                                        locator: e.locator,
                                        size:    e.size,
                                        offset:  blkOff,
                                        length:  blkLen,
                                })
-                               pos = next
+                               if next > offset+length {
+                                       break
+                               }
                        }
-                       f.Close()
-                       if pos < offset+length {
+                       if extIdx == len(extents) && pos < offset+length {
                                return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
                        }
                }
@@ -904,21 +906,41 @@ func (dn *dirnode) loadManifest(txt string) error {
        return nil
 }
 
-func (dn *dirnode) makeParentDirs(name string) (err error) {
-       names := strings.Split(name, "/")
-       for _, name := range names[:len(names)-1] {
-               f, err := dn.OpenFile(name, os.O_CREATE, os.ModeDir|0755)
-               if err != nil {
-                       return err
+// only safe to call from loadManifest -- no locking
+func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
+       names := strings.Split(path, "/")
+       if basename := names[len(names)-1]; basename == "" || basename == "." || basename == ".." {
+               err = fmt.Errorf("invalid filename")
+               return
+       }
+       var node inode = dn
+       for i, name := range names {
+               dn, ok := node.(*dirnode)
+               if !ok {
+                       err = ErrFileExists
+                       return
                }
-               defer f.Close()
-               var ok bool
-               dn, ok = f.inode.(*dirnode)
+               if name == "" || name == "." {
+                       continue
+               }
+               if name == ".." {
+                       node = dn.parent
+                       continue
+               }
+               node, ok = dn.inodes[name]
                if !ok {
-                       return ErrFileExists
+                       if i == len(names)-1 {
+                               fn = dn.newFilenode(name, 0755)
+                               return
+                       }
+                       node = dn.newDirnode(name, 0755)
                }
        }
-       return nil
+       var ok bool
+       if fn, ok = node.(*filenode); !ok {
+               err = ErrInvalidArgument
+       }
+       return
 }
 
 func (dn *dirnode) mkdir(name string) (*file, error) {
@@ -1103,6 +1125,40 @@ func (dn *dirnode) lookupPath(path string) (node inode) {
        return
 }
 
+func (dn *dirnode) newDirnode(name string, perm os.FileMode) *dirnode {
+       child := &dirnode{
+               parent: dn,
+               client: dn.client,
+               kc:     dn.kc,
+               fileinfo: fileinfo{
+                       name: name,
+                       mode: os.ModeDir | perm,
+               },
+       }
+       if dn.inodes == nil {
+               dn.inodes = make(map[string]inode)
+       }
+       dn.inodes[name] = child
+       dn.fileinfo.size++
+       return child
+}
+
+func (dn *dirnode) newFilenode(name string, perm os.FileMode) *filenode {
+       child := &filenode{
+               parent: dn,
+               fileinfo: fileinfo{
+                       name: name,
+                       mode: perm,
+               },
+       }
+       if dn.inodes == nil {
+               dn.inodes = make(map[string]inode)
+       }
+       dn.inodes[name] = child
+       dn.fileinfo.size++
+       return child
+}
+
 // OpenFile is analogous to os.OpenFile().
 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
        if flag&os.O_SYNC != 0 {
@@ -1149,29 +1205,10 @@ func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, err
                        return nil, os.ErrNotExist
                }
                if perm.IsDir() {
-                       n = &dirnode{
-                               parent: dn,
-                               client: dn.client,
-                               kc:     dn.kc,
-                               fileinfo: fileinfo{
-                                       name: name,
-                                       mode: os.ModeDir | 0755,
-                               },
-                       }
+                       n = dn.newDirnode(name, 0755)
                } else {
-                       n = &filenode{
-                               parent: dn,
-                               fileinfo: fileinfo{
-                                       name: name,
-                                       mode: 0755,
-                               },
-                       }
-               }
-               if dn.inodes == nil {
-                       dn.inodes = make(map[string]inode)
+                       n = dn.newFilenode(name, 0755)
                }
-               dn.inodes[name] = n
-               dn.fileinfo.size++
        } else if flag&os.O_EXCL != 0 {
                return nil, ErrFileExists
        } else if flag&os.O_TRUNC != 0 {