Merge branch '19428-webdav-performance'
[arvados.git] / sdk / go / arvados / fs_collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "fmt"
12         "io"
13         "os"
14         "path"
15         "regexp"
16         "sort"
17         "strconv"
18         "strings"
19         "sync"
20         "sync/atomic"
21         "time"
22 )
23
24 var (
25         maxBlockSize      = 1 << 26
26         concurrentWriters = 4 // max goroutines writing to Keep in background and during flush()
27 )
28
29 // A CollectionFileSystem is a FileSystem that can be serialized as a
30 // manifest and stored as a collection.
31 type CollectionFileSystem interface {
32         FileSystem
33
34         // Flush all file data to Keep and return a snapshot of the
35         // filesystem suitable for saving as (Collection)ManifestText.
36         // Prefix (normally ".") is a top level directory, effectively
37         // prepended to all paths in the returned manifest.
38         MarshalManifest(prefix string) (string, error)
39
40         // Total data bytes in all files.
41         Size() int64
42 }
43
44 type collectionFileSystem struct {
45         fileSystem
46         uuid           string
47         savedPDH       atomic.Value
48         replicas       int
49         storageClasses []string
50         // guessSignatureTTL tracks a lower bound for the server's
51         // configured BlobSigningTTL. The guess is initially zero, and
52         // increases when we come across a signature with an expiry
53         // time further in the future than the previous guess.
54         //
55         // When the guessed TTL is much smaller than the real TTL,
56         // preemptive signature refresh is delayed or missed entirely,
57         // which is OK.
58         guessSignatureTTL time.Duration
59         holdCheckChanges  time.Time
60         lockCheckChanges  sync.Mutex
61 }
62
63 // FileSystem returns a CollectionFileSystem for the collection.
64 func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) {
65         modTime := c.ModifiedAt
66         if modTime.IsZero() {
67                 modTime = time.Now()
68         }
69         fs := &collectionFileSystem{
70                 uuid:           c.UUID,
71                 storageClasses: c.StorageClassesDesired,
72                 fileSystem: fileSystem{
73                         fsBackend: keepBackend{apiClient: client, keepClient: kc},
74                         thr:       newThrottle(concurrentWriters),
75                 },
76         }
77         fs.savedPDH.Store(c.PortableDataHash)
78         if r := c.ReplicationDesired; r != nil {
79                 fs.replicas = *r
80         }
81         root := &dirnode{
82                 fs: fs,
83                 treenode: treenode{
84                         fileinfo: fileinfo{
85                                 name:    ".",
86                                 mode:    os.ModeDir | 0755,
87                                 modTime: modTime,
88                                 sys:     func() interface{} { return c },
89                         },
90                         inodes: make(map[string]inode),
91                 },
92         }
93         root.SetParent(root, ".")
94         if err := root.loadManifest(c.ManifestText); err != nil {
95                 return nil, err
96         }
97         backdateTree(root, modTime)
98         fs.root = root
99         return fs, nil
100 }
101
102 // caller must have lock (or guarantee no concurrent accesses somehow)
103 func eachNode(n inode, ffunc func(*filenode), dfunc func(*dirnode)) {
104         switch n := n.(type) {
105         case *filenode:
106                 if ffunc != nil {
107                         ffunc(n)
108                 }
109         case *dirnode:
110                 if dfunc != nil {
111                         dfunc(n)
112                 }
113                 for _, n := range n.inodes {
114                         eachNode(n, ffunc, dfunc)
115                 }
116         }
117 }
118
119 // caller must have lock (or guarantee no concurrent accesses somehow)
120 func backdateTree(n inode, modTime time.Time) {
121         eachNode(n, func(fn *filenode) {
122                 fn.fileinfo.modTime = modTime
123         }, func(dn *dirnode) {
124                 dn.fileinfo.modTime = modTime
125         })
126 }
127
128 // Approximate portion of signature TTL remaining, usually between 0
129 // and 1, or negative if some signatures have expired.
130 func (fs *collectionFileSystem) signatureTimeLeft() (float64, time.Duration) {
131         var (
132                 now      = time.Now()
133                 earliest = now.Add(time.Hour * 24 * 7 * 365)
134                 latest   time.Time
135         )
136         fs.fileSystem.root.RLock()
137         eachNode(fs.root, func(fn *filenode) {
138                 fn.Lock()
139                 defer fn.Unlock()
140                 for _, seg := range fn.segments {
141                         seg, ok := seg.(storedSegment)
142                         if !ok {
143                                 continue
144                         }
145                         expiryTime, err := signatureExpiryTime(seg.locator)
146                         if err != nil {
147                                 continue
148                         }
149                         if expiryTime.Before(earliest) {
150                                 earliest = expiryTime
151                         }
152                         if expiryTime.After(latest) {
153                                 latest = expiryTime
154                         }
155                 }
156         }, nil)
157         fs.fileSystem.root.RUnlock()
158
159         if latest.IsZero() {
160                 // No signatures == 100% of TTL remaining.
161                 return 1, 1
162         }
163
164         ttl := latest.Sub(now)
165         fs.fileSystem.root.Lock()
166         {
167                 if ttl > fs.guessSignatureTTL {
168                         // ttl is closer to the real TTL than
169                         // guessSignatureTTL.
170                         fs.guessSignatureTTL = ttl
171                 } else {
172                         // Use the previous best guess to compute the
173                         // portion remaining (below, after unlocking
174                         // mutex).
175                         ttl = fs.guessSignatureTTL
176                 }
177         }
178         fs.fileSystem.root.Unlock()
179
180         return earliest.Sub(now).Seconds() / ttl.Seconds(), ttl
181 }
182
183 func (fs *collectionFileSystem) updateSignatures(newmanifest string) {
184         newLoc := map[string]string{}
185         for _, tok := range regexp.MustCompile(`\S+`).FindAllString(newmanifest, -1) {
186                 if mBlkRe.MatchString(tok) {
187                         newLoc[stripAllHints(tok)] = tok
188                 }
189         }
190         fs.fileSystem.root.Lock()
191         defer fs.fileSystem.root.Unlock()
192         eachNode(fs.root, func(fn *filenode) {
193                 fn.Lock()
194                 defer fn.Unlock()
195                 for idx, seg := range fn.segments {
196                         seg, ok := seg.(storedSegment)
197                         if !ok {
198                                 continue
199                         }
200                         loc, ok := newLoc[stripAllHints(seg.locator)]
201                         if !ok {
202                                 continue
203                         }
204                         seg.locator = loc
205                         fn.segments[idx] = seg
206                 }
207         }, nil)
208 }
209
210 func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
211         if name == "" || name == "." || name == ".." {
212                 return nil, ErrInvalidArgument
213         }
214         if perm.IsDir() {
215                 return &dirnode{
216                         fs: fs,
217                         treenode: treenode{
218                                 fileinfo: fileinfo{
219                                         name:    name,
220                                         mode:    perm | os.ModeDir,
221                                         modTime: modTime,
222                                 },
223                                 inodes: make(map[string]inode),
224                         },
225                 }, nil
226         }
227         return &filenode{
228                 fs: fs,
229                 fileinfo: fileinfo{
230                         name:    name,
231                         mode:    perm & ^os.ModeDir,
232                         modTime: modTime,
233                 },
234         }, nil
235 }
236
237 func (fs *collectionFileSystem) Child(name string, replace func(inode) (inode, error)) (inode, error) {
238         return fs.rootnode().Child(name, replace)
239 }
240
241 func (fs *collectionFileSystem) FS() FileSystem {
242         return fs
243 }
244
245 func (fs *collectionFileSystem) FileInfo() os.FileInfo {
246         return fs.rootnode().FileInfo()
247 }
248
249 func (fs *collectionFileSystem) IsDir() bool {
250         return true
251 }
252
253 func (fs *collectionFileSystem) Lock() {
254         fs.rootnode().Lock()
255 }
256
257 func (fs *collectionFileSystem) Unlock() {
258         fs.rootnode().Unlock()
259 }
260
261 func (fs *collectionFileSystem) RLock() {
262         fs.rootnode().RLock()
263 }
264
265 func (fs *collectionFileSystem) RUnlock() {
266         fs.rootnode().RUnlock()
267 }
268
269 func (fs *collectionFileSystem) Parent() inode {
270         return fs.rootnode().Parent()
271 }
272
273 func (fs *collectionFileSystem) Read(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
274         return 0, ptr, ErrInvalidOperation
275 }
276
277 func (fs *collectionFileSystem) Write(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
278         return 0, ptr, ErrInvalidOperation
279 }
280
281 func (fs *collectionFileSystem) Readdir() ([]os.FileInfo, error) {
282         return fs.rootnode().Readdir()
283 }
284
285 func (fs *collectionFileSystem) SetParent(parent inode, name string) {
286         fs.rootnode().SetParent(parent, name)
287 }
288
289 func (fs *collectionFileSystem) Truncate(int64) error {
290         return ErrInvalidOperation
291 }
292
293 // Check for and incorporate upstream changes -- unless that has
294 // already been done recently, in which case this func is a no-op.
295 func (fs *collectionFileSystem) checkChangesOnServer() error {
296         if fs.uuid == "" && fs.savedPDH.Load() == "" {
297                 return nil
298         }
299
300         // First try UUID if any, then last known PDH. Stop if all
301         // signatures are new enough.
302         checkingAll := false
303         for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
304                 if id == "" {
305                         continue
306                 }
307
308                 fs.lockCheckChanges.Lock()
309                 if !checkingAll && fs.holdCheckChanges.After(time.Now()) {
310                         fs.lockCheckChanges.Unlock()
311                         return nil
312                 }
313                 remain, ttl := fs.signatureTimeLeft()
314                 if remain > 0.01 && !checkingAll {
315                         fs.holdCheckChanges = time.Now().Add(ttl / 100)
316                 }
317                 fs.lockCheckChanges.Unlock()
318
319                 if remain >= 0.5 {
320                         break
321                 }
322                 checkingAll = true
323                 var coll Collection
324                 err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
325                 if err != nil {
326                         continue
327                 }
328                 fs.updateSignatures(coll.ManifestText)
329         }
330         return nil
331 }
332
333 // Refresh signature on a single locator, if necessary. Assume caller
334 // has lock. If an update is needed, and there are any storedSegments
335 // whose signatures can be updated, start a background task to update
336 // them asynchronously when the caller releases locks.
337 func (fs *collectionFileSystem) refreshSignature(locator string) string {
338         exp, err := signatureExpiryTime(locator)
339         if err != nil || exp.Sub(time.Now()) > time.Minute {
340                 // Synchronous update is not needed. Start an
341                 // asynchronous update if needed.
342                 go fs.checkChangesOnServer()
343                 return locator
344         }
345         var manifests string
346         for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
347                 if id == "" {
348                         continue
349                 }
350                 var coll Collection
351                 err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
352                 if err != nil {
353                         continue
354                 }
355                 manifests += coll.ManifestText
356         }
357         hash := stripAllHints(locator)
358         for _, tok := range regexp.MustCompile(`\S+`).FindAllString(manifests, -1) {
359                 if mBlkRe.MatchString(tok) {
360                         if stripAllHints(tok) == hash {
361                                 locator = tok
362                                 break
363                         }
364                 }
365         }
366         go fs.updateSignatures(manifests)
367         return locator
368 }
369
370 func (fs *collectionFileSystem) Sync() error {
371         err := fs.checkChangesOnServer()
372         if err != nil {
373                 return err
374         }
375         if fs.uuid == "" {
376                 return nil
377         }
378         txt, err := fs.MarshalManifest(".")
379         if err != nil {
380                 return fmt.Errorf("sync failed: %s", err)
381         }
382         if PortableDataHash(txt) == fs.savedPDH.Load() {
383                 // No local changes since last save or initial load.
384                 return nil
385         }
386         coll := Collection{
387                 UUID:         fs.uuid,
388                 ManifestText: txt,
389         }
390
391         selectFields := []string{"uuid", "portable_data_hash"}
392         fs.lockCheckChanges.Lock()
393         remain, _ := fs.signatureTimeLeft()
394         fs.lockCheckChanges.Unlock()
395         if remain < 0.5 {
396                 selectFields = append(selectFields, "manifest_text")
397         }
398
399         err = fs.RequestAndDecode(&coll, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
400                 "collection": map[string]string{
401                         "manifest_text": coll.ManifestText,
402                 },
403                 "select": selectFields,
404         })
405         if err != nil {
406                 return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
407         }
408         fs.updateSignatures(coll.ManifestText)
409         fs.savedPDH.Store(coll.PortableDataHash)
410         return nil
411 }
412
413 func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
414         node, err := rlookup(fs.fileSystem.root, path)
415         if err != nil {
416                 return err
417         }
418         dn, ok := node.(*dirnode)
419         if !ok {
420                 return ErrNotADirectory
421         }
422         dn.Lock()
423         defer dn.Unlock()
424         names := dn.sortedNames()
425         if path != "" {
426                 // Caller only wants to flush the specified dir,
427                 // non-recursively.  Drop subdirs from the list of
428                 // names.
429                 var filenames []string
430                 for _, name := range names {
431                         if _, ok := dn.inodes[name].(*filenode); ok {
432                                 filenames = append(filenames, name)
433                         }
434                 }
435                 names = filenames
436         }
437         for _, name := range names {
438                 child := dn.inodes[name]
439                 child.Lock()
440                 defer child.Unlock()
441         }
442         return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
443 }
444
445 func (fs *collectionFileSystem) MemorySize() int64 {
446         return fs.fileSystem.root.(*dirnode).MemorySize()
447 }
448
449 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
450         fs.fileSystem.root.Lock()
451         defer fs.fileSystem.root.Unlock()
452         return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix)
453 }
454
455 func (fs *collectionFileSystem) Size() int64 {
456         return fs.fileSystem.root.(*dirnode).TreeSize()
457 }
458
459 func (fs *collectionFileSystem) Snapshot() (inode, error) {
460         return fs.fileSystem.root.Snapshot()
461 }
462
463 func (fs *collectionFileSystem) Splice(r inode) error {
464         return fs.fileSystem.root.Splice(r)
465 }
466
467 // filenodePtr is an offset into a file that is (usually) efficient to
468 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
469 // then
470 // filenode.segments[filenodePtr.segmentIdx][filenodePtr.segmentOff]
471 // corresponds to file offset filenodePtr.off. Otherwise, it is
472 // necessary to reexamine len(filenode.segments[0]) etc. to find the
473 // correct segment and offset.
474 type filenodePtr struct {
475         off        int64
476         segmentIdx int
477         segmentOff int
478         repacked   int64
479 }
480
481 // seek returns a ptr that is consistent with both startPtr.off and
482 // the current state of fn. The caller must already hold fn.RLock() or
483 // fn.Lock().
484 //
485 // If startPtr is beyond EOF, ptr.segment* will indicate precisely
486 // EOF.
487 //
488 // After seeking:
489 //
490 //     ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
491 //     ||
492 //     filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
493 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
494         ptr = startPtr
495         if ptr.off < 0 {
496                 // meaningless anyway
497                 return
498         } else if ptr.off >= fn.fileinfo.size {
499                 ptr.segmentIdx = len(fn.segments)
500                 ptr.segmentOff = 0
501                 ptr.repacked = fn.repacked
502                 return
503         } else if ptr.repacked == fn.repacked {
504                 // segmentIdx and segmentOff accurately reflect
505                 // ptr.off, but might have fallen off the end of a
506                 // segment
507                 if ptr.segmentOff >= fn.segments[ptr.segmentIdx].Len() {
508                         ptr.segmentIdx++
509                         ptr.segmentOff = 0
510                 }
511                 return
512         }
513         defer func() {
514                 ptr.repacked = fn.repacked
515         }()
516         if ptr.off >= fn.fileinfo.size {
517                 ptr.segmentIdx, ptr.segmentOff = len(fn.segments), 0
518                 return
519         }
520         // Recompute segmentIdx and segmentOff.  We have already
521         // established fn.fileinfo.size > ptr.off >= 0, so we don't
522         // have to deal with edge cases here.
523         var off int64
524         for ptr.segmentIdx, ptr.segmentOff = 0, 0; off < ptr.off; ptr.segmentIdx++ {
525                 // This would panic (index out of range) if
526                 // fn.fileinfo.size were larger than
527                 // sum(fn.segments[i].Len()) -- but that can't happen
528                 // because we have ensured fn.fileinfo.size is always
529                 // accurate.
530                 segLen := int64(fn.segments[ptr.segmentIdx].Len())
531                 if off+segLen > ptr.off {
532                         ptr.segmentOff = int(ptr.off - off)
533                         break
534                 }
535                 off += segLen
536         }
537         return
538 }
539
540 // filenode implements inode.
541 type filenode struct {
542         parent   inode
543         fs       *collectionFileSystem
544         fileinfo fileinfo
545         segments []segment
546         // number of times `segments` has changed in a
547         // way that might invalidate a filenodePtr
548         repacked int64
549         memsize  int64 // bytes in memSegments
550         sync.RWMutex
551         nullnode
552 }
553
554 // caller must have lock
555 func (fn *filenode) appendSegment(e segment) {
556         fn.segments = append(fn.segments, e)
557         fn.fileinfo.size += int64(e.Len())
558 }
559
560 func (fn *filenode) SetParent(p inode, name string) {
561         fn.Lock()
562         defer fn.Unlock()
563         fn.parent = p
564         fn.fileinfo.name = name
565 }
566
567 func (fn *filenode) Parent() inode {
568         fn.RLock()
569         defer fn.RUnlock()
570         return fn.parent
571 }
572
573 func (fn *filenode) FS() FileSystem {
574         return fn.fs
575 }
576
577 func (fn *filenode) MemorySize() (size int64) {
578         fn.RLock()
579         defer fn.RUnlock()
580         size = 64
581         for _, seg := range fn.segments {
582                 size += 64
583                 if seg, ok := seg.(*memSegment); ok {
584                         size += int64(seg.Len())
585                 }
586         }
587         return
588 }
589
590 // Read reads file data from a single segment, starting at startPtr,
591 // into p. startPtr is assumed not to be up-to-date. Caller must have
592 // RLock or Lock.
593 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
594         ptr = fn.seek(startPtr)
595         if ptr.off < 0 {
596                 err = ErrNegativeOffset
597                 return
598         }
599         if ptr.segmentIdx >= len(fn.segments) {
600                 err = io.EOF
601                 return
602         }
603         if ss, ok := fn.segments[ptr.segmentIdx].(storedSegment); ok {
604                 ss.locator = fn.fs.refreshSignature(ss.locator)
605                 fn.segments[ptr.segmentIdx] = ss
606         }
607         n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
608         if n > 0 {
609                 ptr.off += int64(n)
610                 ptr.segmentOff += n
611                 if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() {
612                         ptr.segmentIdx++
613                         ptr.segmentOff = 0
614                         if ptr.segmentIdx < len(fn.segments) && err == io.EOF {
615                                 err = nil
616                         }
617                 }
618         }
619         return
620 }
621
622 func (fn *filenode) Size() int64 {
623         fn.RLock()
624         defer fn.RUnlock()
625         return fn.fileinfo.Size()
626 }
627
628 func (fn *filenode) FileInfo() os.FileInfo {
629         fn.RLock()
630         defer fn.RUnlock()
631         return fn.fileinfo
632 }
633
634 func (fn *filenode) Truncate(size int64) error {
635         fn.Lock()
636         defer fn.Unlock()
637         return fn.truncate(size)
638 }
639
640 func (fn *filenode) truncate(size int64) error {
641         if size == fn.fileinfo.size {
642                 return nil
643         }
644         fn.repacked++
645         if size < fn.fileinfo.size {
646                 ptr := fn.seek(filenodePtr{off: size})
647                 for i := ptr.segmentIdx; i < len(fn.segments); i++ {
648                         if seg, ok := fn.segments[i].(*memSegment); ok {
649                                 fn.memsize -= int64(seg.Len())
650                         }
651                 }
652                 if ptr.segmentOff == 0 {
653                         fn.segments = fn.segments[:ptr.segmentIdx]
654                 } else {
655                         fn.segments = fn.segments[:ptr.segmentIdx+1]
656                         switch seg := fn.segments[ptr.segmentIdx].(type) {
657                         case *memSegment:
658                                 seg.Truncate(ptr.segmentOff)
659                                 fn.memsize += int64(seg.Len())
660                         default:
661                                 fn.segments[ptr.segmentIdx] = seg.Slice(0, ptr.segmentOff)
662                         }
663                 }
664                 fn.fileinfo.size = size
665                 return nil
666         }
667         for size > fn.fileinfo.size {
668                 grow := size - fn.fileinfo.size
669                 var seg *memSegment
670                 var ok bool
671                 if len(fn.segments) == 0 {
672                         seg = &memSegment{}
673                         fn.segments = append(fn.segments, seg)
674                 } else if seg, ok = fn.segments[len(fn.segments)-1].(*memSegment); !ok || seg.Len() >= maxBlockSize {
675                         seg = &memSegment{}
676                         fn.segments = append(fn.segments, seg)
677                 }
678                 if maxgrow := int64(maxBlockSize - seg.Len()); maxgrow < grow {
679                         grow = maxgrow
680                 }
681                 seg.Truncate(seg.Len() + int(grow))
682                 fn.fileinfo.size += grow
683                 fn.memsize += grow
684         }
685         return nil
686 }
687
688 // Write writes data from p to the file, starting at startPtr,
689 // extending the file size if necessary. Caller must have Lock.
690 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
691         if startPtr.off > fn.fileinfo.size {
692                 if err = fn.truncate(startPtr.off); err != nil {
693                         return 0, startPtr, err
694                 }
695         }
696         ptr = fn.seek(startPtr)
697         if ptr.off < 0 {
698                 err = ErrNegativeOffset
699                 return
700         }
701         for len(p) > 0 && err == nil {
702                 cando := p
703                 if len(cando) > maxBlockSize {
704                         cando = cando[:maxBlockSize]
705                 }
706                 // Rearrange/grow fn.segments (and shrink cando if
707                 // needed) such that cando can be copied to
708                 // fn.segments[ptr.segmentIdx] at offset
709                 // ptr.segmentOff.
710                 cur := ptr.segmentIdx
711                 prev := ptr.segmentIdx - 1
712                 var curWritable bool
713                 if cur < len(fn.segments) {
714                         _, curWritable = fn.segments[cur].(*memSegment)
715                 }
716                 var prevAppendable bool
717                 if prev >= 0 && fn.segments[prev].Len() < maxBlockSize {
718                         _, prevAppendable = fn.segments[prev].(*memSegment)
719                 }
720                 if ptr.segmentOff > 0 && !curWritable {
721                         // Split a non-writable block.
722                         if max := fn.segments[cur].Len() - ptr.segmentOff; max <= len(cando) {
723                                 // Truncate cur, and insert a new
724                                 // segment after it.
725                                 cando = cando[:max]
726                                 fn.segments = append(fn.segments, nil)
727                                 copy(fn.segments[cur+1:], fn.segments[cur:])
728                         } else {
729                                 // Split cur into two copies, truncate
730                                 // the one on the left, shift the one
731                                 // on the right, and insert a new
732                                 // segment between them.
733                                 fn.segments = append(fn.segments, nil, nil)
734                                 copy(fn.segments[cur+2:], fn.segments[cur:])
735                                 fn.segments[cur+2] = fn.segments[cur+2].Slice(ptr.segmentOff+len(cando), -1)
736                         }
737                         cur++
738                         prev++
739                         seg := &memSegment{}
740                         seg.Truncate(len(cando))
741                         fn.memsize += int64(len(cando))
742                         fn.segments[cur] = seg
743                         fn.segments[prev] = fn.segments[prev].Slice(0, ptr.segmentOff)
744                         ptr.segmentIdx++
745                         ptr.segmentOff = 0
746                         fn.repacked++
747                         ptr.repacked++
748                 } else if curWritable {
749                         if fit := int(fn.segments[cur].Len()) - ptr.segmentOff; fit < len(cando) {
750                                 cando = cando[:fit]
751                         }
752                 } else {
753                         if prevAppendable {
754                                 // Shrink cando if needed to fit in
755                                 // prev segment.
756                                 if cangrow := maxBlockSize - fn.segments[prev].Len(); cangrow < len(cando) {
757                                         cando = cando[:cangrow]
758                                 }
759                         }
760
761                         if cur == len(fn.segments) {
762                                 // ptr is at EOF, filesize is changing.
763                                 fn.fileinfo.size += int64(len(cando))
764                         } else if el := fn.segments[cur].Len(); el <= len(cando) {
765                                 // cando is long enough that we won't
766                                 // need cur any more. shrink cando to
767                                 // be exactly as long as cur
768                                 // (otherwise we'd accidentally shift
769                                 // the effective position of all
770                                 // segments after cur).
771                                 cando = cando[:el]
772                                 copy(fn.segments[cur:], fn.segments[cur+1:])
773                                 fn.segments = fn.segments[:len(fn.segments)-1]
774                         } else {
775                                 // shrink cur by the same #bytes we're growing prev
776                                 fn.segments[cur] = fn.segments[cur].Slice(len(cando), -1)
777                         }
778
779                         if prevAppendable {
780                                 // Grow prev.
781                                 ptr.segmentIdx--
782                                 ptr.segmentOff = fn.segments[prev].Len()
783                                 fn.segments[prev].(*memSegment).Truncate(ptr.segmentOff + len(cando))
784                                 fn.memsize += int64(len(cando))
785                                 ptr.repacked++
786                                 fn.repacked++
787                         } else {
788                                 // Insert a segment between prev and
789                                 // cur, and advance prev/cur.
790                                 fn.segments = append(fn.segments, nil)
791                                 if cur < len(fn.segments) {
792                                         copy(fn.segments[cur+1:], fn.segments[cur:])
793                                         ptr.repacked++
794                                         fn.repacked++
795                                 } else {
796                                         // appending a new segment does
797                                         // not invalidate any ptrs
798                                 }
799                                 seg := &memSegment{}
800                                 seg.Truncate(len(cando))
801                                 fn.memsize += int64(len(cando))
802                                 fn.segments[cur] = seg
803                         }
804                 }
805
806                 // Finally we can copy bytes from cando to the current segment.
807                 fn.segments[ptr.segmentIdx].(*memSegment).WriteAt(cando, ptr.segmentOff)
808                 n += len(cando)
809                 p = p[len(cando):]
810
811                 ptr.off += int64(len(cando))
812                 ptr.segmentOff += len(cando)
813                 if ptr.segmentOff >= maxBlockSize {
814                         fn.pruneMemSegments()
815                 }
816                 if fn.segments[ptr.segmentIdx].Len() == ptr.segmentOff {
817                         ptr.segmentOff = 0
818                         ptr.segmentIdx++
819                 }
820
821                 fn.fileinfo.modTime = time.Now()
822         }
823         return
824 }
825
826 // Write some data out to disk to reduce memory use. Caller must have
827 // write lock.
828 func (fn *filenode) pruneMemSegments() {
829         // TODO: share code with (*dirnode)flush()
830         // TODO: pack/flush small blocks too, when fragmented
831         for idx, seg := range fn.segments {
832                 seg, ok := seg.(*memSegment)
833                 if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
834                         continue
835                 }
836                 // Setting seg.flushing guarantees seg.buf will not be
837                 // modified in place: WriteAt and Truncate will
838                 // allocate a new buf instead, if necessary.
839                 idx, buf := idx, seg.buf
840                 done := make(chan struct{})
841                 seg.flushing = done
842                 // If lots of background writes are already in
843                 // progress, block here until one finishes, rather
844                 // than pile up an unlimited number of buffered writes
845                 // and network flush operations.
846                 fn.fs.throttle().Acquire()
847                 go func() {
848                         defer close(done)
849                         resp, err := fn.FS().BlockWrite(context.Background(), BlockWriteOptions{
850                                 Data:           buf,
851                                 Replicas:       fn.fs.replicas,
852                                 StorageClasses: fn.fs.storageClasses,
853                         })
854                         fn.fs.throttle().Release()
855                         fn.Lock()
856                         defer fn.Unlock()
857                         if seg.flushing != done {
858                                 // A new seg.buf has been allocated.
859                                 return
860                         }
861                         if err != nil {
862                                 // TODO: stall (or return errors from)
863                                 // subsequent writes until flushing
864                                 // starts to succeed.
865                                 return
866                         }
867                         if len(fn.segments) <= idx || fn.segments[idx] != seg || len(seg.buf) != len(buf) {
868                                 // Segment has been dropped/moved/resized.
869                                 return
870                         }
871                         fn.memsize -= int64(len(buf))
872                         fn.segments[idx] = storedSegment{
873                                 kc:      fn.FS(),
874                                 locator: resp.Locator,
875                                 size:    len(buf),
876                                 offset:  0,
877                                 length:  len(buf),
878                         }
879                 }()
880         }
881 }
882
883 // Block until all pending pruneMemSegments/flush work is
884 // finished. Caller must NOT have lock.
885 func (fn *filenode) waitPrune() {
886         var pending []<-chan struct{}
887         fn.Lock()
888         for _, seg := range fn.segments {
889                 if seg, ok := seg.(*memSegment); ok && seg.flushing != nil {
890                         pending = append(pending, seg.flushing)
891                 }
892         }
893         fn.Unlock()
894         for _, p := range pending {
895                 <-p
896         }
897 }
898
899 func (fn *filenode) Snapshot() (inode, error) {
900         fn.RLock()
901         defer fn.RUnlock()
902         segments := make([]segment, 0, len(fn.segments))
903         for _, seg := range fn.segments {
904                 segments = append(segments, seg.Slice(0, seg.Len()))
905         }
906         return &filenode{
907                 fileinfo: fn.fileinfo,
908                 segments: segments,
909         }, nil
910 }
911
912 func (fn *filenode) Splice(repl inode) error {
913         repl, err := repl.Snapshot()
914         if err != nil {
915                 return err
916         }
917         fn.parent.Lock()
918         defer fn.parent.Unlock()
919         fn.Lock()
920         defer fn.Unlock()
921         _, err = fn.parent.Child(fn.fileinfo.name, func(inode) (inode, error) { return repl, nil })
922         if err != nil {
923                 return err
924         }
925         switch repl := repl.(type) {
926         case *dirnode:
927                 repl.parent = fn.parent
928                 repl.fileinfo.name = fn.fileinfo.name
929                 repl.setTreeFS(fn.fs)
930         case *filenode:
931                 repl.parent = fn.parent
932                 repl.fileinfo.name = fn.fileinfo.name
933                 repl.fs = fn.fs
934         default:
935                 return fmt.Errorf("cannot splice snapshot containing %T: %w", repl, ErrInvalidArgument)
936         }
937         return nil
938 }
939
940 type dirnode struct {
941         fs *collectionFileSystem
942         treenode
943 }
944
945 func (dn *dirnode) FS() FileSystem {
946         return dn.fs
947 }
948
949 func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
950         if dn == dn.fs.rootnode() && name == ".arvados#collection" {
951                 gn := &getternode{Getter: func() ([]byte, error) {
952                         var coll Collection
953                         var err error
954                         coll.ManifestText, err = dn.fs.MarshalManifest(".")
955                         if err != nil {
956                                 return nil, err
957                         }
958                         coll.UUID = dn.fs.uuid
959                         data, err := json.Marshal(&coll)
960                         if err == nil {
961                                 data = append(data, '\n')
962                         }
963                         return data, err
964                 }}
965                 gn.SetParent(dn, name)
966                 return gn, nil
967         }
968         return dn.treenode.Child(name, replace)
969 }
970
971 type fnSegmentRef struct {
972         fn  *filenode
973         idx int
974 }
975
976 // commitBlock concatenates the data from the given filenode segments
977 // (which must be *memSegments), writes the data out to Keep as a
978 // single block, and replaces the filenodes' *memSegments with
979 // storedSegments that reference the relevant portions of the new
980 // block.
981 //
982 // bufsize is the total data size in refs. It is used to preallocate
983 // the correct amount of memory when len(refs)>1.
984 //
985 // If sync is false, commitBlock returns right away, after starting a
986 // goroutine to do the writes, reacquire the filenodes' locks, and
987 // swap out the *memSegments. Some filenodes' segments might get
988 // modified/rearranged in the meantime, in which case commitBlock
989 // won't replace them.
990 //
991 // Caller must have write lock.
992 func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error {
993         if len(refs) == 0 {
994                 return nil
995         }
996         if err := ctx.Err(); err != nil {
997                 return err
998         }
999         done := make(chan struct{})
1000         var block []byte
1001         segs := make([]*memSegment, 0, len(refs))
1002         offsets := make([]int, 0, len(refs)) // location of segment's data within block
1003         for _, ref := range refs {
1004                 seg := ref.fn.segments[ref.idx].(*memSegment)
1005                 if !sync && seg.flushingUnfinished() {
1006                         // Let the other flushing goroutine finish. If
1007                         // it fails, we'll try again next time.
1008                         close(done)
1009                         return nil
1010                 }
1011                 // In sync mode, we proceed regardless of
1012                 // whether another flush is in progress: It
1013                 // can't finish before we do, because we hold
1014                 // fn's lock until we finish our own writes.
1015                 seg.flushing = done
1016                 offsets = append(offsets, len(block))
1017                 if len(refs) == 1 {
1018                         block = seg.buf
1019                 } else if block == nil {
1020                         block = append(make([]byte, 0, bufsize), seg.buf...)
1021                 } else {
1022                         block = append(block, seg.buf...)
1023                 }
1024                 segs = append(segs, seg)
1025         }
1026         blocksize := len(block)
1027         dn.fs.throttle().Acquire()
1028         errs := make(chan error, 1)
1029         go func() {
1030                 defer close(done)
1031                 defer close(errs)
1032                 resp, err := dn.fs.BlockWrite(context.Background(), BlockWriteOptions{
1033                         Data:           block,
1034                         Replicas:       dn.fs.replicas,
1035                         StorageClasses: dn.fs.storageClasses,
1036                 })
1037                 dn.fs.throttle().Release()
1038                 if err != nil {
1039                         errs <- err
1040                         return
1041                 }
1042                 for idx, ref := range refs {
1043                         if !sync {
1044                                 ref.fn.Lock()
1045                                 // In async mode, fn's lock was
1046                                 // released while we were waiting for
1047                                 // PutB(); lots of things might have
1048                                 // changed.
1049                                 if len(ref.fn.segments) <= ref.idx {
1050                                         // file segments have
1051                                         // rearranged or changed in
1052                                         // some way
1053                                         ref.fn.Unlock()
1054                                         continue
1055                                 } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
1056                                         // segment has been replaced
1057                                         ref.fn.Unlock()
1058                                         continue
1059                                 } else if seg.flushing != done {
1060                                         // seg.buf has been replaced
1061                                         ref.fn.Unlock()
1062                                         continue
1063                                 }
1064                         }
1065                         data := ref.fn.segments[ref.idx].(*memSegment).buf
1066                         ref.fn.segments[ref.idx] = storedSegment{
1067                                 kc:      dn.fs,
1068                                 locator: resp.Locator,
1069                                 size:    blocksize,
1070                                 offset:  offsets[idx],
1071                                 length:  len(data),
1072                         }
1073                         // atomic is needed here despite caller having
1074                         // lock: caller might be running concurrent
1075                         // commitBlock() goroutines using the same
1076                         // lock, writing different segments from the
1077                         // same file.
1078                         atomic.AddInt64(&ref.fn.memsize, -int64(len(data)))
1079                         if !sync {
1080                                 ref.fn.Unlock()
1081                         }
1082                 }
1083         }()
1084         if sync {
1085                 return <-errs
1086         }
1087         return nil
1088 }
1089
1090 type flushOpts struct {
1091         sync        bool
1092         shortBlocks bool
1093 }
1094
1095 // flush in-memory data and remote-cluster block references (for the
1096 // children with the given names, which must be children of dn) to
1097 // local-cluster persistent storage.
1098 //
1099 // Caller must have write lock on dn and the named children.
1100 //
1101 // If any children are dirs, they will be flushed recursively.
1102 func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) error {
1103         cg := newContextGroup(ctx)
1104         defer cg.Cancel()
1105
1106         goCommit := func(refs []fnSegmentRef, bufsize int) {
1107                 cg.Go(func() error {
1108                         return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync)
1109                 })
1110         }
1111
1112         var pending []fnSegmentRef
1113         var pendingLen int = 0
1114         localLocator := map[string]string{}
1115         for _, name := range names {
1116                 switch node := dn.inodes[name].(type) {
1117                 case *dirnode:
1118                         grandchildNames := node.sortedNames()
1119                         for _, grandchildName := range grandchildNames {
1120                                 grandchild := node.inodes[grandchildName]
1121                                 grandchild.Lock()
1122                                 defer grandchild.Unlock()
1123                         }
1124                         cg.Go(func() error { return node.flush(cg.Context(), grandchildNames, opts) })
1125                 case *filenode:
1126                         for idx, seg := range node.segments {
1127                                 switch seg := seg.(type) {
1128                                 case storedSegment:
1129                                         loc, ok := localLocator[seg.locator]
1130                                         if !ok {
1131                                                 var err error
1132                                                 loc, err = dn.fs.LocalLocator(seg.locator)
1133                                                 if err != nil {
1134                                                         return err
1135                                                 }
1136                                                 localLocator[seg.locator] = loc
1137                                         }
1138                                         seg.locator = loc
1139                                         node.segments[idx] = seg
1140                                 case *memSegment:
1141                                         if seg.Len() > maxBlockSize/2 {
1142                                                 goCommit([]fnSegmentRef{{node, idx}}, seg.Len())
1143                                                 continue
1144                                         }
1145                                         if pendingLen+seg.Len() > maxBlockSize {
1146                                                 goCommit(pending, pendingLen)
1147                                                 pending = nil
1148                                                 pendingLen = 0
1149                                         }
1150                                         pending = append(pending, fnSegmentRef{node, idx})
1151                                         pendingLen += seg.Len()
1152                                 default:
1153                                         panic(fmt.Sprintf("can't sync segment type %T", seg))
1154                                 }
1155                         }
1156                 }
1157         }
1158         if opts.shortBlocks {
1159                 goCommit(pending, pendingLen)
1160         }
1161         return cg.Wait()
1162 }
1163
1164 func (dn *dirnode) MemorySize() (size int64) {
1165         dn.RLock()
1166         todo := make([]inode, 0, len(dn.inodes))
1167         for _, node := range dn.inodes {
1168                 todo = append(todo, node)
1169         }
1170         dn.RUnlock()
1171         size = 64
1172         for _, node := range todo {
1173                 size += node.MemorySize()
1174         }
1175         return
1176 }
1177
1178 // caller must have write lock.
1179 func (dn *dirnode) sortedNames() []string {
1180         names := make([]string, 0, len(dn.inodes))
1181         for name := range dn.inodes {
1182                 names = append(names, name)
1183         }
1184         sort.Strings(names)
1185         return names
1186 }
1187
1188 // caller must have write lock.
1189 func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) {
1190         cg := newContextGroup(ctx)
1191         defer cg.Cancel()
1192
1193         if len(dn.inodes) == 0 {
1194                 if prefix == "." {
1195                         return "", nil
1196                 }
1197                 // Express the existence of an empty directory by
1198                 // adding an empty file named `\056`, which (unlike
1199                 // the more obvious spelling `.`) is accepted by the
1200                 // API's manifest validator.
1201                 return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
1202         }
1203
1204         names := dn.sortedNames()
1205
1206         // Wait for children to finish any pending write operations
1207         // before locking them.
1208         for _, name := range names {
1209                 node := dn.inodes[name]
1210                 if fn, ok := node.(*filenode); ok {
1211                         fn.waitPrune()
1212                 }
1213         }
1214
1215         var dirnames []string
1216         var filenames []string
1217         for _, name := range names {
1218                 node := dn.inodes[name]
1219                 node.Lock()
1220                 defer node.Unlock()
1221                 switch node := node.(type) {
1222                 case *dirnode:
1223                         dirnames = append(dirnames, name)
1224                 case *filenode:
1225                         filenames = append(filenames, name)
1226                 default:
1227                         panic(fmt.Sprintf("can't marshal inode type %T", node))
1228                 }
1229         }
1230
1231         subdirs := make([]string, len(dirnames))
1232         rootdir := ""
1233         for i, name := range dirnames {
1234                 i, name := i, name
1235                 cg.Go(func() error {
1236                         txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name)
1237                         subdirs[i] = txt
1238                         return err
1239                 })
1240         }
1241
1242         cg.Go(func() error {
1243                 var streamLen int64
1244                 type filepart struct {
1245                         name   string
1246                         offset int64
1247                         length int64
1248                 }
1249
1250                 var fileparts []filepart
1251                 var blocks []string
1252                 if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
1253                         return err
1254                 }
1255                 for _, name := range filenames {
1256                         node := dn.inodes[name].(*filenode)
1257                         if len(node.segments) == 0 {
1258                                 fileparts = append(fileparts, filepart{name: name})
1259                                 continue
1260                         }
1261                         for _, seg := range node.segments {
1262                                 switch seg := seg.(type) {
1263                                 case storedSegment:
1264                                         if len(blocks) > 0 && blocks[len(blocks)-1] == seg.locator {
1265                                                 streamLen -= int64(seg.size)
1266                                         } else {
1267                                                 blocks = append(blocks, seg.locator)
1268                                         }
1269                                         next := filepart{
1270                                                 name:   name,
1271                                                 offset: streamLen + int64(seg.offset),
1272                                                 length: int64(seg.length),
1273                                         }
1274                                         if prev := len(fileparts) - 1; prev >= 0 &&
1275                                                 fileparts[prev].name == name &&
1276                                                 fileparts[prev].offset+fileparts[prev].length == next.offset {
1277                                                 fileparts[prev].length += next.length
1278                                         } else {
1279                                                 fileparts = append(fileparts, next)
1280                                         }
1281                                         streamLen += int64(seg.size)
1282                                 default:
1283                                         // This can't happen: we
1284                                         // haven't unlocked since
1285                                         // calling flush(sync=true).
1286                                         panic(fmt.Sprintf("can't marshal segment type %T", seg))
1287                                 }
1288                         }
1289                 }
1290                 var filetokens []string
1291                 for _, s := range fileparts {
1292                         filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
1293                 }
1294                 if len(filetokens) == 0 {
1295                         return nil
1296                 } else if len(blocks) == 0 {
1297                         blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
1298                 }
1299                 rootdir = manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n"
1300                 return nil
1301         })
1302         err := cg.Wait()
1303         return rootdir + strings.Join(subdirs, ""), err
1304 }
1305
1306 func (dn *dirnode) loadManifest(txt string) error {
1307         streams := bytes.Split([]byte(txt), []byte{'\n'})
1308         if len(streams[len(streams)-1]) != 0 {
1309                 return fmt.Errorf("line %d: no trailing newline", len(streams))
1310         }
1311         streams = streams[:len(streams)-1]
1312         segments := []storedSegment{}
1313         // To reduce allocs, we reuse a single "pathparts" slice
1314         // (pre-split on "/" separators) for the duration of this
1315         // func.
1316         var pathparts []string
1317         // To reduce allocs, we reuse a single "toks" slice of 3 byte
1318         // slices.
1319         var toks = make([][]byte, 3)
1320         // Similar to bytes.SplitN(token, []byte{c}, 3), but splits
1321         // into the toks slice rather than allocating a new one, and
1322         // returns the number of toks (1, 2, or 3).
1323         splitToToks := func(src []byte, c rune) int {
1324                 c1 := bytes.IndexRune(src, c)
1325                 if c1 < 0 {
1326                         toks[0] = src
1327                         return 1
1328                 }
1329                 toks[0], src = src[:c1], src[c1+1:]
1330                 c2 := bytes.IndexRune(src, c)
1331                 if c2 < 0 {
1332                         toks[1] = src
1333                         return 2
1334                 }
1335                 toks[1], toks[2] = src[:c2], src[c2+1:]
1336                 return 3
1337         }
1338         for i, stream := range streams {
1339                 lineno := i + 1
1340                 var anyFileTokens bool
1341                 var pos int64
1342                 var segIdx int
1343                 segments = segments[:0]
1344                 pathparts = nil
1345                 streamparts := 0
1346                 for i, token := range bytes.Split(stream, []byte{' '}) {
1347                         if i == 0 {
1348                                 pathparts = strings.Split(manifestUnescape(string(token)), "/")
1349                                 streamparts = len(pathparts)
1350                                 continue
1351                         }
1352                         if !bytes.ContainsRune(token, ':') {
1353                                 if anyFileTokens {
1354                                         return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1355                                 }
1356                                 if splitToToks(token, '+') < 2 {
1357                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
1358                                 }
1359                                 length, err := strconv.ParseInt(string(toks[1]), 10, 32)
1360                                 if err != nil || length < 0 {
1361                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
1362                                 }
1363                                 segments = append(segments, storedSegment{
1364                                         locator: string(token),
1365                                         size:    int(length),
1366                                         offset:  0,
1367                                         length:  int(length),
1368                                 })
1369                                 continue
1370                         } else if len(segments) == 0 {
1371                                 return fmt.Errorf("line %d: bad locator %q", lineno, token)
1372                         }
1373                         if splitToToks(token, ':') != 3 {
1374                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1375                         }
1376                         anyFileTokens = true
1377
1378                         offset, err := strconv.ParseInt(string(toks[0]), 10, 64)
1379                         if err != nil || offset < 0 {
1380                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1381                         }
1382                         length, err := strconv.ParseInt(string(toks[1]), 10, 64)
1383                         if err != nil || length < 0 {
1384                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1385                         }
1386                         if !bytes.ContainsAny(toks[2], `\/`) {
1387                                 // optimization for a common case
1388                                 pathparts = append(pathparts[:streamparts], string(toks[2]))
1389                         } else {
1390                                 pathparts = append(pathparts[:streamparts], strings.Split(manifestUnescape(string(toks[2])), "/")...)
1391                         }
1392                         fnode, err := dn.createFileAndParents(pathparts)
1393                         if fnode == nil && err == nil && length == 0 {
1394                                 // Special case: an empty file used as
1395                                 // a marker to preserve an otherwise
1396                                 // empty directory in a manifest.
1397                                 continue
1398                         }
1399                         if err != nil || (fnode == nil && length != 0) {
1400                                 return fmt.Errorf("line %d: cannot use name %q with length %d: %s", lineno, toks[2], length, err)
1401                         }
1402                         // Map the stream offset/range coordinates to
1403                         // block/offset/range coordinates and add
1404                         // corresponding storedSegments to the filenode
1405                         if pos > offset {
1406                                 // Can't continue where we left off.
1407                                 // TODO: binary search instead of
1408                                 // rewinding all the way (but this
1409                                 // situation might be rare anyway)
1410                                 segIdx, pos = 0, 0
1411                         }
1412                         for ; segIdx < len(segments); segIdx++ {
1413                                 seg := segments[segIdx]
1414                                 next := pos + int64(seg.Len())
1415                                 if next <= offset || seg.Len() == 0 {
1416                                         pos = next
1417                                         continue
1418                                 }
1419                                 if pos >= offset+length {
1420                                         break
1421                                 }
1422                                 var blkOff int
1423                                 if pos < offset {
1424                                         blkOff = int(offset - pos)
1425                                 }
1426                                 blkLen := seg.Len() - blkOff
1427                                 if pos+int64(blkOff+blkLen) > offset+length {
1428                                         blkLen = int(offset + length - pos - int64(blkOff))
1429                                 }
1430                                 fnode.appendSegment(storedSegment{
1431                                         kc:      dn.fs,
1432                                         locator: seg.locator,
1433                                         size:    seg.size,
1434                                         offset:  blkOff,
1435                                         length:  blkLen,
1436                                 })
1437                                 if next > offset+length {
1438                                         break
1439                                 } else {
1440                                         pos = next
1441                                 }
1442                         }
1443                         if segIdx == len(segments) && pos < offset+length {
1444                                 return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
1445                         }
1446                 }
1447                 if !anyFileTokens {
1448                         return fmt.Errorf("line %d: no file segments", lineno)
1449                 } else if len(segments) == 0 {
1450                         return fmt.Errorf("line %d: no locators", lineno)
1451                 } else if streamparts == 0 {
1452                         return fmt.Errorf("line %d: no stream name", lineno)
1453                 }
1454         }
1455         return nil
1456 }
1457
1458 // only safe to call from loadManifest -- no locking.
1459 //
1460 // If path is a "parent directory exists" marker (the last path
1461 // component is "."), the returned values are both nil.
1462 //
1463 // Newly added nodes have modtime==0. Caller is responsible for fixing
1464 // them with backdateTree.
1465 func (dn *dirnode) createFileAndParents(names []string) (fn *filenode, err error) {
1466         var node inode = dn
1467         basename := names[len(names)-1]
1468         for _, name := range names[:len(names)-1] {
1469                 switch name {
1470                 case "", ".":
1471                         continue
1472                 case "..":
1473                         if node == dn {
1474                                 // can't be sure parent will be a *dirnode
1475                                 return nil, ErrInvalidArgument
1476                         }
1477                         node = node.Parent()
1478                         continue
1479                 }
1480                 node.Lock()
1481                 unlock := node.Unlock
1482                 node, err = node.Child(name, func(child inode) (inode, error) {
1483                         if child == nil {
1484                                 // note modtime will be fixed later in backdateTree()
1485                                 child, err := node.FS().newNode(name, 0755|os.ModeDir, time.Time{})
1486                                 if err != nil {
1487                                         return nil, err
1488                                 }
1489                                 child.SetParent(node, name)
1490                                 return child, nil
1491                         } else if !child.IsDir() {
1492                                 return child, ErrFileExists
1493                         } else {
1494                                 return child, nil
1495                         }
1496                 })
1497                 unlock()
1498                 if err != nil {
1499                         return
1500                 }
1501         }
1502         if basename == "." {
1503                 return
1504         } else if !permittedName(basename) {
1505                 err = fmt.Errorf("invalid file part %q in path %q", basename, names)
1506                 return
1507         }
1508         node.Lock()
1509         defer node.Unlock()
1510         _, err = node.Child(basename, func(child inode) (inode, error) {
1511                 switch child := child.(type) {
1512                 case nil:
1513                         child, err = node.FS().newNode(basename, 0755, time.Time{})
1514                         if err != nil {
1515                                 return nil, err
1516                         }
1517                         child.SetParent(node, basename)
1518                         fn = child.(*filenode)
1519                         return child, nil
1520                 case *filenode:
1521                         fn = child
1522                         return child, nil
1523                 case *dirnode:
1524                         return child, ErrIsDirectory
1525                 default:
1526                         return child, ErrInvalidArgument
1527                 }
1528         })
1529         return
1530 }
1531
1532 func (dn *dirnode) TreeSize() (bytes int64) {
1533         dn.RLock()
1534         defer dn.RUnlock()
1535         for _, i := range dn.inodes {
1536                 switch i := i.(type) {
1537                 case *filenode:
1538                         bytes += i.Size()
1539                 case *dirnode:
1540                         bytes += i.TreeSize()
1541                 }
1542         }
1543         return
1544 }
1545
1546 func (dn *dirnode) Snapshot() (inode, error) {
1547         return dn.snapshot()
1548 }
1549
1550 func (dn *dirnode) snapshot() (*dirnode, error) {
1551         dn.RLock()
1552         defer dn.RUnlock()
1553         snap := &dirnode{
1554                 treenode: treenode{
1555                         inodes:   make(map[string]inode, len(dn.inodes)),
1556                         fileinfo: dn.fileinfo,
1557                 },
1558         }
1559         for name, child := range dn.inodes {
1560                 dupchild, err := child.Snapshot()
1561                 if err != nil {
1562                         return nil, err
1563                 }
1564                 snap.inodes[name] = dupchild
1565                 dupchild.SetParent(snap, name)
1566         }
1567         return snap, nil
1568 }
1569
1570 func (dn *dirnode) Splice(repl inode) error {
1571         repl, err := repl.Snapshot()
1572         if err != nil {
1573                 return fmt.Errorf("cannot copy snapshot: %w", err)
1574         }
1575         switch repl := repl.(type) {
1576         default:
1577                 return fmt.Errorf("cannot splice snapshot containing %T: %w", repl, ErrInvalidArgument)
1578         case *dirnode:
1579                 dn.Lock()
1580                 defer dn.Unlock()
1581                 dn.inodes = repl.inodes
1582                 dn.setTreeFS(dn.fs)
1583         case *filenode:
1584                 dn.parent.Lock()
1585                 defer dn.parent.Unlock()
1586                 removing, err := dn.parent.Child(dn.fileinfo.name, nil)
1587                 if err != nil {
1588                         return fmt.Errorf("cannot use Splice to replace a top-level directory with a file: %w", ErrInvalidOperation)
1589                 } else if removing != dn {
1590                         // If ../thisdirname is not this dirnode, it
1591                         // must be an inode that wraps a dirnode, like
1592                         // a collectionFileSystem or deferrednode.
1593                         if deferred, ok := removing.(*deferrednode); ok {
1594                                 // More useful to report the type of
1595                                 // the wrapped node rather than just
1596                                 // *deferrednode. (We know the real
1597                                 // inode is already loaded because dn
1598                                 // is inside it.)
1599                                 removing = deferred.realinode()
1600                         }
1601                         return fmt.Errorf("cannot use Splice to attach a file at top level of %T: %w", removing, ErrInvalidOperation)
1602                 }
1603                 dn.Lock()
1604                 defer dn.Unlock()
1605                 _, err = dn.parent.Child(dn.fileinfo.name, func(inode) (inode, error) { return repl, nil })
1606                 if err != nil {
1607                         return fmt.Errorf("error replacing filenode: dn.parent.Child(): %w", err)
1608                 }
1609                 repl.fs = dn.fs
1610         }
1611         return nil
1612 }
1613
1614 func (dn *dirnode) setTreeFS(fs *collectionFileSystem) {
1615         dn.fs = fs
1616         for _, child := range dn.inodes {
1617                 switch child := child.(type) {
1618                 case *dirnode:
1619                         child.setTreeFS(fs)
1620                 case *filenode:
1621                         child.fs = fs
1622                 }
1623         }
1624 }
1625
1626 type segment interface {
1627         io.ReaderAt
1628         Len() int
1629         // Return a new segment with a subsection of the data from this
1630         // one. length<0 means length=Len()-off.
1631         Slice(off int, length int) segment
1632 }
1633
1634 type memSegment struct {
1635         buf []byte
1636         // If flushing is not nil and not ready/closed, then a) buf is
1637         // being shared by a pruneMemSegments goroutine, and must be
1638         // copied on write; and b) the flushing channel will close
1639         // when the goroutine finishes, whether it succeeds or not.
1640         flushing <-chan struct{}
1641 }
1642
1643 func (me *memSegment) flushingUnfinished() bool {
1644         if me.flushing == nil {
1645                 return false
1646         }
1647         select {
1648         case <-me.flushing:
1649                 me.flushing = nil
1650                 return false
1651         default:
1652                 return true
1653         }
1654 }
1655
1656 func (me *memSegment) Len() int {
1657         return len(me.buf)
1658 }
1659
1660 func (me *memSegment) Slice(off, length int) segment {
1661         if length < 0 {
1662                 length = len(me.buf) - off
1663         }
1664         buf := make([]byte, length)
1665         copy(buf, me.buf[off:])
1666         return &memSegment{buf: buf}
1667 }
1668
1669 func (me *memSegment) Truncate(n int) {
1670         if n > cap(me.buf) || (me.flushing != nil && n > len(me.buf)) {
1671                 newsize := 1024
1672                 for newsize < n {
1673                         newsize = newsize << 2
1674                 }
1675                 newbuf := make([]byte, n, newsize)
1676                 copy(newbuf, me.buf)
1677                 me.buf, me.flushing = newbuf, nil
1678         } else {
1679                 // reclaim existing capacity, and zero reclaimed part
1680                 oldlen := len(me.buf)
1681                 me.buf = me.buf[:n]
1682                 for i := oldlen; i < n; i++ {
1683                         me.buf[i] = 0
1684                 }
1685         }
1686 }
1687
1688 func (me *memSegment) WriteAt(p []byte, off int) {
1689         if off+len(p) > len(me.buf) {
1690                 panic("overflowed segment")
1691         }
1692         if me.flushing != nil {
1693                 me.buf, me.flushing = append([]byte(nil), me.buf...), nil
1694         }
1695         copy(me.buf[off:], p)
1696 }
1697
1698 func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
1699         if off > int64(me.Len()) {
1700                 err = io.EOF
1701                 return
1702         }
1703         n = copy(p, me.buf[int(off):])
1704         if n < len(p) {
1705                 err = io.EOF
1706         }
1707         return
1708 }
1709
1710 type storedSegment struct {
1711         kc      fsBackend
1712         locator string
1713         size    int // size of stored block (also encoded in locator)
1714         offset  int // position of segment within the stored block
1715         length  int // bytes in this segment (offset + length <= size)
1716 }
1717
1718 func (se storedSegment) Len() int {
1719         return se.length
1720 }
1721
1722 func (se storedSegment) Slice(n, size int) segment {
1723         se.offset += n
1724         se.length -= n
1725         if size >= 0 && se.length > size {
1726                 se.length = size
1727         }
1728         return se
1729 }
1730
1731 func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
1732         if off > int64(se.length) {
1733                 return 0, io.EOF
1734         }
1735         maxlen := se.length - int(off)
1736         if len(p) > maxlen {
1737                 p = p[:maxlen]
1738                 n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1739                 if err == nil {
1740                         err = io.EOF
1741                 }
1742                 return
1743         }
1744         return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1745 }
1746
1747 func canonicalName(name string) string {
1748         name = path.Clean("/" + name)
1749         if name == "/" || name == "./" {
1750                 name = "."
1751         } else if strings.HasPrefix(name, "/") {
1752                 name = "." + name
1753         }
1754         return name
1755 }
1756
1757 var manifestEscapeSeq = regexp.MustCompile(`\\([0-7]{3}|\\)`)
1758
1759 func manifestUnescapeFunc(seq string) string {
1760         if seq == `\\` {
1761                 return `\`
1762         }
1763         i, err := strconv.ParseUint(seq[1:], 8, 8)
1764         if err != nil {
1765                 // Invalid escape sequence: can't unescape.
1766                 return seq
1767         }
1768         return string([]byte{byte(i)})
1769 }
1770
1771 func manifestUnescape(s string) string {
1772         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
1773 }
1774
1775 var manifestEscapedChar = regexp.MustCompile(`[\000-\040:\s\\]`)
1776
1777 func manifestEscapeFunc(seq string) string {
1778         return fmt.Sprintf("\\%03o", byte(seq[0]))
1779 }
1780
1781 func manifestEscape(s string) string {
1782         return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
1783 }