19362: Move non-S3 collection-addressed reqs to sitefs code path.
[arvados.git] / sdk / go / arvados / fs_collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "fmt"
12         "io"
13         "os"
14         "path"
15         "regexp"
16         "sort"
17         "strconv"
18         "strings"
19         "sync"
20         "sync/atomic"
21         "time"
22 )
23
24 var (
25         maxBlockSize      = 1 << 26
26         concurrentWriters = 4 // max goroutines writing to Keep in background and during flush()
27 )
28
29 // A CollectionFileSystem is a FileSystem that can be serialized as a
30 // manifest and stored as a collection.
31 type CollectionFileSystem interface {
32         FileSystem
33
34         // Flush all file data to Keep and return a snapshot of the
35         // filesystem suitable for saving as (Collection)ManifestText.
36         // Prefix (normally ".") is a top level directory, effectively
37         // prepended to all paths in the returned manifest.
38         MarshalManifest(prefix string) (string, error)
39
40         // Total data bytes in all files.
41         Size() int64
42 }
43
44 type collectionFileSystem struct {
45         fileSystem
46         uuid           string
47         savedPDH       atomic.Value
48         replicas       int
49         storageClasses []string
50         // guessSignatureTTL tracks a lower bound for the server's
51         // configured BlobSigningTTL. The guess is initially zero, and
52         // increases when we come across a signature with an expiry
53         // time further in the future than the previous guess.
54         //
55         // When the guessed TTL is much smaller than the real TTL,
56         // preemptive signature refresh is delayed or missed entirely,
57         // which is OK.
58         guessSignatureTTL time.Duration
59         holdCheckChanges  time.Time
60         lockCheckChanges  sync.Mutex
61 }
62
63 // FileSystem returns a CollectionFileSystem for the collection.
64 func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) {
65         modTime := c.ModifiedAt
66         if modTime.IsZero() {
67                 modTime = time.Now()
68         }
69         fs := &collectionFileSystem{
70                 uuid:           c.UUID,
71                 storageClasses: c.StorageClassesDesired,
72                 fileSystem: fileSystem{
73                         fsBackend: keepBackend{apiClient: client, keepClient: kc},
74                         thr:       newThrottle(concurrentWriters),
75                 },
76         }
77         fs.savedPDH.Store(c.PortableDataHash)
78         if r := c.ReplicationDesired; r != nil {
79                 fs.replicas = *r
80         }
81         root := &dirnode{
82                 fs: fs,
83                 treenode: treenode{
84                         fileinfo: fileinfo{
85                                 name:    ".",
86                                 mode:    os.ModeDir | 0755,
87                                 modTime: modTime,
88                                 sys:     func() interface{} { return c },
89                         },
90                         inodes: make(map[string]inode),
91                 },
92         }
93         root.SetParent(root, ".")
94         if err := root.loadManifest(c.ManifestText); err != nil {
95                 return nil, err
96         }
97         backdateTree(root, modTime)
98         fs.root = root
99         return fs, nil
100 }
101
102 // caller must have lock (or guarantee no concurrent accesses somehow)
103 func eachNode(n inode, ffunc func(*filenode), dfunc func(*dirnode)) {
104         switch n := n.(type) {
105         case *filenode:
106                 if ffunc != nil {
107                         ffunc(n)
108                 }
109         case *dirnode:
110                 if dfunc != nil {
111                         dfunc(n)
112                 }
113                 for _, n := range n.inodes {
114                         eachNode(n, ffunc, dfunc)
115                 }
116         }
117 }
118
119 // caller must have lock (or guarantee no concurrent accesses somehow)
120 func backdateTree(n inode, modTime time.Time) {
121         eachNode(n, func(fn *filenode) {
122                 fn.fileinfo.modTime = modTime
123         }, func(dn *dirnode) {
124                 dn.fileinfo.modTime = modTime
125         })
126 }
127
128 // Approximate portion of signature TTL remaining, usually between 0
129 // and 1, or negative if some signatures have expired.
130 func (fs *collectionFileSystem) signatureTimeLeft() (float64, time.Duration) {
131         var (
132                 now      = time.Now()
133                 earliest = now.Add(time.Hour * 24 * 7 * 365)
134                 latest   time.Time
135         )
136         fs.fileSystem.root.RLock()
137         eachNode(fs.root, func(fn *filenode) {
138                 fn.Lock()
139                 defer fn.Unlock()
140                 for _, seg := range fn.segments {
141                         seg, ok := seg.(storedSegment)
142                         if !ok {
143                                 continue
144                         }
145                         expiryTime, err := signatureExpiryTime(seg.locator)
146                         if err != nil {
147                                 continue
148                         }
149                         if expiryTime.Before(earliest) {
150                                 earliest = expiryTime
151                         }
152                         if expiryTime.After(latest) {
153                                 latest = expiryTime
154                         }
155                 }
156         }, nil)
157         fs.fileSystem.root.RUnlock()
158
159         if latest.IsZero() {
160                 // No signatures == 100% of TTL remaining.
161                 return 1, 1
162         }
163
164         ttl := latest.Sub(now)
165         fs.fileSystem.root.Lock()
166         {
167                 if ttl > fs.guessSignatureTTL {
168                         // ttl is closer to the real TTL than
169                         // guessSignatureTTL.
170                         fs.guessSignatureTTL = ttl
171                 } else {
172                         // Use the previous best guess to compute the
173                         // portion remaining (below, after unlocking
174                         // mutex).
175                         ttl = fs.guessSignatureTTL
176                 }
177         }
178         fs.fileSystem.root.Unlock()
179
180         return earliest.Sub(now).Seconds() / ttl.Seconds(), ttl
181 }
182
183 func (fs *collectionFileSystem) updateSignatures(newmanifest string) {
184         newLoc := map[string]string{}
185         for _, tok := range regexp.MustCompile(`\S+`).FindAllString(newmanifest, -1) {
186                 if mBlkRe.MatchString(tok) {
187                         newLoc[stripAllHints(tok)] = tok
188                 }
189         }
190         fs.fileSystem.root.Lock()
191         defer fs.fileSystem.root.Unlock()
192         eachNode(fs.root, func(fn *filenode) {
193                 fn.Lock()
194                 defer fn.Unlock()
195                 for idx, seg := range fn.segments {
196                         seg, ok := seg.(storedSegment)
197                         if !ok {
198                                 continue
199                         }
200                         loc, ok := newLoc[stripAllHints(seg.locator)]
201                         if !ok {
202                                 continue
203                         }
204                         seg.locator = loc
205                         fn.segments[idx] = seg
206                 }
207         }, nil)
208 }
209
210 func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
211         if name == "" || name == "." || name == ".." {
212                 return nil, ErrInvalidArgument
213         }
214         if perm.IsDir() {
215                 return &dirnode{
216                         fs: fs,
217                         treenode: treenode{
218                                 fileinfo: fileinfo{
219                                         name:    name,
220                                         mode:    perm | os.ModeDir,
221                                         modTime: modTime,
222                                 },
223                                 inodes: make(map[string]inode),
224                         },
225                 }, nil
226         }
227         return &filenode{
228                 fs: fs,
229                 fileinfo: fileinfo{
230                         name:    name,
231                         mode:    perm & ^os.ModeDir,
232                         modTime: modTime,
233                 },
234         }, nil
235 }
236
237 func (fs *collectionFileSystem) Child(name string, replace func(inode) (inode, error)) (inode, error) {
238         return fs.rootnode().Child(name, replace)
239 }
240
241 func (fs *collectionFileSystem) FS() FileSystem {
242         return fs
243 }
244
245 func (fs *collectionFileSystem) FileInfo() os.FileInfo {
246         return fs.rootnode().FileInfo()
247 }
248
249 func (fs *collectionFileSystem) IsDir() bool {
250         return true
251 }
252
253 func (fs *collectionFileSystem) Lock() {
254         fs.rootnode().Lock()
255 }
256
257 func (fs *collectionFileSystem) Unlock() {
258         fs.rootnode().Unlock()
259 }
260
261 func (fs *collectionFileSystem) RLock() {
262         fs.rootnode().RLock()
263 }
264
265 func (fs *collectionFileSystem) RUnlock() {
266         fs.rootnode().RUnlock()
267 }
268
269 func (fs *collectionFileSystem) Parent() inode {
270         return fs.rootnode().Parent()
271 }
272
273 func (fs *collectionFileSystem) Read(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
274         return 0, ptr, ErrInvalidOperation
275 }
276
277 func (fs *collectionFileSystem) Write(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
278         return 0, ptr, ErrInvalidOperation
279 }
280
281 func (fs *collectionFileSystem) Readdir() ([]os.FileInfo, error) {
282         return fs.rootnode().Readdir()
283 }
284
285 func (fs *collectionFileSystem) SetParent(parent inode, name string) {
286         fs.rootnode().SetParent(parent, name)
287 }
288
289 func (fs *collectionFileSystem) Truncate(int64) error {
290         return ErrInvalidOperation
291 }
292
293 // Check for and incorporate upstream changes. If force==false, this
294 // is a no-op except once every ttl/100 or so.
295 //
296 // Return value is true if new content was loaded from upstream and
297 // any unsaved local changes have been discarded.
298 func (fs *collectionFileSystem) checkChangesOnServer(force bool) (bool, error) {
299         if fs.uuid == "" && fs.savedPDH.Load() == "" {
300                 return false, nil
301         }
302
303         fs.lockCheckChanges.Lock()
304         if !force && fs.holdCheckChanges.After(time.Now()) {
305                 fs.lockCheckChanges.Unlock()
306                 return false, nil
307         }
308         remain, ttl := fs.signatureTimeLeft()
309         if remain > 0.01 {
310                 fs.holdCheckChanges = time.Now().Add(ttl / 100)
311         }
312         fs.lockCheckChanges.Unlock()
313
314         if !force && remain >= 0.5 {
315                 // plenty of time left on current signatures
316                 return false, nil
317         }
318
319         getparams := map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}}
320         if fs.uuid != "" {
321                 var coll Collection
322                 err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+fs.uuid, nil, getparams)
323                 if err != nil {
324                         return false, err
325                 }
326                 if coll.PortableDataHash != fs.savedPDH.Load().(string) {
327                         // collection has changed upstream since we
328                         // last loaded or saved. Refresh local data,
329                         // losing any unsaved local changes.
330                         newfs, err := coll.FileSystem(fs.fileSystem.fsBackend, fs.fileSystem.fsBackend)
331                         if err != nil {
332                                 return false, err
333                         }
334                         snap, err := Snapshot(newfs, "/")
335                         if err != nil {
336                                 return false, err
337                         }
338                         err = Splice(fs, "/", snap)
339                         if err != nil {
340                                 return false, err
341                         }
342                         fs.savedPDH.Store(coll.PortableDataHash)
343                         return true, nil
344                 }
345                 fs.updateSignatures(coll.ManifestText)
346                 return false, nil
347         }
348         if pdh := fs.savedPDH.Load().(string); pdh != "" {
349                 var coll Collection
350                 err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, getparams)
351                 if err != nil {
352                         return false, err
353                 }
354                 fs.updateSignatures(coll.ManifestText)
355         }
356         return false, nil
357 }
358
359 // Refresh signature on a single locator, if necessary. Assume caller
360 // has lock. If an update is needed, and there are any storedSegments
361 // whose signatures can be updated, start a background task to update
362 // them asynchronously when the caller releases locks.
363 func (fs *collectionFileSystem) refreshSignature(locator string) string {
364         exp, err := signatureExpiryTime(locator)
365         if err != nil || exp.Sub(time.Now()) > time.Minute {
366                 // Synchronous update is not needed. Start an
367                 // asynchronous update if needed.
368                 go fs.checkChangesOnServer(false)
369                 return locator
370         }
371         var manifests string
372         for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
373                 if id == "" {
374                         continue
375                 }
376                 var coll Collection
377                 err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
378                 if err != nil {
379                         continue
380                 }
381                 manifests += coll.ManifestText
382         }
383         hash := stripAllHints(locator)
384         for _, tok := range regexp.MustCompile(`\S+`).FindAllString(manifests, -1) {
385                 if mBlkRe.MatchString(tok) {
386                         if stripAllHints(tok) == hash {
387                                 locator = tok
388                                 break
389                         }
390                 }
391         }
392         go fs.updateSignatures(manifests)
393         return locator
394 }
395
396 func (fs *collectionFileSystem) Sync() error {
397         refreshed, err := fs.checkChangesOnServer(true)
398         if err != nil {
399                 return err
400         }
401         if refreshed || fs.uuid == "" {
402                 return nil
403         }
404         txt, err := fs.MarshalManifest(".")
405         if err != nil {
406                 return fmt.Errorf("sync failed: %s", err)
407         }
408         if PortableDataHash(txt) == fs.savedPDH.Load() {
409                 // No local changes since last save or initial load.
410                 return nil
411         }
412         coll := Collection{
413                 UUID:         fs.uuid,
414                 ManifestText: txt,
415         }
416
417         selectFields := []string{"uuid", "portable_data_hash"}
418         fs.lockCheckChanges.Lock()
419         remain, _ := fs.signatureTimeLeft()
420         fs.lockCheckChanges.Unlock()
421         if remain < 0.5 {
422                 selectFields = append(selectFields, "manifest_text")
423         }
424
425         err = fs.RequestAndDecode(&coll, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
426                 "collection": map[string]string{
427                         "manifest_text": coll.ManifestText,
428                 },
429                 "select": selectFields,
430         })
431         if err != nil {
432                 return fmt.Errorf("sync failed: update %s: %w", fs.uuid, err)
433         }
434         fs.updateSignatures(coll.ManifestText)
435         fs.savedPDH.Store(coll.PortableDataHash)
436         return nil
437 }
438
439 func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
440         node, err := rlookup(fs.fileSystem.root, path)
441         if err != nil {
442                 return err
443         }
444         dn, ok := node.(*dirnode)
445         if !ok {
446                 return ErrNotADirectory
447         }
448         dn.Lock()
449         defer dn.Unlock()
450         names := dn.sortedNames()
451         if path != "" {
452                 // Caller only wants to flush the specified dir,
453                 // non-recursively.  Drop subdirs from the list of
454                 // names.
455                 var filenames []string
456                 for _, name := range names {
457                         if _, ok := dn.inodes[name].(*filenode); ok {
458                                 filenames = append(filenames, name)
459                         }
460                 }
461                 names = filenames
462         }
463         for _, name := range names {
464                 child := dn.inodes[name]
465                 child.Lock()
466                 defer child.Unlock()
467         }
468         return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
469 }
470
471 func (fs *collectionFileSystem) MemorySize() int64 {
472         fs.fileSystem.root.Lock()
473         defer fs.fileSystem.root.Unlock()
474         return fs.fileSystem.root.(*dirnode).MemorySize()
475 }
476
477 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
478         fs.fileSystem.root.Lock()
479         defer fs.fileSystem.root.Unlock()
480         return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix)
481 }
482
483 func (fs *collectionFileSystem) Size() int64 {
484         return fs.fileSystem.root.(*dirnode).TreeSize()
485 }
486
487 func (fs *collectionFileSystem) Snapshot() (inode, error) {
488         return fs.fileSystem.root.Snapshot()
489 }
490
491 func (fs *collectionFileSystem) Splice(r inode) error {
492         return fs.fileSystem.root.Splice(r)
493 }
494
495 // filenodePtr is an offset into a file that is (usually) efficient to
496 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
497 // then
498 // filenode.segments[filenodePtr.segmentIdx][filenodePtr.segmentOff]
499 // corresponds to file offset filenodePtr.off. Otherwise, it is
500 // necessary to reexamine len(filenode.segments[0]) etc. to find the
501 // correct segment and offset.
502 type filenodePtr struct {
503         off        int64
504         segmentIdx int
505         segmentOff int
506         repacked   int64
507 }
508
509 // seek returns a ptr that is consistent with both startPtr.off and
510 // the current state of fn. The caller must already hold fn.RLock() or
511 // fn.Lock().
512 //
513 // If startPtr is beyond EOF, ptr.segment* will indicate precisely
514 // EOF.
515 //
516 // After seeking:
517 //
518 //     ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
519 //     ||
520 //     filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
521 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
522         ptr = startPtr
523         if ptr.off < 0 {
524                 // meaningless anyway
525                 return
526         } else if ptr.off >= fn.fileinfo.size {
527                 ptr.segmentIdx = len(fn.segments)
528                 ptr.segmentOff = 0
529                 ptr.repacked = fn.repacked
530                 return
531         } else if ptr.repacked == fn.repacked {
532                 // segmentIdx and segmentOff accurately reflect
533                 // ptr.off, but might have fallen off the end of a
534                 // segment
535                 if ptr.segmentOff >= fn.segments[ptr.segmentIdx].Len() {
536                         ptr.segmentIdx++
537                         ptr.segmentOff = 0
538                 }
539                 return
540         }
541         defer func() {
542                 ptr.repacked = fn.repacked
543         }()
544         if ptr.off >= fn.fileinfo.size {
545                 ptr.segmentIdx, ptr.segmentOff = len(fn.segments), 0
546                 return
547         }
548         // Recompute segmentIdx and segmentOff.  We have already
549         // established fn.fileinfo.size > ptr.off >= 0, so we don't
550         // have to deal with edge cases here.
551         var off int64
552         for ptr.segmentIdx, ptr.segmentOff = 0, 0; off < ptr.off; ptr.segmentIdx++ {
553                 // This would panic (index out of range) if
554                 // fn.fileinfo.size were larger than
555                 // sum(fn.segments[i].Len()) -- but that can't happen
556                 // because we have ensured fn.fileinfo.size is always
557                 // accurate.
558                 segLen := int64(fn.segments[ptr.segmentIdx].Len())
559                 if off+segLen > ptr.off {
560                         ptr.segmentOff = int(ptr.off - off)
561                         break
562                 }
563                 off += segLen
564         }
565         return
566 }
567
568 // filenode implements inode.
569 type filenode struct {
570         parent   inode
571         fs       *collectionFileSystem
572         fileinfo fileinfo
573         segments []segment
574         // number of times `segments` has changed in a
575         // way that might invalidate a filenodePtr
576         repacked int64
577         memsize  int64 // bytes in memSegments
578         sync.RWMutex
579         nullnode
580 }
581
582 // caller must have lock
583 func (fn *filenode) appendSegment(e segment) {
584         fn.segments = append(fn.segments, e)
585         fn.fileinfo.size += int64(e.Len())
586 }
587
588 func (fn *filenode) SetParent(p inode, name string) {
589         fn.Lock()
590         defer fn.Unlock()
591         fn.parent = p
592         fn.fileinfo.name = name
593 }
594
595 func (fn *filenode) Parent() inode {
596         fn.RLock()
597         defer fn.RUnlock()
598         return fn.parent
599 }
600
601 func (fn *filenode) FS() FileSystem {
602         return fn.fs
603 }
604
605 // Read reads file data from a single segment, starting at startPtr,
606 // into p. startPtr is assumed not to be up-to-date. Caller must have
607 // RLock or Lock.
608 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
609         ptr = fn.seek(startPtr)
610         if ptr.off < 0 {
611                 err = ErrNegativeOffset
612                 return
613         }
614         if ptr.segmentIdx >= len(fn.segments) {
615                 err = io.EOF
616                 return
617         }
618         if ss, ok := fn.segments[ptr.segmentIdx].(storedSegment); ok {
619                 ss.locator = fn.fs.refreshSignature(ss.locator)
620                 fn.segments[ptr.segmentIdx] = ss
621         }
622         n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
623         if n > 0 {
624                 ptr.off += int64(n)
625                 ptr.segmentOff += n
626                 if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() {
627                         ptr.segmentIdx++
628                         ptr.segmentOff = 0
629                         if ptr.segmentIdx < len(fn.segments) && err == io.EOF {
630                                 err = nil
631                         }
632                 }
633         }
634         return
635 }
636
637 func (fn *filenode) Size() int64 {
638         fn.RLock()
639         defer fn.RUnlock()
640         return fn.fileinfo.Size()
641 }
642
643 func (fn *filenode) FileInfo() os.FileInfo {
644         fn.RLock()
645         defer fn.RUnlock()
646         return fn.fileinfo
647 }
648
649 func (fn *filenode) Truncate(size int64) error {
650         fn.Lock()
651         defer fn.Unlock()
652         return fn.truncate(size)
653 }
654
655 func (fn *filenode) truncate(size int64) error {
656         if size == fn.fileinfo.size {
657                 return nil
658         }
659         fn.repacked++
660         if size < fn.fileinfo.size {
661                 ptr := fn.seek(filenodePtr{off: size})
662                 for i := ptr.segmentIdx; i < len(fn.segments); i++ {
663                         if seg, ok := fn.segments[i].(*memSegment); ok {
664                                 fn.memsize -= int64(seg.Len())
665                         }
666                 }
667                 if ptr.segmentOff == 0 {
668                         fn.segments = fn.segments[:ptr.segmentIdx]
669                 } else {
670                         fn.segments = fn.segments[:ptr.segmentIdx+1]
671                         switch seg := fn.segments[ptr.segmentIdx].(type) {
672                         case *memSegment:
673                                 seg.Truncate(ptr.segmentOff)
674                                 fn.memsize += int64(seg.Len())
675                         default:
676                                 fn.segments[ptr.segmentIdx] = seg.Slice(0, ptr.segmentOff)
677                         }
678                 }
679                 fn.fileinfo.size = size
680                 return nil
681         }
682         for size > fn.fileinfo.size {
683                 grow := size - fn.fileinfo.size
684                 var seg *memSegment
685                 var ok bool
686                 if len(fn.segments) == 0 {
687                         seg = &memSegment{}
688                         fn.segments = append(fn.segments, seg)
689                 } else if seg, ok = fn.segments[len(fn.segments)-1].(*memSegment); !ok || seg.Len() >= maxBlockSize {
690                         seg = &memSegment{}
691                         fn.segments = append(fn.segments, seg)
692                 }
693                 if maxgrow := int64(maxBlockSize - seg.Len()); maxgrow < grow {
694                         grow = maxgrow
695                 }
696                 seg.Truncate(seg.Len() + int(grow))
697                 fn.fileinfo.size += grow
698                 fn.memsize += grow
699         }
700         return nil
701 }
702
703 // Write writes data from p to the file, starting at startPtr,
704 // extending the file size if necessary. Caller must have Lock.
705 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
706         if startPtr.off > fn.fileinfo.size {
707                 if err = fn.truncate(startPtr.off); err != nil {
708                         return 0, startPtr, err
709                 }
710         }
711         ptr = fn.seek(startPtr)
712         if ptr.off < 0 {
713                 err = ErrNegativeOffset
714                 return
715         }
716         for len(p) > 0 && err == nil {
717                 cando := p
718                 if len(cando) > maxBlockSize {
719                         cando = cando[:maxBlockSize]
720                 }
721                 // Rearrange/grow fn.segments (and shrink cando if
722                 // needed) such that cando can be copied to
723                 // fn.segments[ptr.segmentIdx] at offset
724                 // ptr.segmentOff.
725                 cur := ptr.segmentIdx
726                 prev := ptr.segmentIdx - 1
727                 var curWritable bool
728                 if cur < len(fn.segments) {
729                         _, curWritable = fn.segments[cur].(*memSegment)
730                 }
731                 var prevAppendable bool
732                 if prev >= 0 && fn.segments[prev].Len() < maxBlockSize {
733                         _, prevAppendable = fn.segments[prev].(*memSegment)
734                 }
735                 if ptr.segmentOff > 0 && !curWritable {
736                         // Split a non-writable block.
737                         if max := fn.segments[cur].Len() - ptr.segmentOff; max <= len(cando) {
738                                 // Truncate cur, and insert a new
739                                 // segment after it.
740                                 cando = cando[:max]
741                                 fn.segments = append(fn.segments, nil)
742                                 copy(fn.segments[cur+1:], fn.segments[cur:])
743                         } else {
744                                 // Split cur into two copies, truncate
745                                 // the one on the left, shift the one
746                                 // on the right, and insert a new
747                                 // segment between them.
748                                 fn.segments = append(fn.segments, nil, nil)
749                                 copy(fn.segments[cur+2:], fn.segments[cur:])
750                                 fn.segments[cur+2] = fn.segments[cur+2].Slice(ptr.segmentOff+len(cando), -1)
751                         }
752                         cur++
753                         prev++
754                         seg := &memSegment{}
755                         seg.Truncate(len(cando))
756                         fn.memsize += int64(len(cando))
757                         fn.segments[cur] = seg
758                         fn.segments[prev] = fn.segments[prev].Slice(0, ptr.segmentOff)
759                         ptr.segmentIdx++
760                         ptr.segmentOff = 0
761                         fn.repacked++
762                         ptr.repacked++
763                 } else if curWritable {
764                         if fit := int(fn.segments[cur].Len()) - ptr.segmentOff; fit < len(cando) {
765                                 cando = cando[:fit]
766                         }
767                 } else {
768                         if prevAppendable {
769                                 // Shrink cando if needed to fit in
770                                 // prev segment.
771                                 if cangrow := maxBlockSize - fn.segments[prev].Len(); cangrow < len(cando) {
772                                         cando = cando[:cangrow]
773                                 }
774                         }
775
776                         if cur == len(fn.segments) {
777                                 // ptr is at EOF, filesize is changing.
778                                 fn.fileinfo.size += int64(len(cando))
779                         } else if el := fn.segments[cur].Len(); el <= len(cando) {
780                                 // cando is long enough that we won't
781                                 // need cur any more. shrink cando to
782                                 // be exactly as long as cur
783                                 // (otherwise we'd accidentally shift
784                                 // the effective position of all
785                                 // segments after cur).
786                                 cando = cando[:el]
787                                 copy(fn.segments[cur:], fn.segments[cur+1:])
788                                 fn.segments = fn.segments[:len(fn.segments)-1]
789                         } else {
790                                 // shrink cur by the same #bytes we're growing prev
791                                 fn.segments[cur] = fn.segments[cur].Slice(len(cando), -1)
792                         }
793
794                         if prevAppendable {
795                                 // Grow prev.
796                                 ptr.segmentIdx--
797                                 ptr.segmentOff = fn.segments[prev].Len()
798                                 fn.segments[prev].(*memSegment).Truncate(ptr.segmentOff + len(cando))
799                                 fn.memsize += int64(len(cando))
800                                 ptr.repacked++
801                                 fn.repacked++
802                         } else {
803                                 // Insert a segment between prev and
804                                 // cur, and advance prev/cur.
805                                 fn.segments = append(fn.segments, nil)
806                                 if cur < len(fn.segments) {
807                                         copy(fn.segments[cur+1:], fn.segments[cur:])
808                                         ptr.repacked++
809                                         fn.repacked++
810                                 } else {
811                                         // appending a new segment does
812                                         // not invalidate any ptrs
813                                 }
814                                 seg := &memSegment{}
815                                 seg.Truncate(len(cando))
816                                 fn.memsize += int64(len(cando))
817                                 fn.segments[cur] = seg
818                         }
819                 }
820
821                 // Finally we can copy bytes from cando to the current segment.
822                 fn.segments[ptr.segmentIdx].(*memSegment).WriteAt(cando, ptr.segmentOff)
823                 n += len(cando)
824                 p = p[len(cando):]
825
826                 ptr.off += int64(len(cando))
827                 ptr.segmentOff += len(cando)
828                 if ptr.segmentOff >= maxBlockSize {
829                         fn.pruneMemSegments()
830                 }
831                 if fn.segments[ptr.segmentIdx].Len() == ptr.segmentOff {
832                         ptr.segmentOff = 0
833                         ptr.segmentIdx++
834                 }
835
836                 fn.fileinfo.modTime = time.Now()
837         }
838         return
839 }
840
841 // Write some data out to disk to reduce memory use. Caller must have
842 // write lock.
843 func (fn *filenode) pruneMemSegments() {
844         // TODO: share code with (*dirnode)flush()
845         // TODO: pack/flush small blocks too, when fragmented
846         for idx, seg := range fn.segments {
847                 seg, ok := seg.(*memSegment)
848                 if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
849                         continue
850                 }
851                 // Setting seg.flushing guarantees seg.buf will not be
852                 // modified in place: WriteAt and Truncate will
853                 // allocate a new buf instead, if necessary.
854                 idx, buf := idx, seg.buf
855                 done := make(chan struct{})
856                 seg.flushing = done
857                 // If lots of background writes are already in
858                 // progress, block here until one finishes, rather
859                 // than pile up an unlimited number of buffered writes
860                 // and network flush operations.
861                 fn.fs.throttle().Acquire()
862                 go func() {
863                         defer close(done)
864                         resp, err := fn.FS().BlockWrite(context.Background(), BlockWriteOptions{
865                                 Data:           buf,
866                                 Replicas:       fn.fs.replicas,
867                                 StorageClasses: fn.fs.storageClasses,
868                         })
869                         fn.fs.throttle().Release()
870                         fn.Lock()
871                         defer fn.Unlock()
872                         if seg.flushing != done {
873                                 // A new seg.buf has been allocated.
874                                 return
875                         }
876                         if err != nil {
877                                 // TODO: stall (or return errors from)
878                                 // subsequent writes until flushing
879                                 // starts to succeed.
880                                 return
881                         }
882                         if len(fn.segments) <= idx || fn.segments[idx] != seg || len(seg.buf) != len(buf) {
883                                 // Segment has been dropped/moved/resized.
884                                 return
885                         }
886                         fn.memsize -= int64(len(buf))
887                         fn.segments[idx] = storedSegment{
888                                 kc:      fn.FS(),
889                                 locator: resp.Locator,
890                                 size:    len(buf),
891                                 offset:  0,
892                                 length:  len(buf),
893                         }
894                 }()
895         }
896 }
897
898 // Block until all pending pruneMemSegments/flush work is
899 // finished. Caller must NOT have lock.
900 func (fn *filenode) waitPrune() {
901         var pending []<-chan struct{}
902         fn.Lock()
903         for _, seg := range fn.segments {
904                 if seg, ok := seg.(*memSegment); ok && seg.flushing != nil {
905                         pending = append(pending, seg.flushing)
906                 }
907         }
908         fn.Unlock()
909         for _, p := range pending {
910                 <-p
911         }
912 }
913
914 func (fn *filenode) Snapshot() (inode, error) {
915         fn.RLock()
916         defer fn.RUnlock()
917         segments := make([]segment, 0, len(fn.segments))
918         for _, seg := range fn.segments {
919                 segments = append(segments, seg.Slice(0, seg.Len()))
920         }
921         return &filenode{
922                 fileinfo: fn.fileinfo,
923                 segments: segments,
924         }, nil
925 }
926
927 func (fn *filenode) Splice(repl inode) error {
928         repl, err := repl.Snapshot()
929         if err != nil {
930                 return err
931         }
932         fn.parent.Lock()
933         defer fn.parent.Unlock()
934         fn.Lock()
935         defer fn.Unlock()
936         _, err = fn.parent.Child(fn.fileinfo.name, func(inode) (inode, error) { return repl, nil })
937         if err != nil {
938                 return err
939         }
940         switch repl := repl.(type) {
941         case *dirnode:
942                 repl.parent = fn.parent
943                 repl.fileinfo.name = fn.fileinfo.name
944                 repl.setTreeFS(fn.fs)
945         case *filenode:
946                 repl.parent = fn.parent
947                 repl.fileinfo.name = fn.fileinfo.name
948                 repl.fs = fn.fs
949         default:
950                 return fmt.Errorf("cannot splice snapshot containing %T: %w", repl, ErrInvalidArgument)
951         }
952         return nil
953 }
954
955 type dirnode struct {
956         fs *collectionFileSystem
957         treenode
958 }
959
960 func (dn *dirnode) FS() FileSystem {
961         return dn.fs
962 }
963
964 func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
965         if dn == dn.fs.rootnode() && name == ".arvados#collection" {
966                 gn := &getternode{Getter: func() ([]byte, error) {
967                         var coll Collection
968                         var err error
969                         coll.ManifestText, err = dn.fs.MarshalManifest(".")
970                         if err != nil {
971                                 return nil, err
972                         }
973                         coll.UUID = dn.fs.uuid
974                         data, err := json.Marshal(&coll)
975                         if err == nil {
976                                 data = append(data, '\n')
977                         }
978                         return data, err
979                 }}
980                 gn.SetParent(dn, name)
981                 return gn, nil
982         }
983         return dn.treenode.Child(name, replace)
984 }
985
986 type fnSegmentRef struct {
987         fn  *filenode
988         idx int
989 }
990
991 // commitBlock concatenates the data from the given filenode segments
992 // (which must be *memSegments), writes the data out to Keep as a
993 // single block, and replaces the filenodes' *memSegments with
994 // storedSegments that reference the relevant portions of the new
995 // block.
996 //
997 // bufsize is the total data size in refs. It is used to preallocate
998 // the correct amount of memory when len(refs)>1.
999 //
1000 // If sync is false, commitBlock returns right away, after starting a
1001 // goroutine to do the writes, reacquire the filenodes' locks, and
1002 // swap out the *memSegments. Some filenodes' segments might get
1003 // modified/rearranged in the meantime, in which case commitBlock
1004 // won't replace them.
1005 //
1006 // Caller must have write lock.
1007 func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error {
1008         if len(refs) == 0 {
1009                 return nil
1010         }
1011         if err := ctx.Err(); err != nil {
1012                 return err
1013         }
1014         done := make(chan struct{})
1015         var block []byte
1016         segs := make([]*memSegment, 0, len(refs))
1017         offsets := make([]int, 0, len(refs)) // location of segment's data within block
1018         for _, ref := range refs {
1019                 seg := ref.fn.segments[ref.idx].(*memSegment)
1020                 if !sync && seg.flushingUnfinished() {
1021                         // Let the other flushing goroutine finish. If
1022                         // it fails, we'll try again next time.
1023                         close(done)
1024                         return nil
1025                 }
1026                 // In sync mode, we proceed regardless of
1027                 // whether another flush is in progress: It
1028                 // can't finish before we do, because we hold
1029                 // fn's lock until we finish our own writes.
1030                 seg.flushing = done
1031                 offsets = append(offsets, len(block))
1032                 if len(refs) == 1 {
1033                         block = seg.buf
1034                 } else if block == nil {
1035                         block = append(make([]byte, 0, bufsize), seg.buf...)
1036                 } else {
1037                         block = append(block, seg.buf...)
1038                 }
1039                 segs = append(segs, seg)
1040         }
1041         blocksize := len(block)
1042         dn.fs.throttle().Acquire()
1043         errs := make(chan error, 1)
1044         go func() {
1045                 defer close(done)
1046                 defer close(errs)
1047                 resp, err := dn.fs.BlockWrite(context.Background(), BlockWriteOptions{
1048                         Data:           block,
1049                         Replicas:       dn.fs.replicas,
1050                         StorageClasses: dn.fs.storageClasses,
1051                 })
1052                 dn.fs.throttle().Release()
1053                 if err != nil {
1054                         errs <- err
1055                         return
1056                 }
1057                 for idx, ref := range refs {
1058                         if !sync {
1059                                 ref.fn.Lock()
1060                                 // In async mode, fn's lock was
1061                                 // released while we were waiting for
1062                                 // PutB(); lots of things might have
1063                                 // changed.
1064                                 if len(ref.fn.segments) <= ref.idx {
1065                                         // file segments have
1066                                         // rearranged or changed in
1067                                         // some way
1068                                         ref.fn.Unlock()
1069                                         continue
1070                                 } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
1071                                         // segment has been replaced
1072                                         ref.fn.Unlock()
1073                                         continue
1074                                 } else if seg.flushing != done {
1075                                         // seg.buf has been replaced
1076                                         ref.fn.Unlock()
1077                                         continue
1078                                 }
1079                         }
1080                         data := ref.fn.segments[ref.idx].(*memSegment).buf
1081                         ref.fn.segments[ref.idx] = storedSegment{
1082                                 kc:      dn.fs,
1083                                 locator: resp.Locator,
1084                                 size:    blocksize,
1085                                 offset:  offsets[idx],
1086                                 length:  len(data),
1087                         }
1088                         // atomic is needed here despite caller having
1089                         // lock: caller might be running concurrent
1090                         // commitBlock() goroutines using the same
1091                         // lock, writing different segments from the
1092                         // same file.
1093                         atomic.AddInt64(&ref.fn.memsize, -int64(len(data)))
1094                         if !sync {
1095                                 ref.fn.Unlock()
1096                         }
1097                 }
1098         }()
1099         if sync {
1100                 return <-errs
1101         }
1102         return nil
1103 }
1104
1105 type flushOpts struct {
1106         sync        bool
1107         shortBlocks bool
1108 }
1109
1110 // flush in-memory data and remote-cluster block references (for the
1111 // children with the given names, which must be children of dn) to
1112 // local-cluster persistent storage.
1113 //
1114 // Caller must have write lock on dn and the named children.
1115 //
1116 // If any children are dirs, they will be flushed recursively.
1117 func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) error {
1118         cg := newContextGroup(ctx)
1119         defer cg.Cancel()
1120
1121         goCommit := func(refs []fnSegmentRef, bufsize int) {
1122                 cg.Go(func() error {
1123                         return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync)
1124                 })
1125         }
1126
1127         var pending []fnSegmentRef
1128         var pendingLen int = 0
1129         localLocator := map[string]string{}
1130         for _, name := range names {
1131                 switch node := dn.inodes[name].(type) {
1132                 case *dirnode:
1133                         grandchildNames := node.sortedNames()
1134                         for _, grandchildName := range grandchildNames {
1135                                 grandchild := node.inodes[grandchildName]
1136                                 grandchild.Lock()
1137                                 defer grandchild.Unlock()
1138                         }
1139                         cg.Go(func() error { return node.flush(cg.Context(), grandchildNames, opts) })
1140                 case *filenode:
1141                         for idx, seg := range node.segments {
1142                                 switch seg := seg.(type) {
1143                                 case storedSegment:
1144                                         loc, ok := localLocator[seg.locator]
1145                                         if !ok {
1146                                                 var err error
1147                                                 loc, err = dn.fs.LocalLocator(seg.locator)
1148                                                 if err != nil {
1149                                                         return err
1150                                                 }
1151                                                 localLocator[seg.locator] = loc
1152                                         }
1153                                         seg.locator = loc
1154                                         node.segments[idx] = seg
1155                                 case *memSegment:
1156                                         if seg.Len() > maxBlockSize/2 {
1157                                                 goCommit([]fnSegmentRef{{node, idx}}, seg.Len())
1158                                                 continue
1159                                         }
1160                                         if pendingLen+seg.Len() > maxBlockSize {
1161                                                 goCommit(pending, pendingLen)
1162                                                 pending = nil
1163                                                 pendingLen = 0
1164                                         }
1165                                         pending = append(pending, fnSegmentRef{node, idx})
1166                                         pendingLen += seg.Len()
1167                                 default:
1168                                         panic(fmt.Sprintf("can't sync segment type %T", seg))
1169                                 }
1170                         }
1171                 }
1172         }
1173         if opts.shortBlocks {
1174                 goCommit(pending, pendingLen)
1175         }
1176         return cg.Wait()
1177 }
1178
1179 // caller must have write lock.
1180 func (dn *dirnode) MemorySize() (size int64) {
1181         for _, name := range dn.sortedNames() {
1182                 node := dn.inodes[name]
1183                 node.Lock()
1184                 defer node.Unlock()
1185                 switch node := node.(type) {
1186                 case *dirnode:
1187                         size += node.MemorySize()
1188                 case *filenode:
1189                         size += 64
1190                         for _, seg := range node.segments {
1191                                 switch seg := seg.(type) {
1192                                 case storedSegment:
1193                                         size += int64(len(seg.locator)) + 40
1194                                 case *memSegment:
1195                                         size += int64(seg.Len()) + 8
1196                                 }
1197                         }
1198                 }
1199         }
1200         return 64 + size
1201 }
1202
1203 // caller must have write lock.
1204 func (dn *dirnode) sortedNames() []string {
1205         names := make([]string, 0, len(dn.inodes))
1206         for name := range dn.inodes {
1207                 names = append(names, name)
1208         }
1209         sort.Strings(names)
1210         return names
1211 }
1212
1213 // caller must have write lock.
1214 func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) {
1215         cg := newContextGroup(ctx)
1216         defer cg.Cancel()
1217
1218         if len(dn.inodes) == 0 {
1219                 if prefix == "." {
1220                         return "", nil
1221                 }
1222                 // Express the existence of an empty directory by
1223                 // adding an empty file named `\056`, which (unlike
1224                 // the more obvious spelling `.`) is accepted by the
1225                 // API's manifest validator.
1226                 return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
1227         }
1228
1229         names := dn.sortedNames()
1230
1231         // Wait for children to finish any pending write operations
1232         // before locking them.
1233         for _, name := range names {
1234                 node := dn.inodes[name]
1235                 if fn, ok := node.(*filenode); ok {
1236                         fn.waitPrune()
1237                 }
1238         }
1239
1240         var dirnames []string
1241         var filenames []string
1242         for _, name := range names {
1243                 node := dn.inodes[name]
1244                 node.Lock()
1245                 defer node.Unlock()
1246                 switch node := node.(type) {
1247                 case *dirnode:
1248                         dirnames = append(dirnames, name)
1249                 case *filenode:
1250                         filenames = append(filenames, name)
1251                 default:
1252                         panic(fmt.Sprintf("can't marshal inode type %T", node))
1253                 }
1254         }
1255
1256         subdirs := make([]string, len(dirnames))
1257         rootdir := ""
1258         for i, name := range dirnames {
1259                 i, name := i, name
1260                 cg.Go(func() error {
1261                         txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name)
1262                         subdirs[i] = txt
1263                         return err
1264                 })
1265         }
1266
1267         cg.Go(func() error {
1268                 var streamLen int64
1269                 type filepart struct {
1270                         name   string
1271                         offset int64
1272                         length int64
1273                 }
1274
1275                 var fileparts []filepart
1276                 var blocks []string
1277                 if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
1278                         return err
1279                 }
1280                 for _, name := range filenames {
1281                         node := dn.inodes[name].(*filenode)
1282                         if len(node.segments) == 0 {
1283                                 fileparts = append(fileparts, filepart{name: name})
1284                                 continue
1285                         }
1286                         for _, seg := range node.segments {
1287                                 switch seg := seg.(type) {
1288                                 case storedSegment:
1289                                         if len(blocks) > 0 && blocks[len(blocks)-1] == seg.locator {
1290                                                 streamLen -= int64(seg.size)
1291                                         } else {
1292                                                 blocks = append(blocks, seg.locator)
1293                                         }
1294                                         next := filepart{
1295                                                 name:   name,
1296                                                 offset: streamLen + int64(seg.offset),
1297                                                 length: int64(seg.length),
1298                                         }
1299                                         if prev := len(fileparts) - 1; prev >= 0 &&
1300                                                 fileparts[prev].name == name &&
1301                                                 fileparts[prev].offset+fileparts[prev].length == next.offset {
1302                                                 fileparts[prev].length += next.length
1303                                         } else {
1304                                                 fileparts = append(fileparts, next)
1305                                         }
1306                                         streamLen += int64(seg.size)
1307                                 default:
1308                                         // This can't happen: we
1309                                         // haven't unlocked since
1310                                         // calling flush(sync=true).
1311                                         panic(fmt.Sprintf("can't marshal segment type %T", seg))
1312                                 }
1313                         }
1314                 }
1315                 var filetokens []string
1316                 for _, s := range fileparts {
1317                         filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
1318                 }
1319                 if len(filetokens) == 0 {
1320                         return nil
1321                 } else if len(blocks) == 0 {
1322                         blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
1323                 }
1324                 rootdir = manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n"
1325                 return nil
1326         })
1327         err := cg.Wait()
1328         return rootdir + strings.Join(subdirs, ""), err
1329 }
1330
1331 func (dn *dirnode) loadManifest(txt string) error {
1332         streams := bytes.Split([]byte(txt), []byte{'\n'})
1333         if len(streams[len(streams)-1]) != 0 {
1334                 return fmt.Errorf("line %d: no trailing newline", len(streams))
1335         }
1336         streams = streams[:len(streams)-1]
1337         segments := []storedSegment{}
1338         // To reduce allocs, we reuse a single "pathparts" slice
1339         // (pre-split on "/" separators) for the duration of this
1340         // func.
1341         var pathparts []string
1342         // To reduce allocs, we reuse a single "toks" slice of 3 byte
1343         // slices.
1344         var toks = make([][]byte, 3)
1345         // Similar to bytes.SplitN(token, []byte{c}, 3), but splits
1346         // into the toks slice rather than allocating a new one, and
1347         // returns the number of toks (1, 2, or 3).
1348         splitToToks := func(src []byte, c rune) int {
1349                 c1 := bytes.IndexRune(src, c)
1350                 if c1 < 0 {
1351                         toks[0] = src
1352                         return 1
1353                 }
1354                 toks[0], src = src[:c1], src[c1+1:]
1355                 c2 := bytes.IndexRune(src, c)
1356                 if c2 < 0 {
1357                         toks[1] = src
1358                         return 2
1359                 }
1360                 toks[1], toks[2] = src[:c2], src[c2+1:]
1361                 return 3
1362         }
1363         for i, stream := range streams {
1364                 lineno := i + 1
1365                 var anyFileTokens bool
1366                 var pos int64
1367                 var segIdx int
1368                 segments = segments[:0]
1369                 pathparts = nil
1370                 streamparts := 0
1371                 for i, token := range bytes.Split(stream, []byte{' '}) {
1372                         if i == 0 {
1373                                 pathparts = strings.Split(manifestUnescape(string(token)), "/")
1374                                 streamparts = len(pathparts)
1375                                 continue
1376                         }
1377                         if !bytes.ContainsRune(token, ':') {
1378                                 if anyFileTokens {
1379                                         return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1380                                 }
1381                                 if splitToToks(token, '+') < 2 {
1382                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
1383                                 }
1384                                 length, err := strconv.ParseInt(string(toks[1]), 10, 32)
1385                                 if err != nil || length < 0 {
1386                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
1387                                 }
1388                                 segments = append(segments, storedSegment{
1389                                         locator: string(token),
1390                                         size:    int(length),
1391                                         offset:  0,
1392                                         length:  int(length),
1393                                 })
1394                                 continue
1395                         } else if len(segments) == 0 {
1396                                 return fmt.Errorf("line %d: bad locator %q", lineno, token)
1397                         }
1398                         if splitToToks(token, ':') != 3 {
1399                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1400                         }
1401                         anyFileTokens = true
1402
1403                         offset, err := strconv.ParseInt(string(toks[0]), 10, 64)
1404                         if err != nil || offset < 0 {
1405                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1406                         }
1407                         length, err := strconv.ParseInt(string(toks[1]), 10, 64)
1408                         if err != nil || length < 0 {
1409                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1410                         }
1411                         if !bytes.ContainsAny(toks[2], `\/`) {
1412                                 // optimization for a common case
1413                                 pathparts = append(pathparts[:streamparts], string(toks[2]))
1414                         } else {
1415                                 pathparts = append(pathparts[:streamparts], strings.Split(manifestUnescape(string(toks[2])), "/")...)
1416                         }
1417                         fnode, err := dn.createFileAndParents(pathparts)
1418                         if fnode == nil && err == nil && length == 0 {
1419                                 // Special case: an empty file used as
1420                                 // a marker to preserve an otherwise
1421                                 // empty directory in a manifest.
1422                                 continue
1423                         }
1424                         if err != nil || (fnode == nil && length != 0) {
1425                                 return fmt.Errorf("line %d: cannot use name %q with length %d: %s", lineno, toks[2], length, err)
1426                         }
1427                         // Map the stream offset/range coordinates to
1428                         // block/offset/range coordinates and add
1429                         // corresponding storedSegments to the filenode
1430                         if pos > offset {
1431                                 // Can't continue where we left off.
1432                                 // TODO: binary search instead of
1433                                 // rewinding all the way (but this
1434                                 // situation might be rare anyway)
1435                                 segIdx, pos = 0, 0
1436                         }
1437                         for ; segIdx < len(segments); segIdx++ {
1438                                 seg := segments[segIdx]
1439                                 next := pos + int64(seg.Len())
1440                                 if next <= offset || seg.Len() == 0 {
1441                                         pos = next
1442                                         continue
1443                                 }
1444                                 if pos >= offset+length {
1445                                         break
1446                                 }
1447                                 var blkOff int
1448                                 if pos < offset {
1449                                         blkOff = int(offset - pos)
1450                                 }
1451                                 blkLen := seg.Len() - blkOff
1452                                 if pos+int64(blkOff+blkLen) > offset+length {
1453                                         blkLen = int(offset + length - pos - int64(blkOff))
1454                                 }
1455                                 fnode.appendSegment(storedSegment{
1456                                         kc:      dn.fs,
1457                                         locator: seg.locator,
1458                                         size:    seg.size,
1459                                         offset:  blkOff,
1460                                         length:  blkLen,
1461                                 })
1462                                 if next > offset+length {
1463                                         break
1464                                 } else {
1465                                         pos = next
1466                                 }
1467                         }
1468                         if segIdx == len(segments) && pos < offset+length {
1469                                 return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
1470                         }
1471                 }
1472                 if !anyFileTokens {
1473                         return fmt.Errorf("line %d: no file segments", lineno)
1474                 } else if len(segments) == 0 {
1475                         return fmt.Errorf("line %d: no locators", lineno)
1476                 } else if streamparts == 0 {
1477                         return fmt.Errorf("line %d: no stream name", lineno)
1478                 }
1479         }
1480         return nil
1481 }
1482
1483 // only safe to call from loadManifest -- no locking.
1484 //
1485 // If path is a "parent directory exists" marker (the last path
1486 // component is "."), the returned values are both nil.
1487 //
1488 // Newly added nodes have modtime==0. Caller is responsible for fixing
1489 // them with backdateTree.
1490 func (dn *dirnode) createFileAndParents(names []string) (fn *filenode, err error) {
1491         var node inode = dn
1492         basename := names[len(names)-1]
1493         for _, name := range names[:len(names)-1] {
1494                 switch name {
1495                 case "", ".":
1496                         continue
1497                 case "..":
1498                         if node == dn {
1499                                 // can't be sure parent will be a *dirnode
1500                                 return nil, ErrInvalidArgument
1501                         }
1502                         node = node.Parent()
1503                         continue
1504                 }
1505                 node.Lock()
1506                 unlock := node.Unlock
1507                 node, err = node.Child(name, func(child inode) (inode, error) {
1508                         if child == nil {
1509                                 // note modtime will be fixed later in backdateTree()
1510                                 child, err := node.FS().newNode(name, 0755|os.ModeDir, time.Time{})
1511                                 if err != nil {
1512                                         return nil, err
1513                                 }
1514                                 child.SetParent(node, name)
1515                                 return child, nil
1516                         } else if !child.IsDir() {
1517                                 return child, ErrFileExists
1518                         } else {
1519                                 return child, nil
1520                         }
1521                 })
1522                 unlock()
1523                 if err != nil {
1524                         return
1525                 }
1526         }
1527         if basename == "." {
1528                 return
1529         } else if !permittedName(basename) {
1530                 err = fmt.Errorf("invalid file part %q in path %q", basename, names)
1531                 return
1532         }
1533         node.Lock()
1534         defer node.Unlock()
1535         _, err = node.Child(basename, func(child inode) (inode, error) {
1536                 switch child := child.(type) {
1537                 case nil:
1538                         child, err = node.FS().newNode(basename, 0755, time.Time{})
1539                         if err != nil {
1540                                 return nil, err
1541                         }
1542                         child.SetParent(node, basename)
1543                         fn = child.(*filenode)
1544                         return child, nil
1545                 case *filenode:
1546                         fn = child
1547                         return child, nil
1548                 case *dirnode:
1549                         return child, ErrIsDirectory
1550                 default:
1551                         return child, ErrInvalidArgument
1552                 }
1553         })
1554         return
1555 }
1556
1557 func (dn *dirnode) TreeSize() (bytes int64) {
1558         dn.RLock()
1559         defer dn.RUnlock()
1560         for _, i := range dn.inodes {
1561                 switch i := i.(type) {
1562                 case *filenode:
1563                         bytes += i.Size()
1564                 case *dirnode:
1565                         bytes += i.TreeSize()
1566                 }
1567         }
1568         return
1569 }
1570
1571 func (dn *dirnode) Snapshot() (inode, error) {
1572         return dn.snapshot()
1573 }
1574
1575 func (dn *dirnode) snapshot() (*dirnode, error) {
1576         dn.RLock()
1577         defer dn.RUnlock()
1578         snap := &dirnode{
1579                 treenode: treenode{
1580                         inodes:   make(map[string]inode, len(dn.inodes)),
1581                         fileinfo: dn.fileinfo,
1582                 },
1583         }
1584         for name, child := range dn.inodes {
1585                 dupchild, err := child.Snapshot()
1586                 if err != nil {
1587                         return nil, err
1588                 }
1589                 snap.inodes[name] = dupchild
1590                 dupchild.SetParent(snap, name)
1591         }
1592         return snap, nil
1593 }
1594
1595 func (dn *dirnode) Splice(repl inode) error {
1596         repl, err := repl.Snapshot()
1597         if err != nil {
1598                 return fmt.Errorf("cannot copy snapshot: %w", err)
1599         }
1600         switch repl := repl.(type) {
1601         default:
1602                 return fmt.Errorf("cannot splice snapshot containing %T: %w", repl, ErrInvalidArgument)
1603         case *dirnode:
1604                 dn.Lock()
1605                 defer dn.Unlock()
1606                 dn.inodes = repl.inodes
1607                 dn.setTreeFS(dn.fs)
1608         case *filenode:
1609                 dn.parent.Lock()
1610                 defer dn.parent.Unlock()
1611                 removing, err := dn.parent.Child(dn.fileinfo.name, nil)
1612                 if err != nil {
1613                         return fmt.Errorf("cannot use Splice to replace a top-level directory with a file: %w", ErrInvalidOperation)
1614                 } else if removing != dn {
1615                         // If ../thisdirname is not this dirnode, it
1616                         // must be an inode that wraps a dirnode, like
1617                         // a collectionFileSystem or deferrednode.
1618                         if deferred, ok := removing.(*deferrednode); ok {
1619                                 // More useful to report the type of
1620                                 // the wrapped node rather than just
1621                                 // *deferrednode. (We know the real
1622                                 // inode is already loaded because dn
1623                                 // is inside it.)
1624                                 removing = deferred.realinode()
1625                         }
1626                         return fmt.Errorf("cannot use Splice to attach a file at top level of %T: %w", removing, ErrInvalidOperation)
1627                 }
1628                 dn.Lock()
1629                 defer dn.Unlock()
1630                 _, err = dn.parent.Child(dn.fileinfo.name, func(inode) (inode, error) { return repl, nil })
1631                 if err != nil {
1632                         return fmt.Errorf("error replacing filenode: dn.parent.Child(): %w", err)
1633                 }
1634                 repl.fs = dn.fs
1635         }
1636         return nil
1637 }
1638
1639 func (dn *dirnode) setTreeFS(fs *collectionFileSystem) {
1640         dn.fs = fs
1641         for _, child := range dn.inodes {
1642                 switch child := child.(type) {
1643                 case *dirnode:
1644                         child.setTreeFS(fs)
1645                 case *filenode:
1646                         child.fs = fs
1647                 }
1648         }
1649 }
1650
1651 type segment interface {
1652         io.ReaderAt
1653         Len() int
1654         // Return a new segment with a subsection of the data from this
1655         // one. length<0 means length=Len()-off.
1656         Slice(off int, length int) segment
1657 }
1658
1659 type memSegment struct {
1660         buf []byte
1661         // If flushing is not nil and not ready/closed, then a) buf is
1662         // being shared by a pruneMemSegments goroutine, and must be
1663         // copied on write; and b) the flushing channel will close
1664         // when the goroutine finishes, whether it succeeds or not.
1665         flushing <-chan struct{}
1666 }
1667
1668 func (me *memSegment) flushingUnfinished() bool {
1669         if me.flushing == nil {
1670                 return false
1671         }
1672         select {
1673         case <-me.flushing:
1674                 me.flushing = nil
1675                 return false
1676         default:
1677                 return true
1678         }
1679 }
1680
1681 func (me *memSegment) Len() int {
1682         return len(me.buf)
1683 }
1684
1685 func (me *memSegment) Slice(off, length int) segment {
1686         if length < 0 {
1687                 length = len(me.buf) - off
1688         }
1689         buf := make([]byte, length)
1690         copy(buf, me.buf[off:])
1691         return &memSegment{buf: buf}
1692 }
1693
1694 func (me *memSegment) Truncate(n int) {
1695         if n > cap(me.buf) || (me.flushing != nil && n > len(me.buf)) {
1696                 newsize := 1024
1697                 for newsize < n {
1698                         newsize = newsize << 2
1699                 }
1700                 newbuf := make([]byte, n, newsize)
1701                 copy(newbuf, me.buf)
1702                 me.buf, me.flushing = newbuf, nil
1703         } else {
1704                 // reclaim existing capacity, and zero reclaimed part
1705                 oldlen := len(me.buf)
1706                 me.buf = me.buf[:n]
1707                 for i := oldlen; i < n; i++ {
1708                         me.buf[i] = 0
1709                 }
1710         }
1711 }
1712
1713 func (me *memSegment) WriteAt(p []byte, off int) {
1714         if off+len(p) > len(me.buf) {
1715                 panic("overflowed segment")
1716         }
1717         if me.flushing != nil {
1718                 me.buf, me.flushing = append([]byte(nil), me.buf...), nil
1719         }
1720         copy(me.buf[off:], p)
1721 }
1722
1723 func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
1724         if off > int64(me.Len()) {
1725                 err = io.EOF
1726                 return
1727         }
1728         n = copy(p, me.buf[int(off):])
1729         if n < len(p) {
1730                 err = io.EOF
1731         }
1732         return
1733 }
1734
1735 type storedSegment struct {
1736         kc      fsBackend
1737         locator string
1738         size    int // size of stored block (also encoded in locator)
1739         offset  int // position of segment within the stored block
1740         length  int // bytes in this segment (offset + length <= size)
1741 }
1742
1743 func (se storedSegment) Len() int {
1744         return se.length
1745 }
1746
1747 func (se storedSegment) Slice(n, size int) segment {
1748         se.offset += n
1749         se.length -= n
1750         if size >= 0 && se.length > size {
1751                 se.length = size
1752         }
1753         return se
1754 }
1755
1756 func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
1757         if off > int64(se.length) {
1758                 return 0, io.EOF
1759         }
1760         maxlen := se.length - int(off)
1761         if len(p) > maxlen {
1762                 p = p[:maxlen]
1763                 n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1764                 if err == nil {
1765                         err = io.EOF
1766                 }
1767                 return
1768         }
1769         return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1770 }
1771
1772 func canonicalName(name string) string {
1773         name = path.Clean("/" + name)
1774         if name == "/" || name == "./" {
1775                 name = "."
1776         } else if strings.HasPrefix(name, "/") {
1777                 name = "." + name
1778         }
1779         return name
1780 }
1781
1782 var manifestEscapeSeq = regexp.MustCompile(`\\([0-7]{3}|\\)`)
1783
1784 func manifestUnescapeFunc(seq string) string {
1785         if seq == `\\` {
1786                 return `\`
1787         }
1788         i, err := strconv.ParseUint(seq[1:], 8, 8)
1789         if err != nil {
1790                 // Invalid escape sequence: can't unescape.
1791                 return seq
1792         }
1793         return string([]byte{byte(i)})
1794 }
1795
1796 func manifestUnescape(s string) string {
1797         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
1798 }
1799
1800 var manifestEscapedChar = regexp.MustCompile(`[\000-\040:\s\\]`)
1801
1802 func manifestEscapeFunc(seq string) string {
1803         return fmt.Sprintf("\\%03o", byte(seq[0]))
1804 }
1805
1806 func manifestEscape(s string) string {
1807         return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
1808 }