15652: Fix shadowed variable.
[arvados.git] / sdk / go / arvados / fs_collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "io"
12         "os"
13         "path"
14         "regexp"
15         "sort"
16         "strconv"
17         "strings"
18         "sync"
19         "time"
20 )
21
22 var (
23         maxBlockSize      = 1 << 26
24         concurrentWriters = 4 // max goroutines writing to Keep during flush()
25         writeAheadBlocks  = 1 // max background jobs flushing to Keep before blocking writes
26 )
27
28 // A CollectionFileSystem is a FileSystem that can be serialized as a
29 // manifest and stored as a collection.
30 type CollectionFileSystem interface {
31         FileSystem
32
33         // Flush all file data to Keep and return a snapshot of the
34         // filesystem suitable for saving as (Collection)ManifestText.
35         // Prefix (normally ".") is a top level directory, effectively
36         // prepended to all paths in the returned manifest.
37         MarshalManifest(prefix string) (string, error)
38
39         // Total data bytes in all files.
40         Size() int64
41
42         // Memory consumed by buffered file data.
43         memorySize() int64
44 }
45
46 type collectionFileSystem struct {
47         fileSystem
48         uuid string
49 }
50
51 // FileSystem returns a CollectionFileSystem for the collection.
52 func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) {
53         var modTime time.Time
54         if c.ModifiedAt == nil {
55                 modTime = time.Now()
56         } else {
57                 modTime = *c.ModifiedAt
58         }
59         fs := &collectionFileSystem{
60                 uuid: c.UUID,
61                 fileSystem: fileSystem{
62                         fsBackend: keepBackend{apiClient: client, keepClient: kc},
63                 },
64         }
65         root := &dirnode{
66                 fs: fs,
67                 treenode: treenode{
68                         fileinfo: fileinfo{
69                                 name:    ".",
70                                 mode:    os.ModeDir | 0755,
71                                 modTime: modTime,
72                         },
73                         inodes: make(map[string]inode),
74                 },
75         }
76         root.SetParent(root, ".")
77         if err := root.loadManifest(c.ManifestText); err != nil {
78                 return nil, err
79         }
80         backdateTree(root, modTime)
81         fs.root = root
82         return fs, nil
83 }
84
85 func backdateTree(n inode, modTime time.Time) {
86         switch n := n.(type) {
87         case *filenode:
88                 n.fileinfo.modTime = modTime
89         case *dirnode:
90                 n.fileinfo.modTime = modTime
91                 for _, n := range n.inodes {
92                         backdateTree(n, modTime)
93                 }
94         }
95 }
96
97 func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
98         if name == "" || name == "." || name == ".." {
99                 return nil, ErrInvalidArgument
100         }
101         if perm.IsDir() {
102                 return &dirnode{
103                         fs: fs,
104                         treenode: treenode{
105                                 fileinfo: fileinfo{
106                                         name:    name,
107                                         mode:    perm | os.ModeDir,
108                                         modTime: modTime,
109                                 },
110                                 inodes: make(map[string]inode),
111                         },
112                 }, nil
113         } else {
114                 return &filenode{
115                         fs: fs,
116                         fileinfo: fileinfo{
117                                 name:    name,
118                                 mode:    perm & ^os.ModeDir,
119                                 modTime: modTime,
120                         },
121                 }, nil
122         }
123 }
124
125 func (fs *collectionFileSystem) Sync() error {
126         if fs.uuid == "" {
127                 return nil
128         }
129         txt, err := fs.MarshalManifest(".")
130         if err != nil {
131                 return fmt.Errorf("sync failed: %s", err)
132         }
133         coll := &Collection{
134                 UUID:         fs.uuid,
135                 ManifestText: txt,
136         }
137         err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
138                 "collection": map[string]string{
139                         "manifest_text": coll.ManifestText,
140                 },
141                 "select": []string{"uuid"},
142         })
143         if err != nil {
144                 return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
145         }
146         return nil
147 }
148
149 func (fs *collectionFileSystem) Flush(shortBlocks bool) error {
150         fs.fileSystem.root.Lock()
151         defer fs.fileSystem.root.Unlock()
152         dn := fs.fileSystem.root.(*dirnode)
153         return dn.flush(context.TODO(), newThrottle(concurrentWriters), dn.sortedNames(), flushOpts{sync: false, shortBlocks: shortBlocks})
154 }
155
156 func (fs *collectionFileSystem) memorySize() int64 {
157         fs.fileSystem.root.Lock()
158         defer fs.fileSystem.root.Unlock()
159         return fs.fileSystem.root.(*dirnode).memorySize()
160 }
161
162 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
163         fs.fileSystem.root.Lock()
164         defer fs.fileSystem.root.Unlock()
165         return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters))
166 }
167
168 func (fs *collectionFileSystem) Size() int64 {
169         return fs.fileSystem.root.(*dirnode).TreeSize()
170 }
171
172 // filenodePtr is an offset into a file that is (usually) efficient to
173 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
174 // then
175 // filenode.segments[filenodePtr.segmentIdx][filenodePtr.segmentOff]
176 // corresponds to file offset filenodePtr.off. Otherwise, it is
177 // necessary to reexamine len(filenode.segments[0]) etc. to find the
178 // correct segment and offset.
179 type filenodePtr struct {
180         off        int64
181         segmentIdx int
182         segmentOff int
183         repacked   int64
184 }
185
186 // seek returns a ptr that is consistent with both startPtr.off and
187 // the current state of fn. The caller must already hold fn.RLock() or
188 // fn.Lock().
189 //
190 // If startPtr is beyond EOF, ptr.segment* will indicate precisely
191 // EOF.
192 //
193 // After seeking:
194 //
195 //     ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
196 //     ||
197 //     filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
198 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
199         ptr = startPtr
200         if ptr.off < 0 {
201                 // meaningless anyway
202                 return
203         } else if ptr.off >= fn.fileinfo.size {
204                 ptr.segmentIdx = len(fn.segments)
205                 ptr.segmentOff = 0
206                 ptr.repacked = fn.repacked
207                 return
208         } else if ptr.repacked == fn.repacked {
209                 // segmentIdx and segmentOff accurately reflect
210                 // ptr.off, but might have fallen off the end of a
211                 // segment
212                 if ptr.segmentOff >= fn.segments[ptr.segmentIdx].Len() {
213                         ptr.segmentIdx++
214                         ptr.segmentOff = 0
215                 }
216                 return
217         }
218         defer func() {
219                 ptr.repacked = fn.repacked
220         }()
221         if ptr.off >= fn.fileinfo.size {
222                 ptr.segmentIdx, ptr.segmentOff = len(fn.segments), 0
223                 return
224         }
225         // Recompute segmentIdx and segmentOff.  We have already
226         // established fn.fileinfo.size > ptr.off >= 0, so we don't
227         // have to deal with edge cases here.
228         var off int64
229         for ptr.segmentIdx, ptr.segmentOff = 0, 0; off < ptr.off; ptr.segmentIdx++ {
230                 // This would panic (index out of range) if
231                 // fn.fileinfo.size were larger than
232                 // sum(fn.segments[i].Len()) -- but that can't happen
233                 // because we have ensured fn.fileinfo.size is always
234                 // accurate.
235                 segLen := int64(fn.segments[ptr.segmentIdx].Len())
236                 if off+segLen > ptr.off {
237                         ptr.segmentOff = int(ptr.off - off)
238                         break
239                 }
240                 off += segLen
241         }
242         return
243 }
244
245 // filenode implements inode.
246 type filenode struct {
247         parent   inode
248         fs       FileSystem
249         fileinfo fileinfo
250         segments []segment
251         // number of times `segments` has changed in a
252         // way that might invalidate a filenodePtr
253         repacked int64
254         memsize  int64 // bytes in memSegments
255         sync.RWMutex
256         nullnode
257         throttle *throttle
258 }
259
260 // caller must have lock
261 func (fn *filenode) appendSegment(e segment) {
262         fn.segments = append(fn.segments, e)
263         fn.fileinfo.size += int64(e.Len())
264 }
265
266 func (fn *filenode) SetParent(p inode, name string) {
267         fn.Lock()
268         defer fn.Unlock()
269         fn.parent = p
270         fn.fileinfo.name = name
271 }
272
273 func (fn *filenode) Parent() inode {
274         fn.RLock()
275         defer fn.RUnlock()
276         return fn.parent
277 }
278
279 func (fn *filenode) FS() FileSystem {
280         return fn.fs
281 }
282
283 // Read reads file data from a single segment, starting at startPtr,
284 // into p. startPtr is assumed not to be up-to-date. Caller must have
285 // RLock or Lock.
286 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
287         ptr = fn.seek(startPtr)
288         if ptr.off < 0 {
289                 err = ErrNegativeOffset
290                 return
291         }
292         if ptr.segmentIdx >= len(fn.segments) {
293                 err = io.EOF
294                 return
295         }
296         n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
297         if n > 0 {
298                 ptr.off += int64(n)
299                 ptr.segmentOff += n
300                 if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() {
301                         ptr.segmentIdx++
302                         ptr.segmentOff = 0
303                         if ptr.segmentIdx < len(fn.segments) && err == io.EOF {
304                                 err = nil
305                         }
306                 }
307         }
308         return
309 }
310
311 func (fn *filenode) Size() int64 {
312         fn.RLock()
313         defer fn.RUnlock()
314         return fn.fileinfo.Size()
315 }
316
317 func (fn *filenode) FileInfo() os.FileInfo {
318         fn.RLock()
319         defer fn.RUnlock()
320         return fn.fileinfo
321 }
322
323 func (fn *filenode) Truncate(size int64) error {
324         fn.Lock()
325         defer fn.Unlock()
326         return fn.truncate(size)
327 }
328
329 func (fn *filenode) truncate(size int64) error {
330         if size == fn.fileinfo.size {
331                 return nil
332         }
333         fn.repacked++
334         if size < fn.fileinfo.size {
335                 ptr := fn.seek(filenodePtr{off: size})
336                 for i := ptr.segmentIdx; i < len(fn.segments); i++ {
337                         if seg, ok := fn.segments[i].(*memSegment); ok {
338                                 fn.memsize -= int64(seg.Len())
339                         }
340                 }
341                 if ptr.segmentOff == 0 {
342                         fn.segments = fn.segments[:ptr.segmentIdx]
343                 } else {
344                         fn.segments = fn.segments[:ptr.segmentIdx+1]
345                         switch seg := fn.segments[ptr.segmentIdx].(type) {
346                         case *memSegment:
347                                 seg.Truncate(ptr.segmentOff)
348                                 fn.memsize += int64(seg.Len())
349                         default:
350                                 fn.segments[ptr.segmentIdx] = seg.Slice(0, ptr.segmentOff)
351                         }
352                 }
353                 fn.fileinfo.size = size
354                 return nil
355         }
356         for size > fn.fileinfo.size {
357                 grow := size - fn.fileinfo.size
358                 var seg *memSegment
359                 var ok bool
360                 if len(fn.segments) == 0 {
361                         seg = &memSegment{}
362                         fn.segments = append(fn.segments, seg)
363                 } else if seg, ok = fn.segments[len(fn.segments)-1].(*memSegment); !ok || seg.Len() >= maxBlockSize {
364                         seg = &memSegment{}
365                         fn.segments = append(fn.segments, seg)
366                 }
367                 if maxgrow := int64(maxBlockSize - seg.Len()); maxgrow < grow {
368                         grow = maxgrow
369                 }
370                 seg.Truncate(seg.Len() + int(grow))
371                 fn.fileinfo.size += grow
372                 fn.memsize += grow
373         }
374         return nil
375 }
376
377 // Write writes data from p to the file, starting at startPtr,
378 // extending the file size if necessary. Caller must have Lock.
379 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
380         if startPtr.off > fn.fileinfo.size {
381                 if err = fn.truncate(startPtr.off); err != nil {
382                         return 0, startPtr, err
383                 }
384         }
385         ptr = fn.seek(startPtr)
386         if ptr.off < 0 {
387                 err = ErrNegativeOffset
388                 return
389         }
390         for len(p) > 0 && err == nil {
391                 cando := p
392                 if len(cando) > maxBlockSize {
393                         cando = cando[:maxBlockSize]
394                 }
395                 // Rearrange/grow fn.segments (and shrink cando if
396                 // needed) such that cando can be copied to
397                 // fn.segments[ptr.segmentIdx] at offset
398                 // ptr.segmentOff.
399                 cur := ptr.segmentIdx
400                 prev := ptr.segmentIdx - 1
401                 var curWritable bool
402                 if cur < len(fn.segments) {
403                         _, curWritable = fn.segments[cur].(*memSegment)
404                 }
405                 var prevAppendable bool
406                 if prev >= 0 && fn.segments[prev].Len() < maxBlockSize {
407                         _, prevAppendable = fn.segments[prev].(*memSegment)
408                 }
409                 if ptr.segmentOff > 0 && !curWritable {
410                         // Split a non-writable block.
411                         if max := fn.segments[cur].Len() - ptr.segmentOff; max <= len(cando) {
412                                 // Truncate cur, and insert a new
413                                 // segment after it.
414                                 cando = cando[:max]
415                                 fn.segments = append(fn.segments, nil)
416                                 copy(fn.segments[cur+1:], fn.segments[cur:])
417                         } else {
418                                 // Split cur into two copies, truncate
419                                 // the one on the left, shift the one
420                                 // on the right, and insert a new
421                                 // segment between them.
422                                 fn.segments = append(fn.segments, nil, nil)
423                                 copy(fn.segments[cur+2:], fn.segments[cur:])
424                                 fn.segments[cur+2] = fn.segments[cur+2].Slice(ptr.segmentOff+len(cando), -1)
425                         }
426                         cur++
427                         prev++
428                         seg := &memSegment{}
429                         seg.Truncate(len(cando))
430                         fn.memsize += int64(len(cando))
431                         fn.segments[cur] = seg
432                         fn.segments[prev] = fn.segments[prev].Slice(0, ptr.segmentOff)
433                         ptr.segmentIdx++
434                         ptr.segmentOff = 0
435                         fn.repacked++
436                         ptr.repacked++
437                 } else if curWritable {
438                         if fit := int(fn.segments[cur].Len()) - ptr.segmentOff; fit < len(cando) {
439                                 cando = cando[:fit]
440                         }
441                 } else {
442                         if prevAppendable {
443                                 // Shrink cando if needed to fit in
444                                 // prev segment.
445                                 if cangrow := maxBlockSize - fn.segments[prev].Len(); cangrow < len(cando) {
446                                         cando = cando[:cangrow]
447                                 }
448                         }
449
450                         if cur == len(fn.segments) {
451                                 // ptr is at EOF, filesize is changing.
452                                 fn.fileinfo.size += int64(len(cando))
453                         } else if el := fn.segments[cur].Len(); el <= len(cando) {
454                                 // cando is long enough that we won't
455                                 // need cur any more. shrink cando to
456                                 // be exactly as long as cur
457                                 // (otherwise we'd accidentally shift
458                                 // the effective position of all
459                                 // segments after cur).
460                                 cando = cando[:el]
461                                 copy(fn.segments[cur:], fn.segments[cur+1:])
462                                 fn.segments = fn.segments[:len(fn.segments)-1]
463                         } else {
464                                 // shrink cur by the same #bytes we're growing prev
465                                 fn.segments[cur] = fn.segments[cur].Slice(len(cando), -1)
466                         }
467
468                         if prevAppendable {
469                                 // Grow prev.
470                                 ptr.segmentIdx--
471                                 ptr.segmentOff = fn.segments[prev].Len()
472                                 fn.segments[prev].(*memSegment).Truncate(ptr.segmentOff + len(cando))
473                                 fn.memsize += int64(len(cando))
474                                 ptr.repacked++
475                                 fn.repacked++
476                         } else {
477                                 // Insert a segment between prev and
478                                 // cur, and advance prev/cur.
479                                 fn.segments = append(fn.segments, nil)
480                                 if cur < len(fn.segments) {
481                                         copy(fn.segments[cur+1:], fn.segments[cur:])
482                                         ptr.repacked++
483                                         fn.repacked++
484                                 } else {
485                                         // appending a new segment does
486                                         // not invalidate any ptrs
487                                 }
488                                 seg := &memSegment{}
489                                 seg.Truncate(len(cando))
490                                 fn.memsize += int64(len(cando))
491                                 fn.segments[cur] = seg
492                                 cur++
493                                 prev++
494                         }
495                 }
496
497                 // Finally we can copy bytes from cando to the current segment.
498                 fn.segments[ptr.segmentIdx].(*memSegment).WriteAt(cando, ptr.segmentOff)
499                 n += len(cando)
500                 p = p[len(cando):]
501
502                 ptr.off += int64(len(cando))
503                 ptr.segmentOff += len(cando)
504                 if ptr.segmentOff >= maxBlockSize {
505                         fn.pruneMemSegments()
506                 }
507                 if fn.segments[ptr.segmentIdx].Len() == ptr.segmentOff {
508                         ptr.segmentOff = 0
509                         ptr.segmentIdx++
510                 }
511
512                 fn.fileinfo.modTime = time.Now()
513         }
514         return
515 }
516
517 // Write some data out to disk to reduce memory use. Caller must have
518 // write lock.
519 func (fn *filenode) pruneMemSegments() {
520         // TODO: share code with (*dirnode)flush()
521         // TODO: pack/flush small blocks too, when fragmented
522         if fn.throttle == nil {
523                 // TODO: share a throttle with filesystem
524                 fn.throttle = newThrottle(writeAheadBlocks)
525         }
526         for idx, seg := range fn.segments {
527                 seg, ok := seg.(*memSegment)
528                 if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
529                         continue
530                 }
531                 // Setting seg.flushing guarantees seg.buf will not be
532                 // modified in place: WriteAt and Truncate will
533                 // allocate a new buf instead, if necessary.
534                 idx, buf := idx, seg.buf
535                 done := make(chan struct{})
536                 seg.flushing = done
537                 // If lots of background writes are already in
538                 // progress, block here until one finishes, rather
539                 // than pile up an unlimited number of buffered writes
540                 // and network flush operations.
541                 fn.throttle.Acquire()
542                 go func() {
543                         defer close(done)
544                         locator, _, err := fn.FS().PutB(buf)
545                         fn.throttle.Release()
546                         fn.Lock()
547                         defer fn.Unlock()
548                         if seg.flushing != done {
549                                 // A new seg.buf has been allocated.
550                                 return
551                         }
552                         seg.flushing = nil
553                         if err != nil {
554                                 // TODO: stall (or return errors from)
555                                 // subsequent writes until flushing
556                                 // starts to succeed.
557                                 return
558                         }
559                         if len(fn.segments) <= idx || fn.segments[idx] != seg || len(seg.buf) != len(buf) {
560                                 // Segment has been dropped/moved/resized.
561                                 return
562                         }
563                         fn.memsize -= int64(len(buf))
564                         fn.segments[idx] = storedSegment{
565                                 kc:      fn.FS(),
566                                 locator: locator,
567                                 size:    len(buf),
568                                 offset:  0,
569                                 length:  len(buf),
570                         }
571                 }()
572         }
573 }
574
575 // Block until all pending pruneMemSegments/flush work is
576 // finished. Caller must NOT have lock.
577 func (fn *filenode) waitPrune() {
578         var pending []<-chan struct{}
579         fn.Lock()
580         for _, seg := range fn.segments {
581                 if seg, ok := seg.(*memSegment); ok && seg.flushing != nil {
582                         pending = append(pending, seg.flushing)
583                 }
584         }
585         fn.Unlock()
586         for _, p := range pending {
587                 <-p
588         }
589 }
590
591 type dirnode struct {
592         fs *collectionFileSystem
593         treenode
594 }
595
596 func (dn *dirnode) FS() FileSystem {
597         return dn.fs
598 }
599
600 func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
601         if dn == dn.fs.rootnode() && name == ".arvados#collection" {
602                 gn := &getternode{Getter: func() ([]byte, error) {
603                         var coll Collection
604                         var err error
605                         coll.ManifestText, err = dn.fs.MarshalManifest(".")
606                         if err != nil {
607                                 return nil, err
608                         }
609                         data, err := json.Marshal(&coll)
610                         if err == nil {
611                                 data = append(data, '\n')
612                         }
613                         return data, err
614                 }}
615                 gn.SetParent(dn, name)
616                 return gn, nil
617         }
618         return dn.treenode.Child(name, replace)
619 }
620
621 type fnSegmentRef struct {
622         fn  *filenode
623         idx int
624 }
625
626 // commitBlock concatenates the data from the given filenode segments
627 // (which must be *memSegments), writes the data out to Keep as a
628 // single block, and replaces the filenodes' *memSegments with
629 // storedSegments that reference the relevant portions of the new
630 // block.
631 //
632 // If sync is false, commitBlock returns right away, after starting a
633 // goroutine to do the writes, reacquire the filenodes' locks, and
634 // swap out the *memSegments. Some filenodes' segments might get
635 // modified/rearranged in the meantime, in which case commitBlock
636 // won't replace them.
637 //
638 // Caller must have write lock.
639 func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, sync bool) error {
640         if err := ctx.Err(); err != nil {
641                 return err
642         }
643         done := make(chan struct{})
644         block := make([]byte, 0, maxBlockSize)
645         segs := make([]*memSegment, 0, len(refs))
646         offsets := make([]int, 0, len(refs)) // location of segment's data within block
647         for _, ref := range refs {
648                 seg := ref.fn.segments[ref.idx].(*memSegment)
649                 if seg.flushing != nil && !sync {
650                         // Let the other flushing goroutine finish. If
651                         // it fails, we'll try again next time.
652                         return nil
653                 } else {
654                         // In sync mode, we proceed regardless of
655                         // whether another flush is in progress: It
656                         // can't finish before we do, because we hold
657                         // fn's lock until we finish our own writes.
658                 }
659                 seg.flushing = done
660                 offsets = append(offsets, len(block))
661                 block = append(block, seg.buf...)
662                 segs = append(segs, seg)
663         }
664         errs := make(chan error, 1)
665         go func() {
666                 defer close(done)
667                 defer close(errs)
668                 locked := map[*filenode]bool{}
669                 locator, _, err := dn.fs.PutB(block)
670                 {
671                         if !sync {
672                                 for _, name := range dn.sortedNames() {
673                                         if fn, ok := dn.inodes[name].(*filenode); ok {
674                                                 fn.Lock()
675                                                 defer fn.Unlock()
676                                                 locked[fn] = true
677                                         }
678                                 }
679                         }
680                         defer func() {
681                                 for _, seg := range segs {
682                                         if seg.flushing == done {
683                                                 seg.flushing = nil
684                                         }
685                                 }
686                         }()
687                 }
688                 if err != nil {
689                         errs <- err
690                         return
691                 }
692                 for idx, ref := range refs {
693                         if !sync {
694                                 // In async mode, fn's lock was
695                                 // released while we were waiting for
696                                 // PutB(); lots of things might have
697                                 // changed.
698                                 if len(ref.fn.segments) <= ref.idx {
699                                         // file segments have
700                                         // rearranged or changed in
701                                         // some way
702                                         continue
703                                 } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
704                                         // segment has been replaced
705                                         continue
706                                 } else if seg.flushing != done {
707                                         // seg.buf has been replaced
708                                         continue
709                                 } else if !locked[ref.fn] {
710                                         // file was renamed, moved, or
711                                         // deleted since we called
712                                         // PutB
713                                         continue
714                                 }
715                         }
716                         data := ref.fn.segments[ref.idx].(*memSegment).buf
717                         ref.fn.segments[ref.idx] = storedSegment{
718                                 kc:      dn.fs,
719                                 locator: locator,
720                                 size:    len(block),
721                                 offset:  offsets[idx],
722                                 length:  len(data),
723                         }
724                         ref.fn.memsize -= int64(len(data))
725                 }
726         }()
727         if sync {
728                 return <-errs
729         } else {
730                 return nil
731         }
732 }
733
734 type flushOpts struct {
735         sync        bool
736         shortBlocks bool
737 }
738
739 // flush in-memory data and remote-cluster block references (for the
740 // children with the given names, which must be children of dn) to
741 // local-cluster persistent storage.
742 //
743 // Caller must have write lock on dn and the named children.
744 //
745 // If any children are dirs, they will be flushed recursively.
746 func (dn *dirnode) flush(ctx context.Context, throttle *throttle, names []string, opts flushOpts) error {
747         cg := newContextGroup(ctx)
748         defer cg.Cancel()
749
750         goCommit := func(refs []fnSegmentRef) {
751                 if len(refs) == 0 {
752                         return
753                 }
754                 cg.Go(func() error {
755                         throttle.Acquire()
756                         defer throttle.Release()
757                         return dn.commitBlock(cg.Context(), refs, opts.sync)
758                 })
759         }
760
761         var pending []fnSegmentRef
762         var pendingLen int = 0
763         localLocator := map[string]string{}
764         for _, name := range names {
765                 switch node := dn.inodes[name].(type) {
766                 case *dirnode:
767                         grandchildNames := node.sortedNames()
768                         for _, grandchildName := range grandchildNames {
769                                 grandchild := node.inodes[grandchildName]
770                                 grandchild.Lock()
771                                 defer grandchild.Unlock()
772                         }
773                         cg.Go(func() error { return node.flush(cg.Context(), throttle, grandchildNames, opts) })
774                 case *filenode:
775                         for idx, seg := range node.segments {
776                                 switch seg := seg.(type) {
777                                 case storedSegment:
778                                         loc, ok := localLocator[seg.locator]
779                                         if !ok {
780                                                 var err error
781                                                 loc, err = dn.fs.LocalLocator(seg.locator)
782                                                 if err != nil {
783                                                         return err
784                                                 }
785                                                 localLocator[seg.locator] = loc
786                                         }
787                                         seg.locator = loc
788                                         node.segments[idx] = seg
789                                 case *memSegment:
790                                         if seg.Len() > maxBlockSize/2 {
791                                                 goCommit([]fnSegmentRef{{node, idx}})
792                                                 continue
793                                         }
794                                         if pendingLen+seg.Len() > maxBlockSize {
795                                                 goCommit(pending)
796                                                 pending = nil
797                                                 pendingLen = 0
798                                         }
799                                         pending = append(pending, fnSegmentRef{node, idx})
800                                         pendingLen += seg.Len()
801                                 default:
802                                         panic(fmt.Sprintf("can't sync segment type %T", seg))
803                                 }
804                         }
805                 }
806         }
807         if opts.shortBlocks {
808                 goCommit(pending)
809         }
810         return cg.Wait()
811 }
812
813 // caller must have write lock.
814 func (dn *dirnode) memorySize() (size int64) {
815         for _, name := range dn.sortedNames() {
816                 node := dn.inodes[name]
817                 node.Lock()
818                 defer node.Unlock()
819                 switch node := node.(type) {
820                 case *dirnode:
821                         size += node.memorySize()
822                 case *filenode:
823                         for _, seg := range node.segments {
824                                 switch seg := seg.(type) {
825                                 case *memSegment:
826                                         size += int64(seg.Len())
827                                 }
828                         }
829                 }
830         }
831         return
832 }
833
834 // caller must have write lock.
835 func (dn *dirnode) sortedNames() []string {
836         names := make([]string, 0, len(dn.inodes))
837         for name := range dn.inodes {
838                 names = append(names, name)
839         }
840         sort.Strings(names)
841         return names
842 }
843
844 // caller must have write lock.
845 func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
846         cg := newContextGroup(ctx)
847         defer cg.Cancel()
848
849         if len(dn.inodes) == 0 {
850                 if prefix == "." {
851                         return "", nil
852                 }
853                 // Express the existence of an empty directory by
854                 // adding an empty file named `\056`, which (unlike
855                 // the more obvious spelling `.`) is accepted by the
856                 // API's manifest validator.
857                 return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
858         }
859
860         names := dn.sortedNames()
861
862         // Wait for children to finish any pending write operations
863         // before locking them.
864         for _, name := range names {
865                 node := dn.inodes[name]
866                 if fn, ok := node.(*filenode); ok {
867                         fn.waitPrune()
868                 }
869         }
870
871         var dirnames []string
872         var filenames []string
873         for _, name := range names {
874                 node := dn.inodes[name]
875                 node.Lock()
876                 defer node.Unlock()
877                 switch node := node.(type) {
878                 case *dirnode:
879                         dirnames = append(dirnames, name)
880                 case *filenode:
881                         filenames = append(filenames, name)
882                 default:
883                         panic(fmt.Sprintf("can't marshal inode type %T", node))
884                 }
885         }
886
887         subdirs := make([]string, len(dirnames))
888         rootdir := ""
889         for i, name := range dirnames {
890                 i, name := i, name
891                 cg.Go(func() error {
892                         txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, throttle)
893                         subdirs[i] = txt
894                         return err
895                 })
896         }
897
898         cg.Go(func() error {
899                 var streamLen int64
900                 type filepart struct {
901                         name   string
902                         offset int64
903                         length int64
904                 }
905
906                 var fileparts []filepart
907                 var blocks []string
908                 if err := dn.flush(cg.Context(), throttle, filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
909                         return err
910                 }
911                 for _, name := range filenames {
912                         node := dn.inodes[name].(*filenode)
913                         if len(node.segments) == 0 {
914                                 fileparts = append(fileparts, filepart{name: name})
915                                 continue
916                         }
917                         for _, seg := range node.segments {
918                                 switch seg := seg.(type) {
919                                 case storedSegment:
920                                         if len(blocks) > 0 && blocks[len(blocks)-1] == seg.locator {
921                                                 streamLen -= int64(seg.size)
922                                         } else {
923                                                 blocks = append(blocks, seg.locator)
924                                         }
925                                         next := filepart{
926                                                 name:   name,
927                                                 offset: streamLen + int64(seg.offset),
928                                                 length: int64(seg.length),
929                                         }
930                                         if prev := len(fileparts) - 1; prev >= 0 &&
931                                                 fileparts[prev].name == name &&
932                                                 fileparts[prev].offset+fileparts[prev].length == next.offset {
933                                                 fileparts[prev].length += next.length
934                                         } else {
935                                                 fileparts = append(fileparts, next)
936                                         }
937                                         streamLen += int64(seg.size)
938                                 default:
939                                         // This can't happen: we
940                                         // haven't unlocked since
941                                         // calling flush(sync=true).
942                                         panic(fmt.Sprintf("can't marshal segment type %T", seg))
943                                 }
944                         }
945                 }
946                 var filetokens []string
947                 for _, s := range fileparts {
948                         filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
949                 }
950                 if len(filetokens) == 0 {
951                         return nil
952                 } else if len(blocks) == 0 {
953                         blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
954                 }
955                 rootdir = manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n"
956                 return nil
957         })
958         err := cg.Wait()
959         return rootdir + strings.Join(subdirs, ""), err
960 }
961
962 func (dn *dirnode) loadManifest(txt string) error {
963         var dirname string
964         streams := strings.Split(txt, "\n")
965         if streams[len(streams)-1] != "" {
966                 return fmt.Errorf("line %d: no trailing newline", len(streams))
967         }
968         streams = streams[:len(streams)-1]
969         segments := []storedSegment{}
970         for i, stream := range streams {
971                 lineno := i + 1
972                 var anyFileTokens bool
973                 var pos int64
974                 var segIdx int
975                 segments = segments[:0]
976                 for i, token := range strings.Split(stream, " ") {
977                         if i == 0 {
978                                 dirname = manifestUnescape(token)
979                                 continue
980                         }
981                         if !strings.Contains(token, ":") {
982                                 if anyFileTokens {
983                                         return fmt.Errorf("line %d: bad file segment %q", lineno, token)
984                                 }
985                                 toks := strings.SplitN(token, "+", 3)
986                                 if len(toks) < 2 {
987                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
988                                 }
989                                 length, err := strconv.ParseInt(toks[1], 10, 32)
990                                 if err != nil || length < 0 {
991                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
992                                 }
993                                 segments = append(segments, storedSegment{
994                                         locator: token,
995                                         size:    int(length),
996                                         offset:  0,
997                                         length:  int(length),
998                                 })
999                                 continue
1000                         } else if len(segments) == 0 {
1001                                 return fmt.Errorf("line %d: bad locator %q", lineno, token)
1002                         }
1003
1004                         toks := strings.SplitN(token, ":", 3)
1005                         if len(toks) != 3 {
1006                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1007                         }
1008                         anyFileTokens = true
1009
1010                         offset, err := strconv.ParseInt(toks[0], 10, 64)
1011                         if err != nil || offset < 0 {
1012                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1013                         }
1014                         length, err := strconv.ParseInt(toks[1], 10, 64)
1015                         if err != nil || length < 0 {
1016                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1017                         }
1018                         name := dirname + "/" + manifestUnescape(toks[2])
1019                         fnode, err := dn.createFileAndParents(name)
1020                         if fnode == nil && err == nil && length == 0 {
1021                                 // Special case: an empty file used as
1022                                 // a marker to preserve an otherwise
1023                                 // empty directory in a manifest.
1024                                 continue
1025                         }
1026                         if err != nil || (fnode == nil && length != 0) {
1027                                 return fmt.Errorf("line %d: cannot use path %q with length %d: %s", lineno, name, length, err)
1028                         }
1029                         // Map the stream offset/range coordinates to
1030                         // block/offset/range coordinates and add
1031                         // corresponding storedSegments to the filenode
1032                         if pos > offset {
1033                                 // Can't continue where we left off.
1034                                 // TODO: binary search instead of
1035                                 // rewinding all the way (but this
1036                                 // situation might be rare anyway)
1037                                 segIdx, pos = 0, 0
1038                         }
1039                         for next := int64(0); segIdx < len(segments); segIdx++ {
1040                                 seg := segments[segIdx]
1041                                 next = pos + int64(seg.Len())
1042                                 if next <= offset || seg.Len() == 0 {
1043                                         pos = next
1044                                         continue
1045                                 }
1046                                 if pos >= offset+length {
1047                                         break
1048                                 }
1049                                 var blkOff int
1050                                 if pos < offset {
1051                                         blkOff = int(offset - pos)
1052                                 }
1053                                 blkLen := seg.Len() - blkOff
1054                                 if pos+int64(blkOff+blkLen) > offset+length {
1055                                         blkLen = int(offset + length - pos - int64(blkOff))
1056                                 }
1057                                 fnode.appendSegment(storedSegment{
1058                                         kc:      dn.fs,
1059                                         locator: seg.locator,
1060                                         size:    seg.size,
1061                                         offset:  blkOff,
1062                                         length:  blkLen,
1063                                 })
1064                                 if next > offset+length {
1065                                         break
1066                                 } else {
1067                                         pos = next
1068                                 }
1069                         }
1070                         if segIdx == len(segments) && pos < offset+length {
1071                                 return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
1072                         }
1073                 }
1074                 if !anyFileTokens {
1075                         return fmt.Errorf("line %d: no file segments", lineno)
1076                 } else if len(segments) == 0 {
1077                         return fmt.Errorf("line %d: no locators", lineno)
1078                 } else if dirname == "" {
1079                         return fmt.Errorf("line %d: no stream name", lineno)
1080                 }
1081         }
1082         return nil
1083 }
1084
1085 // only safe to call from loadManifest -- no locking.
1086 //
1087 // If path is a "parent directory exists" marker (the last path
1088 // component is "."), the returned values are both nil.
1089 func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
1090         var node inode = dn
1091         names := strings.Split(path, "/")
1092         basename := names[len(names)-1]
1093         for _, name := range names[:len(names)-1] {
1094                 switch name {
1095                 case "", ".":
1096                         continue
1097                 case "..":
1098                         if node == dn {
1099                                 // can't be sure parent will be a *dirnode
1100                                 return nil, ErrInvalidArgument
1101                         }
1102                         node = node.Parent()
1103                         continue
1104                 }
1105                 node, err = node.Child(name, func(child inode) (inode, error) {
1106                         if child == nil {
1107                                 child, err := node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
1108                                 if err != nil {
1109                                         return nil, err
1110                                 }
1111                                 child.SetParent(node, name)
1112                                 return child, nil
1113                         } else if !child.IsDir() {
1114                                 return child, ErrFileExists
1115                         } else {
1116                                 return child, nil
1117                         }
1118                 })
1119                 if err != nil {
1120                         return
1121                 }
1122         }
1123         if basename == "." {
1124                 return
1125         } else if !permittedName(basename) {
1126                 err = fmt.Errorf("invalid file part %q in path %q", basename, path)
1127                 return
1128         }
1129         _, err = node.Child(basename, func(child inode) (inode, error) {
1130                 switch child := child.(type) {
1131                 case nil:
1132                         child, err = node.FS().newNode(basename, 0755, node.FileInfo().ModTime())
1133                         if err != nil {
1134                                 return nil, err
1135                         }
1136                         child.SetParent(node, basename)
1137                         fn = child.(*filenode)
1138                         return child, nil
1139                 case *filenode:
1140                         fn = child
1141                         return child, nil
1142                 case *dirnode:
1143                         return child, ErrIsDirectory
1144                 default:
1145                         return child, ErrInvalidArgument
1146                 }
1147         })
1148         return
1149 }
1150
1151 func (dn *dirnode) TreeSize() (bytes int64) {
1152         dn.RLock()
1153         defer dn.RUnlock()
1154         for _, i := range dn.inodes {
1155                 switch i := i.(type) {
1156                 case *filenode:
1157                         bytes += i.Size()
1158                 case *dirnode:
1159                         bytes += i.TreeSize()
1160                 }
1161         }
1162         return
1163 }
1164
1165 type segment interface {
1166         io.ReaderAt
1167         Len() int
1168         // Return a new segment with a subsection of the data from this
1169         // one. length<0 means length=Len()-off.
1170         Slice(off int, length int) segment
1171 }
1172
1173 type memSegment struct {
1174         buf []byte
1175         // If flushing is not nil, then a) buf is being shared by a
1176         // pruneMemSegments goroutine, and must be copied on write;
1177         // and b) the flushing channel will close when the goroutine
1178         // finishes, whether it succeeds or not.
1179         flushing <-chan struct{}
1180 }
1181
1182 func (me *memSegment) Len() int {
1183         return len(me.buf)
1184 }
1185
1186 func (me *memSegment) Slice(off, length int) segment {
1187         if length < 0 {
1188                 length = len(me.buf) - off
1189         }
1190         buf := make([]byte, length)
1191         copy(buf, me.buf[off:])
1192         return &memSegment{buf: buf}
1193 }
1194
1195 func (me *memSegment) Truncate(n int) {
1196         if n > cap(me.buf) || (me.flushing != nil && n > len(me.buf)) {
1197                 newsize := 1024
1198                 for newsize < n {
1199                         newsize = newsize << 2
1200                 }
1201                 newbuf := make([]byte, n, newsize)
1202                 copy(newbuf, me.buf)
1203                 me.buf, me.flushing = newbuf, nil
1204         } else {
1205                 // reclaim existing capacity, and zero reclaimed part
1206                 oldlen := len(me.buf)
1207                 me.buf = me.buf[:n]
1208                 for i := oldlen; i < n; i++ {
1209                         me.buf[i] = 0
1210                 }
1211         }
1212 }
1213
1214 func (me *memSegment) WriteAt(p []byte, off int) {
1215         if off+len(p) > len(me.buf) {
1216                 panic("overflowed segment")
1217         }
1218         if me.flushing != nil {
1219                 me.buf, me.flushing = append([]byte(nil), me.buf...), nil
1220         }
1221         copy(me.buf[off:], p)
1222 }
1223
1224 func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
1225         if off > int64(me.Len()) {
1226                 err = io.EOF
1227                 return
1228         }
1229         n = copy(p, me.buf[int(off):])
1230         if n < len(p) {
1231                 err = io.EOF
1232         }
1233         return
1234 }
1235
1236 type storedSegment struct {
1237         kc      fsBackend
1238         locator string
1239         size    int // size of stored block (also encoded in locator)
1240         offset  int // position of segment within the stored block
1241         length  int // bytes in this segment (offset + length <= size)
1242 }
1243
1244 func (se storedSegment) Len() int {
1245         return se.length
1246 }
1247
1248 func (se storedSegment) Slice(n, size int) segment {
1249         se.offset += n
1250         se.length -= n
1251         if size >= 0 && se.length > size {
1252                 se.length = size
1253         }
1254         return se
1255 }
1256
1257 func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
1258         if off > int64(se.length) {
1259                 return 0, io.EOF
1260         }
1261         maxlen := se.length - int(off)
1262         if len(p) > maxlen {
1263                 p = p[:maxlen]
1264                 n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1265                 if err == nil {
1266                         err = io.EOF
1267                 }
1268                 return
1269         }
1270         return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1271 }
1272
1273 func canonicalName(name string) string {
1274         name = path.Clean("/" + name)
1275         if name == "/" || name == "./" {
1276                 name = "."
1277         } else if strings.HasPrefix(name, "/") {
1278                 name = "." + name
1279         }
1280         return name
1281 }
1282
1283 var manifestEscapeSeq = regexp.MustCompile(`\\([0-7]{3}|\\)`)
1284
1285 func manifestUnescapeFunc(seq string) string {
1286         if seq == `\\` {
1287                 return `\`
1288         }
1289         i, err := strconv.ParseUint(seq[1:], 8, 8)
1290         if err != nil {
1291                 // Invalid escape sequence: can't unescape.
1292                 return seq
1293         }
1294         return string([]byte{byte(i)})
1295 }
1296
1297 func manifestUnescape(s string) string {
1298         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
1299 }
1300
1301 var manifestEscapedChar = regexp.MustCompile(`[\000-\040:\s\\]`)
1302
1303 func manifestEscapeFunc(seq string) string {
1304         return fmt.Sprintf("\\%03o", byte(seq[0]))
1305 }
1306
1307 func manifestEscape(s string) string {
1308         return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
1309 }