1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
24 ErrReadOnlyFile = errors.New("read-only file")
25 ErrNegativeOffset = errors.New("cannot seek to negative offset")
26 ErrFileExists = errors.New("file exists")
27 ErrInvalidOperation = errors.New("invalid operation")
28 ErrDirectoryNotEmpty = errors.New("directory not empty")
29 ErrPermission = os.ErrPermission
31 maxBlockSize = 1 << 26
40 Readdir(int) ([]os.FileInfo, error)
41 Stat() (os.FileInfo, error)
45 type keepClient interface {
46 ReadAt(locator string, p []byte, off int) (int, error)
49 type fileinfo struct {
56 // Name implements os.FileInfo.
57 func (fi fileinfo) Name() string {
61 // ModTime implements os.FileInfo.
62 func (fi fileinfo) ModTime() time.Time {
66 // Mode implements os.FileInfo.
67 func (fi fileinfo) Mode() os.FileMode {
71 // IsDir implements os.FileInfo.
72 func (fi fileinfo) IsDir() bool {
73 return fi.mode&os.ModeDir != 0
76 // Size implements os.FileInfo.
77 func (fi fileinfo) Size() int64 {
81 // Sys implements os.FileInfo.
82 func (fi fileinfo) Sys() interface{} {
86 func (fi fileinfo) Stat() os.FileInfo {
90 // A CollectionFileSystem is an http.Filesystem plus Stat() and
91 // support for opening writable files.
92 type CollectionFileSystem interface {
94 Stat(name string) (os.FileInfo, error)
95 Create(name string) (File, error)
96 OpenFile(name string, flag int, perm os.FileMode) (File, error)
97 Mkdir(name string, perm os.FileMode) error
98 Remove(name string) error
99 MarshalManifest(string) (string, error)
102 type fileSystem struct {
106 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
107 return fs.dirnode.OpenFile(path.Clean(name), flag, perm)
110 func (fs *fileSystem) Open(name string) (http.File, error) {
111 return fs.dirnode.OpenFile(path.Clean(name), os.O_RDONLY, 0)
114 func (fs *fileSystem) Create(name string) (File, error) {
115 return fs.dirnode.OpenFile(path.Clean(name), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
118 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
119 f, err := fs.OpenFile(name, os.O_RDONLY, 0)
127 type inode interface {
130 Read([]byte, filenodePtr) (int, filenodePtr, error)
131 Write([]byte, filenodePtr) (int, filenodePtr, error)
132 Truncate(int64) error
133 Readdir() []os.FileInfo
140 // filenode implements inode.
141 type filenode struct {
145 repacked int64 // number of times anything in []extents has changed len
149 // filenodePtr is an offset into a file that is (usually) efficient to
150 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
151 // then filenode.extents[filenodePtr.extentIdx][filenodePtr.extentOff]
152 // corresponds to file offset filenodePtr.off. Otherwise, it is
153 // necessary to reexamine len(filenode.extents[0]) etc. to find the
154 // correct extent and offset.
155 type filenodePtr struct {
162 // seek returns a ptr that is consistent with both startPtr.off and
163 // the current state of fn. The caller must already hold fn.RLock() or
166 // If startPtr points beyond the end of the file, ptr will point to
167 // exactly the end of the file.
171 // ptr.extentIdx == len(filenode.extents) // i.e., at EOF
173 // filenode.extents[ptr.extentIdx].Len() >= ptr.extentOff
174 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
177 // meaningless anyway
179 } else if ptr.off >= fn.fileinfo.size {
180 ptr.off = fn.fileinfo.size
181 ptr.extentIdx = len(fn.extents)
183 ptr.repacked = fn.repacked
185 } else if ptr.repacked == fn.repacked {
186 // extentIdx and extentOff accurately reflect ptr.off,
187 // but might have fallen off the end of an extent
188 if ptr.extentOff >= fn.extents[ptr.extentIdx].Len() {
195 ptr.repacked = fn.repacked
197 if ptr.off >= fn.fileinfo.size {
198 ptr.extentIdx, ptr.extentOff = len(fn.extents), 0
201 // Recompute extentIdx and extentOff. We have already
202 // established fn.fileinfo.size > ptr.off >= 0, so we don't
203 // have to deal with edge cases here.
205 for ptr.extentIdx, ptr.extentOff = 0, 0; off < ptr.off; ptr.extentIdx++ {
206 // This would panic (index out of range) if
207 // fn.fileinfo.size were larger than
208 // sum(fn.extents[i].Len()) -- but that can't happen
209 // because we have ensured fn.fileinfo.size is always
211 extLen := int64(fn.extents[ptr.extentIdx].Len())
212 if off+extLen > ptr.off {
213 ptr.extentOff = int(ptr.off - off)
221 func (fn *filenode) appendExtent(e extent) {
224 fn.extents = append(fn.extents, e)
225 fn.fileinfo.size += int64(e.Len())
228 func (fn *filenode) Parent() inode {
232 func (fn *filenode) Readdir() []os.FileInfo {
236 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
239 ptr = fn.seek(startPtr)
241 err = ErrNegativeOffset
244 if ptr.extentIdx >= len(fn.extents) {
248 n, err = fn.extents[ptr.extentIdx].ReadAt(p, int64(ptr.extentOff))
252 if ptr.extentOff == fn.extents[ptr.extentIdx].Len() {
255 if ptr.extentIdx < len(fn.extents) && err == io.EOF {
263 func (fn *filenode) Truncate(size int64) error {
266 if size < fn.fileinfo.size {
267 ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
268 if ptr.extentOff == 0 {
269 fn.extents = fn.extents[:ptr.extentIdx]
271 fn.extents = fn.extents[:ptr.extentIdx+1]
272 e := fn.extents[ptr.extentIdx]
273 if e, ok := e.(writableExtent); ok {
274 e.Truncate(ptr.extentOff)
276 fn.extents[ptr.extentIdx] = e.Slice(0, ptr.extentOff)
279 fn.fileinfo.size = size
283 for size > fn.fileinfo.size {
284 grow := size - fn.fileinfo.size
287 if len(fn.extents) == 0 {
289 fn.extents = append(fn.extents, e)
290 } else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
292 fn.extents = append(fn.extents, e)
296 if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
299 e.Truncate(e.Len() + int(grow))
300 fn.fileinfo.size += grow
305 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
308 ptr = fn.seek(startPtr)
310 err = ErrNegativeOffset
313 for len(p) > 0 && err == nil {
315 if len(cando) > maxBlockSize {
316 cando = cando[:maxBlockSize]
318 // Rearrange/grow fn.extents (and shrink cando if
319 // needed) such that cando can be copied to
320 // fn.extents[ptr.extentIdx] at offset ptr.extentOff.
322 prev := ptr.extentIdx - 1
324 if cur < len(fn.extents) {
325 _, curWritable = fn.extents[cur].(writableExtent)
327 var prevAppendable bool
328 if prev >= 0 && fn.extents[prev].Len() < maxBlockSize {
329 _, prevAppendable = fn.extents[prev].(writableExtent)
331 if ptr.extentOff > 0 && !curWritable {
332 // Split a non-writable block.
333 if max := fn.extents[cur].Len() - ptr.extentOff; max <= len(cando) {
334 // Truncate cur, and insert a new
337 fn.extents = append(fn.extents, nil)
338 copy(fn.extents[cur+1:], fn.extents[cur:])
340 // Split cur into two copies, truncate
341 // the one on the left, shift the one
342 // on the right, and insert a new
343 // extent between them.
344 fn.extents = append(fn.extents, nil, nil)
345 copy(fn.extents[cur+2:], fn.extents[cur:])
346 fn.extents[cur+2] = fn.extents[cur+2].Slice(ptr.extentOff+len(cando), -1)
351 e.Truncate(len(cando))
353 fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
358 } else if curWritable {
359 if fit := int(fn.extents[cur].Len()) - ptr.extentOff; fit < len(cando) {
364 // Shrink cando if needed to fit in prev extent.
365 if cangrow := maxBlockSize - fn.extents[prev].Len(); cangrow < len(cando) {
366 cando = cando[:cangrow]
370 if cur == len(fn.extents) {
371 // ptr is at EOF, filesize is changing.
372 fn.fileinfo.size += int64(len(cando))
373 } else if el := fn.extents[cur].Len(); el <= len(cando) {
374 // cando is long enough that we won't
375 // need cur any more. shrink cando to
376 // be exactly as long as cur
377 // (otherwise we'd accidentally shift
378 // the effective position of all
379 // extents after cur).
381 copy(fn.extents[cur:], fn.extents[cur+1:])
382 fn.extents = fn.extents[:len(fn.extents)-1]
384 // shrink cur by the same #bytes we're growing prev
385 fn.extents[cur] = fn.extents[cur].Slice(len(cando), -1)
391 ptr.extentOff = fn.extents[prev].Len()
392 fn.extents[prev].(writableExtent).Truncate(ptr.extentOff + len(cando))
396 // Insert an extent between prev and cur, and advance prev/cur.
397 fn.extents = append(fn.extents, nil)
398 if cur < len(fn.extents) {
399 copy(fn.extents[cur+1:], fn.extents[cur:])
403 // appending a new extent does
404 // not invalidate any ptrs
407 e.Truncate(len(cando))
414 // Finally we can copy bytes from cando to the current extent.
415 fn.extents[ptr.extentIdx].(writableExtent).WriteAt(cando, ptr.extentOff)
419 ptr.off += int64(len(cando))
420 ptr.extentOff += len(cando)
421 if fn.extents[ptr.extentIdx].Len() == ptr.extentOff {
429 // FileSystem returns a CollectionFileSystem for the collection.
430 func (c *Collection) FileSystem(client *Client, kc keepClient) CollectionFileSystem {
431 fs := &fileSystem{dirnode: dirnode{
432 cache: &keepBlockCache{kc: kc},
435 fileinfo: fileinfo{name: ".", mode: os.ModeDir | 0755},
437 inodes: make(map[string]inode),
439 fs.dirnode.parent = &fs.dirnode
440 fs.dirnode.loadManifest(c.ManifestText)
449 unreaddirs []os.FileInfo
452 func (f *file) Read(p []byte) (n int, err error) {
453 n, f.ptr, err = f.inode.Read(p, f.ptr)
457 func (f *file) Seek(off int64, whence int) (pos int64, err error) {
458 size := f.inode.Size()
469 return f.ptr.off, ErrNegativeOffset
474 if ptr.off != f.ptr.off {
476 // force filenode to recompute f.ptr fields on next
480 return f.ptr.off, nil
483 func (f *file) Truncate(size int64) error {
484 return f.inode.Truncate(size)
487 func (f *file) Write(p []byte) (n int, err error) {
489 return 0, ErrReadOnlyFile
491 n, f.ptr, err = f.inode.Write(p, f.ptr)
495 func (f *file) Readdir(count int) ([]os.FileInfo, error) {
496 if !f.inode.IsDir() {
497 return nil, ErrInvalidOperation
500 return f.inode.Readdir(), nil
502 if f.unreaddirs == nil {
503 f.unreaddirs = f.inode.Readdir()
505 if len(f.unreaddirs) == 0 {
508 if count > len(f.unreaddirs) {
509 count = len(f.unreaddirs)
511 ret := f.unreaddirs[:count]
512 f.unreaddirs = f.unreaddirs[count:]
516 func (f *file) Stat() (os.FileInfo, error) {
520 func (f *file) Close() error {
525 type dirnode struct {
531 inodes map[string]inode
535 // caller must hold dn.Lock().
536 func (dn *dirnode) sync() error {
537 type shortBlock struct {
541 var pending []shortBlock
544 flush := func(sbs []shortBlock) error {
550 for _, sb := range sbs {
551 data := sb.fn.extents[sb.idx].(*memExtent).buf
552 if _, err := hash.Write(data); err != nil {
557 // FIXME: write to keep
558 locator := fmt.Sprintf("%x+%d", hash.Sum(nil), size)
560 for _, sb := range sbs {
561 data := sb.fn.extents[sb.idx].(*memExtent).buf
562 sb.fn.extents[sb.idx] = storedExtent{
574 names := make([]string, 0, len(dn.inodes))
575 for name := range dn.inodes {
576 names = append(names, name)
580 for _, name := range names {
581 fn, ok := dn.inodes[name].(*filenode)
587 for idx, ext := range fn.extents {
588 ext, ok := ext.(*memExtent)
592 if ext.Len() > maxBlockSize/2 {
593 if err := flush([]shortBlock{{fn, idx}}); err != nil {
598 if pendingLen+ext.Len() > maxBlockSize {
599 if err := flush(pending); err != nil {
605 pending = append(pending, shortBlock{fn, idx})
606 pendingLen += ext.Len()
609 return flush(pending)
612 func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
615 if err := dn.sync(); err != nil {
620 type m1segment struct {
625 var segments []m1segment
629 names := make([]string, 0, len(dn.inodes))
630 for name := range dn.inodes {
631 names = append(names, name)
635 for _, name := range names {
636 node := dn.inodes[name]
637 switch node := node.(type) {
639 subdir, err := node.MarshalManifest(prefix + "/" + node.Name())
643 subdirs = subdirs + subdir
645 for _, e := range node.extents {
646 switch e := e.(type) {
648 blocks = append(blocks, fmt.Sprintf("FIXME+%d", e.Len()))
649 segments = append(segments, m1segment{
652 length: int64(e.Len()),
654 streamLen += int64(e.Len())
656 if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
657 streamLen -= int64(e.size)
659 blocks = append(blocks, e.locator)
661 segments = append(segments, m1segment{
663 offset: streamLen + int64(e.offset),
664 length: int64(e.length),
666 streamLen += int64(e.size)
668 panic(fmt.Sprintf("can't marshal extent type %T", e))
672 panic(fmt.Sprintf("can't marshal inode type %T", node))
675 var filetokens []string
676 for _, s := range segments {
677 filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, s.name))
679 if len(filetokens) == 0 {
681 } else if len(blocks) == 0 {
682 blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
684 return prefix + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
687 func (dn *dirnode) loadManifest(txt string) {
690 for _, stream := range strings.Split(txt, "\n") {
691 var extents []storedExtent
692 for i, token := range strings.Split(stream, " ") {
694 dirname = manifestUnescape(token)
697 if !strings.Contains(token, ":") {
698 toks := strings.SplitN(token, "+", 3)
703 length, err := strconv.ParseInt(toks[1], 10, 32)
704 if err != nil || length < 0 {
708 extents = append(extents, storedExtent{
716 toks := strings.Split(token, ":")
718 // FIXME: broken manifest
721 offset, err := strconv.ParseInt(toks[0], 10, 64)
722 if err != nil || offset < 0 {
723 // FIXME: broken manifest
726 length, err := strconv.ParseInt(toks[1], 10, 64)
727 if err != nil || length < 0 {
728 // FIXME: broken manifest
731 name := path.Clean(dirname + "/" + manifestUnescape(toks[2]))
732 dn.makeParentDirs(name)
733 f, err := dn.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0700)
738 if f.inode.Stat().IsDir() {
740 // FIXME: broken manifest
743 // Map the stream offset/range coordinates to
744 // block/offset/range coordinates and add
745 // corresponding storedExtents to the filenode
747 for _, e := range extents {
748 next := pos + int64(e.Len())
753 if pos > offset+length {
758 blkOff = int(offset - pos)
760 blkLen := e.Len() - blkOff
761 if pos+int64(blkOff+blkLen) > offset+length {
762 blkLen = int(offset + length - pos - int64(blkOff))
764 f.inode.(*filenode).appendExtent(storedExtent{
778 func (dn *dirnode) makeParentDirs(name string) (err error) {
779 names := strings.Split(name, "/")
780 for _, name := range names[:len(names)-1] {
781 f, err := dn.mkdir(name)
787 dn, ok = f.inode.(*dirnode)
795 func (dn *dirnode) mkdir(name string) (*file, error) {
796 return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
799 func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
800 f, err := dn.mkdir(name)
807 func (dn *dirnode) Remove(name string) error {
808 dirname, name := path.Split(name)
809 if name == "" || name == "." || name == ".." {
810 return ErrInvalidOperation
812 dn, ok := dn.lookupPath(dirname).(*dirnode)
814 return os.ErrNotExist
818 switch node := dn.inodes[name].(type) {
820 return os.ErrNotExist
824 if len(node.inodes) > 0 {
825 return ErrDirectoryNotEmpty
828 delete(dn.inodes, name)
832 func (dn *dirnode) Parent() inode {
838 func (dn *dirnode) Readdir() (fi []os.FileInfo) {
841 fi = make([]os.FileInfo, 0, len(dn.inodes))
842 for _, inode := range dn.inodes {
843 fi = append(fi, inode.Stat())
848 func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
849 return 0, ptr, ErrInvalidOperation
852 func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
853 return 0, ptr, ErrInvalidOperation
856 func (dn *dirnode) Truncate(int64) error {
857 return ErrInvalidOperation
860 // lookupPath returns the inode for the file/directory with the given
861 // name (which may contain "/" separators), along with its parent
862 // node. If no such file/directory exists, the returned node is nil.
863 func (dn *dirnode) lookupPath(path string) (node inode) {
865 for _, name := range strings.Split(path, "/") {
866 dn, ok := node.(*dirnode)
870 if name == "." || name == "" {
878 node = dn.inodes[name]
884 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
885 dirname, name := path.Split(name)
886 dn, ok := dn.lookupPath(dirname).(*dirnode)
888 return nil, os.ErrNotExist
890 writeMode := flag&(os.O_RDWR|os.O_WRONLY|os.O_CREATE) != 0
892 // A directory can be opened via "foo/", "foo/.", or
896 return &file{inode: dn}, nil
898 return &file{inode: dn.Parent()}, nil
901 createMode := flag&os.O_CREATE != 0
909 n, ok := dn.inodes[name]
912 return nil, os.ErrNotExist
921 mode: os.ModeDir | 0755,
933 if dn.inodes == nil {
934 dn.inodes = make(map[string]inode)
938 } else if flag&os.O_EXCL != 0 {
939 return nil, ErrFileExists
943 append: flag&os.O_APPEND != 0,
944 writable: flag&(os.O_WRONLY|os.O_RDWR) != 0,
948 type extent interface {
951 // Return a new extent with a subsection of the data from this
952 // one. length<0 means length=Len()-off.
953 Slice(off int, length int) extent
956 type writableExtent interface {
958 WriteAt(p []byte, off int)
962 type memExtent struct {
966 func (me *memExtent) Len() int {
970 func (me *memExtent) Slice(off, length int) extent {
972 length = len(me.buf) - off
974 buf := make([]byte, length)
975 copy(buf, me.buf[off:])
976 return &memExtent{buf: buf}
979 func (me *memExtent) Truncate(n int) {
983 newsize = newsize << 2
985 newbuf := make([]byte, n, newsize)
989 // Zero unused part when shrinking, in case we grow
990 // and start using it again later.
991 for i := n; i < len(me.buf); i++ {
998 func (me *memExtent) WriteAt(p []byte, off int) {
999 if off+len(p) > len(me.buf) {
1000 panic("overflowed extent")
1002 copy(me.buf[off:], p)
1005 func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
1006 if off > int64(me.Len()) {
1010 n = copy(p, me.buf[int(off):])
1017 type storedExtent struct {
1025 func (se storedExtent) Len() int {
1029 func (se storedExtent) Slice(n, size int) extent {
1032 if size >= 0 && se.length > size {
1038 func (se storedExtent) ReadAt(p []byte, off int64) (n int, err error) {
1039 if off > int64(se.length) {
1042 maxlen := se.length - int(off)
1043 if len(p) > maxlen {
1045 n, err = se.cache.ReadAt(se.locator, p, int(off)+se.offset)
1051 return se.cache.ReadAt(se.locator, p, int(off)+se.offset)
1054 type blockCache interface {
1055 ReadAt(locator string, p []byte, off int) (n int, err error)
1058 type keepBlockCache struct {
1062 var scratch = make([]byte, 2<<26)
1064 func (kbc *keepBlockCache) ReadAt(locator string, p []byte, off int) (int, error) {
1065 return kbc.kc.ReadAt(locator, p, off)
1068 func canonicalName(name string) string {
1069 name = path.Clean("/" + name)
1070 if name == "/" || name == "./" {
1072 } else if strings.HasPrefix(name, "/") {
1078 var manifestEscapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
1080 func manifestUnescapeSeq(seq string) string {
1084 i, err := strconv.ParseUint(seq[1:], 8, 8)
1086 // Invalid escape sequence: can't unescape.
1089 return string([]byte{byte(i)})
1092 func manifestUnescape(s string) string {
1093 return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeSeq)