Add 'sdk/java-v2/' from commit '55f103e336ca9fb8bf1720d2ef4ee8dd4e221118'
[arvados.git] / sdk / go / arvados / fs_collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "io"
12         "os"
13         "path"
14         "regexp"
15         "sort"
16         "strconv"
17         "strings"
18         "sync"
19         "time"
20 )
21
22 var (
23         maxBlockSize      = 1 << 26
24         concurrentWriters = 4 // max goroutines writing to Keep during sync()
25         writeAheadBlocks  = 1 // max background jobs flushing to Keep before blocking writes
26 )
27
28 // A CollectionFileSystem is a FileSystem that can be serialized as a
29 // manifest and stored as a collection.
30 type CollectionFileSystem interface {
31         FileSystem
32
33         // Flush all file data to Keep and return a snapshot of the
34         // filesystem suitable for saving as (Collection)ManifestText.
35         // Prefix (normally ".") is a top level directory, effectively
36         // prepended to all paths in the returned manifest.
37         MarshalManifest(prefix string) (string, error)
38
39         // Total data bytes in all files.
40         Size() int64
41 }
42
43 type collectionFileSystem struct {
44         fileSystem
45         uuid string
46 }
47
48 // FileSystem returns a CollectionFileSystem for the collection.
49 func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) {
50         var modTime time.Time
51         if c.ModifiedAt == nil {
52                 modTime = time.Now()
53         } else {
54                 modTime = *c.ModifiedAt
55         }
56         fs := &collectionFileSystem{
57                 uuid: c.UUID,
58                 fileSystem: fileSystem{
59                         fsBackend: keepBackend{apiClient: client, keepClient: kc},
60                 },
61         }
62         root := &dirnode{
63                 fs: fs,
64                 treenode: treenode{
65                         fileinfo: fileinfo{
66                                 name:    ".",
67                                 mode:    os.ModeDir | 0755,
68                                 modTime: modTime,
69                         },
70                         inodes: make(map[string]inode),
71                 },
72         }
73         root.SetParent(root, ".")
74         if err := root.loadManifest(c.ManifestText); err != nil {
75                 return nil, err
76         }
77         backdateTree(root, modTime)
78         fs.root = root
79         return fs, nil
80 }
81
82 func backdateTree(n inode, modTime time.Time) {
83         switch n := n.(type) {
84         case *filenode:
85                 n.fileinfo.modTime = modTime
86         case *dirnode:
87                 n.fileinfo.modTime = modTime
88                 for _, n := range n.inodes {
89                         backdateTree(n, modTime)
90                 }
91         }
92 }
93
94 func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
95         if name == "" || name == "." || name == ".." {
96                 return nil, ErrInvalidArgument
97         }
98         if perm.IsDir() {
99                 return &dirnode{
100                         fs: fs,
101                         treenode: treenode{
102                                 fileinfo: fileinfo{
103                                         name:    name,
104                                         mode:    perm | os.ModeDir,
105                                         modTime: modTime,
106                                 },
107                                 inodes: make(map[string]inode),
108                         },
109                 }, nil
110         } else {
111                 return &filenode{
112                         fs: fs,
113                         fileinfo: fileinfo{
114                                 name:    name,
115                                 mode:    perm & ^os.ModeDir,
116                                 modTime: modTime,
117                         },
118                 }, nil
119         }
120 }
121
122 func (fs *collectionFileSystem) Sync() error {
123         if fs.uuid == "" {
124                 return nil
125         }
126         txt, err := fs.MarshalManifest(".")
127         if err != nil {
128                 return fmt.Errorf("sync failed: %s", err)
129         }
130         coll := &Collection{
131                 UUID:         fs.uuid,
132                 ManifestText: txt,
133         }
134         err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, fs.UpdateBody(coll), map[string]interface{}{"select": []string{"uuid"}})
135         if err != nil {
136                 return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
137         }
138         return nil
139 }
140
141 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
142         fs.fileSystem.root.Lock()
143         defer fs.fileSystem.root.Unlock()
144         return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters))
145 }
146
147 func (fs *collectionFileSystem) Size() int64 {
148         return fs.fileSystem.root.(*dirnode).TreeSize()
149 }
150
151 // filenodePtr is an offset into a file that is (usually) efficient to
152 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
153 // then
154 // filenode.segments[filenodePtr.segmentIdx][filenodePtr.segmentOff]
155 // corresponds to file offset filenodePtr.off. Otherwise, it is
156 // necessary to reexamine len(filenode.segments[0]) etc. to find the
157 // correct segment and offset.
158 type filenodePtr struct {
159         off        int64
160         segmentIdx int
161         segmentOff int
162         repacked   int64
163 }
164
165 // seek returns a ptr that is consistent with both startPtr.off and
166 // the current state of fn. The caller must already hold fn.RLock() or
167 // fn.Lock().
168 //
169 // If startPtr is beyond EOF, ptr.segment* will indicate precisely
170 // EOF.
171 //
172 // After seeking:
173 //
174 //     ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
175 //     ||
176 //     filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
177 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
178         ptr = startPtr
179         if ptr.off < 0 {
180                 // meaningless anyway
181                 return
182         } else if ptr.off >= fn.fileinfo.size {
183                 ptr.segmentIdx = len(fn.segments)
184                 ptr.segmentOff = 0
185                 ptr.repacked = fn.repacked
186                 return
187         } else if ptr.repacked == fn.repacked {
188                 // segmentIdx and segmentOff accurately reflect
189                 // ptr.off, but might have fallen off the end of a
190                 // segment
191                 if ptr.segmentOff >= fn.segments[ptr.segmentIdx].Len() {
192                         ptr.segmentIdx++
193                         ptr.segmentOff = 0
194                 }
195                 return
196         }
197         defer func() {
198                 ptr.repacked = fn.repacked
199         }()
200         if ptr.off >= fn.fileinfo.size {
201                 ptr.segmentIdx, ptr.segmentOff = len(fn.segments), 0
202                 return
203         }
204         // Recompute segmentIdx and segmentOff.  We have already
205         // established fn.fileinfo.size > ptr.off >= 0, so we don't
206         // have to deal with edge cases here.
207         var off int64
208         for ptr.segmentIdx, ptr.segmentOff = 0, 0; off < ptr.off; ptr.segmentIdx++ {
209                 // This would panic (index out of range) if
210                 // fn.fileinfo.size were larger than
211                 // sum(fn.segments[i].Len()) -- but that can't happen
212                 // because we have ensured fn.fileinfo.size is always
213                 // accurate.
214                 segLen := int64(fn.segments[ptr.segmentIdx].Len())
215                 if off+segLen > ptr.off {
216                         ptr.segmentOff = int(ptr.off - off)
217                         break
218                 }
219                 off += segLen
220         }
221         return
222 }
223
224 // filenode implements inode.
225 type filenode struct {
226         parent   inode
227         fs       FileSystem
228         fileinfo fileinfo
229         segments []segment
230         // number of times `segments` has changed in a
231         // way that might invalidate a filenodePtr
232         repacked int64
233         memsize  int64 // bytes in memSegments
234         sync.RWMutex
235         nullnode
236         throttle *throttle
237 }
238
239 // caller must have lock
240 func (fn *filenode) appendSegment(e segment) {
241         fn.segments = append(fn.segments, e)
242         fn.fileinfo.size += int64(e.Len())
243 }
244
245 func (fn *filenode) SetParent(p inode, name string) {
246         fn.Lock()
247         defer fn.Unlock()
248         fn.parent = p
249         fn.fileinfo.name = name
250 }
251
252 func (fn *filenode) Parent() inode {
253         fn.RLock()
254         defer fn.RUnlock()
255         return fn.parent
256 }
257
258 func (fn *filenode) FS() FileSystem {
259         return fn.fs
260 }
261
262 // Read reads file data from a single segment, starting at startPtr,
263 // into p. startPtr is assumed not to be up-to-date. Caller must have
264 // RLock or Lock.
265 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
266         ptr = fn.seek(startPtr)
267         if ptr.off < 0 {
268                 err = ErrNegativeOffset
269                 return
270         }
271         if ptr.segmentIdx >= len(fn.segments) {
272                 err = io.EOF
273                 return
274         }
275         n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
276         if n > 0 {
277                 ptr.off += int64(n)
278                 ptr.segmentOff += n
279                 if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() {
280                         ptr.segmentIdx++
281                         ptr.segmentOff = 0
282                         if ptr.segmentIdx < len(fn.segments) && err == io.EOF {
283                                 err = nil
284                         }
285                 }
286         }
287         return
288 }
289
290 func (fn *filenode) Size() int64 {
291         fn.RLock()
292         defer fn.RUnlock()
293         return fn.fileinfo.Size()
294 }
295
296 func (fn *filenode) FileInfo() os.FileInfo {
297         fn.RLock()
298         defer fn.RUnlock()
299         return fn.fileinfo
300 }
301
302 func (fn *filenode) Truncate(size int64) error {
303         fn.Lock()
304         defer fn.Unlock()
305         return fn.truncate(size)
306 }
307
308 func (fn *filenode) truncate(size int64) error {
309         if size == fn.fileinfo.size {
310                 return nil
311         }
312         fn.repacked++
313         if size < fn.fileinfo.size {
314                 ptr := fn.seek(filenodePtr{off: size})
315                 for i := ptr.segmentIdx; i < len(fn.segments); i++ {
316                         if seg, ok := fn.segments[i].(*memSegment); ok {
317                                 fn.memsize -= int64(seg.Len())
318                         }
319                 }
320                 if ptr.segmentOff == 0 {
321                         fn.segments = fn.segments[:ptr.segmentIdx]
322                 } else {
323                         fn.segments = fn.segments[:ptr.segmentIdx+1]
324                         switch seg := fn.segments[ptr.segmentIdx].(type) {
325                         case *memSegment:
326                                 seg.Truncate(ptr.segmentOff)
327                                 fn.memsize += int64(seg.Len())
328                         default:
329                                 fn.segments[ptr.segmentIdx] = seg.Slice(0, ptr.segmentOff)
330                         }
331                 }
332                 fn.fileinfo.size = size
333                 return nil
334         }
335         for size > fn.fileinfo.size {
336                 grow := size - fn.fileinfo.size
337                 var seg *memSegment
338                 var ok bool
339                 if len(fn.segments) == 0 {
340                         seg = &memSegment{}
341                         fn.segments = append(fn.segments, seg)
342                 } else if seg, ok = fn.segments[len(fn.segments)-1].(*memSegment); !ok || seg.Len() >= maxBlockSize {
343                         seg = &memSegment{}
344                         fn.segments = append(fn.segments, seg)
345                 }
346                 if maxgrow := int64(maxBlockSize - seg.Len()); maxgrow < grow {
347                         grow = maxgrow
348                 }
349                 seg.Truncate(seg.Len() + int(grow))
350                 fn.fileinfo.size += grow
351                 fn.memsize += grow
352         }
353         return nil
354 }
355
356 // Write writes data from p to the file, starting at startPtr,
357 // extending the file size if necessary. Caller must have Lock.
358 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
359         if startPtr.off > fn.fileinfo.size {
360                 if err = fn.truncate(startPtr.off); err != nil {
361                         return 0, startPtr, err
362                 }
363         }
364         ptr = fn.seek(startPtr)
365         if ptr.off < 0 {
366                 err = ErrNegativeOffset
367                 return
368         }
369         for len(p) > 0 && err == nil {
370                 cando := p
371                 if len(cando) > maxBlockSize {
372                         cando = cando[:maxBlockSize]
373                 }
374                 // Rearrange/grow fn.segments (and shrink cando if
375                 // needed) such that cando can be copied to
376                 // fn.segments[ptr.segmentIdx] at offset
377                 // ptr.segmentOff.
378                 cur := ptr.segmentIdx
379                 prev := ptr.segmentIdx - 1
380                 var curWritable bool
381                 if cur < len(fn.segments) {
382                         _, curWritable = fn.segments[cur].(*memSegment)
383                 }
384                 var prevAppendable bool
385                 if prev >= 0 && fn.segments[prev].Len() < maxBlockSize {
386                         _, prevAppendable = fn.segments[prev].(*memSegment)
387                 }
388                 if ptr.segmentOff > 0 && !curWritable {
389                         // Split a non-writable block.
390                         if max := fn.segments[cur].Len() - ptr.segmentOff; max <= len(cando) {
391                                 // Truncate cur, and insert a new
392                                 // segment after it.
393                                 cando = cando[:max]
394                                 fn.segments = append(fn.segments, nil)
395                                 copy(fn.segments[cur+1:], fn.segments[cur:])
396                         } else {
397                                 // Split cur into two copies, truncate
398                                 // the one on the left, shift the one
399                                 // on the right, and insert a new
400                                 // segment between them.
401                                 fn.segments = append(fn.segments, nil, nil)
402                                 copy(fn.segments[cur+2:], fn.segments[cur:])
403                                 fn.segments[cur+2] = fn.segments[cur+2].Slice(ptr.segmentOff+len(cando), -1)
404                         }
405                         cur++
406                         prev++
407                         seg := &memSegment{}
408                         seg.Truncate(len(cando))
409                         fn.memsize += int64(len(cando))
410                         fn.segments[cur] = seg
411                         fn.segments[prev] = fn.segments[prev].Slice(0, ptr.segmentOff)
412                         ptr.segmentIdx++
413                         ptr.segmentOff = 0
414                         fn.repacked++
415                         ptr.repacked++
416                 } else if curWritable {
417                         if fit := int(fn.segments[cur].Len()) - ptr.segmentOff; fit < len(cando) {
418                                 cando = cando[:fit]
419                         }
420                 } else {
421                         if prevAppendable {
422                                 // Shrink cando if needed to fit in
423                                 // prev segment.
424                                 if cangrow := maxBlockSize - fn.segments[prev].Len(); cangrow < len(cando) {
425                                         cando = cando[:cangrow]
426                                 }
427                         }
428
429                         if cur == len(fn.segments) {
430                                 // ptr is at EOF, filesize is changing.
431                                 fn.fileinfo.size += int64(len(cando))
432                         } else if el := fn.segments[cur].Len(); el <= len(cando) {
433                                 // cando is long enough that we won't
434                                 // need cur any more. shrink cando to
435                                 // be exactly as long as cur
436                                 // (otherwise we'd accidentally shift
437                                 // the effective position of all
438                                 // segments after cur).
439                                 cando = cando[:el]
440                                 copy(fn.segments[cur:], fn.segments[cur+1:])
441                                 fn.segments = fn.segments[:len(fn.segments)-1]
442                         } else {
443                                 // shrink cur by the same #bytes we're growing prev
444                                 fn.segments[cur] = fn.segments[cur].Slice(len(cando), -1)
445                         }
446
447                         if prevAppendable {
448                                 // Grow prev.
449                                 ptr.segmentIdx--
450                                 ptr.segmentOff = fn.segments[prev].Len()
451                                 fn.segments[prev].(*memSegment).Truncate(ptr.segmentOff + len(cando))
452                                 fn.memsize += int64(len(cando))
453                                 ptr.repacked++
454                                 fn.repacked++
455                         } else {
456                                 // Insert a segment between prev and
457                                 // cur, and advance prev/cur.
458                                 fn.segments = append(fn.segments, nil)
459                                 if cur < len(fn.segments) {
460                                         copy(fn.segments[cur+1:], fn.segments[cur:])
461                                         ptr.repacked++
462                                         fn.repacked++
463                                 } else {
464                                         // appending a new segment does
465                                         // not invalidate any ptrs
466                                 }
467                                 seg := &memSegment{}
468                                 seg.Truncate(len(cando))
469                                 fn.memsize += int64(len(cando))
470                                 fn.segments[cur] = seg
471                                 cur++
472                                 prev++
473                         }
474                 }
475
476                 // Finally we can copy bytes from cando to the current segment.
477                 fn.segments[ptr.segmentIdx].(*memSegment).WriteAt(cando, ptr.segmentOff)
478                 n += len(cando)
479                 p = p[len(cando):]
480
481                 ptr.off += int64(len(cando))
482                 ptr.segmentOff += len(cando)
483                 if ptr.segmentOff >= maxBlockSize {
484                         fn.pruneMemSegments()
485                 }
486                 if fn.segments[ptr.segmentIdx].Len() == ptr.segmentOff {
487                         ptr.segmentOff = 0
488                         ptr.segmentIdx++
489                 }
490
491                 fn.fileinfo.modTime = time.Now()
492         }
493         return
494 }
495
496 // Write some data out to disk to reduce memory use. Caller must have
497 // write lock.
498 func (fn *filenode) pruneMemSegments() {
499         // TODO: share code with (*dirnode)sync()
500         // TODO: pack/flush small blocks too, when fragmented
501         if fn.throttle == nil {
502                 // TODO: share a throttle with filesystem
503                 fn.throttle = newThrottle(writeAheadBlocks)
504         }
505         for idx, seg := range fn.segments {
506                 seg, ok := seg.(*memSegment)
507                 if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
508                         continue
509                 }
510                 // Setting seg.flushing guarantees seg.buf will not be
511                 // modified in place: WriteAt and Truncate will
512                 // allocate a new buf instead, if necessary.
513                 idx, buf := idx, seg.buf
514                 done := make(chan struct{})
515                 seg.flushing = done
516                 // If lots of background writes are already in
517                 // progress, block here until one finishes, rather
518                 // than pile up an unlimited number of buffered writes
519                 // and network flush operations.
520                 fn.throttle.Acquire()
521                 go func() {
522                         defer close(done)
523                         locator, _, err := fn.FS().PutB(buf)
524                         fn.throttle.Release()
525                         fn.Lock()
526                         defer fn.Unlock()
527                         if curbuf := seg.buf[:1]; &curbuf[0] != &buf[0] {
528                                 // A new seg.buf has been allocated.
529                                 return
530                         }
531                         seg.flushing = nil
532                         if err != nil {
533                                 // TODO: stall (or return errors from)
534                                 // subsequent writes until flushing
535                                 // starts to succeed.
536                                 return
537                         }
538                         if len(fn.segments) <= idx || fn.segments[idx] != seg || len(seg.buf) != len(buf) {
539                                 // Segment has been dropped/moved/resized.
540                                 return
541                         }
542                         fn.memsize -= int64(len(buf))
543                         fn.segments[idx] = storedSegment{
544                                 kc:      fn.FS(),
545                                 locator: locator,
546                                 size:    len(buf),
547                                 offset:  0,
548                                 length:  len(buf),
549                         }
550                 }()
551         }
552 }
553
554 // Block until all pending pruneMemSegments work is finished. Caller
555 // must NOT have lock.
556 func (fn *filenode) waitPrune() {
557         var pending []<-chan struct{}
558         fn.Lock()
559         for _, seg := range fn.segments {
560                 if seg, ok := seg.(*memSegment); ok && seg.flushing != nil {
561                         pending = append(pending, seg.flushing)
562                 }
563         }
564         fn.Unlock()
565         for _, p := range pending {
566                 <-p
567         }
568 }
569
570 type dirnode struct {
571         fs *collectionFileSystem
572         treenode
573 }
574
575 func (dn *dirnode) FS() FileSystem {
576         return dn.fs
577 }
578
579 func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
580         if dn == dn.fs.rootnode() && name == ".arvados#collection" {
581                 gn := &getternode{Getter: func() ([]byte, error) {
582                         var coll Collection
583                         var err error
584                         coll.ManifestText, err = dn.fs.MarshalManifest(".")
585                         if err != nil {
586                                 return nil, err
587                         }
588                         data, err := json.Marshal(&coll)
589                         if err == nil {
590                                 data = append(data, '\n')
591                         }
592                         return data, err
593                 }}
594                 gn.SetParent(dn, name)
595                 return gn, nil
596         }
597         return dn.treenode.Child(name, replace)
598 }
599
600 type fnSegmentRef struct {
601         fn  *filenode
602         idx int
603 }
604
605 // commitBlock concatenates the data from the given filenode segments
606 // (which must be *memSegments), writes the data out to Keep as a
607 // single block, and replaces the filenodes' *memSegments with
608 // storedSegments that reference the relevant portions of the new
609 // block.
610 //
611 // Caller must have write lock.
612 func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
613         if len(refs) == 0 {
614                 return nil
615         }
616         throttle.Acquire()
617         defer throttle.Release()
618         if err := ctx.Err(); err != nil {
619                 return err
620         }
621         block := make([]byte, 0, maxBlockSize)
622         for _, ref := range refs {
623                 block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
624         }
625         locator, _, err := dn.fs.PutB(block)
626         if err != nil {
627                 return err
628         }
629         off := 0
630         for _, ref := range refs {
631                 data := ref.fn.segments[ref.idx].(*memSegment).buf
632                 ref.fn.segments[ref.idx] = storedSegment{
633                         kc:      dn.fs,
634                         locator: locator,
635                         size:    len(block),
636                         offset:  off,
637                         length:  len(data),
638                 }
639                 off += len(data)
640                 ref.fn.memsize -= int64(len(data))
641         }
642         return nil
643 }
644
645 // sync flushes in-memory data and remote block references (for the
646 // children with the given names, which must be children of dn) to
647 // local persistent storage. Caller must have write lock on dn and the
648 // named children.
649 func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
650         cg := newContextGroup(ctx)
651         defer cg.Cancel()
652
653         goCommit := func(refs []fnSegmentRef) {
654                 cg.Go(func() error {
655                         return dn.commitBlock(cg.Context(), throttle, refs)
656                 })
657         }
658
659         var pending []fnSegmentRef
660         var pendingLen int = 0
661         localLocator := map[string]string{}
662         for _, name := range names {
663                 fn, ok := dn.inodes[name].(*filenode)
664                 if !ok {
665                         continue
666                 }
667                 for idx, seg := range fn.segments {
668                         switch seg := seg.(type) {
669                         case storedSegment:
670                                 loc, ok := localLocator[seg.locator]
671                                 if !ok {
672                                         var err error
673                                         loc, err = dn.fs.LocalLocator(seg.locator)
674                                         if err != nil {
675                                                 return err
676                                         }
677                                         localLocator[seg.locator] = loc
678                                 }
679                                 seg.locator = loc
680                                 fn.segments[idx] = seg
681                         case *memSegment:
682                                 if seg.Len() > maxBlockSize/2 {
683                                         goCommit([]fnSegmentRef{{fn, idx}})
684                                         continue
685                                 }
686                                 if pendingLen+seg.Len() > maxBlockSize {
687                                         goCommit(pending)
688                                         pending = nil
689                                         pendingLen = 0
690                                 }
691                                 pending = append(pending, fnSegmentRef{fn, idx})
692                                 pendingLen += seg.Len()
693                         default:
694                                 panic(fmt.Sprintf("can't sync segment type %T", seg))
695                         }
696                 }
697         }
698         goCommit(pending)
699         return cg.Wait()
700 }
701
702 // caller must have write lock.
703 func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
704         cg := newContextGroup(ctx)
705         defer cg.Cancel()
706
707         if len(dn.inodes) == 0 {
708                 if prefix == "." {
709                         return "", nil
710                 }
711                 // Express the existence of an empty directory by
712                 // adding an empty file named `\056`, which (unlike
713                 // the more obvious spelling `.`) is accepted by the
714                 // API's manifest validator.
715                 return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
716         }
717
718         names := make([]string, 0, len(dn.inodes))
719         for name := range dn.inodes {
720                 names = append(names, name)
721         }
722         sort.Strings(names)
723
724         // Wait for children to finish any pending write operations
725         // before locking them.
726         for _, name := range names {
727                 node := dn.inodes[name]
728                 if fn, ok := node.(*filenode); ok {
729                         fn.waitPrune()
730                 }
731         }
732
733         var dirnames []string
734         var filenames []string
735         for _, name := range names {
736                 node := dn.inodes[name]
737                 node.Lock()
738                 defer node.Unlock()
739                 switch node := node.(type) {
740                 case *dirnode:
741                         dirnames = append(dirnames, name)
742                 case *filenode:
743                         filenames = append(filenames, name)
744                 default:
745                         panic(fmt.Sprintf("can't marshal inode type %T", node))
746                 }
747         }
748
749         subdirs := make([]string, len(dirnames))
750         rootdir := ""
751         for i, name := range dirnames {
752                 i, name := i, name
753                 cg.Go(func() error {
754                         txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, throttle)
755                         subdirs[i] = txt
756                         return err
757                 })
758         }
759
760         cg.Go(func() error {
761                 var streamLen int64
762                 type filepart struct {
763                         name   string
764                         offset int64
765                         length int64
766                 }
767
768                 var fileparts []filepart
769                 var blocks []string
770                 if err := dn.sync(cg.Context(), throttle, names); err != nil {
771                         return err
772                 }
773                 for _, name := range filenames {
774                         node := dn.inodes[name].(*filenode)
775                         if len(node.segments) == 0 {
776                                 fileparts = append(fileparts, filepart{name: name})
777                                 continue
778                         }
779                         for _, seg := range node.segments {
780                                 switch seg := seg.(type) {
781                                 case storedSegment:
782                                         if len(blocks) > 0 && blocks[len(blocks)-1] == seg.locator {
783                                                 streamLen -= int64(seg.size)
784                                         } else {
785                                                 blocks = append(blocks, seg.locator)
786                                         }
787                                         next := filepart{
788                                                 name:   name,
789                                                 offset: streamLen + int64(seg.offset),
790                                                 length: int64(seg.length),
791                                         }
792                                         if prev := len(fileparts) - 1; prev >= 0 &&
793                                                 fileparts[prev].name == name &&
794                                                 fileparts[prev].offset+fileparts[prev].length == next.offset {
795                                                 fileparts[prev].length += next.length
796                                         } else {
797                                                 fileparts = append(fileparts, next)
798                                         }
799                                         streamLen += int64(seg.size)
800                                 default:
801                                         // This can't happen: we
802                                         // haven't unlocked since
803                                         // calling sync().
804                                         panic(fmt.Sprintf("can't marshal segment type %T", seg))
805                                 }
806                         }
807                 }
808                 var filetokens []string
809                 for _, s := range fileparts {
810                         filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
811                 }
812                 if len(filetokens) == 0 {
813                         return nil
814                 } else if len(blocks) == 0 {
815                         blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
816                 }
817                 rootdir = manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n"
818                 return nil
819         })
820         err := cg.Wait()
821         return rootdir + strings.Join(subdirs, ""), err
822 }
823
824 func (dn *dirnode) loadManifest(txt string) error {
825         var dirname string
826         streams := strings.Split(txt, "\n")
827         if streams[len(streams)-1] != "" {
828                 return fmt.Errorf("line %d: no trailing newline", len(streams))
829         }
830         streams = streams[:len(streams)-1]
831         segments := []storedSegment{}
832         for i, stream := range streams {
833                 lineno := i + 1
834                 var anyFileTokens bool
835                 var pos int64
836                 var segIdx int
837                 segments = segments[:0]
838                 for i, token := range strings.Split(stream, " ") {
839                         if i == 0 {
840                                 dirname = manifestUnescape(token)
841                                 continue
842                         }
843                         if !strings.Contains(token, ":") {
844                                 if anyFileTokens {
845                                         return fmt.Errorf("line %d: bad file segment %q", lineno, token)
846                                 }
847                                 toks := strings.SplitN(token, "+", 3)
848                                 if len(toks) < 2 {
849                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
850                                 }
851                                 length, err := strconv.ParseInt(toks[1], 10, 32)
852                                 if err != nil || length < 0 {
853                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
854                                 }
855                                 segments = append(segments, storedSegment{
856                                         locator: token,
857                                         size:    int(length),
858                                         offset:  0,
859                                         length:  int(length),
860                                 })
861                                 continue
862                         } else if len(segments) == 0 {
863                                 return fmt.Errorf("line %d: bad locator %q", lineno, token)
864                         }
865
866                         toks := strings.SplitN(token, ":", 3)
867                         if len(toks) != 3 {
868                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
869                         }
870                         anyFileTokens = true
871
872                         offset, err := strconv.ParseInt(toks[0], 10, 64)
873                         if err != nil || offset < 0 {
874                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
875                         }
876                         length, err := strconv.ParseInt(toks[1], 10, 64)
877                         if err != nil || length < 0 {
878                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
879                         }
880                         name := dirname + "/" + manifestUnescape(toks[2])
881                         fnode, err := dn.createFileAndParents(name)
882                         if fnode == nil && err == nil && length == 0 {
883                                 // Special case: an empty file used as
884                                 // a marker to preserve an otherwise
885                                 // empty directory in a manifest.
886                                 continue
887                         }
888                         if err != nil || (fnode == nil && length != 0) {
889                                 return fmt.Errorf("line %d: cannot use path %q with length %d: %s", lineno, name, length, err)
890                         }
891                         // Map the stream offset/range coordinates to
892                         // block/offset/range coordinates and add
893                         // corresponding storedSegments to the filenode
894                         if pos > offset {
895                                 // Can't continue where we left off.
896                                 // TODO: binary search instead of
897                                 // rewinding all the way (but this
898                                 // situation might be rare anyway)
899                                 segIdx, pos = 0, 0
900                         }
901                         for next := int64(0); segIdx < len(segments); segIdx++ {
902                                 seg := segments[segIdx]
903                                 next = pos + int64(seg.Len())
904                                 if next <= offset || seg.Len() == 0 {
905                                         pos = next
906                                         continue
907                                 }
908                                 if pos >= offset+length {
909                                         break
910                                 }
911                                 var blkOff int
912                                 if pos < offset {
913                                         blkOff = int(offset - pos)
914                                 }
915                                 blkLen := seg.Len() - blkOff
916                                 if pos+int64(blkOff+blkLen) > offset+length {
917                                         blkLen = int(offset + length - pos - int64(blkOff))
918                                 }
919                                 fnode.appendSegment(storedSegment{
920                                         kc:      dn.fs,
921                                         locator: seg.locator,
922                                         size:    seg.size,
923                                         offset:  blkOff,
924                                         length:  blkLen,
925                                 })
926                                 if next > offset+length {
927                                         break
928                                 } else {
929                                         pos = next
930                                 }
931                         }
932                         if segIdx == len(segments) && pos < offset+length {
933                                 return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
934                         }
935                 }
936                 if !anyFileTokens {
937                         return fmt.Errorf("line %d: no file segments", lineno)
938                 } else if len(segments) == 0 {
939                         return fmt.Errorf("line %d: no locators", lineno)
940                 } else if dirname == "" {
941                         return fmt.Errorf("line %d: no stream name", lineno)
942                 }
943         }
944         return nil
945 }
946
947 // only safe to call from loadManifest -- no locking.
948 //
949 // If path is a "parent directory exists" marker (the last path
950 // component is "."), the returned values are both nil.
951 func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
952         var node inode = dn
953         names := strings.Split(path, "/")
954         basename := names[len(names)-1]
955         for _, name := range names[:len(names)-1] {
956                 switch name {
957                 case "", ".":
958                         continue
959                 case "..":
960                         if node == dn {
961                                 // can't be sure parent will be a *dirnode
962                                 return nil, ErrInvalidArgument
963                         }
964                         node = node.Parent()
965                         continue
966                 }
967                 node, err = node.Child(name, func(child inode) (inode, error) {
968                         if child == nil {
969                                 child, err := node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
970                                 if err != nil {
971                                         return nil, err
972                                 }
973                                 child.SetParent(node, name)
974                                 return child, nil
975                         } else if !child.IsDir() {
976                                 return child, ErrFileExists
977                         } else {
978                                 return child, nil
979                         }
980                 })
981                 if err != nil {
982                         return
983                 }
984         }
985         if basename == "." {
986                 return
987         } else if !permittedName(basename) {
988                 err = fmt.Errorf("invalid file part %q in path %q", basename, path)
989                 return
990         }
991         _, err = node.Child(basename, func(child inode) (inode, error) {
992                 switch child := child.(type) {
993                 case nil:
994                         child, err = node.FS().newNode(basename, 0755, node.FileInfo().ModTime())
995                         if err != nil {
996                                 return nil, err
997                         }
998                         child.SetParent(node, basename)
999                         fn = child.(*filenode)
1000                         return child, nil
1001                 case *filenode:
1002                         fn = child
1003                         return child, nil
1004                 case *dirnode:
1005                         return child, ErrIsDirectory
1006                 default:
1007                         return child, ErrInvalidArgument
1008                 }
1009         })
1010         return
1011 }
1012
1013 func (dn *dirnode) TreeSize() (bytes int64) {
1014         dn.RLock()
1015         defer dn.RUnlock()
1016         for _, i := range dn.inodes {
1017                 switch i := i.(type) {
1018                 case *filenode:
1019                         bytes += i.Size()
1020                 case *dirnode:
1021                         bytes += i.TreeSize()
1022                 }
1023         }
1024         return
1025 }
1026
1027 type segment interface {
1028         io.ReaderAt
1029         Len() int
1030         // Return a new segment with a subsection of the data from this
1031         // one. length<0 means length=Len()-off.
1032         Slice(off int, length int) segment
1033 }
1034
1035 type memSegment struct {
1036         buf []byte
1037         // If flushing is not nil, then a) buf is being shared by a
1038         // pruneMemSegments goroutine, and must be copied on write;
1039         // and b) the flushing channel will close when the goroutine
1040         // finishes, whether it succeeds or not.
1041         flushing <-chan struct{}
1042 }
1043
1044 func (me *memSegment) Len() int {
1045         return len(me.buf)
1046 }
1047
1048 func (me *memSegment) Slice(off, length int) segment {
1049         if length < 0 {
1050                 length = len(me.buf) - off
1051         }
1052         buf := make([]byte, length)
1053         copy(buf, me.buf[off:])
1054         return &memSegment{buf: buf}
1055 }
1056
1057 func (me *memSegment) Truncate(n int) {
1058         if n > cap(me.buf) || (me.flushing != nil && n > len(me.buf)) {
1059                 newsize := 1024
1060                 for newsize < n {
1061                         newsize = newsize << 2
1062                 }
1063                 newbuf := make([]byte, n, newsize)
1064                 copy(newbuf, me.buf)
1065                 me.buf, me.flushing = newbuf, nil
1066         } else {
1067                 // reclaim existing capacity, and zero reclaimed part
1068                 oldlen := len(me.buf)
1069                 me.buf = me.buf[:n]
1070                 for i := oldlen; i < n; i++ {
1071                         me.buf[i] = 0
1072                 }
1073         }
1074 }
1075
1076 func (me *memSegment) WriteAt(p []byte, off int) {
1077         if off+len(p) > len(me.buf) {
1078                 panic("overflowed segment")
1079         }
1080         if me.flushing != nil {
1081                 me.buf, me.flushing = append([]byte(nil), me.buf...), nil
1082         }
1083         copy(me.buf[off:], p)
1084 }
1085
1086 func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
1087         if off > int64(me.Len()) {
1088                 err = io.EOF
1089                 return
1090         }
1091         n = copy(p, me.buf[int(off):])
1092         if n < len(p) {
1093                 err = io.EOF
1094         }
1095         return
1096 }
1097
1098 type storedSegment struct {
1099         kc      fsBackend
1100         locator string
1101         size    int // size of stored block (also encoded in locator)
1102         offset  int // position of segment within the stored block
1103         length  int // bytes in this segment (offset + length <= size)
1104 }
1105
1106 func (se storedSegment) Len() int {
1107         return se.length
1108 }
1109
1110 func (se storedSegment) Slice(n, size int) segment {
1111         se.offset += n
1112         se.length -= n
1113         if size >= 0 && se.length > size {
1114                 se.length = size
1115         }
1116         return se
1117 }
1118
1119 func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
1120         if off > int64(se.length) {
1121                 return 0, io.EOF
1122         }
1123         maxlen := se.length - int(off)
1124         if len(p) > maxlen {
1125                 p = p[:maxlen]
1126                 n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1127                 if err == nil {
1128                         err = io.EOF
1129                 }
1130                 return
1131         }
1132         return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1133 }
1134
1135 func canonicalName(name string) string {
1136         name = path.Clean("/" + name)
1137         if name == "/" || name == "./" {
1138                 name = "."
1139         } else if strings.HasPrefix(name, "/") {
1140                 name = "." + name
1141         }
1142         return name
1143 }
1144
1145 var manifestEscapeSeq = regexp.MustCompile(`\\([0-7]{3}|\\)`)
1146
1147 func manifestUnescapeFunc(seq string) string {
1148         if seq == `\\` {
1149                 return `\`
1150         }
1151         i, err := strconv.ParseUint(seq[1:], 8, 8)
1152         if err != nil {
1153                 // Invalid escape sequence: can't unescape.
1154                 return seq
1155         }
1156         return string([]byte{byte(i)})
1157 }
1158
1159 func manifestUnescape(s string) string {
1160         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
1161 }
1162
1163 var manifestEscapedChar = regexp.MustCompile(`[\000-\040:\s\\]`)
1164
1165 func manifestEscapeFunc(seq string) string {
1166         return fmt.Sprintf("\\%03o", byte(seq[0]))
1167 }
1168
1169 func manifestEscape(s string) string {
1170         return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
1171 }