1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
24 ErrReadOnlyFile = errors.New("read-only file")
25 ErrNegativeOffset = errors.New("cannot seek to negative offset")
26 ErrFileExists = errors.New("file exists")
27 ErrInvalidOperation = errors.New("invalid operation")
28 ErrDirectoryNotEmpty = errors.New("directory not empty")
29 ErrPermission = os.ErrPermission
31 maxBlockSize = 1 << 26
40 Readdir(int) ([]os.FileInfo, error)
41 Stat() (os.FileInfo, error)
45 type keepClient interface {
46 ReadAt(locator string, p []byte, off int) (int, error)
49 type fileinfo struct {
56 // Name implements os.FileInfo.
57 func (fi fileinfo) Name() string {
61 // ModTime implements os.FileInfo.
62 func (fi fileinfo) ModTime() time.Time {
66 // Mode implements os.FileInfo.
67 func (fi fileinfo) Mode() os.FileMode {
71 // IsDir implements os.FileInfo.
72 func (fi fileinfo) IsDir() bool {
73 return fi.mode&os.ModeDir != 0
76 // Size implements os.FileInfo.
77 func (fi fileinfo) Size() int64 {
81 // Sys implements os.FileInfo.
82 func (fi fileinfo) Sys() interface{} {
86 func (fi fileinfo) Stat() os.FileInfo {
90 // A CollectionFileSystem is an http.Filesystem plus Stat() and
91 // support for opening writable files.
92 type CollectionFileSystem interface {
94 Stat(name string) (os.FileInfo, error)
95 Create(name string) (File, error)
96 OpenFile(name string, flag int, perm os.FileMode) (File, error)
97 Mkdir(name string, perm os.FileMode) error
98 Remove(name string) error
99 MarshalManifest(string) (string, error)
102 type fileSystem struct {
106 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
107 return fs.dirnode.OpenFile(path.Clean(name), flag, perm)
110 func (fs *fileSystem) Open(name string) (http.File, error) {
111 return fs.dirnode.OpenFile(path.Clean(name), os.O_RDONLY, 0)
114 func (fs *fileSystem) Create(name string) (File, error) {
115 return fs.dirnode.OpenFile(path.Clean(name), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
118 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
119 f, err := fs.OpenFile(name, os.O_RDONLY, 0)
127 type inode interface {
129 OpenFile(string, int, os.FileMode) (*file, error)
131 Read([]byte, filenodePtr) (int, filenodePtr, error)
132 Write([]byte, filenodePtr) (int, filenodePtr, error)
133 Truncate(int64) error
134 Readdir() []os.FileInfo
141 // filenode implements inode.
142 type filenode struct {
146 repacked int64 // number of times anything in []extents has changed len
150 // filenodePtr is an offset into a file that is (usually) efficient to
151 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
152 // then filenode.extents[filenodePtr.extentIdx][filenodePtr.extentOff]
153 // corresponds to file offset filenodePtr.off. Otherwise, it is
154 // necessary to reexamine len(filenode.extents[0]) etc. to find the
155 // correct extent and offset.
156 type filenodePtr struct {
163 // seek returns a ptr that is consistent with both startPtr.off and
164 // the current state of fn. The caller must already hold fn.RLock() or
167 // If startPtr points beyond the end of the file, ptr will point to
168 // exactly the end of the file.
172 // ptr.extentIdx == len(filenode.extents) // i.e., at EOF
174 // filenode.extents[ptr.extentIdx].Len() >= ptr.extentOff
175 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
178 // meaningless anyway
180 } else if ptr.off >= fn.fileinfo.size {
181 ptr.off = fn.fileinfo.size
182 ptr.extentIdx = len(fn.extents)
184 ptr.repacked = fn.repacked
186 } else if ptr.repacked == fn.repacked {
187 // extentIdx and extentOff accurately reflect ptr.off,
188 // but might have fallen off the end of an extent
189 if ptr.extentOff >= fn.extents[ptr.extentIdx].Len() {
196 ptr.repacked = fn.repacked
198 if ptr.off >= fn.fileinfo.size {
199 ptr.extentIdx, ptr.extentOff = len(fn.extents), 0
202 // Recompute extentIdx and extentOff. We have already
203 // established fn.fileinfo.size > ptr.off >= 0, so we don't
204 // have to deal with edge cases here.
206 for ptr.extentIdx, ptr.extentOff = 0, 0; off < ptr.off; ptr.extentIdx++ {
207 // This would panic (index out of range) if
208 // fn.fileinfo.size were larger than
209 // sum(fn.extents[i].Len()) -- but that can't happen
210 // because we have ensured fn.fileinfo.size is always
212 extLen := int64(fn.extents[ptr.extentIdx].Len())
213 if off+extLen > ptr.off {
214 ptr.extentOff = int(ptr.off - off)
222 func (fn *filenode) appendExtent(e extent) {
225 fn.extents = append(fn.extents, e)
226 fn.fileinfo.size += int64(e.Len())
229 func (fn *filenode) OpenFile(string, int, os.FileMode) (*file, error) {
230 return nil, os.ErrNotExist
233 func (fn *filenode) Parent() inode {
237 func (fn *filenode) Readdir() []os.FileInfo {
241 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
244 ptr = fn.seek(startPtr)
246 err = ErrNegativeOffset
249 if ptr.extentIdx >= len(fn.extents) {
253 n, err = fn.extents[ptr.extentIdx].ReadAt(p, int64(ptr.extentOff))
257 if ptr.extentOff == fn.extents[ptr.extentIdx].Len() {
260 if ptr.extentIdx < len(fn.extents) && err == io.EOF {
268 func (fn *filenode) Truncate(size int64) error {
271 if size < fn.fileinfo.size {
272 ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
273 if ptr.extentOff == 0 {
274 fn.extents = fn.extents[:ptr.extentIdx]
276 fn.extents = fn.extents[:ptr.extentIdx+1]
277 e := fn.extents[ptr.extentIdx]
278 if e, ok := e.(writableExtent); ok {
279 e.Truncate(ptr.extentOff)
281 fn.extents[ptr.extentIdx] = e.Slice(0, ptr.extentOff)
284 fn.fileinfo.size = size
288 for size > fn.fileinfo.size {
289 grow := size - fn.fileinfo.size
292 if len(fn.extents) == 0 {
294 fn.extents = append(fn.extents, e)
295 } else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
297 fn.extents = append(fn.extents, e)
301 if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
304 e.Truncate(e.Len() + int(grow))
305 fn.fileinfo.size += grow
310 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
313 ptr = fn.seek(startPtr)
315 err = ErrNegativeOffset
318 for len(p) > 0 && err == nil {
320 if len(cando) > maxBlockSize {
321 cando = cando[:maxBlockSize]
323 // Rearrange/grow fn.extents (and shrink cando if
324 // needed) such that cando can be copied to
325 // fn.extents[ptr.extentIdx] at offset ptr.extentOff.
327 prev := ptr.extentIdx - 1
329 if cur < len(fn.extents) {
330 _, curWritable = fn.extents[cur].(writableExtent)
332 var prevAppendable bool
333 if prev >= 0 && fn.extents[prev].Len() < maxBlockSize {
334 _, prevAppendable = fn.extents[prev].(writableExtent)
336 if ptr.extentOff > 0 && !curWritable {
337 // Split a non-writable block.
338 if max := fn.extents[cur].Len() - ptr.extentOff; max <= len(cando) {
339 // Truncate cur, and insert a new
342 fn.extents = append(fn.extents, nil)
343 copy(fn.extents[cur+1:], fn.extents[cur:])
345 // Split cur into two copies, truncate
346 // the one on the left, shift the one
347 // on the right, and insert a new
348 // extent between them.
349 fn.extents = append(fn.extents, nil, nil)
350 copy(fn.extents[cur+2:], fn.extents[cur:])
351 fn.extents[cur+2] = fn.extents[cur+2].Slice(ptr.extentOff+len(cando), -1)
356 e.Truncate(len(cando))
358 fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
363 } else if curWritable {
364 if fit := int(fn.extents[cur].Len()) - ptr.extentOff; fit < len(cando) {
369 // Shrink cando if needed to fit in prev extent.
370 if cangrow := maxBlockSize - fn.extents[prev].Len(); cangrow < len(cando) {
371 cando = cando[:cangrow]
375 if cur == len(fn.extents) {
376 // ptr is at EOF, filesize is changing.
377 fn.fileinfo.size += int64(len(cando))
378 } else if el := fn.extents[cur].Len(); el <= len(cando) {
379 // cando is long enough that we won't
380 // need cur any more. shrink cando to
381 // be exactly as long as cur
382 // (otherwise we'd accidentally shift
383 // the effective position of all
384 // extents after cur).
386 copy(fn.extents[cur:], fn.extents[cur+1:])
387 fn.extents = fn.extents[:len(fn.extents)-1]
389 // shrink cur by the same #bytes we're growing prev
390 fn.extents[cur] = fn.extents[cur].Slice(len(cando), -1)
396 ptr.extentOff = fn.extents[prev].Len()
397 fn.extents[prev].(writableExtent).Truncate(ptr.extentOff + len(cando))
401 // Insert an extent between prev and cur, and advance prev/cur.
402 fn.extents = append(fn.extents, nil)
403 if cur < len(fn.extents) {
404 copy(fn.extents[cur+1:], fn.extents[cur:])
408 // appending a new extent does
409 // not invalidate any ptrs
412 e.Truncate(len(cando))
419 // Finally we can copy bytes from cando to the current extent.
420 fn.extents[ptr.extentIdx].(writableExtent).WriteAt(cando, ptr.extentOff)
424 ptr.off += int64(len(cando))
425 ptr.extentOff += len(cando)
426 if fn.extents[ptr.extentIdx].Len() == ptr.extentOff {
434 // FileSystem returns a CollectionFileSystem for the collection.
435 func (c *Collection) FileSystem(client *Client, kc keepClient) CollectionFileSystem {
436 fs := &fileSystem{dirnode: dirnode{
437 cache: &keepBlockCache{kc: kc},
440 fileinfo: fileinfo{name: ".", mode: os.ModeDir | 0755},
442 inodes: make(map[string]inode),
444 fs.dirnode.parent = &fs.dirnode
445 fs.dirnode.loadManifest(c.ManifestText)
454 unreaddirs []os.FileInfo
457 func (f *file) Read(p []byte) (n int, err error) {
458 n, f.ptr, err = f.inode.Read(p, f.ptr)
462 func (f *file) Seek(off int64, whence int) (pos int64, err error) {
463 size := f.inode.Size()
474 return f.ptr.off, ErrNegativeOffset
479 if ptr.off != f.ptr.off {
481 // force filenode to recompute f.ptr fields on next
485 return f.ptr.off, nil
488 func (f *file) Truncate(size int64) error {
489 return f.inode.Truncate(size)
492 func (f *file) Write(p []byte) (n int, err error) {
494 return 0, ErrReadOnlyFile
496 n, f.ptr, err = f.inode.Write(p, f.ptr)
500 func (f *file) Readdir(count int) ([]os.FileInfo, error) {
501 if !f.inode.IsDir() {
502 return nil, ErrInvalidOperation
505 return f.inode.Readdir(), nil
507 if f.unreaddirs == nil {
508 f.unreaddirs = f.inode.Readdir()
510 if len(f.unreaddirs) == 0 {
513 if count > len(f.unreaddirs) {
514 count = len(f.unreaddirs)
516 ret := f.unreaddirs[:count]
517 f.unreaddirs = f.unreaddirs[count:]
521 func (f *file) Stat() (os.FileInfo, error) {
525 func (f *file) Close() error {
530 func (f *file) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
531 return f.inode.OpenFile(name, flag, perm)
534 type dirnode struct {
540 inodes map[string]inode
544 // caller must hold dn.Lock().
545 func (dn *dirnode) sync() error {
546 type shortBlock struct {
550 var pending []shortBlock
553 flush := func(sbs []shortBlock) error {
559 for _, sb := range sbs {
560 data := sb.fn.extents[sb.idx].(*memExtent).buf
561 if _, err := hash.Write(data); err != nil {
566 // FIXME: write to keep
567 locator := fmt.Sprintf("%x+%d", hash.Sum(nil), size)
569 for _, sb := range sbs {
570 data := sb.fn.extents[sb.idx].(*memExtent).buf
571 sb.fn.extents[sb.idx] = storedExtent{
583 names := make([]string, 0, len(dn.inodes))
584 for name := range dn.inodes {
585 names = append(names, name)
589 for _, name := range names {
590 fn, ok := dn.inodes[name].(*filenode)
596 for idx, ext := range fn.extents {
597 ext, ok := ext.(*memExtent)
601 if ext.Len() > maxBlockSize/2 {
602 if err := flush([]shortBlock{{fn, idx}}); err != nil {
607 if pendingLen+ext.Len() > maxBlockSize {
608 if err := flush(pending); err != nil {
614 pending = append(pending, shortBlock{fn, idx})
615 pendingLen += ext.Len()
618 return flush(pending)
621 func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
624 if err := dn.sync(); err != nil {
629 type m1segment struct {
634 var segments []m1segment
638 names := make([]string, 0, len(dn.inodes))
639 for name := range dn.inodes {
640 names = append(names, name)
644 for _, name := range names {
645 node := dn.inodes[name]
646 switch node := node.(type) {
648 subdir, err := node.MarshalManifest(prefix + "/" + node.Name())
652 subdirs = subdirs + subdir
654 for _, e := range node.extents {
655 switch e := e.(type) {
657 blocks = append(blocks, fmt.Sprintf("FIXME+%d", e.Len()))
658 segments = append(segments, m1segment{
661 length: int64(e.Len()),
663 streamLen += int64(e.Len())
665 if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
666 streamLen -= int64(e.size)
668 blocks = append(blocks, e.locator)
670 segments = append(segments, m1segment{
672 offset: streamLen + int64(e.offset),
673 length: int64(e.length),
675 streamLen += int64(e.size)
677 panic(fmt.Sprintf("can't marshal extent type %T", e))
681 panic(fmt.Sprintf("can't marshal inode type %T", node))
684 var filetokens []string
685 for _, s := range segments {
686 filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, s.name))
688 if len(filetokens) == 0 {
690 } else if len(blocks) == 0 {
691 blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
693 return prefix + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
696 func (dn *dirnode) loadManifest(txt string) {
699 for _, stream := range strings.Split(txt, "\n") {
700 var extents []storedExtent
701 for i, token := range strings.Split(stream, " ") {
703 dirname = manifestUnescape(token)
706 if !strings.Contains(token, ":") {
707 toks := strings.SplitN(token, "+", 3)
712 length, err := strconv.ParseInt(toks[1], 10, 32)
713 if err != nil || length < 0 {
717 extents = append(extents, storedExtent{
725 toks := strings.Split(token, ":")
727 // FIXME: broken manifest
730 offset, err := strconv.ParseInt(toks[0], 10, 64)
731 if err != nil || offset < 0 {
732 // FIXME: broken manifest
735 length, err := strconv.ParseInt(toks[1], 10, 64)
736 if err != nil || length < 0 {
737 // FIXME: broken manifest
740 name := path.Clean(dirname + "/" + manifestUnescape(toks[2]))
741 dn.makeParentDirs(name)
742 f, err := dn.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0700)
747 if f.inode.Stat().IsDir() {
749 // FIXME: broken manifest
752 // Map the stream offset/range coordinates to
753 // block/offset/range coordinates and add
754 // corresponding storedExtents to the filenode
756 for _, e := range extents {
757 next := pos + int64(e.Len())
762 if pos > offset+length {
767 blkOff = int(offset - pos)
769 blkLen := e.Len() - blkOff
770 if pos+int64(blkOff+blkLen) > offset+length {
771 blkLen = int(offset + length - pos - int64(blkOff))
773 f.inode.(*filenode).appendExtent(storedExtent{
787 func (dn *dirnode) makeParentDirs(name string) (err error) {
788 names := strings.Split(name, "/")
789 for _, name := range names[:len(names)-1] {
790 f, err := dn.mkdir(name)
796 dn, ok = f.inode.(*dirnode)
804 func (dn *dirnode) mkdir(name string) (*file, error) {
805 return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
808 func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
809 f, err := dn.mkdir(name)
816 func (dn *dirnode) Remove(name string) error {
817 dirname, name := path.Split(name)
818 if name == "" || name == "." || name == ".." {
819 return ErrInvalidOperation
821 dn, ok := dn.lookupPath(dirname).(*dirnode)
823 return os.ErrNotExist
827 switch node := dn.inodes[name].(type) {
829 return os.ErrNotExist
833 if len(node.inodes) > 0 {
834 return ErrDirectoryNotEmpty
837 delete(dn.inodes, name)
841 func (dn *dirnode) Parent() inode {
847 func (dn *dirnode) Readdir() (fi []os.FileInfo) {
850 fi = make([]os.FileInfo, 0, len(dn.inodes))
851 for _, inode := range dn.inodes {
852 fi = append(fi, inode.Stat())
857 func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
858 return 0, ptr, ErrInvalidOperation
861 func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
862 return 0, ptr, ErrInvalidOperation
865 func (dn *dirnode) Truncate(int64) error {
866 return ErrInvalidOperation
869 // lookupPath returns the inode for the file/directory with the given
870 // name (which may contain "/" separators), along with its parent
871 // node. If no such file/directory exists, the returned node is nil.
872 func (dn *dirnode) lookupPath(path string) (node inode) {
874 for _, name := range strings.Split(path, "/") {
875 dn, ok := node.(*dirnode)
879 if name == "." || name == "" {
887 node = dn.inodes[name]
893 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
894 dirname, name := path.Split(name)
895 dn, ok := dn.lookupPath(dirname).(*dirnode)
897 return nil, os.ErrNotExist
899 writeMode := flag&(os.O_RDWR|os.O_WRONLY|os.O_CREATE) != 0
901 // A directory can be opened via "foo/", "foo/.", or
905 return &file{inode: dn}, nil
907 return &file{inode: dn.Parent()}, nil
910 createMode := flag&os.O_CREATE != 0
918 n, ok := dn.inodes[name]
921 return nil, os.ErrNotExist
930 mode: os.ModeDir | 0755,
942 if dn.inodes == nil {
943 dn.inodes = make(map[string]inode)
947 } else if flag&os.O_EXCL != 0 {
948 return nil, ErrFileExists
952 append: flag&os.O_APPEND != 0,
953 writable: flag&(os.O_WRONLY|os.O_RDWR) != 0,
957 type extent interface {
960 // Return a new extent with a subsection of the data from this
961 // one. length<0 means length=Len()-off.
962 Slice(off int, length int) extent
965 type writableExtent interface {
967 WriteAt(p []byte, off int)
971 type memExtent struct {
975 func (me *memExtent) Len() int {
979 func (me *memExtent) Slice(off, length int) extent {
981 length = len(me.buf) - off
983 buf := make([]byte, length)
984 copy(buf, me.buf[off:])
985 return &memExtent{buf: buf}
988 func (me *memExtent) Truncate(n int) {
992 newsize = newsize << 2
994 newbuf := make([]byte, n, newsize)
998 // Zero unused part when shrinking, in case we grow
999 // and start using it again later.
1000 for i := n; i < len(me.buf); i++ {
1007 func (me *memExtent) WriteAt(p []byte, off int) {
1008 if off+len(p) > len(me.buf) {
1009 panic("overflowed extent")
1011 copy(me.buf[off:], p)
1014 func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
1015 if off > int64(me.Len()) {
1019 n = copy(p, me.buf[int(off):])
1026 type storedExtent struct {
1034 func (se storedExtent) Len() int {
1038 func (se storedExtent) Slice(n, size int) extent {
1041 if size >= 0 && se.length > size {
1047 func (se storedExtent) ReadAt(p []byte, off int64) (n int, err error) {
1048 if off > int64(se.length) {
1051 maxlen := se.length - int(off)
1052 if len(p) > maxlen {
1054 n, err = se.cache.ReadAt(se.locator, p, int(off)+se.offset)
1060 return se.cache.ReadAt(se.locator, p, int(off)+se.offset)
1063 type blockCache interface {
1064 ReadAt(locator string, p []byte, off int) (n int, err error)
1067 type keepBlockCache struct {
1071 var scratch = make([]byte, 2<<26)
1073 func (kbc *keepBlockCache) ReadAt(locator string, p []byte, off int) (int, error) {
1074 return kbc.kc.ReadAt(locator, p, off)
1077 func canonicalName(name string) string {
1078 name = path.Clean("/" + name)
1079 if name == "/" || name == "./" {
1081 } else if strings.HasPrefix(name, "/") {
1087 var manifestEscapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
1089 func manifestUnescapeSeq(seq string) string {
1093 i, err := strconv.ParseUint(seq[1:], 8, 8)
1095 // Invalid escape sequence: can't unescape.
1098 return string([]byte{byte(i)})
1101 func manifestUnescape(s string) string {
1102 return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeSeq)