1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
24 ErrReadOnlyFile = errors.New("read-only file")
25 ErrNegativeOffset = errors.New("cannot seek to negative offset")
26 ErrFileExists = errors.New("file exists")
27 ErrInvalidOperation = errors.New("invalid operation")
28 ErrPermission = os.ErrPermission
30 maxBlockSize = 1 << 26
39 Readdir(int) ([]os.FileInfo, error)
40 Stat() (os.FileInfo, error)
44 type keepClient interface {
45 ReadAt(locator string, p []byte, off int) (int, error)
48 type fileinfo struct {
55 // Name implements os.FileInfo.
56 func (fi fileinfo) Name() string {
60 // ModTime implements os.FileInfo.
61 func (fi fileinfo) ModTime() time.Time {
65 // Mode implements os.FileInfo.
66 func (fi fileinfo) Mode() os.FileMode {
70 // IsDir implements os.FileInfo.
71 func (fi fileinfo) IsDir() bool {
72 return fi.mode&os.ModeDir != 0
75 // Size implements os.FileInfo.
76 func (fi fileinfo) Size() int64 {
80 // Sys implements os.FileInfo.
81 func (fi fileinfo) Sys() interface{} {
85 func (fi fileinfo) Stat() os.FileInfo {
89 // A CollectionFileSystem is an http.Filesystem plus Stat() and
90 // support for opening writable files.
91 type CollectionFileSystem interface {
93 Stat(name string) (os.FileInfo, error)
94 Create(name string) (File, error)
95 OpenFile(name string, flag int, perm os.FileMode) (File, error)
96 MarshalManifest(string) (string, error)
99 type fileSystem struct {
103 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
104 return fs.dirnode.OpenFile(path.Clean(name), flag, perm)
107 func (fs *fileSystem) Open(name string) (http.File, error) {
108 return fs.dirnode.OpenFile(path.Clean(name), os.O_RDONLY, 0)
111 func (fs *fileSystem) Create(name string) (File, error) {
112 return fs.dirnode.OpenFile(path.Clean(name), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
115 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
116 f, err := fs.OpenFile(name, os.O_RDONLY, 0)
124 type inode interface {
126 OpenFile(string, int, os.FileMode) (*file, error)
128 Read([]byte, filenodePtr) (int, filenodePtr, error)
129 Write([]byte, filenodePtr) (int, filenodePtr, error)
130 Truncate(int64) error
131 Readdir() []os.FileInfo
138 // filenode implements inode.
139 type filenode struct {
143 repacked int64 // number of times anything in []extents has changed len
147 // filenodePtr is an offset into a file that is (usually) efficient to
148 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
149 // then filenode.extents[filenodePtr.extentIdx][filenodePtr.extentOff]
150 // corresponds to file offset filenodePtr.off. Otherwise, it is
151 // necessary to reexamine len(filenode.extents[0]) etc. to find the
152 // correct extent and offset.
153 type filenodePtr struct {
160 // seek returns a ptr that is consistent with both startPtr.off and
161 // the current state of fn. The caller must already hold fn.RLock() or
164 // If startPtr points beyond the end of the file, ptr will point to
165 // exactly the end of the file.
169 // ptr.extentIdx == len(filenode.extents) // i.e., at EOF
171 // filenode.extents[ptr.extentIdx].Len() >= ptr.extentOff
172 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
175 // meaningless anyway
177 } else if ptr.off >= fn.fileinfo.size {
178 ptr.off = fn.fileinfo.size
179 ptr.extentIdx = len(fn.extents)
181 ptr.repacked = fn.repacked
183 } else if ptr.repacked == fn.repacked {
184 // extentIdx and extentOff accurately reflect ptr.off,
185 // but might have fallen off the end of an extent
186 if ptr.extentOff >= fn.extents[ptr.extentIdx].Len() {
193 ptr.repacked = fn.repacked
195 if ptr.off >= fn.fileinfo.size {
196 ptr.extentIdx, ptr.extentOff = len(fn.extents), 0
199 // Recompute extentIdx and extentOff. We have already
200 // established fn.fileinfo.size > ptr.off >= 0, so we don't
201 // have to deal with edge cases here.
203 for ptr.extentIdx, ptr.extentOff = 0, 0; off < ptr.off; ptr.extentIdx++ {
204 // This would panic (index out of range) if
205 // fn.fileinfo.size were larger than
206 // sum(fn.extents[i].Len()) -- but that can't happen
207 // because we have ensured fn.fileinfo.size is always
209 extLen := int64(fn.extents[ptr.extentIdx].Len())
210 if off+extLen > ptr.off {
211 ptr.extentOff = int(ptr.off - off)
219 func (fn *filenode) appendExtent(e extent) {
222 fn.extents = append(fn.extents, e)
223 fn.fileinfo.size += int64(e.Len())
226 func (fn *filenode) OpenFile(string, int, os.FileMode) (*file, error) {
227 return nil, os.ErrNotExist
230 func (fn *filenode) Parent() inode {
234 func (fn *filenode) Readdir() []os.FileInfo {
238 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
241 ptr = fn.seek(startPtr)
243 err = ErrNegativeOffset
246 if ptr.extentIdx >= len(fn.extents) {
250 n, err = fn.extents[ptr.extentIdx].ReadAt(p, int64(ptr.extentOff))
254 if ptr.extentOff == fn.extents[ptr.extentIdx].Len() {
257 if ptr.extentIdx < len(fn.extents) && err == io.EOF {
265 func (fn *filenode) Truncate(size int64) error {
268 if size < fn.fileinfo.size {
269 ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
270 if ptr.extentOff == 0 {
271 fn.extents = fn.extents[:ptr.extentIdx]
273 fn.extents = fn.extents[:ptr.extentIdx+1]
274 e := fn.extents[ptr.extentIdx]
275 if e, ok := e.(writableExtent); ok {
276 e.Truncate(ptr.extentOff)
278 fn.extents[ptr.extentIdx] = e.Slice(0, ptr.extentOff)
281 fn.fileinfo.size = size
285 for size > fn.fileinfo.size {
286 grow := size - fn.fileinfo.size
289 if len(fn.extents) == 0 {
291 fn.extents = append(fn.extents, e)
292 } else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
294 fn.extents = append(fn.extents, e)
298 if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
301 e.Truncate(e.Len() + int(grow))
302 fn.fileinfo.size += grow
307 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
310 ptr = fn.seek(startPtr)
312 err = ErrNegativeOffset
315 for len(p) > 0 && err == nil {
317 if len(cando) > maxBlockSize {
318 cando = cando[:maxBlockSize]
320 // Rearrange/grow fn.extents (and shrink cando if
321 // needed) such that cando can be copied to
322 // fn.extents[ptr.extentIdx] at offset ptr.extentOff.
324 prev := ptr.extentIdx - 1
326 if cur < len(fn.extents) {
327 _, curWritable = fn.extents[cur].(writableExtent)
329 var prevAppendable bool
330 if prev >= 0 && fn.extents[prev].Len() < maxBlockSize {
331 _, prevAppendable = fn.extents[prev].(writableExtent)
333 if ptr.extentOff > 0 && !curWritable {
334 // Split a non-writable block.
335 if max := fn.extents[cur].Len() - ptr.extentOff; max <= len(cando) {
336 // Truncate cur, and insert a new
339 fn.extents = append(fn.extents, nil)
340 copy(fn.extents[cur+1:], fn.extents[cur:])
342 // Split cur into two copies, truncate
343 // the one on the left, shift the one
344 // on the right, and insert a new
345 // extent between them.
346 fn.extents = append(fn.extents, nil, nil)
347 copy(fn.extents[cur+2:], fn.extents[cur:])
348 fn.extents[cur+2] = fn.extents[cur+2].Slice(ptr.extentOff+len(cando), -1)
353 e.Truncate(len(cando))
355 fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
360 } else if curWritable {
361 if fit := int(fn.extents[cur].Len()) - ptr.extentOff; fit < len(cando) {
366 // Shrink cando if needed to fit in prev extent.
367 if cangrow := maxBlockSize - fn.extents[prev].Len(); cangrow < len(cando) {
368 cando = cando[:cangrow]
372 if cur == len(fn.extents) {
373 // ptr is at EOF, filesize is changing.
374 fn.fileinfo.size += int64(len(cando))
375 } else if el := fn.extents[cur].Len(); el <= len(cando) {
376 // cando is long enough that we won't
377 // need cur any more. shrink cando to
378 // be exactly as long as cur
379 // (otherwise we'd accidentally shift
380 // the effective position of all
381 // extents after cur).
383 copy(fn.extents[cur:], fn.extents[cur+1:])
384 fn.extents = fn.extents[:len(fn.extents)-1]
386 // shrink cur by the same #bytes we're growing prev
387 fn.extents[cur] = fn.extents[cur].Slice(len(cando), -1)
393 ptr.extentOff = fn.extents[prev].Len()
394 fn.extents[prev].(writableExtent).Truncate(ptr.extentOff + len(cando))
398 // Insert an extent between prev and cur, and advance prev/cur.
399 fn.extents = append(fn.extents, nil)
400 if cur < len(fn.extents) {
401 copy(fn.extents[cur+1:], fn.extents[cur:])
405 // appending a new extent does
406 // not invalidate any ptrs
409 e.Truncate(len(cando))
416 // Finally we can copy bytes from cando to the current extent.
417 fn.extents[ptr.extentIdx].(writableExtent).WriteAt(cando, ptr.extentOff)
421 ptr.off += int64(len(cando))
422 ptr.extentOff += len(cando)
423 if fn.extents[ptr.extentIdx].Len() == ptr.extentOff {
431 // FileSystem returns a CollectionFileSystem for the collection.
432 func (c *Collection) FileSystem(client *Client, kc keepClient) CollectionFileSystem {
433 fs := &fileSystem{dirnode: dirnode{
434 cache: &keepBlockCache{kc: kc},
437 fileinfo: fileinfo{name: ".", mode: os.ModeDir | 0755},
439 inodes: make(map[string]inode),
441 fs.dirnode.parent = &fs.dirnode
442 fs.dirnode.loadManifest(c.ManifestText)
451 unreaddirs []os.FileInfo
454 func (f *file) Read(p []byte) (n int, err error) {
455 n, f.ptr, err = f.inode.Read(p, f.ptr)
459 func (f *file) Seek(off int64, whence int) (pos int64, err error) {
460 size := f.inode.Size()
471 return f.ptr.off, ErrNegativeOffset
476 if ptr.off != f.ptr.off {
478 // force filenode to recompute f.ptr fields on next
482 return f.ptr.off, nil
485 func (f *file) Truncate(size int64) error {
486 return f.inode.Truncate(size)
489 func (f *file) Write(p []byte) (n int, err error) {
491 return 0, ErrReadOnlyFile
493 n, f.ptr, err = f.inode.Write(p, f.ptr)
497 func (f *file) Readdir(count int) ([]os.FileInfo, error) {
498 if !f.inode.IsDir() {
499 return nil, ErrInvalidOperation
502 return f.inode.Readdir(), nil
504 if f.unreaddirs == nil {
505 f.unreaddirs = f.inode.Readdir()
507 if len(f.unreaddirs) == 0 {
510 if count > len(f.unreaddirs) {
511 count = len(f.unreaddirs)
513 ret := f.unreaddirs[:count]
514 f.unreaddirs = f.unreaddirs[count:]
518 func (f *file) Stat() (os.FileInfo, error) {
522 func (f *file) Close() error {
527 func (f *file) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
528 return f.inode.OpenFile(name, flag, perm)
531 type dirnode struct {
537 inodes map[string]inode
541 // caller must hold dn.Lock().
542 func (dn *dirnode) sync() error {
543 type shortBlock struct {
547 var pending []shortBlock
550 flush := func(sbs []shortBlock) error {
556 for _, sb := range sbs {
557 data := sb.fn.extents[sb.idx].(*memExtent).buf
558 if _, err := hash.Write(data); err != nil {
563 // FIXME: write to keep
564 locator := fmt.Sprintf("%x+%d", hash.Sum(nil), size)
566 for _, sb := range sbs {
567 data := sb.fn.extents[sb.idx].(*memExtent).buf
568 sb.fn.extents[sb.idx] = storedExtent{
580 names := make([]string, 0, len(dn.inodes))
581 for name := range dn.inodes {
582 names = append(names, name)
586 for _, name := range names {
587 fn, ok := dn.inodes[name].(*filenode)
593 for idx, ext := range fn.extents {
594 ext, ok := ext.(*memExtent)
598 if ext.Len() > maxBlockSize/2 {
599 if err := flush([]shortBlock{{fn, idx}}); err != nil {
604 if pendingLen+ext.Len() > maxBlockSize {
605 if err := flush(pending); err != nil {
611 pending = append(pending, shortBlock{fn, idx})
612 pendingLen += ext.Len()
615 return flush(pending)
618 func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
621 if err := dn.sync(); err != nil {
626 type m1segment struct {
631 var segments []m1segment
635 names := make([]string, 0, len(dn.inodes))
636 for name := range dn.inodes {
637 names = append(names, name)
641 for _, name := range names {
642 node := dn.inodes[name]
643 switch node := node.(type) {
645 subdir, err := node.MarshalManifest(prefix + "/" + node.Name())
649 subdirs = subdirs + subdir
651 for _, e := range node.extents {
652 switch e := e.(type) {
654 blocks = append(blocks, fmt.Sprintf("FIXME+%d", e.Len()))
655 segments = append(segments, m1segment{
658 length: int64(e.Len()),
660 streamLen += int64(e.Len())
662 if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
663 streamLen -= int64(e.size)
665 blocks = append(blocks, e.locator)
667 segments = append(segments, m1segment{
669 offset: streamLen + int64(e.offset),
670 length: int64(e.length),
672 streamLen += int64(e.size)
674 panic(fmt.Sprintf("can't marshal extent type %T", e))
678 panic(fmt.Sprintf("can't marshal inode type %T", node))
681 var filetokens []string
682 for _, s := range segments {
683 filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, s.name))
685 if len(filetokens) == 0 {
687 } else if len(blocks) == 0 {
688 blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
690 return prefix + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
693 func (dn *dirnode) loadManifest(txt string) {
696 for _, stream := range strings.Split(txt, "\n") {
697 var extents []storedExtent
698 for i, token := range strings.Split(stream, " ") {
700 dirname = manifestUnescape(token)
703 if !strings.Contains(token, ":") {
704 toks := strings.SplitN(token, "+", 3)
709 length, err := strconv.ParseInt(toks[1], 10, 32)
710 if err != nil || length < 0 {
714 extents = append(extents, storedExtent{
722 toks := strings.Split(token, ":")
724 // FIXME: broken manifest
727 offset, err := strconv.ParseInt(toks[0], 10, 64)
728 if err != nil || offset < 0 {
729 // FIXME: broken manifest
732 length, err := strconv.ParseInt(toks[1], 10, 64)
733 if err != nil || length < 0 {
734 // FIXME: broken manifest
737 name := path.Clean(dirname + "/" + manifestUnescape(toks[2]))
738 dn.makeParentDirs(name)
739 f, err := dn.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0700)
744 if f.inode.Stat().IsDir() {
746 // FIXME: broken manifest
749 // Map the stream offset/range coordinates to
750 // block/offset/range coordinates and add
751 // corresponding storedExtents to the filenode
753 for _, e := range extents {
754 next := pos + int64(e.Len())
759 if pos > offset+length {
764 blkOff = int(offset - pos)
766 blkLen := e.Len() - blkOff
767 if pos+int64(blkOff+blkLen) > offset+length {
768 blkLen = int(offset + length - pos - int64(blkOff))
770 f.inode.(*filenode).appendExtent(storedExtent{
784 func (dn *dirnode) makeParentDirs(name string) {
785 names := strings.Split(name, "/")
786 for _, name := range names[:len(names)-1] {
789 if n, ok := dn.inodes[name]; !ok {
796 mode: os.ModeDir | 0755,
799 if dn.inodes == nil {
800 dn.inodes = make(map[string]inode)
805 } else if n, ok := n.(*dirnode); ok {
814 func (dn *dirnode) Parent() inode {
818 func (dn *dirnode) Readdir() (fi []os.FileInfo) {
821 fi = make([]os.FileInfo, 0, len(dn.inodes))
822 for _, inode := range dn.inodes {
823 fi = append(fi, inode.Stat())
828 func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
829 return 0, ptr, ErrInvalidOperation
832 func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
833 return 0, ptr, ErrInvalidOperation
836 func (dn *dirnode) Truncate(int64) error {
837 return ErrInvalidOperation
840 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
841 name = strings.TrimSuffix(name, "/")
842 if name == "." || name == "" {
843 return &file{inode: dn}, nil
845 if dirname, name := path.Split(name); dirname != "" {
846 // OpenFile("foo/bar/baz") =>
847 // OpenFile("foo/bar").OpenFile("baz") (or
848 // ErrNotExist, if foo/bar is a file)
849 f, err := dn.OpenFile(dirname, os.O_RDONLY, 0)
854 if dn, ok := f.inode.(*dirnode); ok {
855 return dn.OpenFile(name, flag, perm)
857 return nil, os.ErrNotExist
863 return &file{inode: dn.parent}, nil
865 n, ok := dn.inodes[name]
867 if flag&os.O_CREATE == 0 {
868 return nil, os.ErrNotExist
877 if dn.inodes == nil {
878 dn.inodes = make(map[string]inode)
882 } else if flag&os.O_EXCL != 0 {
883 return nil, ErrFileExists
887 append: flag&os.O_APPEND != 0,
888 writable: flag&(os.O_WRONLY|os.O_RDWR) != 0,
892 type extent interface {
895 // Return a new extent with a subsection of the data from this
896 // one. length<0 means length=Len()-off.
897 Slice(off int, length int) extent
900 type writableExtent interface {
902 WriteAt(p []byte, off int)
906 type memExtent struct {
910 func (me *memExtent) Len() int {
914 func (me *memExtent) Slice(off, length int) extent {
916 length = len(me.buf) - off
918 buf := make([]byte, length)
919 copy(buf, me.buf[off:])
920 return &memExtent{buf: buf}
923 func (me *memExtent) Truncate(n int) {
927 newsize = newsize << 2
929 newbuf := make([]byte, n, newsize)
933 // Zero unused part when shrinking, in case we grow
934 // and start using it again later.
935 for i := n; i < len(me.buf); i++ {
942 func (me *memExtent) WriteAt(p []byte, off int) {
943 if off+len(p) > len(me.buf) {
944 panic("overflowed extent")
946 copy(me.buf[off:], p)
949 func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
950 if off > int64(me.Len()) {
954 n = copy(p, me.buf[int(off):])
961 type storedExtent struct {
969 func (se storedExtent) Len() int {
973 func (se storedExtent) Slice(n, size int) extent {
976 if size >= 0 && se.length > size {
982 func (se storedExtent) ReadAt(p []byte, off int64) (n int, err error) {
983 if off > int64(se.length) {
986 maxlen := se.length - int(off)
989 n, err = se.cache.ReadAt(se.locator, p, int(off)+se.offset)
995 return se.cache.ReadAt(se.locator, p, int(off)+se.offset)
998 type blockCache interface {
999 ReadAt(locator string, p []byte, off int) (n int, err error)
1002 type keepBlockCache struct {
1006 var scratch = make([]byte, 2<<26)
1008 func (kbc *keepBlockCache) ReadAt(locator string, p []byte, off int) (int, error) {
1009 return kbc.kc.ReadAt(locator, p, off)
1012 func canonicalName(name string) string {
1013 name = path.Clean("/" + name)
1014 if name == "/" || name == "./" {
1016 } else if strings.HasPrefix(name, "/") {
1022 var manifestEscapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
1024 func manifestUnescapeSeq(seq string) string {
1028 i, err := strconv.ParseUint(seq[1:], 8, 8)
1030 // Invalid escape sequence: can't unescape.
1033 return string([]byte{byte(i)})
1036 func manifestUnescape(s string) string {
1037 return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeSeq)