Merge remote-tracking branch 'MajewskiKrzysztof/exclude_home_project_parameter' into...
[arvados.git] / sdk / go / arvados / fs_collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "bytes"
9         "context"
10         "encoding/json"
11         "fmt"
12         "io"
13         "os"
14         "path"
15         "regexp"
16         "sort"
17         "strconv"
18         "strings"
19         "sync"
20         "sync/atomic"
21         "time"
22 )
23
24 var (
25         maxBlockSize      = 1 << 26
26         concurrentWriters = 4 // max goroutines writing to Keep in background and during flush()
27 )
28
29 // A CollectionFileSystem is a FileSystem that can be serialized as a
30 // manifest and stored as a collection.
31 type CollectionFileSystem interface {
32         FileSystem
33
34         // Flush all file data to Keep and return a snapshot of the
35         // filesystem suitable for saving as (Collection)ManifestText.
36         // Prefix (normally ".") is a top level directory, effectively
37         // prepended to all paths in the returned manifest.
38         MarshalManifest(prefix string) (string, error)
39
40         // Total data bytes in all files.
41         Size() int64
42 }
43
44 type collectionFileSystem struct {
45         fileSystem
46         uuid           string
47         savedPDH       atomic.Value
48         replicas       int
49         storageClasses []string
50         // guessSignatureTTL tracks a lower bound for the server's
51         // configured BlobSigningTTL. The guess is initially zero, and
52         // increases when we come across a signature with an expiry
53         // time further in the future than the previous guess.
54         //
55         // When the guessed TTL is much smaller than the real TTL,
56         // preemptive signature refresh is delayed or missed entirely,
57         // which is OK.
58         guessSignatureTTL time.Duration
59         holdCheckChanges  time.Time
60         lockCheckChanges  sync.Mutex
61 }
62
63 // FileSystem returns a CollectionFileSystem for the collection.
64 func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) {
65         modTime := c.ModifiedAt
66         if modTime.IsZero() {
67                 modTime = time.Now()
68         }
69         fs := &collectionFileSystem{
70                 uuid:           c.UUID,
71                 storageClasses: c.StorageClassesDesired,
72                 fileSystem: fileSystem{
73                         fsBackend: keepBackend{apiClient: client, keepClient: kc},
74                         thr:       newThrottle(concurrentWriters),
75                 },
76         }
77         fs.savedPDH.Store(c.PortableDataHash)
78         if r := c.ReplicationDesired; r != nil {
79                 fs.replicas = *r
80         }
81         root := &dirnode{
82                 fs: fs,
83                 treenode: treenode{
84                         fileinfo: fileinfo{
85                                 name:    ".",
86                                 mode:    os.ModeDir | 0755,
87                                 modTime: modTime,
88                         },
89                         inodes: make(map[string]inode),
90                 },
91         }
92         root.SetParent(root, ".")
93         if err := root.loadManifest(c.ManifestText); err != nil {
94                 return nil, err
95         }
96         backdateTree(root, modTime)
97         fs.root = root
98         return fs, nil
99 }
100
101 // caller must have lock (or guarantee no concurrent accesses somehow)
102 func eachNode(n inode, ffunc func(*filenode), dfunc func(*dirnode)) {
103         switch n := n.(type) {
104         case *filenode:
105                 if ffunc != nil {
106                         ffunc(n)
107                 }
108         case *dirnode:
109                 if dfunc != nil {
110                         dfunc(n)
111                 }
112                 for _, n := range n.inodes {
113                         eachNode(n, ffunc, dfunc)
114                 }
115         }
116 }
117
118 // caller must have lock (or guarantee no concurrent accesses somehow)
119 func backdateTree(n inode, modTime time.Time) {
120         eachNode(n, func(fn *filenode) {
121                 fn.fileinfo.modTime = modTime
122         }, func(dn *dirnode) {
123                 dn.fileinfo.modTime = modTime
124         })
125 }
126
127 // Approximate portion of signature TTL remaining, usually between 0
128 // and 1, or negative if some signatures have expired.
129 func (fs *collectionFileSystem) signatureTimeLeft() (float64, time.Duration) {
130         var (
131                 now      = time.Now()
132                 earliest = now.Add(time.Hour * 24 * 7 * 365)
133                 latest   time.Time
134         )
135         fs.fileSystem.root.RLock()
136         eachNode(fs.root, func(fn *filenode) {
137                 fn.Lock()
138                 defer fn.Unlock()
139                 for _, seg := range fn.segments {
140                         seg, ok := seg.(storedSegment)
141                         if !ok {
142                                 continue
143                         }
144                         expiryTime, err := signatureExpiryTime(seg.locator)
145                         if err != nil {
146                                 continue
147                         }
148                         if expiryTime.Before(earliest) {
149                                 earliest = expiryTime
150                         }
151                         if expiryTime.After(latest) {
152                                 latest = expiryTime
153                         }
154                 }
155         }, nil)
156         fs.fileSystem.root.RUnlock()
157
158         if latest.IsZero() {
159                 // No signatures == 100% of TTL remaining.
160                 return 1, 1
161         }
162
163         ttl := latest.Sub(now)
164         fs.fileSystem.root.Lock()
165         {
166                 if ttl > fs.guessSignatureTTL {
167                         // ttl is closer to the real TTL than
168                         // guessSignatureTTL.
169                         fs.guessSignatureTTL = ttl
170                 } else {
171                         // Use the previous best guess to compute the
172                         // portion remaining (below, after unlocking
173                         // mutex).
174                         ttl = fs.guessSignatureTTL
175                 }
176         }
177         fs.fileSystem.root.Unlock()
178
179         return earliest.Sub(now).Seconds() / ttl.Seconds(), ttl
180 }
181
182 func (fs *collectionFileSystem) updateSignatures(newmanifest string) {
183         newLoc := map[string]string{}
184         for _, tok := range regexp.MustCompile(`\S+`).FindAllString(newmanifest, -1) {
185                 if mBlkRe.MatchString(tok) {
186                         newLoc[stripAllHints(tok)] = tok
187                 }
188         }
189         fs.fileSystem.root.Lock()
190         defer fs.fileSystem.root.Unlock()
191         eachNode(fs.root, func(fn *filenode) {
192                 fn.Lock()
193                 defer fn.Unlock()
194                 for idx, seg := range fn.segments {
195                         seg, ok := seg.(storedSegment)
196                         if !ok {
197                                 continue
198                         }
199                         loc, ok := newLoc[stripAllHints(seg.locator)]
200                         if !ok {
201                                 continue
202                         }
203                         seg.locator = loc
204                         fn.segments[idx] = seg
205                 }
206         }, nil)
207 }
208
209 func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
210         if name == "" || name == "." || name == ".." {
211                 return nil, ErrInvalidArgument
212         }
213         if perm.IsDir() {
214                 return &dirnode{
215                         fs: fs,
216                         treenode: treenode{
217                                 fileinfo: fileinfo{
218                                         name:    name,
219                                         mode:    perm | os.ModeDir,
220                                         modTime: modTime,
221                                 },
222                                 inodes: make(map[string]inode),
223                         },
224                 }, nil
225         }
226         return &filenode{
227                 fs: fs,
228                 fileinfo: fileinfo{
229                         name:    name,
230                         mode:    perm & ^os.ModeDir,
231                         modTime: modTime,
232                 },
233         }, nil
234 }
235
236 func (fs *collectionFileSystem) Child(name string, replace func(inode) (inode, error)) (inode, error) {
237         return fs.rootnode().Child(name, replace)
238 }
239
240 func (fs *collectionFileSystem) FS() FileSystem {
241         return fs
242 }
243
244 func (fs *collectionFileSystem) FileInfo() os.FileInfo {
245         return fs.rootnode().FileInfo()
246 }
247
248 func (fs *collectionFileSystem) IsDir() bool {
249         return true
250 }
251
252 func (fs *collectionFileSystem) Lock() {
253         fs.rootnode().Lock()
254 }
255
256 func (fs *collectionFileSystem) Unlock() {
257         fs.rootnode().Unlock()
258 }
259
260 func (fs *collectionFileSystem) RLock() {
261         fs.rootnode().RLock()
262 }
263
264 func (fs *collectionFileSystem) RUnlock() {
265         fs.rootnode().RUnlock()
266 }
267
268 func (fs *collectionFileSystem) Parent() inode {
269         return fs.rootnode().Parent()
270 }
271
272 func (fs *collectionFileSystem) Read(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
273         return 0, ptr, ErrInvalidOperation
274 }
275
276 func (fs *collectionFileSystem) Write(_ []byte, ptr filenodePtr) (int, filenodePtr, error) {
277         return 0, ptr, ErrInvalidOperation
278 }
279
280 func (fs *collectionFileSystem) Readdir() ([]os.FileInfo, error) {
281         return fs.rootnode().Readdir()
282 }
283
284 func (fs *collectionFileSystem) SetParent(parent inode, name string) {
285         fs.rootnode().SetParent(parent, name)
286 }
287
288 func (fs *collectionFileSystem) Truncate(int64) error {
289         return ErrInvalidOperation
290 }
291
292 // Check for and incorporate upstream changes -- unless that has
293 // already been done recently, in which case this func is a no-op.
294 func (fs *collectionFileSystem) checkChangesOnServer() error {
295         if fs.uuid == "" && fs.savedPDH.Load() == "" {
296                 return nil
297         }
298
299         // First try UUID if any, then last known PDH. Stop if all
300         // signatures are new enough.
301         checkingAll := false
302         for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
303                 if id == "" {
304                         continue
305                 }
306
307                 fs.lockCheckChanges.Lock()
308                 if !checkingAll && fs.holdCheckChanges.After(time.Now()) {
309                         fs.lockCheckChanges.Unlock()
310                         return nil
311                 }
312                 remain, ttl := fs.signatureTimeLeft()
313                 if remain > 0.01 && !checkingAll {
314                         fs.holdCheckChanges = time.Now().Add(ttl / 100)
315                 }
316                 fs.lockCheckChanges.Unlock()
317
318                 if remain >= 0.5 {
319                         break
320                 }
321                 checkingAll = true
322                 var coll Collection
323                 err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
324                 if err != nil {
325                         continue
326                 }
327                 fs.updateSignatures(coll.ManifestText)
328         }
329         return nil
330 }
331
332 // Refresh signature on a single locator, if necessary. Assume caller
333 // has lock. If an update is needed, and there are any storedSegments
334 // whose signatures can be updated, start a background task to update
335 // them asynchronously when the caller releases locks.
336 func (fs *collectionFileSystem) refreshSignature(locator string) string {
337         exp, err := signatureExpiryTime(locator)
338         if err != nil || exp.Sub(time.Now()) > time.Minute {
339                 // Synchronous update is not needed. Start an
340                 // asynchronous update if needed.
341                 go fs.checkChangesOnServer()
342                 return locator
343         }
344         var manifests string
345         for _, id := range []string{fs.uuid, fs.savedPDH.Load().(string)} {
346                 if id == "" {
347                         continue
348                 }
349                 var coll Collection
350                 err := fs.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+id, nil, map[string]interface{}{"select": []string{"portable_data_hash", "manifest_text"}})
351                 if err != nil {
352                         continue
353                 }
354                 manifests += coll.ManifestText
355         }
356         hash := stripAllHints(locator)
357         for _, tok := range regexp.MustCompile(`\S+`).FindAllString(manifests, -1) {
358                 if mBlkRe.MatchString(tok) {
359                         if stripAllHints(tok) == hash {
360                                 locator = tok
361                                 break
362                         }
363                 }
364         }
365         go fs.updateSignatures(manifests)
366         return locator
367 }
368
369 func (fs *collectionFileSystem) Sync() error {
370         err := fs.checkChangesOnServer()
371         if err != nil {
372                 return err
373         }
374         if fs.uuid == "" {
375                 return nil
376         }
377         txt, err := fs.MarshalManifest(".")
378         if err != nil {
379                 return fmt.Errorf("sync failed: %s", err)
380         }
381         if PortableDataHash(txt) == fs.savedPDH.Load() {
382                 // No local changes since last save or initial load.
383                 return nil
384         }
385         coll := Collection{
386                 UUID:         fs.uuid,
387                 ManifestText: txt,
388         }
389
390         selectFields := []string{"uuid", "portable_data_hash"}
391         fs.lockCheckChanges.Lock()
392         remain, _ := fs.signatureTimeLeft()
393         fs.lockCheckChanges.Unlock()
394         if remain < 0.5 {
395                 selectFields = append(selectFields, "manifest_text")
396         }
397
398         err = fs.RequestAndDecode(&coll, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
399                 "collection": map[string]string{
400                         "manifest_text": coll.ManifestText,
401                 },
402                 "select": selectFields,
403         })
404         if err != nil {
405                 return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
406         }
407         fs.updateSignatures(coll.ManifestText)
408         fs.savedPDH.Store(coll.PortableDataHash)
409         return nil
410 }
411
412 func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
413         node, err := rlookup(fs.fileSystem.root, path)
414         if err != nil {
415                 return err
416         }
417         dn, ok := node.(*dirnode)
418         if !ok {
419                 return ErrNotADirectory
420         }
421         dn.Lock()
422         defer dn.Unlock()
423         names := dn.sortedNames()
424         if path != "" {
425                 // Caller only wants to flush the specified dir,
426                 // non-recursively.  Drop subdirs from the list of
427                 // names.
428                 var filenames []string
429                 for _, name := range names {
430                         if _, ok := dn.inodes[name].(*filenode); ok {
431                                 filenames = append(filenames, name)
432                         }
433                 }
434                 names = filenames
435         }
436         for _, name := range names {
437                 child := dn.inodes[name]
438                 child.Lock()
439                 defer child.Unlock()
440         }
441         return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
442 }
443
444 func (fs *collectionFileSystem) MemorySize() int64 {
445         fs.fileSystem.root.Lock()
446         defer fs.fileSystem.root.Unlock()
447         return fs.fileSystem.root.(*dirnode).MemorySize()
448 }
449
450 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
451         fs.fileSystem.root.Lock()
452         defer fs.fileSystem.root.Unlock()
453         return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix)
454 }
455
456 func (fs *collectionFileSystem) Size() int64 {
457         return fs.fileSystem.root.(*dirnode).TreeSize()
458 }
459
460 // filenodePtr is an offset into a file that is (usually) efficient to
461 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
462 // then
463 // filenode.segments[filenodePtr.segmentIdx][filenodePtr.segmentOff]
464 // corresponds to file offset filenodePtr.off. Otherwise, it is
465 // necessary to reexamine len(filenode.segments[0]) etc. to find the
466 // correct segment and offset.
467 type filenodePtr struct {
468         off        int64
469         segmentIdx int
470         segmentOff int
471         repacked   int64
472 }
473
474 // seek returns a ptr that is consistent with both startPtr.off and
475 // the current state of fn. The caller must already hold fn.RLock() or
476 // fn.Lock().
477 //
478 // If startPtr is beyond EOF, ptr.segment* will indicate precisely
479 // EOF.
480 //
481 // After seeking:
482 //
483 //     ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
484 //     ||
485 //     filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
486 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
487         ptr = startPtr
488         if ptr.off < 0 {
489                 // meaningless anyway
490                 return
491         } else if ptr.off >= fn.fileinfo.size {
492                 ptr.segmentIdx = len(fn.segments)
493                 ptr.segmentOff = 0
494                 ptr.repacked = fn.repacked
495                 return
496         } else if ptr.repacked == fn.repacked {
497                 // segmentIdx and segmentOff accurately reflect
498                 // ptr.off, but might have fallen off the end of a
499                 // segment
500                 if ptr.segmentOff >= fn.segments[ptr.segmentIdx].Len() {
501                         ptr.segmentIdx++
502                         ptr.segmentOff = 0
503                 }
504                 return
505         }
506         defer func() {
507                 ptr.repacked = fn.repacked
508         }()
509         if ptr.off >= fn.fileinfo.size {
510                 ptr.segmentIdx, ptr.segmentOff = len(fn.segments), 0
511                 return
512         }
513         // Recompute segmentIdx and segmentOff.  We have already
514         // established fn.fileinfo.size > ptr.off >= 0, so we don't
515         // have to deal with edge cases here.
516         var off int64
517         for ptr.segmentIdx, ptr.segmentOff = 0, 0; off < ptr.off; ptr.segmentIdx++ {
518                 // This would panic (index out of range) if
519                 // fn.fileinfo.size were larger than
520                 // sum(fn.segments[i].Len()) -- but that can't happen
521                 // because we have ensured fn.fileinfo.size is always
522                 // accurate.
523                 segLen := int64(fn.segments[ptr.segmentIdx].Len())
524                 if off+segLen > ptr.off {
525                         ptr.segmentOff = int(ptr.off - off)
526                         break
527                 }
528                 off += segLen
529         }
530         return
531 }
532
533 // filenode implements inode.
534 type filenode struct {
535         parent   inode
536         fs       *collectionFileSystem
537         fileinfo fileinfo
538         segments []segment
539         // number of times `segments` has changed in a
540         // way that might invalidate a filenodePtr
541         repacked int64
542         memsize  int64 // bytes in memSegments
543         sync.RWMutex
544         nullnode
545 }
546
547 // caller must have lock
548 func (fn *filenode) appendSegment(e segment) {
549         fn.segments = append(fn.segments, e)
550         fn.fileinfo.size += int64(e.Len())
551 }
552
553 func (fn *filenode) SetParent(p inode, name string) {
554         fn.Lock()
555         defer fn.Unlock()
556         fn.parent = p
557         fn.fileinfo.name = name
558 }
559
560 func (fn *filenode) Parent() inode {
561         fn.RLock()
562         defer fn.RUnlock()
563         return fn.parent
564 }
565
566 func (fn *filenode) FS() FileSystem {
567         return fn.fs
568 }
569
570 // Read reads file data from a single segment, starting at startPtr,
571 // into p. startPtr is assumed not to be up-to-date. Caller must have
572 // RLock or Lock.
573 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
574         ptr = fn.seek(startPtr)
575         if ptr.off < 0 {
576                 err = ErrNegativeOffset
577                 return
578         }
579         if ptr.segmentIdx >= len(fn.segments) {
580                 err = io.EOF
581                 return
582         }
583         if ss, ok := fn.segments[ptr.segmentIdx].(storedSegment); ok {
584                 ss.locator = fn.fs.refreshSignature(ss.locator)
585                 fn.segments[ptr.segmentIdx] = ss
586         }
587         n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
588         if n > 0 {
589                 ptr.off += int64(n)
590                 ptr.segmentOff += n
591                 if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() {
592                         ptr.segmentIdx++
593                         ptr.segmentOff = 0
594                         if ptr.segmentIdx < len(fn.segments) && err == io.EOF {
595                                 err = nil
596                         }
597                 }
598         }
599         return
600 }
601
602 func (fn *filenode) Size() int64 {
603         fn.RLock()
604         defer fn.RUnlock()
605         return fn.fileinfo.Size()
606 }
607
608 func (fn *filenode) FileInfo() os.FileInfo {
609         fn.RLock()
610         defer fn.RUnlock()
611         return fn.fileinfo
612 }
613
614 func (fn *filenode) Truncate(size int64) error {
615         fn.Lock()
616         defer fn.Unlock()
617         return fn.truncate(size)
618 }
619
620 func (fn *filenode) truncate(size int64) error {
621         if size == fn.fileinfo.size {
622                 return nil
623         }
624         fn.repacked++
625         if size < fn.fileinfo.size {
626                 ptr := fn.seek(filenodePtr{off: size})
627                 for i := ptr.segmentIdx; i < len(fn.segments); i++ {
628                         if seg, ok := fn.segments[i].(*memSegment); ok {
629                                 fn.memsize -= int64(seg.Len())
630                         }
631                 }
632                 if ptr.segmentOff == 0 {
633                         fn.segments = fn.segments[:ptr.segmentIdx]
634                 } else {
635                         fn.segments = fn.segments[:ptr.segmentIdx+1]
636                         switch seg := fn.segments[ptr.segmentIdx].(type) {
637                         case *memSegment:
638                                 seg.Truncate(ptr.segmentOff)
639                                 fn.memsize += int64(seg.Len())
640                         default:
641                                 fn.segments[ptr.segmentIdx] = seg.Slice(0, ptr.segmentOff)
642                         }
643                 }
644                 fn.fileinfo.size = size
645                 return nil
646         }
647         for size > fn.fileinfo.size {
648                 grow := size - fn.fileinfo.size
649                 var seg *memSegment
650                 var ok bool
651                 if len(fn.segments) == 0 {
652                         seg = &memSegment{}
653                         fn.segments = append(fn.segments, seg)
654                 } else if seg, ok = fn.segments[len(fn.segments)-1].(*memSegment); !ok || seg.Len() >= maxBlockSize {
655                         seg = &memSegment{}
656                         fn.segments = append(fn.segments, seg)
657                 }
658                 if maxgrow := int64(maxBlockSize - seg.Len()); maxgrow < grow {
659                         grow = maxgrow
660                 }
661                 seg.Truncate(seg.Len() + int(grow))
662                 fn.fileinfo.size += grow
663                 fn.memsize += grow
664         }
665         return nil
666 }
667
668 // Write writes data from p to the file, starting at startPtr,
669 // extending the file size if necessary. Caller must have Lock.
670 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
671         if startPtr.off > fn.fileinfo.size {
672                 if err = fn.truncate(startPtr.off); err != nil {
673                         return 0, startPtr, err
674                 }
675         }
676         ptr = fn.seek(startPtr)
677         if ptr.off < 0 {
678                 err = ErrNegativeOffset
679                 return
680         }
681         for len(p) > 0 && err == nil {
682                 cando := p
683                 if len(cando) > maxBlockSize {
684                         cando = cando[:maxBlockSize]
685                 }
686                 // Rearrange/grow fn.segments (and shrink cando if
687                 // needed) such that cando can be copied to
688                 // fn.segments[ptr.segmentIdx] at offset
689                 // ptr.segmentOff.
690                 cur := ptr.segmentIdx
691                 prev := ptr.segmentIdx - 1
692                 var curWritable bool
693                 if cur < len(fn.segments) {
694                         _, curWritable = fn.segments[cur].(*memSegment)
695                 }
696                 var prevAppendable bool
697                 if prev >= 0 && fn.segments[prev].Len() < maxBlockSize {
698                         _, prevAppendable = fn.segments[prev].(*memSegment)
699                 }
700                 if ptr.segmentOff > 0 && !curWritable {
701                         // Split a non-writable block.
702                         if max := fn.segments[cur].Len() - ptr.segmentOff; max <= len(cando) {
703                                 // Truncate cur, and insert a new
704                                 // segment after it.
705                                 cando = cando[:max]
706                                 fn.segments = append(fn.segments, nil)
707                                 copy(fn.segments[cur+1:], fn.segments[cur:])
708                         } else {
709                                 // Split cur into two copies, truncate
710                                 // the one on the left, shift the one
711                                 // on the right, and insert a new
712                                 // segment between them.
713                                 fn.segments = append(fn.segments, nil, nil)
714                                 copy(fn.segments[cur+2:], fn.segments[cur:])
715                                 fn.segments[cur+2] = fn.segments[cur+2].Slice(ptr.segmentOff+len(cando), -1)
716                         }
717                         cur++
718                         prev++
719                         seg := &memSegment{}
720                         seg.Truncate(len(cando))
721                         fn.memsize += int64(len(cando))
722                         fn.segments[cur] = seg
723                         fn.segments[prev] = fn.segments[prev].Slice(0, ptr.segmentOff)
724                         ptr.segmentIdx++
725                         ptr.segmentOff = 0
726                         fn.repacked++
727                         ptr.repacked++
728                 } else if curWritable {
729                         if fit := int(fn.segments[cur].Len()) - ptr.segmentOff; fit < len(cando) {
730                                 cando = cando[:fit]
731                         }
732                 } else {
733                         if prevAppendable {
734                                 // Shrink cando if needed to fit in
735                                 // prev segment.
736                                 if cangrow := maxBlockSize - fn.segments[prev].Len(); cangrow < len(cando) {
737                                         cando = cando[:cangrow]
738                                 }
739                         }
740
741                         if cur == len(fn.segments) {
742                                 // ptr is at EOF, filesize is changing.
743                                 fn.fileinfo.size += int64(len(cando))
744                         } else if el := fn.segments[cur].Len(); el <= len(cando) {
745                                 // cando is long enough that we won't
746                                 // need cur any more. shrink cando to
747                                 // be exactly as long as cur
748                                 // (otherwise we'd accidentally shift
749                                 // the effective position of all
750                                 // segments after cur).
751                                 cando = cando[:el]
752                                 copy(fn.segments[cur:], fn.segments[cur+1:])
753                                 fn.segments = fn.segments[:len(fn.segments)-1]
754                         } else {
755                                 // shrink cur by the same #bytes we're growing prev
756                                 fn.segments[cur] = fn.segments[cur].Slice(len(cando), -1)
757                         }
758
759                         if prevAppendable {
760                                 // Grow prev.
761                                 ptr.segmentIdx--
762                                 ptr.segmentOff = fn.segments[prev].Len()
763                                 fn.segments[prev].(*memSegment).Truncate(ptr.segmentOff + len(cando))
764                                 fn.memsize += int64(len(cando))
765                                 ptr.repacked++
766                                 fn.repacked++
767                         } else {
768                                 // Insert a segment between prev and
769                                 // cur, and advance prev/cur.
770                                 fn.segments = append(fn.segments, nil)
771                                 if cur < len(fn.segments) {
772                                         copy(fn.segments[cur+1:], fn.segments[cur:])
773                                         ptr.repacked++
774                                         fn.repacked++
775                                 } else {
776                                         // appending a new segment does
777                                         // not invalidate any ptrs
778                                 }
779                                 seg := &memSegment{}
780                                 seg.Truncate(len(cando))
781                                 fn.memsize += int64(len(cando))
782                                 fn.segments[cur] = seg
783                         }
784                 }
785
786                 // Finally we can copy bytes from cando to the current segment.
787                 fn.segments[ptr.segmentIdx].(*memSegment).WriteAt(cando, ptr.segmentOff)
788                 n += len(cando)
789                 p = p[len(cando):]
790
791                 ptr.off += int64(len(cando))
792                 ptr.segmentOff += len(cando)
793                 if ptr.segmentOff >= maxBlockSize {
794                         fn.pruneMemSegments()
795                 }
796                 if fn.segments[ptr.segmentIdx].Len() == ptr.segmentOff {
797                         ptr.segmentOff = 0
798                         ptr.segmentIdx++
799                 }
800
801                 fn.fileinfo.modTime = time.Now()
802         }
803         return
804 }
805
806 // Write some data out to disk to reduce memory use. Caller must have
807 // write lock.
808 func (fn *filenode) pruneMemSegments() {
809         // TODO: share code with (*dirnode)flush()
810         // TODO: pack/flush small blocks too, when fragmented
811         for idx, seg := range fn.segments {
812                 seg, ok := seg.(*memSegment)
813                 if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
814                         continue
815                 }
816                 // Setting seg.flushing guarantees seg.buf will not be
817                 // modified in place: WriteAt and Truncate will
818                 // allocate a new buf instead, if necessary.
819                 idx, buf := idx, seg.buf
820                 done := make(chan struct{})
821                 seg.flushing = done
822                 // If lots of background writes are already in
823                 // progress, block here until one finishes, rather
824                 // than pile up an unlimited number of buffered writes
825                 // and network flush operations.
826                 fn.fs.throttle().Acquire()
827                 go func() {
828                         defer close(done)
829                         resp, err := fn.FS().BlockWrite(context.Background(), BlockWriteOptions{
830                                 Data:           buf,
831                                 Replicas:       fn.fs.replicas,
832                                 StorageClasses: fn.fs.storageClasses,
833                         })
834                         fn.fs.throttle().Release()
835                         fn.Lock()
836                         defer fn.Unlock()
837                         if seg.flushing != done {
838                                 // A new seg.buf has been allocated.
839                                 return
840                         }
841                         if err != nil {
842                                 // TODO: stall (or return errors from)
843                                 // subsequent writes until flushing
844                                 // starts to succeed.
845                                 return
846                         }
847                         if len(fn.segments) <= idx || fn.segments[idx] != seg || len(seg.buf) != len(buf) {
848                                 // Segment has been dropped/moved/resized.
849                                 return
850                         }
851                         fn.memsize -= int64(len(buf))
852                         fn.segments[idx] = storedSegment{
853                                 kc:      fn.FS(),
854                                 locator: resp.Locator,
855                                 size:    len(buf),
856                                 offset:  0,
857                                 length:  len(buf),
858                         }
859                 }()
860         }
861 }
862
863 // Block until all pending pruneMemSegments/flush work is
864 // finished. Caller must NOT have lock.
865 func (fn *filenode) waitPrune() {
866         var pending []<-chan struct{}
867         fn.Lock()
868         for _, seg := range fn.segments {
869                 if seg, ok := seg.(*memSegment); ok && seg.flushing != nil {
870                         pending = append(pending, seg.flushing)
871                 }
872         }
873         fn.Unlock()
874         for _, p := range pending {
875                 <-p
876         }
877 }
878
879 type dirnode struct {
880         fs *collectionFileSystem
881         treenode
882 }
883
884 func (dn *dirnode) FS() FileSystem {
885         return dn.fs
886 }
887
888 func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
889         if dn == dn.fs.rootnode() && name == ".arvados#collection" {
890                 gn := &getternode{Getter: func() ([]byte, error) {
891                         var coll Collection
892                         var err error
893                         coll.ManifestText, err = dn.fs.MarshalManifest(".")
894                         if err != nil {
895                                 return nil, err
896                         }
897                         coll.UUID = dn.fs.uuid
898                         data, err := json.Marshal(&coll)
899                         if err == nil {
900                                 data = append(data, '\n')
901                         }
902                         return data, err
903                 }}
904                 gn.SetParent(dn, name)
905                 return gn, nil
906         }
907         return dn.treenode.Child(name, replace)
908 }
909
910 type fnSegmentRef struct {
911         fn  *filenode
912         idx int
913 }
914
915 // commitBlock concatenates the data from the given filenode segments
916 // (which must be *memSegments), writes the data out to Keep as a
917 // single block, and replaces the filenodes' *memSegments with
918 // storedSegments that reference the relevant portions of the new
919 // block.
920 //
921 // bufsize is the total data size in refs. It is used to preallocate
922 // the correct amount of memory when len(refs)>1.
923 //
924 // If sync is false, commitBlock returns right away, after starting a
925 // goroutine to do the writes, reacquire the filenodes' locks, and
926 // swap out the *memSegments. Some filenodes' segments might get
927 // modified/rearranged in the meantime, in which case commitBlock
928 // won't replace them.
929 //
930 // Caller must have write lock.
931 func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error {
932         if len(refs) == 0 {
933                 return nil
934         }
935         if err := ctx.Err(); err != nil {
936                 return err
937         }
938         done := make(chan struct{})
939         var block []byte
940         segs := make([]*memSegment, 0, len(refs))
941         offsets := make([]int, 0, len(refs)) // location of segment's data within block
942         for _, ref := range refs {
943                 seg := ref.fn.segments[ref.idx].(*memSegment)
944                 if !sync && seg.flushingUnfinished() {
945                         // Let the other flushing goroutine finish. If
946                         // it fails, we'll try again next time.
947                         close(done)
948                         return nil
949                 }
950                 // In sync mode, we proceed regardless of
951                 // whether another flush is in progress: It
952                 // can't finish before we do, because we hold
953                 // fn's lock until we finish our own writes.
954                 seg.flushing = done
955                 offsets = append(offsets, len(block))
956                 if len(refs) == 1 {
957                         block = seg.buf
958                 } else if block == nil {
959                         block = append(make([]byte, 0, bufsize), seg.buf...)
960                 } else {
961                         block = append(block, seg.buf...)
962                 }
963                 segs = append(segs, seg)
964         }
965         blocksize := len(block)
966         dn.fs.throttle().Acquire()
967         errs := make(chan error, 1)
968         go func() {
969                 defer close(done)
970                 defer close(errs)
971                 resp, err := dn.fs.BlockWrite(context.Background(), BlockWriteOptions{
972                         Data:           block,
973                         Replicas:       dn.fs.replicas,
974                         StorageClasses: dn.fs.storageClasses,
975                 })
976                 dn.fs.throttle().Release()
977                 if err != nil {
978                         errs <- err
979                         return
980                 }
981                 for idx, ref := range refs {
982                         if !sync {
983                                 ref.fn.Lock()
984                                 // In async mode, fn's lock was
985                                 // released while we were waiting for
986                                 // PutB(); lots of things might have
987                                 // changed.
988                                 if len(ref.fn.segments) <= ref.idx {
989                                         // file segments have
990                                         // rearranged or changed in
991                                         // some way
992                                         ref.fn.Unlock()
993                                         continue
994                                 } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
995                                         // segment has been replaced
996                                         ref.fn.Unlock()
997                                         continue
998                                 } else if seg.flushing != done {
999                                         // seg.buf has been replaced
1000                                         ref.fn.Unlock()
1001                                         continue
1002                                 }
1003                         }
1004                         data := ref.fn.segments[ref.idx].(*memSegment).buf
1005                         ref.fn.segments[ref.idx] = storedSegment{
1006                                 kc:      dn.fs,
1007                                 locator: resp.Locator,
1008                                 size:    blocksize,
1009                                 offset:  offsets[idx],
1010                                 length:  len(data),
1011                         }
1012                         // atomic is needed here despite caller having
1013                         // lock: caller might be running concurrent
1014                         // commitBlock() goroutines using the same
1015                         // lock, writing different segments from the
1016                         // same file.
1017                         atomic.AddInt64(&ref.fn.memsize, -int64(len(data)))
1018                         if !sync {
1019                                 ref.fn.Unlock()
1020                         }
1021                 }
1022         }()
1023         if sync {
1024                 return <-errs
1025         }
1026         return nil
1027 }
1028
1029 type flushOpts struct {
1030         sync        bool
1031         shortBlocks bool
1032 }
1033
1034 // flush in-memory data and remote-cluster block references (for the
1035 // children with the given names, which must be children of dn) to
1036 // local-cluster persistent storage.
1037 //
1038 // Caller must have write lock on dn and the named children.
1039 //
1040 // If any children are dirs, they will be flushed recursively.
1041 func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) error {
1042         cg := newContextGroup(ctx)
1043         defer cg.Cancel()
1044
1045         goCommit := func(refs []fnSegmentRef, bufsize int) {
1046                 cg.Go(func() error {
1047                         return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync)
1048                 })
1049         }
1050
1051         var pending []fnSegmentRef
1052         var pendingLen int = 0
1053         localLocator := map[string]string{}
1054         for _, name := range names {
1055                 switch node := dn.inodes[name].(type) {
1056                 case *dirnode:
1057                         grandchildNames := node.sortedNames()
1058                         for _, grandchildName := range grandchildNames {
1059                                 grandchild := node.inodes[grandchildName]
1060                                 grandchild.Lock()
1061                                 defer grandchild.Unlock()
1062                         }
1063                         cg.Go(func() error { return node.flush(cg.Context(), grandchildNames, opts) })
1064                 case *filenode:
1065                         for idx, seg := range node.segments {
1066                                 switch seg := seg.(type) {
1067                                 case storedSegment:
1068                                         loc, ok := localLocator[seg.locator]
1069                                         if !ok {
1070                                                 var err error
1071                                                 loc, err = dn.fs.LocalLocator(seg.locator)
1072                                                 if err != nil {
1073                                                         return err
1074                                                 }
1075                                                 localLocator[seg.locator] = loc
1076                                         }
1077                                         seg.locator = loc
1078                                         node.segments[idx] = seg
1079                                 case *memSegment:
1080                                         if seg.Len() > maxBlockSize/2 {
1081                                                 goCommit([]fnSegmentRef{{node, idx}}, seg.Len())
1082                                                 continue
1083                                         }
1084                                         if pendingLen+seg.Len() > maxBlockSize {
1085                                                 goCommit(pending, pendingLen)
1086                                                 pending = nil
1087                                                 pendingLen = 0
1088                                         }
1089                                         pending = append(pending, fnSegmentRef{node, idx})
1090                                         pendingLen += seg.Len()
1091                                 default:
1092                                         panic(fmt.Sprintf("can't sync segment type %T", seg))
1093                                 }
1094                         }
1095                 }
1096         }
1097         if opts.shortBlocks {
1098                 goCommit(pending, pendingLen)
1099         }
1100         return cg.Wait()
1101 }
1102
1103 // caller must have write lock.
1104 func (dn *dirnode) MemorySize() (size int64) {
1105         for _, name := range dn.sortedNames() {
1106                 node := dn.inodes[name]
1107                 node.Lock()
1108                 defer node.Unlock()
1109                 switch node := node.(type) {
1110                 case *dirnode:
1111                         size += node.MemorySize()
1112                 case *filenode:
1113                         for _, seg := range node.segments {
1114                                 switch seg := seg.(type) {
1115                                 case *memSegment:
1116                                         size += int64(seg.Len())
1117                                 }
1118                         }
1119                 }
1120         }
1121         return
1122 }
1123
1124 // caller must have write lock.
1125 func (dn *dirnode) sortedNames() []string {
1126         names := make([]string, 0, len(dn.inodes))
1127         for name := range dn.inodes {
1128                 names = append(names, name)
1129         }
1130         sort.Strings(names)
1131         return names
1132 }
1133
1134 // caller must have write lock.
1135 func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) {
1136         cg := newContextGroup(ctx)
1137         defer cg.Cancel()
1138
1139         if len(dn.inodes) == 0 {
1140                 if prefix == "." {
1141                         return "", nil
1142                 }
1143                 // Express the existence of an empty directory by
1144                 // adding an empty file named `\056`, which (unlike
1145                 // the more obvious spelling `.`) is accepted by the
1146                 // API's manifest validator.
1147                 return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
1148         }
1149
1150         names := dn.sortedNames()
1151
1152         // Wait for children to finish any pending write operations
1153         // before locking them.
1154         for _, name := range names {
1155                 node := dn.inodes[name]
1156                 if fn, ok := node.(*filenode); ok {
1157                         fn.waitPrune()
1158                 }
1159         }
1160
1161         var dirnames []string
1162         var filenames []string
1163         for _, name := range names {
1164                 node := dn.inodes[name]
1165                 node.Lock()
1166                 defer node.Unlock()
1167                 switch node := node.(type) {
1168                 case *dirnode:
1169                         dirnames = append(dirnames, name)
1170                 case *filenode:
1171                         filenames = append(filenames, name)
1172                 default:
1173                         panic(fmt.Sprintf("can't marshal inode type %T", node))
1174                 }
1175         }
1176
1177         subdirs := make([]string, len(dirnames))
1178         rootdir := ""
1179         for i, name := range dirnames {
1180                 i, name := i, name
1181                 cg.Go(func() error {
1182                         txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name)
1183                         subdirs[i] = txt
1184                         return err
1185                 })
1186         }
1187
1188         cg.Go(func() error {
1189                 var streamLen int64
1190                 type filepart struct {
1191                         name   string
1192                         offset int64
1193                         length int64
1194                 }
1195
1196                 var fileparts []filepart
1197                 var blocks []string
1198                 if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
1199                         return err
1200                 }
1201                 for _, name := range filenames {
1202                         node := dn.inodes[name].(*filenode)
1203                         if len(node.segments) == 0 {
1204                                 fileparts = append(fileparts, filepart{name: name})
1205                                 continue
1206                         }
1207                         for _, seg := range node.segments {
1208                                 switch seg := seg.(type) {
1209                                 case storedSegment:
1210                                         if len(blocks) > 0 && blocks[len(blocks)-1] == seg.locator {
1211                                                 streamLen -= int64(seg.size)
1212                                         } else {
1213                                                 blocks = append(blocks, seg.locator)
1214                                         }
1215                                         next := filepart{
1216                                                 name:   name,
1217                                                 offset: streamLen + int64(seg.offset),
1218                                                 length: int64(seg.length),
1219                                         }
1220                                         if prev := len(fileparts) - 1; prev >= 0 &&
1221                                                 fileparts[prev].name == name &&
1222                                                 fileparts[prev].offset+fileparts[prev].length == next.offset {
1223                                                 fileparts[prev].length += next.length
1224                                         } else {
1225                                                 fileparts = append(fileparts, next)
1226                                         }
1227                                         streamLen += int64(seg.size)
1228                                 default:
1229                                         // This can't happen: we
1230                                         // haven't unlocked since
1231                                         // calling flush(sync=true).
1232                                         panic(fmt.Sprintf("can't marshal segment type %T", seg))
1233                                 }
1234                         }
1235                 }
1236                 var filetokens []string
1237                 for _, s := range fileparts {
1238                         filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
1239                 }
1240                 if len(filetokens) == 0 {
1241                         return nil
1242                 } else if len(blocks) == 0 {
1243                         blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
1244                 }
1245                 rootdir = manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n"
1246                 return nil
1247         })
1248         err := cg.Wait()
1249         return rootdir + strings.Join(subdirs, ""), err
1250 }
1251
1252 func (dn *dirnode) loadManifest(txt string) error {
1253         streams := bytes.Split([]byte(txt), []byte{'\n'})
1254         if len(streams[len(streams)-1]) != 0 {
1255                 return fmt.Errorf("line %d: no trailing newline", len(streams))
1256         }
1257         streams = streams[:len(streams)-1]
1258         segments := []storedSegment{}
1259         // To reduce allocs, we reuse a single "pathparts" slice
1260         // (pre-split on "/" separators) for the duration of this
1261         // func.
1262         var pathparts []string
1263         // To reduce allocs, we reuse a single "toks" slice of 3 byte
1264         // slices.
1265         var toks = make([][]byte, 3)
1266         // Similar to bytes.SplitN(token, []byte{c}, 3), but splits
1267         // into the toks slice rather than allocating a new one, and
1268         // returns the number of toks (1, 2, or 3).
1269         splitToToks := func(src []byte, c rune) int {
1270                 c1 := bytes.IndexRune(src, c)
1271                 if c1 < 0 {
1272                         toks[0] = src
1273                         return 1
1274                 }
1275                 toks[0], src = src[:c1], src[c1+1:]
1276                 c2 := bytes.IndexRune(src, c)
1277                 if c2 < 0 {
1278                         toks[1] = src
1279                         return 2
1280                 }
1281                 toks[1], toks[2] = src[:c2], src[c2+1:]
1282                 return 3
1283         }
1284         for i, stream := range streams {
1285                 lineno := i + 1
1286                 var anyFileTokens bool
1287                 var pos int64
1288                 var segIdx int
1289                 segments = segments[:0]
1290                 pathparts = nil
1291                 streamparts := 0
1292                 for i, token := range bytes.Split(stream, []byte{' '}) {
1293                         if i == 0 {
1294                                 pathparts = strings.Split(manifestUnescape(string(token)), "/")
1295                                 streamparts = len(pathparts)
1296                                 continue
1297                         }
1298                         if !bytes.ContainsRune(token, ':') {
1299                                 if anyFileTokens {
1300                                         return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1301                                 }
1302                                 if splitToToks(token, '+') < 2 {
1303                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
1304                                 }
1305                                 length, err := strconv.ParseInt(string(toks[1]), 10, 32)
1306                                 if err != nil || length < 0 {
1307                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
1308                                 }
1309                                 segments = append(segments, storedSegment{
1310                                         locator: string(token),
1311                                         size:    int(length),
1312                                         offset:  0,
1313                                         length:  int(length),
1314                                 })
1315                                 continue
1316                         } else if len(segments) == 0 {
1317                                 return fmt.Errorf("line %d: bad locator %q", lineno, token)
1318                         }
1319                         if splitToToks(token, ':') != 3 {
1320                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1321                         }
1322                         anyFileTokens = true
1323
1324                         offset, err := strconv.ParseInt(string(toks[0]), 10, 64)
1325                         if err != nil || offset < 0 {
1326                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1327                         }
1328                         length, err := strconv.ParseInt(string(toks[1]), 10, 64)
1329                         if err != nil || length < 0 {
1330                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1331                         }
1332                         if !bytes.ContainsAny(toks[2], `\/`) {
1333                                 // optimization for a common case
1334                                 pathparts = append(pathparts[:streamparts], string(toks[2]))
1335                         } else {
1336                                 pathparts = append(pathparts[:streamparts], strings.Split(manifestUnescape(string(toks[2])), "/")...)
1337                         }
1338                         fnode, err := dn.createFileAndParents(pathparts)
1339                         if fnode == nil && err == nil && length == 0 {
1340                                 // Special case: an empty file used as
1341                                 // a marker to preserve an otherwise
1342                                 // empty directory in a manifest.
1343                                 continue
1344                         }
1345                         if err != nil || (fnode == nil && length != 0) {
1346                                 return fmt.Errorf("line %d: cannot use name %q with length %d: %s", lineno, toks[2], length, err)
1347                         }
1348                         // Map the stream offset/range coordinates to
1349                         // block/offset/range coordinates and add
1350                         // corresponding storedSegments to the filenode
1351                         if pos > offset {
1352                                 // Can't continue where we left off.
1353                                 // TODO: binary search instead of
1354                                 // rewinding all the way (but this
1355                                 // situation might be rare anyway)
1356                                 segIdx, pos = 0, 0
1357                         }
1358                         for ; segIdx < len(segments); segIdx++ {
1359                                 seg := segments[segIdx]
1360                                 next := pos + int64(seg.Len())
1361                                 if next <= offset || seg.Len() == 0 {
1362                                         pos = next
1363                                         continue
1364                                 }
1365                                 if pos >= offset+length {
1366                                         break
1367                                 }
1368                                 var blkOff int
1369                                 if pos < offset {
1370                                         blkOff = int(offset - pos)
1371                                 }
1372                                 blkLen := seg.Len() - blkOff
1373                                 if pos+int64(blkOff+blkLen) > offset+length {
1374                                         blkLen = int(offset + length - pos - int64(blkOff))
1375                                 }
1376                                 fnode.appendSegment(storedSegment{
1377                                         kc:      dn.fs,
1378                                         locator: seg.locator,
1379                                         size:    seg.size,
1380                                         offset:  blkOff,
1381                                         length:  blkLen,
1382                                 })
1383                                 if next > offset+length {
1384                                         break
1385                                 } else {
1386                                         pos = next
1387                                 }
1388                         }
1389                         if segIdx == len(segments) && pos < offset+length {
1390                                 return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
1391                         }
1392                 }
1393                 if !anyFileTokens {
1394                         return fmt.Errorf("line %d: no file segments", lineno)
1395                 } else if len(segments) == 0 {
1396                         return fmt.Errorf("line %d: no locators", lineno)
1397                 } else if streamparts == 0 {
1398                         return fmt.Errorf("line %d: no stream name", lineno)
1399                 }
1400         }
1401         return nil
1402 }
1403
1404 // only safe to call from loadManifest -- no locking.
1405 //
1406 // If path is a "parent directory exists" marker (the last path
1407 // component is "."), the returned values are both nil.
1408 //
1409 // Newly added nodes have modtime==0. Caller is responsible for fixing
1410 // them with backdateTree.
1411 func (dn *dirnode) createFileAndParents(names []string) (fn *filenode, err error) {
1412         var node inode = dn
1413         basename := names[len(names)-1]
1414         for _, name := range names[:len(names)-1] {
1415                 switch name {
1416                 case "", ".":
1417                         continue
1418                 case "..":
1419                         if node == dn {
1420                                 // can't be sure parent will be a *dirnode
1421                                 return nil, ErrInvalidArgument
1422                         }
1423                         node = node.Parent()
1424                         continue
1425                 }
1426                 node.Lock()
1427                 unlock := node.Unlock
1428                 node, err = node.Child(name, func(child inode) (inode, error) {
1429                         if child == nil {
1430                                 // note modtime will be fixed later in backdateTree()
1431                                 child, err := node.FS().newNode(name, 0755|os.ModeDir, time.Time{})
1432                                 if err != nil {
1433                                         return nil, err
1434                                 }
1435                                 child.SetParent(node, name)
1436                                 return child, nil
1437                         } else if !child.IsDir() {
1438                                 return child, ErrFileExists
1439                         } else {
1440                                 return child, nil
1441                         }
1442                 })
1443                 unlock()
1444                 if err != nil {
1445                         return
1446                 }
1447         }
1448         if basename == "." {
1449                 return
1450         } else if !permittedName(basename) {
1451                 err = fmt.Errorf("invalid file part %q in path %q", basename, names)
1452                 return
1453         }
1454         node.Lock()
1455         defer node.Unlock()
1456         _, err = node.Child(basename, func(child inode) (inode, error) {
1457                 switch child := child.(type) {
1458                 case nil:
1459                         child, err = node.FS().newNode(basename, 0755, time.Time{})
1460                         if err != nil {
1461                                 return nil, err
1462                         }
1463                         child.SetParent(node, basename)
1464                         fn = child.(*filenode)
1465                         return child, nil
1466                 case *filenode:
1467                         fn = child
1468                         return child, nil
1469                 case *dirnode:
1470                         return child, ErrIsDirectory
1471                 default:
1472                         return child, ErrInvalidArgument
1473                 }
1474         })
1475         return
1476 }
1477
1478 func (dn *dirnode) TreeSize() (bytes int64) {
1479         dn.RLock()
1480         defer dn.RUnlock()
1481         for _, i := range dn.inodes {
1482                 switch i := i.(type) {
1483                 case *filenode:
1484                         bytes += i.Size()
1485                 case *dirnode:
1486                         bytes += i.TreeSize()
1487                 }
1488         }
1489         return
1490 }
1491
1492 type segment interface {
1493         io.ReaderAt
1494         Len() int
1495         // Return a new segment with a subsection of the data from this
1496         // one. length<0 means length=Len()-off.
1497         Slice(off int, length int) segment
1498 }
1499
1500 type memSegment struct {
1501         buf []byte
1502         // If flushing is not nil and not ready/closed, then a) buf is
1503         // being shared by a pruneMemSegments goroutine, and must be
1504         // copied on write; and b) the flushing channel will close
1505         // when the goroutine finishes, whether it succeeds or not.
1506         flushing <-chan struct{}
1507 }
1508
1509 func (me *memSegment) flushingUnfinished() bool {
1510         if me.flushing == nil {
1511                 return false
1512         }
1513         select {
1514         case <-me.flushing:
1515                 me.flushing = nil
1516                 return false
1517         default:
1518                 return true
1519         }
1520 }
1521
1522 func (me *memSegment) Len() int {
1523         return len(me.buf)
1524 }
1525
1526 func (me *memSegment) Slice(off, length int) segment {
1527         if length < 0 {
1528                 length = len(me.buf) - off
1529         }
1530         buf := make([]byte, length)
1531         copy(buf, me.buf[off:])
1532         return &memSegment{buf: buf}
1533 }
1534
1535 func (me *memSegment) Truncate(n int) {
1536         if n > cap(me.buf) || (me.flushing != nil && n > len(me.buf)) {
1537                 newsize := 1024
1538                 for newsize < n {
1539                         newsize = newsize << 2
1540                 }
1541                 newbuf := make([]byte, n, newsize)
1542                 copy(newbuf, me.buf)
1543                 me.buf, me.flushing = newbuf, nil
1544         } else {
1545                 // reclaim existing capacity, and zero reclaimed part
1546                 oldlen := len(me.buf)
1547                 me.buf = me.buf[:n]
1548                 for i := oldlen; i < n; i++ {
1549                         me.buf[i] = 0
1550                 }
1551         }
1552 }
1553
1554 func (me *memSegment) WriteAt(p []byte, off int) {
1555         if off+len(p) > len(me.buf) {
1556                 panic("overflowed segment")
1557         }
1558         if me.flushing != nil {
1559                 me.buf, me.flushing = append([]byte(nil), me.buf...), nil
1560         }
1561         copy(me.buf[off:], p)
1562 }
1563
1564 func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
1565         if off > int64(me.Len()) {
1566                 err = io.EOF
1567                 return
1568         }
1569         n = copy(p, me.buf[int(off):])
1570         if n < len(p) {
1571                 err = io.EOF
1572         }
1573         return
1574 }
1575
1576 type storedSegment struct {
1577         kc      fsBackend
1578         locator string
1579         size    int // size of stored block (also encoded in locator)
1580         offset  int // position of segment within the stored block
1581         length  int // bytes in this segment (offset + length <= size)
1582 }
1583
1584 func (se storedSegment) Len() int {
1585         return se.length
1586 }
1587
1588 func (se storedSegment) Slice(n, size int) segment {
1589         se.offset += n
1590         se.length -= n
1591         if size >= 0 && se.length > size {
1592                 se.length = size
1593         }
1594         return se
1595 }
1596
1597 func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
1598         if off > int64(se.length) {
1599                 return 0, io.EOF
1600         }
1601         maxlen := se.length - int(off)
1602         if len(p) > maxlen {
1603                 p = p[:maxlen]
1604                 n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1605                 if err == nil {
1606                         err = io.EOF
1607                 }
1608                 return
1609         }
1610         return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1611 }
1612
1613 func canonicalName(name string) string {
1614         name = path.Clean("/" + name)
1615         if name == "/" || name == "./" {
1616                 name = "."
1617         } else if strings.HasPrefix(name, "/") {
1618                 name = "." + name
1619         }
1620         return name
1621 }
1622
1623 var manifestEscapeSeq = regexp.MustCompile(`\\([0-7]{3}|\\)`)
1624
1625 func manifestUnescapeFunc(seq string) string {
1626         if seq == `\\` {
1627                 return `\`
1628         }
1629         i, err := strconv.ParseUint(seq[1:], 8, 8)
1630         if err != nil {
1631                 // Invalid escape sequence: can't unescape.
1632                 return seq
1633         }
1634         return string([]byte{byte(i)})
1635 }
1636
1637 func manifestUnescape(s string) string {
1638         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
1639 }
1640
1641 var manifestEscapedChar = regexp.MustCompile(`[\000-\040:\s\\]`)
1642
1643 func manifestEscapeFunc(seq string) string {
1644         return fmt.Sprintf("\\%03o", byte(seq[0]))
1645 }
1646
1647 func manifestEscape(s string) string {
1648         return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
1649 }