Merge branch '21766-disk-cache-size'
[arvados.git] / lib / crunchrun / copier.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "encoding/json"
9         "errors"
10         "fmt"
11         "io"
12         "io/fs"
13         "os"
14         "path/filepath"
15         "sort"
16         "strings"
17
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "git.arvados.org/arvados.git/sdk/go/keepclient"
20         "git.arvados.org/arvados.git/sdk/go/manifest"
21         "github.com/bmatcuk/doublestar/v4"
22 )
23
24 type printfer interface {
25         Printf(string, ...interface{})
26 }
27
28 var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
29
30 const limitFollowSymlinks = 10
31
32 type filetodo struct {
33         src  string
34         dst  string
35         size int64
36 }
37
38 // copier copies data from a finished container's output path to a new
39 // Arvados collection.
40 //
41 // Regular files (and symlinks to regular files) in hostOutputDir are
42 // copied from the local filesystem.
43 //
44 // Symlinks to mounted collections, and any collections mounted under
45 // ctrOutputDir, are copied by transforming the relevant parts of the
46 // existing manifests, without moving any data around.
47 //
48 // Symlinks to other parts of the container's filesystem result in
49 // errors.
50 //
51 // Use:
52 //
53 //      manifest, err := (&copier{...}).Copy()
54 type copier struct {
55         client        *arvados.Client
56         keepClient    IKeepClient
57         hostOutputDir string
58         ctrOutputDir  string
59         globs         []string
60         bindmounts    map[string]bindmount
61         mounts        map[string]arvados.Mount
62         secretMounts  map[string]arvados.Mount
63         logger        printfer
64
65         dirs     []string
66         files    []filetodo
67         manifest string
68
69         manifestCache map[string]*manifest.Manifest
70 }
71
72 // Copy copies data as needed, and returns a new manifest.
73 func (cp *copier) Copy() (string, error) {
74         err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
75         if err != nil {
76                 return "", fmt.Errorf("error scanning files to copy to output: %v", err)
77         }
78         collfs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient)
79         if err != nil {
80                 return "", fmt.Errorf("error creating Collection.FileSystem: %v", err)
81         }
82
83         // Remove files/dirs that don't match globs (the ones that
84         // were added during cp.walkMount() by copying subtree
85         // manifests into cp.manifest).
86         err = cp.applyGlobsToCollectionFS(collfs)
87         if err != nil {
88                 return "", fmt.Errorf("error while removing non-matching files from output collection: %w", err)
89         }
90         // Remove files/dirs that don't match globs (the ones that are
91         // stored on the local filesystem and would need to be copied
92         // in copyFile() below).
93         cp.applyGlobsToFilesAndDirs()
94         for _, d := range cp.dirs {
95                 err = collfs.Mkdir(d, 0777)
96                 if err != nil && err != os.ErrExist {
97                         return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
98                 }
99         }
100
101         var unflushed int64
102         var lastparentdir string
103         for _, f := range cp.files {
104                 // If a dir has just had its last file added, do a
105                 // full Flush. Otherwise, do a partial Flush (write
106                 // full-size blocks, but leave the last short block
107                 // open so f's data can be packed with it).
108                 dir, _ := filepath.Split(f.dst)
109                 if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE {
110                         if err := collfs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
111                                 return "", fmt.Errorf("error flushing output collection file data: %v", err)
112                         }
113                         unflushed = 0
114                 }
115                 lastparentdir = dir
116
117                 n, err := cp.copyFile(collfs, f)
118                 if err != nil {
119                         return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
120                 }
121                 unflushed += n
122         }
123         return collfs.MarshalManifest(".")
124 }
125
126 func (cp *copier) matchGlobs(path string, isDir bool) bool {
127         // An entry in the top level of the output directory looks
128         // like "/foo", but globs look like "foo", so we strip the
129         // leading "/" before matching.
130         path = strings.TrimLeft(path, "/")
131         for _, glob := range cp.globs {
132                 if !isDir && strings.HasSuffix(glob, "/**") {
133                         // doublestar.Match("f*/**", "ff") and
134                         // doublestar.Match("f*/**", "ff/gg") both
135                         // return true, but (to be compatible with
136                         // bash shopt) "ff" should match only if it is
137                         // a directory.
138                         //
139                         // To avoid errant matches, we add the file's
140                         // basename to the end of the pattern:
141                         //
142                         // Match("f*/**/ff", "ff") => false
143                         // Match("f*/**/gg", "ff/gg") => true
144                         //
145                         // Of course, we need to escape basename in
146                         // case it contains *, ?, \, etc.
147                         _, name := filepath.Split(path)
148                         escapedName := strings.TrimSuffix(strings.Replace(name, "", "\\", -1), "\\")
149                         if match, _ := doublestar.Match(glob+"/"+escapedName, path); match {
150                                 return true
151                         }
152                 } else if match, _ := doublestar.Match(glob, path); match {
153                         return true
154                 } else if isDir {
155                         // Workaround doublestar bug (v4.6.1).
156                         // "foo*/**" should match "foo", but does not,
157                         // because isZeroLengthPattern does not accept
158                         // "*/**" as a zero length pattern.
159                         if trunc := strings.TrimSuffix(glob, "*/**"); trunc != glob {
160                                 if match, _ := doublestar.Match(trunc, path); match {
161                                         return true
162                                 }
163                         }
164                 }
165         }
166         return false
167 }
168
169 // Delete entries from cp.files that do not match cp.globs.
170 //
171 // Delete entries from cp.dirs that do not match cp.globs.
172 //
173 // Ensure parent/ancestor directories of remaining cp.files and
174 // cp.dirs entries are still present in cp.dirs, even if they do not
175 // match cp.globs themselves.
176 func (cp *copier) applyGlobsToFilesAndDirs() {
177         if len(cp.globs) == 0 {
178                 return
179         }
180         keepdirs := make(map[string]bool)
181         for _, path := range cp.dirs {
182                 if cp.matchGlobs(path, true) {
183                         keepdirs[path] = true
184                 }
185         }
186         for path := range keepdirs {
187                 for i, c := range path {
188                         if i > 0 && c == '/' {
189                                 keepdirs[path[:i]] = true
190                         }
191                 }
192         }
193         var keepfiles []filetodo
194         for _, file := range cp.files {
195                 if cp.matchGlobs(file.dst, false) {
196                         keepfiles = append(keepfiles, file)
197                 }
198         }
199         for _, file := range keepfiles {
200                 for i, c := range file.dst {
201                         if i > 0 && c == '/' {
202                                 keepdirs[file.dst[:i]] = true
203                         }
204                 }
205         }
206         cp.dirs = nil
207         for path := range keepdirs {
208                 cp.dirs = append(cp.dirs, path)
209         }
210         sort.Strings(cp.dirs)
211         cp.files = keepfiles
212 }
213
214 // Delete files in collfs that do not match cp.globs.  Also delete
215 // directories that are empty (after deleting non-matching files) and
216 // do not match cp.globs themselves.
217 func (cp *copier) applyGlobsToCollectionFS(collfs arvados.CollectionFileSystem) error {
218         if len(cp.globs) == 0 {
219                 return nil
220         }
221         include := make(map[string]bool)
222         err := fs.WalkDir(arvados.FS(collfs), "", func(path string, ent fs.DirEntry, err error) error {
223                 if cp.matchGlobs(path, ent.IsDir()) {
224                         for i, c := range path {
225                                 if i > 0 && c == '/' {
226                                         include[path[:i]] = true
227                                 }
228                         }
229                         include[path] = true
230                 }
231                 return nil
232         })
233         if err != nil {
234                 return err
235         }
236         err = fs.WalkDir(arvados.FS(collfs), "", func(path string, ent fs.DirEntry, err error) error {
237                 if err != nil || path == "" {
238                         return err
239                 }
240                 if !include[path] {
241                         err := collfs.RemoveAll(path)
242                         if err != nil {
243                                 return err
244                         }
245                         if ent.IsDir() {
246                                 return fs.SkipDir
247                         }
248                 }
249                 return nil
250         })
251         return err
252 }
253
254 // Return true if it's possible for any descendant of the given path
255 // to match anything in cp.globs.  Used by walkMount to avoid loading
256 // collections that are mounted underneath ctrOutputPath but excluded
257 // by globs.
258 func (cp *copier) subtreeCouldMatch(path string) bool {
259         if len(cp.globs) == 0 {
260                 return true
261         }
262         pathdepth := 1 + strings.Count(path, "/")
263         for _, glob := range cp.globs {
264                 globdepth := 0
265                 lastsep := 0
266                 for i, c := range glob {
267                         if c != '/' || !doublestar.ValidatePattern(glob[:i]) {
268                                 // Escaped "/", or "/" in a character
269                                 // class, is not a path separator.
270                                 continue
271                         }
272                         if glob[lastsep:i] == "**" {
273                                 return true
274                         }
275                         lastsep = i + 1
276                         if globdepth++; globdepth == pathdepth {
277                                 if match, _ := doublestar.Match(glob[:i]+"/*", path+"/z"); match {
278                                         return true
279                                 }
280                                 break
281                         }
282                 }
283                 if globdepth < pathdepth && glob[lastsep:] == "**" {
284                         return true
285                 }
286         }
287         return false
288 }
289
290 func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
291         cp.logger.Printf("copying %q (%d bytes)", strings.TrimLeft(f.dst, "/"), f.size)
292         dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
293         if err != nil {
294                 return 0, err
295         }
296         src, err := os.Open(f.src)
297         if err != nil {
298                 dst.Close()
299                 return 0, err
300         }
301         defer src.Close()
302         n, err := io.Copy(dst, src)
303         if err != nil {
304                 dst.Close()
305                 return n, err
306         }
307         return n, dst.Close()
308 }
309
310 // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an
311 // absolute path in the container's filesystem) to dest (an absolute
312 // path in the output collection, or "" for output root).
313 //
314 // src must be (or be a descendant of) a readonly "collection" mount,
315 // a writable collection mounted at ctrOutputPath, or a "tmp" mount.
316 //
317 // If walkMountsBelow is true, include contents of any collection
318 // mounted below src as well.
319 func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
320         // srcRoot, srcMount indicate the innermost mount that
321         // contains src.
322         var srcRoot string
323         var srcMount arvados.Mount
324         for root, mnt := range cp.mounts {
325                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
326                         srcRoot, srcMount = root, mnt
327                 }
328         }
329         for root := range cp.secretMounts {
330                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
331                         // Silently omit secrets, and symlinks to
332                         // secrets.
333                         return nil
334                 }
335         }
336         if srcRoot == "" {
337                 return fmt.Errorf("cannot output file %q: not in any mount", src)
338         }
339
340         // srcRelPath is the path to the file/dir we are trying to
341         // copy, relative to its mount point -- ".", "./foo.txt", ...
342         srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
343
344         // outputRelPath is the destination path relative to the
345         // output directory. Used for logging and glob matching.
346         var outputRelPath = ""
347         if strings.HasPrefix(src, cp.ctrOutputDir) {
348                 outputRelPath = strings.TrimPrefix(src[len(cp.ctrOutputDir):], "/")
349         }
350         if outputRelPath == "" {
351                 // blank means copy a whole directory, so replace it
352                 // with a wildcard to make it a little clearer what's
353                 // going on since outputRelPath is only used for logging
354                 outputRelPath = "*"
355         }
356
357         switch {
358         case srcMount.ExcludeFromOutput:
359         case outputRelPath != "*" && !cp.subtreeCouldMatch(outputRelPath):
360                 cp.logger.Printf("not copying %q because contents cannot match output globs", outputRelPath)
361                 return nil
362         case srcMount.Kind == "tmp":
363                 // Handle by walking the host filesystem.
364                 return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
365         case srcMount.Kind != "collection":
366                 return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
367         case !srcMount.Writable:
368                 cp.logger.Printf("copying %q from %v/%v", outputRelPath, srcMount.PortableDataHash, strings.TrimPrefix(srcRelPath, "./"))
369                 mft, err := cp.getManifest(srcMount.PortableDataHash)
370                 if err != nil {
371                         return err
372                 }
373                 cp.manifest += mft.Extract(srcRelPath, dest).Text
374         default:
375                 cp.logger.Printf("copying %q", outputRelPath)
376                 hostRoot, err := cp.hostRoot(srcRoot)
377                 if err != nil {
378                         return err
379                 }
380                 f, err := os.Open(filepath.Join(hostRoot, ".arvados#collection"))
381                 if err != nil {
382                         return err
383                 }
384                 defer f.Close()
385                 var coll arvados.Collection
386                 err = json.NewDecoder(f).Decode(&coll)
387                 if err != nil {
388                         return err
389                 }
390                 mft := manifest.Manifest{Text: coll.ManifestText}
391                 cp.manifest += mft.Extract(srcRelPath, dest).Text
392         }
393         if walkMountsBelow {
394                 return cp.walkMountsBelow(dest, src)
395         }
396         return nil
397 }
398
399 func (cp *copier) walkMountsBelow(dest, src string) error {
400         for mnt, mntinfo := range cp.mounts {
401                 if !strings.HasPrefix(mnt, src+"/") {
402                         continue
403                 }
404                 if cp.copyRegularFiles(mntinfo) {
405                         // These got copied into the nearest parent
406                         // mount as regular files during setup, so
407                         // they get copied as regular files when we
408                         // process the parent. Output will reflect any
409                         // changes and deletions done by the
410                         // container.
411                         continue
412                 }
413                 // Example: we are processing dest=/foo src=/mnt1/dir1
414                 // (perhaps we followed a symlink /outdir/foo ->
415                 // /mnt1/dir1). Caller has already processed the
416                 // collection mounted at /mnt1, but now we find that
417                 // /mnt1/dir1/mnt2 is also a mount, so we need to copy
418                 // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
419                 //
420                 // We handle all descendants of /mnt1/dir1 in this
421                 // loop instead of using recursion:
422                 // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
423                 // /mnt1/dir1/mnt2, but we only want to walk it
424                 // once. (This simplification is safe because mounted
425                 // collections cannot contain symlinks.)
426                 err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
427                 if err != nil {
428                         return err
429                 }
430         }
431         return nil
432 }
433
434 // Add entries to cp.dirs and cp.files so as to copy src (an absolute
435 // path in the container's filesystem which corresponds to a real file
436 // or directory in cp.hostOutputDir) to dest (an absolute path in the
437 // output collection, or "" for output root).
438 //
439 // Always follow symlinks.
440 //
441 // If includeMounts is true, include mounts at and below src.
442 // Otherwise, skip them.
443 func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
444         if includeMounts {
445                 err := cp.walkMountsBelow(dest, src)
446                 if err != nil {
447                         return err
448                 }
449         }
450
451         hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
452
453         // If src is a symlink, walk its target.
454         fi, err := os.Lstat(hostsrc)
455         if err != nil {
456                 return fmt.Errorf("lstat %q: %s", src, err)
457         }
458         if fi.Mode()&os.ModeSymlink != 0 {
459                 if maxSymlinks < 0 {
460                         return errTooManySymlinks
461                 }
462                 target, err := os.Readlink(hostsrc)
463                 if err != nil {
464                         return fmt.Errorf("readlink %q: %s", src, err)
465                 }
466                 if !strings.HasPrefix(target, "/") {
467                         target = filepath.Join(filepath.Dir(src), target)
468                 }
469                 return cp.walkMount(dest, target, maxSymlinks-1, true)
470         }
471
472         // If src is a regular directory, append it to cp.dirs and
473         // walk each of its children. (If there are no children,
474         // create an empty file "dest/.keep".)
475         if fi.Mode().IsDir() {
476                 if dest != "" {
477                         cp.dirs = append(cp.dirs, dest)
478                 }
479                 dir, err := os.Open(hostsrc)
480                 if err != nil {
481                         return fmt.Errorf("open %q: %s", src, err)
482                 }
483                 names, err := dir.Readdirnames(-1)
484                 dir.Close()
485                 if err != nil {
486                         return fmt.Errorf("readdirnames %q: %s", src, err)
487                 }
488                 if len(names) == 0 {
489                         if dest != "" {
490                                 cp.files = append(cp.files, filetodo{
491                                         src: os.DevNull,
492                                         dst: dest + "/.keep",
493                                 })
494                         }
495                         return nil
496                 }
497                 sort.Strings(names)
498                 for _, name := range names {
499                         dest, src := dest+"/"+name, src+"/"+name
500                         if _, isSecret := cp.secretMounts[src]; isSecret {
501                                 continue
502                         }
503                         if mntinfo, isMount := cp.mounts[src]; isMount && !cp.copyRegularFiles(mntinfo) {
504                                 // If a regular file/dir somehow
505                                 // exists at a path that's also a
506                                 // mount target, ignore the file --
507                                 // the mount has already been included
508                                 // with walkMountsBelow().
509                                 //
510                                 // (...except mount types that are
511                                 // handled as regular files.)
512                                 continue
513                         } else if isMount && !cp.subtreeCouldMatch(src[len(cp.ctrOutputDir)+1:]) {
514                                 continue
515                         }
516                         err = cp.walkHostFS(dest, src, maxSymlinks, false)
517                         if err != nil {
518                                 return err
519                         }
520                 }
521                 return nil
522         }
523
524         // If src is a regular file, append it to cp.files.
525         if fi.Mode().IsRegular() {
526                 cp.files = append(cp.files, filetodo{
527                         src:  hostsrc,
528                         dst:  dest,
529                         size: fi.Size(),
530                 })
531                 return nil
532         }
533         cp.logger.Printf("Skipping unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
534         return nil
535 }
536
537 // Return the host path that was mounted at the given path in the
538 // container.
539 func (cp *copier) hostRoot(ctrRoot string) (string, error) {
540         if ctrRoot == cp.ctrOutputDir {
541                 return cp.hostOutputDir, nil
542         }
543         if mnt, ok := cp.bindmounts[ctrRoot]; ok {
544                 return mnt.HostPath, nil
545         }
546         return "", fmt.Errorf("not bind-mounted: %q", ctrRoot)
547 }
548
549 func (cp *copier) copyRegularFiles(m arvados.Mount) bool {
550         return m.Kind == "text" || m.Kind == "json" || (m.Kind == "collection" && m.Writable)
551 }
552
553 func (cp *copier) getManifest(pdh string) (*manifest.Manifest, error) {
554         if mft, ok := cp.manifestCache[pdh]; ok {
555                 return mft, nil
556         }
557         var coll arvados.Collection
558         err := cp.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, nil)
559         if err != nil {
560                 return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
561         }
562         mft := &manifest.Manifest{Text: coll.ManifestText}
563         if cp.manifestCache == nil {
564                 cp.manifestCache = map[string]*manifest.Manifest{pdh: mft}
565         } else {
566                 cp.manifestCache[pdh] = mft
567         }
568         return mft, nil
569 }