]> git.arvados.org - arvados.git/blob - lib/crunchrun/copier.go
Merge branch '23009-multiselect-bug' into main. Closes #23009
[arvados.git] / lib / crunchrun / copier.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "encoding/json"
9         "errors"
10         "fmt"
11         "io"
12         "io/fs"
13         "os"
14         "path/filepath"
15         "sort"
16         "strings"
17
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "git.arvados.org/arvados.git/sdk/go/keepclient"
20         "github.com/bmatcuk/doublestar/v4"
21 )
22
23 type printfer interface {
24         Printf(string, ...interface{})
25 }
26
27 var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
28
29 const limitFollowSymlinks = 10
30
31 type filetodo struct {
32         src  string
33         dst  string
34         size int64
35 }
36
37 // copier copies data from a finished container's output path to a new
38 // Arvados collection.
39 //
40 // Regular files (and symlinks to regular files) in hostOutputDir are
41 // copied from the local filesystem.
42 //
43 // Symlinks to mounted collections, and any collections mounted under
44 // ctrOutputDir, are copied by reference, without moving any data
45 // around.
46 //
47 // Symlinks to other parts of the container's filesystem result in
48 // errors.
49 //
50 // Use:
51 //
52 //      manifest, err := (&copier{...}).Copy()
53 type copier struct {
54         client        *arvados.Client
55         keepClient    IKeepClient
56         hostOutputDir string
57         ctrOutputDir  string
58         globs         []string
59         bindmounts    map[string]bindmount
60         mounts        map[string]arvados.Mount
61         secretMounts  map[string]arvados.Mount
62         logger        printfer
63
64         dirs   []string
65         files  []filetodo
66         staged arvados.CollectionFileSystem
67
68         manifestCache map[string]string
69
70         // tmpfs is the filesystem representation of the source
71         // collection that was most recently handled in
72         // copyFromCollection.  This improves performance slightly in
73         // the special case where many mounts reference the same
74         // source collection.
75         tmpfs             arvados.CollectionFileSystem
76         tmpfsManifestText string
77 }
78
79 // Copy copies data as needed, and returns a new manifest.
80 //
81 // Copy should not be called more than once.
82 func (cp *copier) Copy() (string, error) {
83         var err error
84         cp.staged, err = (&arvados.Collection{}).FileSystem(cp.client, cp.keepClient)
85         if err != nil {
86                 return "", fmt.Errorf("error creating Collection.FileSystem: %v", err)
87         }
88         err = cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
89         if err != nil {
90                 return "", fmt.Errorf("error scanning files to copy to output: %v", err)
91         }
92
93         // Remove files/dirs that don't match globs (the files/dirs
94         // that were added during cp.walkMount() by copying subtree
95         // manifests into cp.staged).
96         err = cp.applyGlobsToStaged()
97         if err != nil {
98                 return "", fmt.Errorf("error while removing non-matching files from output collection: %w", err)
99         }
100         // Remove files/dirs that don't match globs (the files/dirs
101         // that are stored on the local filesystem and would need to
102         // be copied in copyFile() below).
103         cp.applyGlobsToFilesAndDirs()
104         for _, d := range cp.dirs {
105                 err = cp.staged.Mkdir(d, 0777)
106                 if err != nil && err != os.ErrExist {
107                         return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
108                 }
109         }
110
111         var unflushed int64
112         var lastparentdir string
113         for _, f := range cp.files {
114                 // If a dir has just had its last file added, do a
115                 // full Flush. Otherwise, do a partial Flush (write
116                 // full-size blocks, but leave the last short block
117                 // open so f's data can be packed with it).
118                 dir, _ := filepath.Split(f.dst)
119                 if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE {
120                         if err := cp.staged.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
121                                 return "", fmt.Errorf("error flushing output collection file data: %v", err)
122                         }
123                         unflushed = 0
124                 }
125                 lastparentdir = dir
126
127                 n, err := cp.copyFile(cp.staged, f)
128                 if err != nil {
129                         return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
130                 }
131                 unflushed += n
132         }
133         return cp.staged.MarshalManifest(".")
134 }
135
136 func (cp *copier) matchGlobs(path string, isDir bool) bool {
137         // An entry in the top level of the output directory looks
138         // like "/foo", but globs look like "foo", so we strip the
139         // leading "/" before matching.
140         path = strings.TrimLeft(path, "/")
141         for _, glob := range cp.globs {
142                 if !isDir && strings.HasSuffix(glob, "/**") {
143                         // doublestar.Match("f*/**", "ff") and
144                         // doublestar.Match("f*/**", "ff/gg") both
145                         // return true, but (to be compatible with
146                         // bash shopt) "ff" should match only if it is
147                         // a directory.
148                         //
149                         // To avoid errant matches, we add the file's
150                         // basename to the end of the pattern:
151                         //
152                         // Match("f*/**/ff", "ff") => false
153                         // Match("f*/**/gg", "ff/gg") => true
154                         //
155                         // Of course, we need to escape basename in
156                         // case it contains *, ?, \, etc.
157                         _, name := filepath.Split(path)
158                         escapedName := strings.TrimSuffix(strings.Replace(name, "", "\\", -1), "\\")
159                         if match, _ := doublestar.Match(glob+"/"+escapedName, path); match {
160                                 return true
161                         }
162                 } else if match, _ := doublestar.Match(glob, path); match {
163                         return true
164                 } else if isDir {
165                         // Workaround doublestar bug (v4.6.1).
166                         // "foo*/**" should match "foo", but does not,
167                         // because isZeroLengthPattern does not accept
168                         // "*/**" as a zero length pattern.
169                         if trunc := strings.TrimSuffix(glob, "*/**"); trunc != glob {
170                                 if match, _ := doublestar.Match(trunc, path); match {
171                                         return true
172                                 }
173                         }
174                 }
175         }
176         return false
177 }
178
179 // Delete entries from cp.files that do not match cp.globs.
180 //
181 // Delete entries from cp.dirs that do not match cp.globs.
182 //
183 // Ensure parent/ancestor directories of remaining cp.files and
184 // cp.dirs entries are still present in cp.dirs, even if they do not
185 // match cp.globs themselves.
186 func (cp *copier) applyGlobsToFilesAndDirs() {
187         if len(cp.globs) == 0 {
188                 return
189         }
190         keepdirs := make(map[string]bool)
191         for _, path := range cp.dirs {
192                 if cp.matchGlobs(path, true) {
193                         keepdirs[path] = true
194                 }
195         }
196         for path := range keepdirs {
197                 for i, c := range path {
198                         if i > 0 && c == '/' {
199                                 keepdirs[path[:i]] = true
200                         }
201                 }
202         }
203         var keepfiles []filetodo
204         for _, file := range cp.files {
205                 if cp.matchGlobs(file.dst, false) {
206                         keepfiles = append(keepfiles, file)
207                 }
208         }
209         for _, file := range keepfiles {
210                 for i, c := range file.dst {
211                         if i > 0 && c == '/' {
212                                 keepdirs[file.dst[:i]] = true
213                         }
214                 }
215         }
216         cp.dirs = nil
217         for path := range keepdirs {
218                 cp.dirs = append(cp.dirs, path)
219         }
220         sort.Strings(cp.dirs)
221         cp.files = keepfiles
222 }
223
224 // Delete files in cp.staged that do not match cp.globs.  Also delete
225 // directories that are empty (after deleting non-matching files) and
226 // do not match cp.globs themselves.
227 func (cp *copier) applyGlobsToStaged() error {
228         if len(cp.globs) == 0 {
229                 return nil
230         }
231         include := make(map[string]bool)
232         err := fs.WalkDir(arvados.FS(cp.staged), "", func(path string, ent fs.DirEntry, err error) error {
233                 if cp.matchGlobs(path, ent.IsDir()) {
234                         for i, c := range path {
235                                 if i > 0 && c == '/' {
236                                         include[path[:i]] = true
237                                 }
238                         }
239                         include[path] = true
240                 }
241                 return nil
242         })
243         if err != nil {
244                 return err
245         }
246         err = fs.WalkDir(arvados.FS(cp.staged), "", func(path string, ent fs.DirEntry, err error) error {
247                 if err != nil || path == "" {
248                         return err
249                 }
250                 if !include[path] {
251                         err := cp.staged.RemoveAll(path)
252                         if err != nil {
253                                 return err
254                         }
255                         if ent.IsDir() {
256                                 return fs.SkipDir
257                         }
258                 }
259                 return nil
260         })
261         return err
262 }
263
264 // Return true if it's possible for any descendant of the given path
265 // to match anything in cp.globs.  Used by walkMount to avoid loading
266 // collections that are mounted underneath ctrOutputPath but excluded
267 // by globs.
268 func (cp *copier) subtreeCouldMatch(path string) bool {
269         if len(cp.globs) == 0 {
270                 return true
271         }
272         pathdepth := 1 + strings.Count(path, "/")
273         for _, glob := range cp.globs {
274                 globdepth := 0
275                 lastsep := 0
276                 for i, c := range glob {
277                         if c != '/' || !doublestar.ValidatePattern(glob[:i]) {
278                                 // Escaped "/", or "/" in a character
279                                 // class, is not a path separator.
280                                 continue
281                         }
282                         if glob[lastsep:i] == "**" {
283                                 return true
284                         }
285                         lastsep = i + 1
286                         if globdepth++; globdepth == pathdepth {
287                                 if match, _ := doublestar.Match(glob[:i]+"/*", path+"/z"); match {
288                                         return true
289                                 }
290                                 break
291                         }
292                 }
293                 if globdepth < pathdepth && glob[lastsep:] == "**" {
294                         return true
295                 }
296         }
297         return false
298 }
299
300 func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
301         cp.logger.Printf("copying %q (%d bytes)", strings.TrimLeft(f.dst, "/"), f.size)
302         dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
303         if err != nil {
304                 return 0, err
305         }
306         src, err := os.Open(f.src)
307         if err != nil {
308                 dst.Close()
309                 return 0, err
310         }
311         defer src.Close()
312         n, err := io.Copy(dst, src)
313         if err != nil {
314                 dst.Close()
315                 return n, err
316         }
317         return n, dst.Close()
318 }
319
320 // Add to cp.staged, cp.files, and cp.dirs so as to copy src (an
321 // absolute path in the container's filesystem) to dest (an absolute
322 // path in the output collection, or "" for output root).
323 //
324 // src must be (or be a descendant of) a readonly "collection" mount,
325 // a writable collection mounted at ctrOutputPath, or a "tmp" mount.
326 //
327 // If walkMountsBelow is true, include contents of any collection
328 // mounted below src as well.
329 func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
330         // srcRoot, srcMount indicate the innermost mount that
331         // contains src.
332         var srcRoot string
333         var srcMount arvados.Mount
334         for root, mnt := range cp.mounts {
335                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
336                         srcRoot, srcMount = root, mnt
337                 }
338         }
339         for root := range cp.secretMounts {
340                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
341                         // Silently omit secrets, and symlinks to
342                         // secrets.
343                         return nil
344                 }
345         }
346         if srcRoot == "" {
347                 return fmt.Errorf("cannot output file %q: not in any mount", src)
348         }
349
350         // srcRelPath is the path to the file/dir we are trying to
351         // copy, relative to its mount point -- ".", "./foo.txt", ...
352         srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
353
354         // outputRelPath is the destination path relative to the
355         // output directory. Used for logging and glob matching.
356         var outputRelPath = ""
357         if strings.HasPrefix(src, cp.ctrOutputDir) {
358                 outputRelPath = strings.TrimPrefix(src[len(cp.ctrOutputDir):], "/")
359         }
360         if outputRelPath == "" {
361                 // blank means copy a whole directory, so replace it
362                 // with a wildcard to make it a little clearer what's
363                 // going on since outputRelPath is only used for logging
364                 outputRelPath = "*"
365         }
366
367         switch {
368         case srcMount.ExcludeFromOutput:
369         case outputRelPath != "*" && !cp.subtreeCouldMatch(outputRelPath):
370                 cp.logger.Printf("not copying %q because contents cannot match output globs", outputRelPath)
371                 return nil
372         case srcMount.Kind == "tmp":
373                 // Handle by walking the host filesystem.
374                 return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
375         case srcMount.Kind != "collection":
376                 return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
377         case !srcMount.Writable:
378                 cp.logger.Printf("copying %q from %v/%v", outputRelPath, srcMount.PortableDataHash, strings.TrimPrefix(srcRelPath, "./"))
379                 mft, err := cp.getManifest(srcMount.PortableDataHash)
380                 if err != nil {
381                         return err
382                 }
383                 err = cp.copyFromCollection(dest, &arvados.Collection{ManifestText: mft}, srcRelPath)
384                 if err != nil {
385                         return err
386                 }
387         default:
388                 cp.logger.Printf("copying %q", outputRelPath)
389                 hostRoot, err := cp.hostRoot(srcRoot)
390                 if err != nil {
391                         return err
392                 }
393                 f, err := os.Open(filepath.Join(hostRoot, ".arvados#collection"))
394                 if err != nil {
395                         return err
396                 }
397                 defer f.Close()
398                 var coll arvados.Collection
399                 err = json.NewDecoder(f).Decode(&coll)
400                 if err != nil {
401                         return err
402                 }
403                 err = cp.copyFromCollection(dest, &coll, srcRelPath)
404                 if err != nil {
405                         return err
406                 }
407         }
408         cp.tmpfs = nil
409         cp.tmpfsManifestText = ""
410         if walkMountsBelow {
411                 return cp.walkMountsBelow(dest, src)
412         }
413         return nil
414 }
415
416 func (cp *copier) copyFromCollection(dest string, coll *arvados.Collection, srcRelPath string) error {
417         if coll.ManifestText == "" || coll.ManifestText != cp.tmpfsManifestText {
418                 tmpfs, err := coll.FileSystem(cp.client, cp.keepClient)
419                 if err != nil {
420                         return err
421                 }
422                 cp.tmpfs = tmpfs
423                 cp.tmpfsManifestText = coll.ManifestText
424         }
425         snap, err := arvados.Snapshot(cp.tmpfs, srcRelPath)
426         if err != nil {
427                 return err
428         }
429         // Create ancestors of dest, if necessary.
430         for i, c := range dest {
431                 if i > 0 && c == '/' {
432                         err = cp.staged.Mkdir(dest[:i], 0777)
433                         if err != nil && !os.IsExist(err) {
434                                 return err
435                         }
436                 }
437         }
438         return arvados.Splice(cp.staged, dest, snap)
439 }
440
441 func (cp *copier) walkMountsBelow(dest, src string) error {
442         for mnt, mntinfo := range cp.mounts {
443                 if !strings.HasPrefix(mnt, src+"/") {
444                         continue
445                 }
446                 if cp.copyRegularFiles(mntinfo) {
447                         // These got copied into the nearest parent
448                         // mount as regular files during setup, so
449                         // they get copied as regular files when we
450                         // process the parent. Output will reflect any
451                         // changes and deletions done by the
452                         // container.
453                         continue
454                 }
455                 // Example: we are processing dest=/foo src=/mnt1/dir1
456                 // (perhaps we followed a symlink /outdir/foo ->
457                 // /mnt1/dir1). Caller has already processed the
458                 // collection mounted at /mnt1, but now we find that
459                 // /mnt1/dir1/mnt2 is also a mount, so we need to copy
460                 // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
461                 //
462                 // We handle all descendants of /mnt1/dir1 in this
463                 // loop instead of using recursion:
464                 // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
465                 // /mnt1/dir1/mnt2, but we only want to walk it
466                 // once. (This simplification is safe because mounted
467                 // collections cannot contain symlinks.)
468                 err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
469                 if err != nil {
470                         return err
471                 }
472         }
473         return nil
474 }
475
476 // Add entries to cp.dirs and cp.files so as to copy src (an absolute
477 // path in the container's filesystem which corresponds to a real file
478 // or directory in cp.hostOutputDir) to dest (an absolute path in the
479 // output collection, or "" for output root).
480 //
481 // Always follow symlinks.
482 //
483 // If includeMounts is true, include mounts at and below src.
484 // Otherwise, skip them.
485 func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
486         if includeMounts {
487                 err := cp.walkMountsBelow(dest, src)
488                 if err != nil {
489                         return err
490                 }
491         }
492
493         hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
494
495         // If src is a symlink, walk its target.
496         fi, err := os.Lstat(hostsrc)
497         if err != nil {
498                 return fmt.Errorf("lstat %q: %s", src, err)
499         }
500         if fi.Mode()&os.ModeSymlink != 0 {
501                 if maxSymlinks < 0 {
502                         return errTooManySymlinks
503                 }
504                 target, err := os.Readlink(hostsrc)
505                 if err != nil {
506                         return fmt.Errorf("readlink %q: %s", src, err)
507                 }
508                 if !strings.HasPrefix(target, "/") {
509                         target = filepath.Join(filepath.Dir(src), target)
510                 }
511                 return cp.walkMount(dest, target, maxSymlinks-1, true)
512         }
513
514         // If src is a regular directory, append it to cp.dirs and
515         // walk each of its children. (If there are no children,
516         // create an empty file "dest/.keep".)
517         if fi.Mode().IsDir() {
518                 if dest != "" {
519                         cp.dirs = append(cp.dirs, dest)
520                 }
521                 dir, err := os.Open(hostsrc)
522                 if err != nil {
523                         return fmt.Errorf("open %q: %s", src, err)
524                 }
525                 names, err := dir.Readdirnames(-1)
526                 dir.Close()
527                 if err != nil {
528                         return fmt.Errorf("readdirnames %q: %s", src, err)
529                 }
530                 if len(names) == 0 {
531                         if dest != "" {
532                                 cp.files = append(cp.files, filetodo{
533                                         src: os.DevNull,
534                                         dst: dest + "/.keep",
535                                 })
536                         }
537                         return nil
538                 }
539                 sort.Strings(names)
540                 for _, name := range names {
541                         dest, src := dest+"/"+name, src+"/"+name
542                         if _, isSecret := cp.secretMounts[src]; isSecret {
543                                 continue
544                         }
545                         if mntinfo, isMount := cp.mounts[src]; isMount && !cp.copyRegularFiles(mntinfo) {
546                                 // If a regular file/dir somehow
547                                 // exists at a path that's also a
548                                 // mount target, ignore the file --
549                                 // the mount has already been included
550                                 // with walkMountsBelow().
551                                 //
552                                 // (...except mount types that are
553                                 // handled as regular files.)
554                                 continue
555                         } else if isMount && !cp.subtreeCouldMatch(src[len(cp.ctrOutputDir)+1:]) {
556                                 continue
557                         }
558                         err = cp.walkHostFS(dest, src, maxSymlinks, false)
559                         if err != nil {
560                                 return err
561                         }
562                 }
563                 return nil
564         }
565
566         // If src is a regular file, append it to cp.files.
567         if fi.Mode().IsRegular() {
568                 cp.files = append(cp.files, filetodo{
569                         src:  hostsrc,
570                         dst:  dest,
571                         size: fi.Size(),
572                 })
573                 return nil
574         }
575         cp.logger.Printf("Skipping unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
576         return nil
577 }
578
579 // Return the host path that was mounted at the given path in the
580 // container.
581 func (cp *copier) hostRoot(ctrRoot string) (string, error) {
582         if ctrRoot == cp.ctrOutputDir {
583                 return cp.hostOutputDir, nil
584         }
585         if mnt, ok := cp.bindmounts[ctrRoot]; ok {
586                 return mnt.HostPath, nil
587         }
588         return "", fmt.Errorf("not bind-mounted: %q", ctrRoot)
589 }
590
591 func (cp *copier) copyRegularFiles(m arvados.Mount) bool {
592         return m.Kind == "text" || m.Kind == "json" || (m.Kind == "collection" && m.Writable)
593 }
594
595 func (cp *copier) getManifest(pdh string) (string, error) {
596         if mft, ok := cp.manifestCache[pdh]; ok {
597                 return mft, nil
598         }
599         var coll arvados.Collection
600         err := cp.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, nil)
601         if err != nil {
602                 return "", fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
603         }
604         if cp.manifestCache == nil {
605                 cp.manifestCache = make(map[string]string)
606         }
607         cp.manifestCache[pdh] = coll.ManifestText
608         return coll.ManifestText, nil
609 }