1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
23 ErrReadOnlyFile = errors.New("read-only file")
24 ErrNegativeOffset = errors.New("cannot seek to negative offset")
25 ErrFileExists = errors.New("file exists")
26 ErrInvalidOperation = errors.New("invalid operation")
27 ErrDirectoryNotEmpty = errors.New("directory not empty")
28 ErrPermission = os.ErrPermission
30 maxBlockSize = 1 << 26
39 Readdir(int) ([]os.FileInfo, error)
40 Stat() (os.FileInfo, error)
44 type keepClient interface {
45 ReadAt(locator string, p []byte, off int) (int, error)
46 PutB(p []byte) (string, int, error)
49 type fileinfo struct {
56 // Name implements os.FileInfo.
57 func (fi fileinfo) Name() string {
61 // ModTime implements os.FileInfo.
62 func (fi fileinfo) ModTime() time.Time {
66 // Mode implements os.FileInfo.
67 func (fi fileinfo) Mode() os.FileMode {
71 // IsDir implements os.FileInfo.
72 func (fi fileinfo) IsDir() bool {
73 return fi.mode&os.ModeDir != 0
76 // Size implements os.FileInfo.
77 func (fi fileinfo) Size() int64 {
81 // Sys implements os.FileInfo.
82 func (fi fileinfo) Sys() interface{} {
86 func (fi fileinfo) Stat() os.FileInfo {
90 // A CollectionFileSystem is an http.Filesystem plus Stat() and
91 // support for opening writable files.
92 type CollectionFileSystem interface {
94 Stat(name string) (os.FileInfo, error)
95 Create(name string) (File, error)
96 OpenFile(name string, flag int, perm os.FileMode) (File, error)
97 Mkdir(name string, perm os.FileMode) error
98 Remove(name string) error
99 MarshalManifest(string) (string, error)
102 type fileSystem struct {
106 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
107 return fs.dirnode.OpenFile(path.Clean(name), flag, perm)
110 func (fs *fileSystem) Open(name string) (http.File, error) {
111 return fs.dirnode.OpenFile(path.Clean(name), os.O_RDONLY, 0)
114 func (fs *fileSystem) Create(name string) (File, error) {
115 return fs.dirnode.OpenFile(path.Clean(name), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
118 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
119 f, err := fs.OpenFile(name, os.O_RDONLY, 0)
127 type inode interface {
130 Read([]byte, filenodePtr) (int, filenodePtr, error)
131 Write([]byte, filenodePtr) (int, filenodePtr, error)
132 Truncate(int64) error
133 Readdir() []os.FileInfo
140 // filenode implements inode.
141 type filenode struct {
145 repacked int64 // number of times anything in []extents has changed len
149 // filenodePtr is an offset into a file that is (usually) efficient to
150 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
151 // then filenode.extents[filenodePtr.extentIdx][filenodePtr.extentOff]
152 // corresponds to file offset filenodePtr.off. Otherwise, it is
153 // necessary to reexamine len(filenode.extents[0]) etc. to find the
154 // correct extent and offset.
155 type filenodePtr struct {
162 // seek returns a ptr that is consistent with both startPtr.off and
163 // the current state of fn. The caller must already hold fn.RLock() or
166 // If startPtr points beyond the end of the file, ptr will point to
167 // exactly the end of the file.
171 // ptr.extentIdx == len(filenode.extents) // i.e., at EOF
173 // filenode.extents[ptr.extentIdx].Len() >= ptr.extentOff
174 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
177 // meaningless anyway
179 } else if ptr.off >= fn.fileinfo.size {
180 ptr.off = fn.fileinfo.size
181 ptr.extentIdx = len(fn.extents)
183 ptr.repacked = fn.repacked
185 } else if ptr.repacked == fn.repacked {
186 // extentIdx and extentOff accurately reflect ptr.off,
187 // but might have fallen off the end of an extent
188 if ptr.extentOff >= fn.extents[ptr.extentIdx].Len() {
195 ptr.repacked = fn.repacked
197 if ptr.off >= fn.fileinfo.size {
198 ptr.extentIdx, ptr.extentOff = len(fn.extents), 0
201 // Recompute extentIdx and extentOff. We have already
202 // established fn.fileinfo.size > ptr.off >= 0, so we don't
203 // have to deal with edge cases here.
205 for ptr.extentIdx, ptr.extentOff = 0, 0; off < ptr.off; ptr.extentIdx++ {
206 // This would panic (index out of range) if
207 // fn.fileinfo.size were larger than
208 // sum(fn.extents[i].Len()) -- but that can't happen
209 // because we have ensured fn.fileinfo.size is always
211 extLen := int64(fn.extents[ptr.extentIdx].Len())
212 if off+extLen > ptr.off {
213 ptr.extentOff = int(ptr.off - off)
221 func (fn *filenode) appendExtent(e extent) {
224 fn.extents = append(fn.extents, e)
225 fn.fileinfo.size += int64(e.Len())
228 func (fn *filenode) Parent() inode {
232 func (fn *filenode) Readdir() []os.FileInfo {
236 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
239 ptr = fn.seek(startPtr)
241 err = ErrNegativeOffset
244 if ptr.extentIdx >= len(fn.extents) {
248 n, err = fn.extents[ptr.extentIdx].ReadAt(p, int64(ptr.extentOff))
252 if ptr.extentOff == fn.extents[ptr.extentIdx].Len() {
255 if ptr.extentIdx < len(fn.extents) && err == io.EOF {
263 func (fn *filenode) Truncate(size int64) error {
266 if size < fn.fileinfo.size {
267 ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
268 if ptr.extentOff == 0 {
269 fn.extents = fn.extents[:ptr.extentIdx]
271 fn.extents = fn.extents[:ptr.extentIdx+1]
272 e := fn.extents[ptr.extentIdx]
273 if e, ok := e.(writableExtent); ok {
274 e.Truncate(ptr.extentOff)
276 fn.extents[ptr.extentIdx] = e.Slice(0, ptr.extentOff)
279 fn.fileinfo.size = size
283 for size > fn.fileinfo.size {
284 grow := size - fn.fileinfo.size
287 if len(fn.extents) == 0 {
289 fn.extents = append(fn.extents, e)
290 } else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
292 fn.extents = append(fn.extents, e)
296 if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
299 e.Truncate(e.Len() + int(grow))
300 fn.fileinfo.size += grow
305 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
308 ptr = fn.seek(startPtr)
310 err = ErrNegativeOffset
313 for len(p) > 0 && err == nil {
315 if len(cando) > maxBlockSize {
316 cando = cando[:maxBlockSize]
318 // Rearrange/grow fn.extents (and shrink cando if
319 // needed) such that cando can be copied to
320 // fn.extents[ptr.extentIdx] at offset ptr.extentOff.
322 prev := ptr.extentIdx - 1
324 if cur < len(fn.extents) {
325 _, curWritable = fn.extents[cur].(writableExtent)
327 var prevAppendable bool
328 if prev >= 0 && fn.extents[prev].Len() < maxBlockSize {
329 _, prevAppendable = fn.extents[prev].(writableExtent)
331 if ptr.extentOff > 0 && !curWritable {
332 // Split a non-writable block.
333 if max := fn.extents[cur].Len() - ptr.extentOff; max <= len(cando) {
334 // Truncate cur, and insert a new
337 fn.extents = append(fn.extents, nil)
338 copy(fn.extents[cur+1:], fn.extents[cur:])
340 // Split cur into two copies, truncate
341 // the one on the left, shift the one
342 // on the right, and insert a new
343 // extent between them.
344 fn.extents = append(fn.extents, nil, nil)
345 copy(fn.extents[cur+2:], fn.extents[cur:])
346 fn.extents[cur+2] = fn.extents[cur+2].Slice(ptr.extentOff+len(cando), -1)
351 e.Truncate(len(cando))
353 fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
358 } else if curWritable {
359 if fit := int(fn.extents[cur].Len()) - ptr.extentOff; fit < len(cando) {
364 // Shrink cando if needed to fit in prev extent.
365 if cangrow := maxBlockSize - fn.extents[prev].Len(); cangrow < len(cando) {
366 cando = cando[:cangrow]
370 if cur == len(fn.extents) {
371 // ptr is at EOF, filesize is changing.
372 fn.fileinfo.size += int64(len(cando))
373 } else if el := fn.extents[cur].Len(); el <= len(cando) {
374 // cando is long enough that we won't
375 // need cur any more. shrink cando to
376 // be exactly as long as cur
377 // (otherwise we'd accidentally shift
378 // the effective position of all
379 // extents after cur).
381 copy(fn.extents[cur:], fn.extents[cur+1:])
382 fn.extents = fn.extents[:len(fn.extents)-1]
384 // shrink cur by the same #bytes we're growing prev
385 fn.extents[cur] = fn.extents[cur].Slice(len(cando), -1)
391 ptr.extentOff = fn.extents[prev].Len()
392 fn.extents[prev].(writableExtent).Truncate(ptr.extentOff + len(cando))
396 // Insert an extent between prev and cur, and advance prev/cur.
397 fn.extents = append(fn.extents, nil)
398 if cur < len(fn.extents) {
399 copy(fn.extents[cur+1:], fn.extents[cur:])
403 // appending a new extent does
404 // not invalidate any ptrs
407 e.Truncate(len(cando))
414 // Finally we can copy bytes from cando to the current extent.
415 fn.extents[ptr.extentIdx].(writableExtent).WriteAt(cando, ptr.extentOff)
419 ptr.off += int64(len(cando))
420 ptr.extentOff += len(cando)
421 if fn.extents[ptr.extentIdx].Len() == ptr.extentOff {
429 // FileSystem returns a CollectionFileSystem for the collection.
430 func (c *Collection) FileSystem(client *Client, kc keepClient) CollectionFileSystem {
431 fs := &fileSystem{dirnode: dirnode{
434 fileinfo: fileinfo{name: ".", mode: os.ModeDir | 0755},
436 inodes: make(map[string]inode),
438 fs.dirnode.parent = &fs.dirnode
439 fs.dirnode.loadManifest(c.ManifestText)
448 unreaddirs []os.FileInfo
451 func (f *file) Read(p []byte) (n int, err error) {
452 n, f.ptr, err = f.inode.Read(p, f.ptr)
456 func (f *file) Seek(off int64, whence int) (pos int64, err error) {
457 size := f.inode.Size()
468 return f.ptr.off, ErrNegativeOffset
473 if ptr.off != f.ptr.off {
475 // force filenode to recompute f.ptr fields on next
479 return f.ptr.off, nil
482 func (f *file) Truncate(size int64) error {
483 return f.inode.Truncate(size)
486 func (f *file) Write(p []byte) (n int, err error) {
488 return 0, ErrReadOnlyFile
490 n, f.ptr, err = f.inode.Write(p, f.ptr)
494 func (f *file) Readdir(count int) ([]os.FileInfo, error) {
495 if !f.inode.IsDir() {
496 return nil, ErrInvalidOperation
499 return f.inode.Readdir(), nil
501 if f.unreaddirs == nil {
502 f.unreaddirs = f.inode.Readdir()
504 if len(f.unreaddirs) == 0 {
507 if count > len(f.unreaddirs) {
508 count = len(f.unreaddirs)
510 ret := f.unreaddirs[:count]
511 f.unreaddirs = f.unreaddirs[count:]
515 func (f *file) Stat() (os.FileInfo, error) {
519 func (f *file) Close() error {
524 type dirnode struct {
529 inodes map[string]inode
533 // caller must hold dn.Lock().
534 func (dn *dirnode) sync() error {
535 type shortBlock struct {
539 var pending []shortBlock
542 flush := func(sbs []shortBlock) error {
546 block := make([]byte, 0, maxBlockSize)
547 for _, sb := range sbs {
548 block = append(block, sb.fn.extents[sb.idx].(*memExtent).buf...)
550 locator, _, err := dn.kc.PutB(block)
555 for _, sb := range sbs {
556 data := sb.fn.extents[sb.idx].(*memExtent).buf
557 sb.fn.extents[sb.idx] = storedExtent{
569 names := make([]string, 0, len(dn.inodes))
570 for name := range dn.inodes {
571 names = append(names, name)
575 for _, name := range names {
576 fn, ok := dn.inodes[name].(*filenode)
582 for idx, ext := range fn.extents {
583 ext, ok := ext.(*memExtent)
587 if ext.Len() > maxBlockSize/2 {
588 if err := flush([]shortBlock{{fn, idx}}); err != nil {
593 if pendingLen+ext.Len() > maxBlockSize {
594 if err := flush(pending); err != nil {
600 pending = append(pending, shortBlock{fn, idx})
601 pendingLen += ext.Len()
604 return flush(pending)
607 func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
610 return dn.marshalManifest(prefix)
613 // caller must have read lock.
614 func (dn *dirnode) marshalManifest(prefix string) (string, error) {
616 type m1segment struct {
621 var segments []m1segment
625 if err := dn.sync(); err != nil {
629 names := make([]string, 0, len(dn.inodes))
630 for name, node := range dn.inodes {
631 names = append(names, name)
637 for _, name := range names {
638 node := dn.inodes[name]
639 switch node := node.(type) {
641 subdir, err := node.marshalManifest(prefix + "/" + node.Name())
645 subdirs = subdirs + subdir
647 for _, e := range node.extents {
648 switch e := e.(type) {
650 if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
651 streamLen -= int64(e.size)
653 blocks = append(blocks, e.locator)
655 segments = append(segments, m1segment{
657 offset: streamLen + int64(e.offset),
658 length: int64(e.length),
660 streamLen += int64(e.size)
662 // This can't happen: we
663 // haven't unlocked since
665 panic(fmt.Sprintf("can't marshal extent type %T", e))
669 panic(fmt.Sprintf("can't marshal inode type %T", node))
672 var filetokens []string
673 for _, s := range segments {
674 filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
676 if len(filetokens) == 0 {
678 } else if len(blocks) == 0 {
679 blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
681 return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
684 func (dn *dirnode) loadManifest(txt string) {
687 for _, stream := range strings.Split(txt, "\n") {
688 var extents []storedExtent
689 for i, token := range strings.Split(stream, " ") {
691 dirname = manifestUnescape(token)
694 if !strings.Contains(token, ":") {
695 toks := strings.SplitN(token, "+", 3)
700 length, err := strconv.ParseInt(toks[1], 10, 32)
701 if err != nil || length < 0 {
705 extents = append(extents, storedExtent{
713 toks := strings.Split(token, ":")
715 // FIXME: broken manifest
718 offset, err := strconv.ParseInt(toks[0], 10, 64)
719 if err != nil || offset < 0 {
720 // FIXME: broken manifest
723 length, err := strconv.ParseInt(toks[1], 10, 64)
724 if err != nil || length < 0 {
725 // FIXME: broken manifest
728 name := path.Clean(dirname + "/" + manifestUnescape(toks[2]))
729 dn.makeParentDirs(name)
730 f, err := dn.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0700)
732 // FIXME: don't panic
733 panic(fmt.Errorf("cannot append to %q: %s", name, err))
735 if f.inode.Stat().IsDir() {
737 // FIXME: don't panic
738 panic(fmt.Errorf("cannot append to %q: is a directory", name))
740 // Map the stream offset/range coordinates to
741 // block/offset/range coordinates and add
742 // corresponding storedExtents to the filenode
744 for _, e := range extents {
745 next := pos + int64(e.Len())
750 if pos > offset+length {
755 blkOff = int(offset - pos)
757 blkLen := e.Len() - blkOff
758 if pos+int64(blkOff+blkLen) > offset+length {
759 blkLen = int(offset + length - pos - int64(blkOff))
761 f.inode.(*filenode).appendExtent(storedExtent{
775 func (dn *dirnode) makeParentDirs(name string) (err error) {
776 names := strings.Split(name, "/")
777 for _, name := range names[:len(names)-1] {
778 f, err := dn.mkdir(name)
784 dn, ok = f.inode.(*dirnode)
792 func (dn *dirnode) mkdir(name string) (*file, error) {
793 return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
796 func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
797 f, err := dn.mkdir(name)
804 func (dn *dirnode) Remove(name string) error {
805 dirname, name := path.Split(name)
806 if name == "" || name == "." || name == ".." {
807 return ErrInvalidOperation
809 dn, ok := dn.lookupPath(dirname).(*dirnode)
811 return os.ErrNotExist
815 switch node := dn.inodes[name].(type) {
817 return os.ErrNotExist
821 if len(node.inodes) > 0 {
822 return ErrDirectoryNotEmpty
825 delete(dn.inodes, name)
829 func (dn *dirnode) Parent() inode {
835 func (dn *dirnode) Readdir() (fi []os.FileInfo) {
838 fi = make([]os.FileInfo, 0, len(dn.inodes))
839 for _, inode := range dn.inodes {
840 fi = append(fi, inode.Stat())
845 func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
846 return 0, ptr, ErrInvalidOperation
849 func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
850 return 0, ptr, ErrInvalidOperation
853 func (dn *dirnode) Truncate(int64) error {
854 return ErrInvalidOperation
857 // lookupPath returns the inode for the file/directory with the given
858 // name (which may contain "/" separators), along with its parent
859 // node. If no such file/directory exists, the returned node is nil.
860 func (dn *dirnode) lookupPath(path string) (node inode) {
862 for _, name := range strings.Split(path, "/") {
863 dn, ok := node.(*dirnode)
867 if name == "." || name == "" {
875 node = dn.inodes[name]
881 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
882 dirname, name := path.Split(name)
883 dn, ok := dn.lookupPath(dirname).(*dirnode)
885 return nil, os.ErrNotExist
887 writeMode := flag&(os.O_RDWR|os.O_WRONLY|os.O_CREATE) != 0
889 // A directory can be opened via "foo/", "foo/.", or
893 return &file{inode: dn}, nil
895 return &file{inode: dn.Parent()}, nil
898 createMode := flag&os.O_CREATE != 0
906 n, ok := dn.inodes[name]
909 return nil, os.ErrNotExist
918 mode: os.ModeDir | 0755,
930 if dn.inodes == nil {
931 dn.inodes = make(map[string]inode)
935 } else if flag&os.O_EXCL != 0 {
936 return nil, ErrFileExists
940 append: flag&os.O_APPEND != 0,
941 writable: flag&(os.O_WRONLY|os.O_RDWR) != 0,
945 type extent interface {
948 // Return a new extent with a subsection of the data from this
949 // one. length<0 means length=Len()-off.
950 Slice(off int, length int) extent
953 type writableExtent interface {
955 WriteAt(p []byte, off int)
959 type memExtent struct {
963 func (me *memExtent) Len() int {
967 func (me *memExtent) Slice(off, length int) extent {
969 length = len(me.buf) - off
971 buf := make([]byte, length)
972 copy(buf, me.buf[off:])
973 return &memExtent{buf: buf}
976 func (me *memExtent) Truncate(n int) {
980 newsize = newsize << 2
982 newbuf := make([]byte, n, newsize)
986 // Zero unused part when shrinking, in case we grow
987 // and start using it again later.
988 for i := n; i < len(me.buf); i++ {
995 func (me *memExtent) WriteAt(p []byte, off int) {
996 if off+len(p) > len(me.buf) {
997 panic("overflowed extent")
999 copy(me.buf[off:], p)
1002 func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
1003 if off > int64(me.Len()) {
1007 n = copy(p, me.buf[int(off):])
1014 type storedExtent struct {
1022 func (se storedExtent) Len() int {
1026 func (se storedExtent) Slice(n, size int) extent {
1029 if size >= 0 && se.length > size {
1035 func (se storedExtent) ReadAt(p []byte, off int64) (n int, err error) {
1036 if off > int64(se.length) {
1039 maxlen := se.length - int(off)
1040 if len(p) > maxlen {
1042 n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1048 return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1051 func canonicalName(name string) string {
1052 name = path.Clean("/" + name)
1053 if name == "/" || name == "./" {
1055 } else if strings.HasPrefix(name, "/") {
1061 var manifestEscapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
1063 func manifestUnescapeFunc(seq string) string {
1067 i, err := strconv.ParseUint(seq[1:], 8, 8)
1069 // Invalid escape sequence: can't unescape.
1072 return string([]byte{byte(i)})
1075 func manifestUnescape(s string) string {
1076 return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
1079 var manifestEscapedChar = regexp.MustCompile(`[^\.\w/]`)
1081 func manifestEscapeFunc(seq string) string {
1082 return fmt.Sprintf("\\%03o", byte(seq[0]))
1085 func manifestEscape(s string) string {
1086 return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)