Bump loofah from 2.2.3 to 2.3.1 in /apps/workbench
[arvados.git] / sdk / go / arvados / fs_collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "io"
12         "os"
13         "path"
14         "regexp"
15         "sort"
16         "strconv"
17         "strings"
18         "sync"
19         "time"
20 )
21
22 var (
23         maxBlockSize      = 1 << 26
24         concurrentWriters = 4 // max goroutines writing to Keep in background and during flush()
25 )
26
27 // A CollectionFileSystem is a FileSystem that can be serialized as a
28 // manifest and stored as a collection.
29 type CollectionFileSystem interface {
30         FileSystem
31
32         // Flush all file data to Keep and return a snapshot of the
33         // filesystem suitable for saving as (Collection)ManifestText.
34         // Prefix (normally ".") is a top level directory, effectively
35         // prepended to all paths in the returned manifest.
36         MarshalManifest(prefix string) (string, error)
37
38         // Total data bytes in all files.
39         Size() int64
40
41         // Memory consumed by buffered file data.
42         memorySize() int64
43 }
44
45 type collectionFileSystem struct {
46         fileSystem
47         uuid string
48 }
49
50 // FileSystem returns a CollectionFileSystem for the collection.
51 func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) {
52         var modTime time.Time
53         if c.ModifiedAt == nil {
54                 modTime = time.Now()
55         } else {
56                 modTime = *c.ModifiedAt
57         }
58         fs := &collectionFileSystem{
59                 uuid: c.UUID,
60                 fileSystem: fileSystem{
61                         fsBackend: keepBackend{apiClient: client, keepClient: kc},
62                         thr:       newThrottle(concurrentWriters),
63                 },
64         }
65         root := &dirnode{
66                 fs: fs,
67                 treenode: treenode{
68                         fileinfo: fileinfo{
69                                 name:    ".",
70                                 mode:    os.ModeDir | 0755,
71                                 modTime: modTime,
72                         },
73                         inodes: make(map[string]inode),
74                 },
75         }
76         root.SetParent(root, ".")
77         if err := root.loadManifest(c.ManifestText); err != nil {
78                 return nil, err
79         }
80         backdateTree(root, modTime)
81         fs.root = root
82         return fs, nil
83 }
84
85 func backdateTree(n inode, modTime time.Time) {
86         switch n := n.(type) {
87         case *filenode:
88                 n.fileinfo.modTime = modTime
89         case *dirnode:
90                 n.fileinfo.modTime = modTime
91                 for _, n := range n.inodes {
92                         backdateTree(n, modTime)
93                 }
94         }
95 }
96
97 func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
98         if name == "" || name == "." || name == ".." {
99                 return nil, ErrInvalidArgument
100         }
101         if perm.IsDir() {
102                 return &dirnode{
103                         fs: fs,
104                         treenode: treenode{
105                                 fileinfo: fileinfo{
106                                         name:    name,
107                                         mode:    perm | os.ModeDir,
108                                         modTime: modTime,
109                                 },
110                                 inodes: make(map[string]inode),
111                         },
112                 }, nil
113         } else {
114                 return &filenode{
115                         fs: fs,
116                         fileinfo: fileinfo{
117                                 name:    name,
118                                 mode:    perm & ^os.ModeDir,
119                                 modTime: modTime,
120                         },
121                 }, nil
122         }
123 }
124
125 func (fs *collectionFileSystem) Sync() error {
126         if fs.uuid == "" {
127                 return nil
128         }
129         txt, err := fs.MarshalManifest(".")
130         if err != nil {
131                 return fmt.Errorf("sync failed: %s", err)
132         }
133         coll := &Collection{
134                 UUID:         fs.uuid,
135                 ManifestText: txt,
136         }
137         err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
138                 "collection": map[string]string{
139                         "manifest_text": coll.ManifestText,
140                 },
141                 "select": []string{"uuid"},
142         })
143         if err != nil {
144                 return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
145         }
146         return nil
147 }
148
149 func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
150         node, err := rlookup(fs.fileSystem.root, path)
151         if err != nil {
152                 return err
153         }
154         dn, ok := node.(*dirnode)
155         if !ok {
156                 return ErrNotADirectory
157         }
158         dn.Lock()
159         defer dn.Unlock()
160         names := dn.sortedNames()
161         if path != "" {
162                 // Caller only wants to flush the specified dir,
163                 // non-recursively.  Drop subdirs from the list of
164                 // names.
165                 var filenames []string
166                 for _, name := range names {
167                         if _, ok := dn.inodes[name].(*filenode); ok {
168                                 filenames = append(filenames, name)
169                         }
170                 }
171                 names = filenames
172         }
173         for _, name := range names {
174                 child := dn.inodes[name]
175                 child.Lock()
176                 defer child.Unlock()
177         }
178         return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
179 }
180
181 func (fs *collectionFileSystem) memorySize() int64 {
182         fs.fileSystem.root.Lock()
183         defer fs.fileSystem.root.Unlock()
184         return fs.fileSystem.root.(*dirnode).memorySize()
185 }
186
187 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
188         fs.fileSystem.root.Lock()
189         defer fs.fileSystem.root.Unlock()
190         return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix)
191 }
192
193 func (fs *collectionFileSystem) Size() int64 {
194         return fs.fileSystem.root.(*dirnode).TreeSize()
195 }
196
197 // filenodePtr is an offset into a file that is (usually) efficient to
198 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
199 // then
200 // filenode.segments[filenodePtr.segmentIdx][filenodePtr.segmentOff]
201 // corresponds to file offset filenodePtr.off. Otherwise, it is
202 // necessary to reexamine len(filenode.segments[0]) etc. to find the
203 // correct segment and offset.
204 type filenodePtr struct {
205         off        int64
206         segmentIdx int
207         segmentOff int
208         repacked   int64
209 }
210
211 // seek returns a ptr that is consistent with both startPtr.off and
212 // the current state of fn. The caller must already hold fn.RLock() or
213 // fn.Lock().
214 //
215 // If startPtr is beyond EOF, ptr.segment* will indicate precisely
216 // EOF.
217 //
218 // After seeking:
219 //
220 //     ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
221 //     ||
222 //     filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
223 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
224         ptr = startPtr
225         if ptr.off < 0 {
226                 // meaningless anyway
227                 return
228         } else if ptr.off >= fn.fileinfo.size {
229                 ptr.segmentIdx = len(fn.segments)
230                 ptr.segmentOff = 0
231                 ptr.repacked = fn.repacked
232                 return
233         } else if ptr.repacked == fn.repacked {
234                 // segmentIdx and segmentOff accurately reflect
235                 // ptr.off, but might have fallen off the end of a
236                 // segment
237                 if ptr.segmentOff >= fn.segments[ptr.segmentIdx].Len() {
238                         ptr.segmentIdx++
239                         ptr.segmentOff = 0
240                 }
241                 return
242         }
243         defer func() {
244                 ptr.repacked = fn.repacked
245         }()
246         if ptr.off >= fn.fileinfo.size {
247                 ptr.segmentIdx, ptr.segmentOff = len(fn.segments), 0
248                 return
249         }
250         // Recompute segmentIdx and segmentOff.  We have already
251         // established fn.fileinfo.size > ptr.off >= 0, so we don't
252         // have to deal with edge cases here.
253         var off int64
254         for ptr.segmentIdx, ptr.segmentOff = 0, 0; off < ptr.off; ptr.segmentIdx++ {
255                 // This would panic (index out of range) if
256                 // fn.fileinfo.size were larger than
257                 // sum(fn.segments[i].Len()) -- but that can't happen
258                 // because we have ensured fn.fileinfo.size is always
259                 // accurate.
260                 segLen := int64(fn.segments[ptr.segmentIdx].Len())
261                 if off+segLen > ptr.off {
262                         ptr.segmentOff = int(ptr.off - off)
263                         break
264                 }
265                 off += segLen
266         }
267         return
268 }
269
270 // filenode implements inode.
271 type filenode struct {
272         parent   inode
273         fs       FileSystem
274         fileinfo fileinfo
275         segments []segment
276         // number of times `segments` has changed in a
277         // way that might invalidate a filenodePtr
278         repacked int64
279         memsize  int64 // bytes in memSegments
280         sync.RWMutex
281         nullnode
282 }
283
284 // caller must have lock
285 func (fn *filenode) appendSegment(e segment) {
286         fn.segments = append(fn.segments, e)
287         fn.fileinfo.size += int64(e.Len())
288 }
289
290 func (fn *filenode) SetParent(p inode, name string) {
291         fn.Lock()
292         defer fn.Unlock()
293         fn.parent = p
294         fn.fileinfo.name = name
295 }
296
297 func (fn *filenode) Parent() inode {
298         fn.RLock()
299         defer fn.RUnlock()
300         return fn.parent
301 }
302
303 func (fn *filenode) FS() FileSystem {
304         return fn.fs
305 }
306
307 // Read reads file data from a single segment, starting at startPtr,
308 // into p. startPtr is assumed not to be up-to-date. Caller must have
309 // RLock or Lock.
310 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
311         ptr = fn.seek(startPtr)
312         if ptr.off < 0 {
313                 err = ErrNegativeOffset
314                 return
315         }
316         if ptr.segmentIdx >= len(fn.segments) {
317                 err = io.EOF
318                 return
319         }
320         n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
321         if n > 0 {
322                 ptr.off += int64(n)
323                 ptr.segmentOff += n
324                 if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() {
325                         ptr.segmentIdx++
326                         ptr.segmentOff = 0
327                         if ptr.segmentIdx < len(fn.segments) && err == io.EOF {
328                                 err = nil
329                         }
330                 }
331         }
332         return
333 }
334
335 func (fn *filenode) Size() int64 {
336         fn.RLock()
337         defer fn.RUnlock()
338         return fn.fileinfo.Size()
339 }
340
341 func (fn *filenode) FileInfo() os.FileInfo {
342         fn.RLock()
343         defer fn.RUnlock()
344         return fn.fileinfo
345 }
346
347 func (fn *filenode) Truncate(size int64) error {
348         fn.Lock()
349         defer fn.Unlock()
350         return fn.truncate(size)
351 }
352
353 func (fn *filenode) truncate(size int64) error {
354         if size == fn.fileinfo.size {
355                 return nil
356         }
357         fn.repacked++
358         if size < fn.fileinfo.size {
359                 ptr := fn.seek(filenodePtr{off: size})
360                 for i := ptr.segmentIdx; i < len(fn.segments); i++ {
361                         if seg, ok := fn.segments[i].(*memSegment); ok {
362                                 fn.memsize -= int64(seg.Len())
363                         }
364                 }
365                 if ptr.segmentOff == 0 {
366                         fn.segments = fn.segments[:ptr.segmentIdx]
367                 } else {
368                         fn.segments = fn.segments[:ptr.segmentIdx+1]
369                         switch seg := fn.segments[ptr.segmentIdx].(type) {
370                         case *memSegment:
371                                 seg.Truncate(ptr.segmentOff)
372                                 fn.memsize += int64(seg.Len())
373                         default:
374                                 fn.segments[ptr.segmentIdx] = seg.Slice(0, ptr.segmentOff)
375                         }
376                 }
377                 fn.fileinfo.size = size
378                 return nil
379         }
380         for size > fn.fileinfo.size {
381                 grow := size - fn.fileinfo.size
382                 var seg *memSegment
383                 var ok bool
384                 if len(fn.segments) == 0 {
385                         seg = &memSegment{}
386                         fn.segments = append(fn.segments, seg)
387                 } else if seg, ok = fn.segments[len(fn.segments)-1].(*memSegment); !ok || seg.Len() >= maxBlockSize {
388                         seg = &memSegment{}
389                         fn.segments = append(fn.segments, seg)
390                 }
391                 if maxgrow := int64(maxBlockSize - seg.Len()); maxgrow < grow {
392                         grow = maxgrow
393                 }
394                 seg.Truncate(seg.Len() + int(grow))
395                 fn.fileinfo.size += grow
396                 fn.memsize += grow
397         }
398         return nil
399 }
400
401 // Write writes data from p to the file, starting at startPtr,
402 // extending the file size if necessary. Caller must have Lock.
403 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
404         if startPtr.off > fn.fileinfo.size {
405                 if err = fn.truncate(startPtr.off); err != nil {
406                         return 0, startPtr, err
407                 }
408         }
409         ptr = fn.seek(startPtr)
410         if ptr.off < 0 {
411                 err = ErrNegativeOffset
412                 return
413         }
414         for len(p) > 0 && err == nil {
415                 cando := p
416                 if len(cando) > maxBlockSize {
417                         cando = cando[:maxBlockSize]
418                 }
419                 // Rearrange/grow fn.segments (and shrink cando if
420                 // needed) such that cando can be copied to
421                 // fn.segments[ptr.segmentIdx] at offset
422                 // ptr.segmentOff.
423                 cur := ptr.segmentIdx
424                 prev := ptr.segmentIdx - 1
425                 var curWritable bool
426                 if cur < len(fn.segments) {
427                         _, curWritable = fn.segments[cur].(*memSegment)
428                 }
429                 var prevAppendable bool
430                 if prev >= 0 && fn.segments[prev].Len() < maxBlockSize {
431                         _, prevAppendable = fn.segments[prev].(*memSegment)
432                 }
433                 if ptr.segmentOff > 0 && !curWritable {
434                         // Split a non-writable block.
435                         if max := fn.segments[cur].Len() - ptr.segmentOff; max <= len(cando) {
436                                 // Truncate cur, and insert a new
437                                 // segment after it.
438                                 cando = cando[:max]
439                                 fn.segments = append(fn.segments, nil)
440                                 copy(fn.segments[cur+1:], fn.segments[cur:])
441                         } else {
442                                 // Split cur into two copies, truncate
443                                 // the one on the left, shift the one
444                                 // on the right, and insert a new
445                                 // segment between them.
446                                 fn.segments = append(fn.segments, nil, nil)
447                                 copy(fn.segments[cur+2:], fn.segments[cur:])
448                                 fn.segments[cur+2] = fn.segments[cur+2].Slice(ptr.segmentOff+len(cando), -1)
449                         }
450                         cur++
451                         prev++
452                         seg := &memSegment{}
453                         seg.Truncate(len(cando))
454                         fn.memsize += int64(len(cando))
455                         fn.segments[cur] = seg
456                         fn.segments[prev] = fn.segments[prev].Slice(0, ptr.segmentOff)
457                         ptr.segmentIdx++
458                         ptr.segmentOff = 0
459                         fn.repacked++
460                         ptr.repacked++
461                 } else if curWritable {
462                         if fit := int(fn.segments[cur].Len()) - ptr.segmentOff; fit < len(cando) {
463                                 cando = cando[:fit]
464                         }
465                 } else {
466                         if prevAppendable {
467                                 // Shrink cando if needed to fit in
468                                 // prev segment.
469                                 if cangrow := maxBlockSize - fn.segments[prev].Len(); cangrow < len(cando) {
470                                         cando = cando[:cangrow]
471                                 }
472                         }
473
474                         if cur == len(fn.segments) {
475                                 // ptr is at EOF, filesize is changing.
476                                 fn.fileinfo.size += int64(len(cando))
477                         } else if el := fn.segments[cur].Len(); el <= len(cando) {
478                                 // cando is long enough that we won't
479                                 // need cur any more. shrink cando to
480                                 // be exactly as long as cur
481                                 // (otherwise we'd accidentally shift
482                                 // the effective position of all
483                                 // segments after cur).
484                                 cando = cando[:el]
485                                 copy(fn.segments[cur:], fn.segments[cur+1:])
486                                 fn.segments = fn.segments[:len(fn.segments)-1]
487                         } else {
488                                 // shrink cur by the same #bytes we're growing prev
489                                 fn.segments[cur] = fn.segments[cur].Slice(len(cando), -1)
490                         }
491
492                         if prevAppendable {
493                                 // Grow prev.
494                                 ptr.segmentIdx--
495                                 ptr.segmentOff = fn.segments[prev].Len()
496                                 fn.segments[prev].(*memSegment).Truncate(ptr.segmentOff + len(cando))
497                                 fn.memsize += int64(len(cando))
498                                 ptr.repacked++
499                                 fn.repacked++
500                         } else {
501                                 // Insert a segment between prev and
502                                 // cur, and advance prev/cur.
503                                 fn.segments = append(fn.segments, nil)
504                                 if cur < len(fn.segments) {
505                                         copy(fn.segments[cur+1:], fn.segments[cur:])
506                                         ptr.repacked++
507                                         fn.repacked++
508                                 } else {
509                                         // appending a new segment does
510                                         // not invalidate any ptrs
511                                 }
512                                 seg := &memSegment{}
513                                 seg.Truncate(len(cando))
514                                 fn.memsize += int64(len(cando))
515                                 fn.segments[cur] = seg
516                                 cur++
517                                 prev++
518                         }
519                 }
520
521                 // Finally we can copy bytes from cando to the current segment.
522                 fn.segments[ptr.segmentIdx].(*memSegment).WriteAt(cando, ptr.segmentOff)
523                 n += len(cando)
524                 p = p[len(cando):]
525
526                 ptr.off += int64(len(cando))
527                 ptr.segmentOff += len(cando)
528                 if ptr.segmentOff >= maxBlockSize {
529                         fn.pruneMemSegments()
530                 }
531                 if fn.segments[ptr.segmentIdx].Len() == ptr.segmentOff {
532                         ptr.segmentOff = 0
533                         ptr.segmentIdx++
534                 }
535
536                 fn.fileinfo.modTime = time.Now()
537         }
538         return
539 }
540
541 // Write some data out to disk to reduce memory use. Caller must have
542 // write lock.
543 func (fn *filenode) pruneMemSegments() {
544         // TODO: share code with (*dirnode)flush()
545         // TODO: pack/flush small blocks too, when fragmented
546         for idx, seg := range fn.segments {
547                 seg, ok := seg.(*memSegment)
548                 if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
549                         continue
550                 }
551                 // Setting seg.flushing guarantees seg.buf will not be
552                 // modified in place: WriteAt and Truncate will
553                 // allocate a new buf instead, if necessary.
554                 idx, buf := idx, seg.buf
555                 done := make(chan struct{})
556                 seg.flushing = done
557                 // If lots of background writes are already in
558                 // progress, block here until one finishes, rather
559                 // than pile up an unlimited number of buffered writes
560                 // and network flush operations.
561                 fn.fs.throttle().Acquire()
562                 go func() {
563                         defer close(done)
564                         locator, _, err := fn.FS().PutB(buf)
565                         fn.fs.throttle().Release()
566                         fn.Lock()
567                         defer fn.Unlock()
568                         if seg.flushing != done {
569                                 // A new seg.buf has been allocated.
570                                 return
571                         }
572                         seg.flushing = nil
573                         if err != nil {
574                                 // TODO: stall (or return errors from)
575                                 // subsequent writes until flushing
576                                 // starts to succeed.
577                                 return
578                         }
579                         if len(fn.segments) <= idx || fn.segments[idx] != seg || len(seg.buf) != len(buf) {
580                                 // Segment has been dropped/moved/resized.
581                                 return
582                         }
583                         fn.memsize -= int64(len(buf))
584                         fn.segments[idx] = storedSegment{
585                                 kc:      fn.FS(),
586                                 locator: locator,
587                                 size:    len(buf),
588                                 offset:  0,
589                                 length:  len(buf),
590                         }
591                 }()
592         }
593 }
594
595 // Block until all pending pruneMemSegments/flush work is
596 // finished. Caller must NOT have lock.
597 func (fn *filenode) waitPrune() {
598         var pending []<-chan struct{}
599         fn.Lock()
600         for _, seg := range fn.segments {
601                 if seg, ok := seg.(*memSegment); ok && seg.flushing != nil {
602                         pending = append(pending, seg.flushing)
603                 }
604         }
605         fn.Unlock()
606         for _, p := range pending {
607                 <-p
608         }
609 }
610
611 type dirnode struct {
612         fs *collectionFileSystem
613         treenode
614 }
615
616 func (dn *dirnode) FS() FileSystem {
617         return dn.fs
618 }
619
620 func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
621         if dn == dn.fs.rootnode() && name == ".arvados#collection" {
622                 gn := &getternode{Getter: func() ([]byte, error) {
623                         var coll Collection
624                         var err error
625                         coll.ManifestText, err = dn.fs.MarshalManifest(".")
626                         if err != nil {
627                                 return nil, err
628                         }
629                         data, err := json.Marshal(&coll)
630                         if err == nil {
631                                 data = append(data, '\n')
632                         }
633                         return data, err
634                 }}
635                 gn.SetParent(dn, name)
636                 return gn, nil
637         }
638         return dn.treenode.Child(name, replace)
639 }
640
641 type fnSegmentRef struct {
642         fn  *filenode
643         idx int
644 }
645
646 // commitBlock concatenates the data from the given filenode segments
647 // (which must be *memSegments), writes the data out to Keep as a
648 // single block, and replaces the filenodes' *memSegments with
649 // storedSegments that reference the relevant portions of the new
650 // block.
651 //
652 // bufsize is the total data size in refs. It is used to preallocate
653 // the correct amount of memory when len(refs)>1.
654 //
655 // If sync is false, commitBlock returns right away, after starting a
656 // goroutine to do the writes, reacquire the filenodes' locks, and
657 // swap out the *memSegments. Some filenodes' segments might get
658 // modified/rearranged in the meantime, in which case commitBlock
659 // won't replace them.
660 //
661 // Caller must have write lock.
662 func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error {
663         if len(refs) == 0 {
664                 return nil
665         }
666         if err := ctx.Err(); err != nil {
667                 return err
668         }
669         done := make(chan struct{})
670         var block []byte
671         segs := make([]*memSegment, 0, len(refs))
672         offsets := make([]int, 0, len(refs)) // location of segment's data within block
673         for _, ref := range refs {
674                 seg := ref.fn.segments[ref.idx].(*memSegment)
675                 if seg.flushing != nil && !sync {
676                         // Let the other flushing goroutine finish. If
677                         // it fails, we'll try again next time.
678                         return nil
679                 } else {
680                         // In sync mode, we proceed regardless of
681                         // whether another flush is in progress: It
682                         // can't finish before we do, because we hold
683                         // fn's lock until we finish our own writes.
684                 }
685                 seg.flushing = done
686                 offsets = append(offsets, len(block))
687                 if len(refs) == 1 {
688                         block = seg.buf
689                 } else if block == nil {
690                         block = append(make([]byte, 0, bufsize), seg.buf...)
691                 } else {
692                         block = append(block, seg.buf...)
693                 }
694                 segs = append(segs, seg)
695         }
696         dn.fs.throttle().Acquire()
697         errs := make(chan error, 1)
698         go func() {
699                 defer close(done)
700                 defer close(errs)
701                 locked := map[*filenode]bool{}
702                 locator, _, err := dn.fs.PutB(block)
703                 dn.fs.throttle().Release()
704                 {
705                         if !sync {
706                                 for _, name := range dn.sortedNames() {
707                                         if fn, ok := dn.inodes[name].(*filenode); ok {
708                                                 fn.Lock()
709                                                 defer fn.Unlock()
710                                                 locked[fn] = true
711                                         }
712                                 }
713                         }
714                         defer func() {
715                                 for _, seg := range segs {
716                                         if seg.flushing == done {
717                                                 seg.flushing = nil
718                                         }
719                                 }
720                         }()
721                 }
722                 if err != nil {
723                         errs <- err
724                         return
725                 }
726                 for idx, ref := range refs {
727                         if !sync {
728                                 // In async mode, fn's lock was
729                                 // released while we were waiting for
730                                 // PutB(); lots of things might have
731                                 // changed.
732                                 if len(ref.fn.segments) <= ref.idx {
733                                         // file segments have
734                                         // rearranged or changed in
735                                         // some way
736                                         continue
737                                 } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
738                                         // segment has been replaced
739                                         continue
740                                 } else if seg.flushing != done {
741                                         // seg.buf has been replaced
742                                         continue
743                                 } else if !locked[ref.fn] {
744                                         // file was renamed, moved, or
745                                         // deleted since we called
746                                         // PutB
747                                         continue
748                                 }
749                         }
750                         data := ref.fn.segments[ref.idx].(*memSegment).buf
751                         ref.fn.segments[ref.idx] = storedSegment{
752                                 kc:      dn.fs,
753                                 locator: locator,
754                                 size:    len(block),
755                                 offset:  offsets[idx],
756                                 length:  len(data),
757                         }
758                         ref.fn.memsize -= int64(len(data))
759                 }
760         }()
761         if sync {
762                 return <-errs
763         } else {
764                 return nil
765         }
766 }
767
768 type flushOpts struct {
769         sync        bool
770         shortBlocks bool
771 }
772
773 // flush in-memory data and remote-cluster block references (for the
774 // children with the given names, which must be children of dn) to
775 // local-cluster persistent storage.
776 //
777 // Caller must have write lock on dn and the named children.
778 //
779 // If any children are dirs, they will be flushed recursively.
780 func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) error {
781         cg := newContextGroup(ctx)
782         defer cg.Cancel()
783
784         goCommit := func(refs []fnSegmentRef, bufsize int) {
785                 cg.Go(func() error {
786                         return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync)
787                 })
788         }
789
790         var pending []fnSegmentRef
791         var pendingLen int = 0
792         localLocator := map[string]string{}
793         for _, name := range names {
794                 switch node := dn.inodes[name].(type) {
795                 case *dirnode:
796                         grandchildNames := node.sortedNames()
797                         for _, grandchildName := range grandchildNames {
798                                 grandchild := node.inodes[grandchildName]
799                                 grandchild.Lock()
800                                 defer grandchild.Unlock()
801                         }
802                         cg.Go(func() error { return node.flush(cg.Context(), grandchildNames, opts) })
803                 case *filenode:
804                         for idx, seg := range node.segments {
805                                 switch seg := seg.(type) {
806                                 case storedSegment:
807                                         loc, ok := localLocator[seg.locator]
808                                         if !ok {
809                                                 var err error
810                                                 loc, err = dn.fs.LocalLocator(seg.locator)
811                                                 if err != nil {
812                                                         return err
813                                                 }
814                                                 localLocator[seg.locator] = loc
815                                         }
816                                         seg.locator = loc
817                                         node.segments[idx] = seg
818                                 case *memSegment:
819                                         if seg.Len() > maxBlockSize/2 {
820                                                 goCommit([]fnSegmentRef{{node, idx}}, seg.Len())
821                                                 continue
822                                         }
823                                         if pendingLen+seg.Len() > maxBlockSize {
824                                                 goCommit(pending, pendingLen)
825                                                 pending = nil
826                                                 pendingLen = 0
827                                         }
828                                         pending = append(pending, fnSegmentRef{node, idx})
829                                         pendingLen += seg.Len()
830                                 default:
831                                         panic(fmt.Sprintf("can't sync segment type %T", seg))
832                                 }
833                         }
834                 }
835         }
836         if opts.shortBlocks {
837                 goCommit(pending, pendingLen)
838         }
839         return cg.Wait()
840 }
841
842 // caller must have write lock.
843 func (dn *dirnode) memorySize() (size int64) {
844         for _, name := range dn.sortedNames() {
845                 node := dn.inodes[name]
846                 node.Lock()
847                 defer node.Unlock()
848                 switch node := node.(type) {
849                 case *dirnode:
850                         size += node.memorySize()
851                 case *filenode:
852                         for _, seg := range node.segments {
853                                 switch seg := seg.(type) {
854                                 case *memSegment:
855                                         size += int64(seg.Len())
856                                 }
857                         }
858                 }
859         }
860         return
861 }
862
863 // caller must have write lock.
864 func (dn *dirnode) sortedNames() []string {
865         names := make([]string, 0, len(dn.inodes))
866         for name := range dn.inodes {
867                 names = append(names, name)
868         }
869         sort.Strings(names)
870         return names
871 }
872
873 // caller must have write lock.
874 func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) {
875         cg := newContextGroup(ctx)
876         defer cg.Cancel()
877
878         if len(dn.inodes) == 0 {
879                 if prefix == "." {
880                         return "", nil
881                 }
882                 // Express the existence of an empty directory by
883                 // adding an empty file named `\056`, which (unlike
884                 // the more obvious spelling `.`) is accepted by the
885                 // API's manifest validator.
886                 return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
887         }
888
889         names := dn.sortedNames()
890
891         // Wait for children to finish any pending write operations
892         // before locking them.
893         for _, name := range names {
894                 node := dn.inodes[name]
895                 if fn, ok := node.(*filenode); ok {
896                         fn.waitPrune()
897                 }
898         }
899
900         var dirnames []string
901         var filenames []string
902         for _, name := range names {
903                 node := dn.inodes[name]
904                 node.Lock()
905                 defer node.Unlock()
906                 switch node := node.(type) {
907                 case *dirnode:
908                         dirnames = append(dirnames, name)
909                 case *filenode:
910                         filenames = append(filenames, name)
911                 default:
912                         panic(fmt.Sprintf("can't marshal inode type %T", node))
913                 }
914         }
915
916         subdirs := make([]string, len(dirnames))
917         rootdir := ""
918         for i, name := range dirnames {
919                 i, name := i, name
920                 cg.Go(func() error {
921                         txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name)
922                         subdirs[i] = txt
923                         return err
924                 })
925         }
926
927         cg.Go(func() error {
928                 var streamLen int64
929                 type filepart struct {
930                         name   string
931                         offset int64
932                         length int64
933                 }
934
935                 var fileparts []filepart
936                 var blocks []string
937                 if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
938                         return err
939                 }
940                 for _, name := range filenames {
941                         node := dn.inodes[name].(*filenode)
942                         if len(node.segments) == 0 {
943                                 fileparts = append(fileparts, filepart{name: name})
944                                 continue
945                         }
946                         for _, seg := range node.segments {
947                                 switch seg := seg.(type) {
948                                 case storedSegment:
949                                         if len(blocks) > 0 && blocks[len(blocks)-1] == seg.locator {
950                                                 streamLen -= int64(seg.size)
951                                         } else {
952                                                 blocks = append(blocks, seg.locator)
953                                         }
954                                         next := filepart{
955                                                 name:   name,
956                                                 offset: streamLen + int64(seg.offset),
957                                                 length: int64(seg.length),
958                                         }
959                                         if prev := len(fileparts) - 1; prev >= 0 &&
960                                                 fileparts[prev].name == name &&
961                                                 fileparts[prev].offset+fileparts[prev].length == next.offset {
962                                                 fileparts[prev].length += next.length
963                                         } else {
964                                                 fileparts = append(fileparts, next)
965                                         }
966                                         streamLen += int64(seg.size)
967                                 default:
968                                         // This can't happen: we
969                                         // haven't unlocked since
970                                         // calling flush(sync=true).
971                                         panic(fmt.Sprintf("can't marshal segment type %T", seg))
972                                 }
973                         }
974                 }
975                 var filetokens []string
976                 for _, s := range fileparts {
977                         filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
978                 }
979                 if len(filetokens) == 0 {
980                         return nil
981                 } else if len(blocks) == 0 {
982                         blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
983                 }
984                 rootdir = manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n"
985                 return nil
986         })
987         err := cg.Wait()
988         return rootdir + strings.Join(subdirs, ""), err
989 }
990
991 func (dn *dirnode) loadManifest(txt string) error {
992         var dirname string
993         streams := strings.Split(txt, "\n")
994         if streams[len(streams)-1] != "" {
995                 return fmt.Errorf("line %d: no trailing newline", len(streams))
996         }
997         streams = streams[:len(streams)-1]
998         segments := []storedSegment{}
999         for i, stream := range streams {
1000                 lineno := i + 1
1001                 var anyFileTokens bool
1002                 var pos int64
1003                 var segIdx int
1004                 segments = segments[:0]
1005                 for i, token := range strings.Split(stream, " ") {
1006                         if i == 0 {
1007                                 dirname = manifestUnescape(token)
1008                                 continue
1009                         }
1010                         if !strings.Contains(token, ":") {
1011                                 if anyFileTokens {
1012                                         return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1013                                 }
1014                                 toks := strings.SplitN(token, "+", 3)
1015                                 if len(toks) < 2 {
1016                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
1017                                 }
1018                                 length, err := strconv.ParseInt(toks[1], 10, 32)
1019                                 if err != nil || length < 0 {
1020                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
1021                                 }
1022                                 segments = append(segments, storedSegment{
1023                                         locator: token,
1024                                         size:    int(length),
1025                                         offset:  0,
1026                                         length:  int(length),
1027                                 })
1028                                 continue
1029                         } else if len(segments) == 0 {
1030                                 return fmt.Errorf("line %d: bad locator %q", lineno, token)
1031                         }
1032
1033                         toks := strings.SplitN(token, ":", 3)
1034                         if len(toks) != 3 {
1035                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1036                         }
1037                         anyFileTokens = true
1038
1039                         offset, err := strconv.ParseInt(toks[0], 10, 64)
1040                         if err != nil || offset < 0 {
1041                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1042                         }
1043                         length, err := strconv.ParseInt(toks[1], 10, 64)
1044                         if err != nil || length < 0 {
1045                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
1046                         }
1047                         name := dirname + "/" + manifestUnescape(toks[2])
1048                         fnode, err := dn.createFileAndParents(name)
1049                         if fnode == nil && err == nil && length == 0 {
1050                                 // Special case: an empty file used as
1051                                 // a marker to preserve an otherwise
1052                                 // empty directory in a manifest.
1053                                 continue
1054                         }
1055                         if err != nil || (fnode == nil && length != 0) {
1056                                 return fmt.Errorf("line %d: cannot use path %q with length %d: %s", lineno, name, length, err)
1057                         }
1058                         // Map the stream offset/range coordinates to
1059                         // block/offset/range coordinates and add
1060                         // corresponding storedSegments to the filenode
1061                         if pos > offset {
1062                                 // Can't continue where we left off.
1063                                 // TODO: binary search instead of
1064                                 // rewinding all the way (but this
1065                                 // situation might be rare anyway)
1066                                 segIdx, pos = 0, 0
1067                         }
1068                         for next := int64(0); segIdx < len(segments); segIdx++ {
1069                                 seg := segments[segIdx]
1070                                 next = pos + int64(seg.Len())
1071                                 if next <= offset || seg.Len() == 0 {
1072                                         pos = next
1073                                         continue
1074                                 }
1075                                 if pos >= offset+length {
1076                                         break
1077                                 }
1078                                 var blkOff int
1079                                 if pos < offset {
1080                                         blkOff = int(offset - pos)
1081                                 }
1082                                 blkLen := seg.Len() - blkOff
1083                                 if pos+int64(blkOff+blkLen) > offset+length {
1084                                         blkLen = int(offset + length - pos - int64(blkOff))
1085                                 }
1086                                 fnode.appendSegment(storedSegment{
1087                                         kc:      dn.fs,
1088                                         locator: seg.locator,
1089                                         size:    seg.size,
1090                                         offset:  blkOff,
1091                                         length:  blkLen,
1092                                 })
1093                                 if next > offset+length {
1094                                         break
1095                                 } else {
1096                                         pos = next
1097                                 }
1098                         }
1099                         if segIdx == len(segments) && pos < offset+length {
1100                                 return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
1101                         }
1102                 }
1103                 if !anyFileTokens {
1104                         return fmt.Errorf("line %d: no file segments", lineno)
1105                 } else if len(segments) == 0 {
1106                         return fmt.Errorf("line %d: no locators", lineno)
1107                 } else if dirname == "" {
1108                         return fmt.Errorf("line %d: no stream name", lineno)
1109                 }
1110         }
1111         return nil
1112 }
1113
1114 // only safe to call from loadManifest -- no locking.
1115 //
1116 // If path is a "parent directory exists" marker (the last path
1117 // component is "."), the returned values are both nil.
1118 func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
1119         var node inode = dn
1120         names := strings.Split(path, "/")
1121         basename := names[len(names)-1]
1122         for _, name := range names[:len(names)-1] {
1123                 switch name {
1124                 case "", ".":
1125                         continue
1126                 case "..":
1127                         if node == dn {
1128                                 // can't be sure parent will be a *dirnode
1129                                 return nil, ErrInvalidArgument
1130                         }
1131                         node = node.Parent()
1132                         continue
1133                 }
1134                 node, err = node.Child(name, func(child inode) (inode, error) {
1135                         if child == nil {
1136                                 child, err := node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
1137                                 if err != nil {
1138                                         return nil, err
1139                                 }
1140                                 child.SetParent(node, name)
1141                                 return child, nil
1142                         } else if !child.IsDir() {
1143                                 return child, ErrFileExists
1144                         } else {
1145                                 return child, nil
1146                         }
1147                 })
1148                 if err != nil {
1149                         return
1150                 }
1151         }
1152         if basename == "." {
1153                 return
1154         } else if !permittedName(basename) {
1155                 err = fmt.Errorf("invalid file part %q in path %q", basename, path)
1156                 return
1157         }
1158         _, err = node.Child(basename, func(child inode) (inode, error) {
1159                 switch child := child.(type) {
1160                 case nil:
1161                         child, err = node.FS().newNode(basename, 0755, node.FileInfo().ModTime())
1162                         if err != nil {
1163                                 return nil, err
1164                         }
1165                         child.SetParent(node, basename)
1166                         fn = child.(*filenode)
1167                         return child, nil
1168                 case *filenode:
1169                         fn = child
1170                         return child, nil
1171                 case *dirnode:
1172                         return child, ErrIsDirectory
1173                 default:
1174                         return child, ErrInvalidArgument
1175                 }
1176         })
1177         return
1178 }
1179
1180 func (dn *dirnode) TreeSize() (bytes int64) {
1181         dn.RLock()
1182         defer dn.RUnlock()
1183         for _, i := range dn.inodes {
1184                 switch i := i.(type) {
1185                 case *filenode:
1186                         bytes += i.Size()
1187                 case *dirnode:
1188                         bytes += i.TreeSize()
1189                 }
1190         }
1191         return
1192 }
1193
1194 type segment interface {
1195         io.ReaderAt
1196         Len() int
1197         // Return a new segment with a subsection of the data from this
1198         // one. length<0 means length=Len()-off.
1199         Slice(off int, length int) segment
1200 }
1201
1202 type memSegment struct {
1203         buf []byte
1204         // If flushing is not nil, then a) buf is being shared by a
1205         // pruneMemSegments goroutine, and must be copied on write;
1206         // and b) the flushing channel will close when the goroutine
1207         // finishes, whether it succeeds or not.
1208         flushing <-chan struct{}
1209 }
1210
1211 func (me *memSegment) Len() int {
1212         return len(me.buf)
1213 }
1214
1215 func (me *memSegment) Slice(off, length int) segment {
1216         if length < 0 {
1217                 length = len(me.buf) - off
1218         }
1219         buf := make([]byte, length)
1220         copy(buf, me.buf[off:])
1221         return &memSegment{buf: buf}
1222 }
1223
1224 func (me *memSegment) Truncate(n int) {
1225         if n > cap(me.buf) || (me.flushing != nil && n > len(me.buf)) {
1226                 newsize := 1024
1227                 for newsize < n {
1228                         newsize = newsize << 2
1229                 }
1230                 newbuf := make([]byte, n, newsize)
1231                 copy(newbuf, me.buf)
1232                 me.buf, me.flushing = newbuf, nil
1233         } else {
1234                 // reclaim existing capacity, and zero reclaimed part
1235                 oldlen := len(me.buf)
1236                 me.buf = me.buf[:n]
1237                 for i := oldlen; i < n; i++ {
1238                         me.buf[i] = 0
1239                 }
1240         }
1241 }
1242
1243 func (me *memSegment) WriteAt(p []byte, off int) {
1244         if off+len(p) > len(me.buf) {
1245                 panic("overflowed segment")
1246         }
1247         if me.flushing != nil {
1248                 me.buf, me.flushing = append([]byte(nil), me.buf...), nil
1249         }
1250         copy(me.buf[off:], p)
1251 }
1252
1253 func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
1254         if off > int64(me.Len()) {
1255                 err = io.EOF
1256                 return
1257         }
1258         n = copy(p, me.buf[int(off):])
1259         if n < len(p) {
1260                 err = io.EOF
1261         }
1262         return
1263 }
1264
1265 type storedSegment struct {
1266         kc      fsBackend
1267         locator string
1268         size    int // size of stored block (also encoded in locator)
1269         offset  int // position of segment within the stored block
1270         length  int // bytes in this segment (offset + length <= size)
1271 }
1272
1273 func (se storedSegment) Len() int {
1274         return se.length
1275 }
1276
1277 func (se storedSegment) Slice(n, size int) segment {
1278         se.offset += n
1279         se.length -= n
1280         if size >= 0 && se.length > size {
1281                 se.length = size
1282         }
1283         return se
1284 }
1285
1286 func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
1287         if off > int64(se.length) {
1288                 return 0, io.EOF
1289         }
1290         maxlen := se.length - int(off)
1291         if len(p) > maxlen {
1292                 p = p[:maxlen]
1293                 n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1294                 if err == nil {
1295                         err = io.EOF
1296                 }
1297                 return
1298         }
1299         return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1300 }
1301
1302 func canonicalName(name string) string {
1303         name = path.Clean("/" + name)
1304         if name == "/" || name == "./" {
1305                 name = "."
1306         } else if strings.HasPrefix(name, "/") {
1307                 name = "." + name
1308         }
1309         return name
1310 }
1311
1312 var manifestEscapeSeq = regexp.MustCompile(`\\([0-7]{3}|\\)`)
1313
1314 func manifestUnescapeFunc(seq string) string {
1315         if seq == `\\` {
1316                 return `\`
1317         }
1318         i, err := strconv.ParseUint(seq[1:], 8, 8)
1319         if err != nil {
1320                 // Invalid escape sequence: can't unescape.
1321                 return seq
1322         }
1323         return string([]byte{byte(i)})
1324 }
1325
1326 func manifestUnescape(s string) string {
1327         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
1328 }
1329
1330 var manifestEscapedChar = regexp.MustCompile(`[\000-\040:\s\\]`)
1331
1332 func manifestEscapeFunc(seq string) string {
1333         return fmt.Sprintf("\\%03o", byte(seq[0]))
1334 }
1335
1336 func manifestEscape(s string) string {
1337         return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
1338 }