21891: Use collectionfs Snapshot/Splice instead of Extract.
[arvados.git] / lib / crunchrun / copier.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "encoding/json"
9         "errors"
10         "fmt"
11         "io"
12         "io/fs"
13         "os"
14         "path/filepath"
15         "sort"
16         "strings"
17
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "git.arvados.org/arvados.git/sdk/go/keepclient"
20         "github.com/bmatcuk/doublestar/v4"
21 )
22
23 type printfer interface {
24         Printf(string, ...interface{})
25 }
26
27 var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
28
29 const limitFollowSymlinks = 10
30
31 type filetodo struct {
32         src  string
33         dst  string
34         size int64
35 }
36
37 // copier copies data from a finished container's output path to a new
38 // Arvados collection.
39 //
40 // Regular files (and symlinks to regular files) in hostOutputDir are
41 // copied from the local filesystem.
42 //
43 // Symlinks to mounted collections, and any collections mounted under
44 // ctrOutputDir, are copied by transforming the relevant parts of the
45 // existing manifests, without moving any data around.
46 //
47 // Symlinks to other parts of the container's filesystem result in
48 // errors.
49 //
50 // Use:
51 //
52 //      manifest, err := (&copier{...}).Copy()
53 type copier struct {
54         client        *arvados.Client
55         keepClient    IKeepClient
56         hostOutputDir string
57         ctrOutputDir  string
58         globs         []string
59         bindmounts    map[string]bindmount
60         mounts        map[string]arvados.Mount
61         secretMounts  map[string]arvados.Mount
62         logger        printfer
63
64         dirs     []string
65         files    []filetodo
66         manifest string
67
68         manifestCache map[string]string
69 }
70
71 // Copy copies data as needed, and returns a new manifest.
72 func (cp *copier) Copy() (string, error) {
73         err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
74         if err != nil {
75                 return "", fmt.Errorf("error scanning files to copy to output: %v", err)
76         }
77         collfs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient)
78         if err != nil {
79                 return "", fmt.Errorf("error creating Collection.FileSystem: %v", err)
80         }
81
82         // Remove files/dirs that don't match globs (the ones that
83         // were added during cp.walkMount() by copying subtree
84         // manifests into cp.manifest).
85         err = cp.applyGlobsToCollectionFS(collfs)
86         if err != nil {
87                 return "", fmt.Errorf("error while removing non-matching files from output collection: %w", err)
88         }
89         // Remove files/dirs that don't match globs (the ones that are
90         // stored on the local filesystem and would need to be copied
91         // in copyFile() below).
92         cp.applyGlobsToFilesAndDirs()
93         for _, d := range cp.dirs {
94                 err = collfs.Mkdir(d, 0777)
95                 if err != nil && err != os.ErrExist {
96                         return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
97                 }
98         }
99
100         var unflushed int64
101         var lastparentdir string
102         for _, f := range cp.files {
103                 // If a dir has just had its last file added, do a
104                 // full Flush. Otherwise, do a partial Flush (write
105                 // full-size blocks, but leave the last short block
106                 // open so f's data can be packed with it).
107                 dir, _ := filepath.Split(f.dst)
108                 if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE {
109                         if err := collfs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
110                                 return "", fmt.Errorf("error flushing output collection file data: %v", err)
111                         }
112                         unflushed = 0
113                 }
114                 lastparentdir = dir
115
116                 n, err := cp.copyFile(collfs, f)
117                 if err != nil {
118                         return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
119                 }
120                 unflushed += n
121         }
122         return collfs.MarshalManifest(".")
123 }
124
125 func (cp *copier) matchGlobs(path string, isDir bool) bool {
126         // An entry in the top level of the output directory looks
127         // like "/foo", but globs look like "foo", so we strip the
128         // leading "/" before matching.
129         path = strings.TrimLeft(path, "/")
130         for _, glob := range cp.globs {
131                 if !isDir && strings.HasSuffix(glob, "/**") {
132                         // doublestar.Match("f*/**", "ff") and
133                         // doublestar.Match("f*/**", "ff/gg") both
134                         // return true, but (to be compatible with
135                         // bash shopt) "ff" should match only if it is
136                         // a directory.
137                         //
138                         // To avoid errant matches, we add the file's
139                         // basename to the end of the pattern:
140                         //
141                         // Match("f*/**/ff", "ff") => false
142                         // Match("f*/**/gg", "ff/gg") => true
143                         //
144                         // Of course, we need to escape basename in
145                         // case it contains *, ?, \, etc.
146                         _, name := filepath.Split(path)
147                         escapedName := strings.TrimSuffix(strings.Replace(name, "", "\\", -1), "\\")
148                         if match, _ := doublestar.Match(glob+"/"+escapedName, path); match {
149                                 return true
150                         }
151                 } else if match, _ := doublestar.Match(glob, path); match {
152                         return true
153                 } else if isDir {
154                         // Workaround doublestar bug (v4.6.1).
155                         // "foo*/**" should match "foo", but does not,
156                         // because isZeroLengthPattern does not accept
157                         // "*/**" as a zero length pattern.
158                         if trunc := strings.TrimSuffix(glob, "*/**"); trunc != glob {
159                                 if match, _ := doublestar.Match(trunc, path); match {
160                                         return true
161                                 }
162                         }
163                 }
164         }
165         return false
166 }
167
168 // Delete entries from cp.files that do not match cp.globs.
169 //
170 // Delete entries from cp.dirs that do not match cp.globs.
171 //
172 // Ensure parent/ancestor directories of remaining cp.files and
173 // cp.dirs entries are still present in cp.dirs, even if they do not
174 // match cp.globs themselves.
175 func (cp *copier) applyGlobsToFilesAndDirs() {
176         if len(cp.globs) == 0 {
177                 return
178         }
179         keepdirs := make(map[string]bool)
180         for _, path := range cp.dirs {
181                 if cp.matchGlobs(path, true) {
182                         keepdirs[path] = true
183                 }
184         }
185         for path := range keepdirs {
186                 for i, c := range path {
187                         if i > 0 && c == '/' {
188                                 keepdirs[path[:i]] = true
189                         }
190                 }
191         }
192         var keepfiles []filetodo
193         for _, file := range cp.files {
194                 if cp.matchGlobs(file.dst, false) {
195                         keepfiles = append(keepfiles, file)
196                 }
197         }
198         for _, file := range keepfiles {
199                 for i, c := range file.dst {
200                         if i > 0 && c == '/' {
201                                 keepdirs[file.dst[:i]] = true
202                         }
203                 }
204         }
205         cp.dirs = nil
206         for path := range keepdirs {
207                 cp.dirs = append(cp.dirs, path)
208         }
209         sort.Strings(cp.dirs)
210         cp.files = keepfiles
211 }
212
213 // Delete files in collfs that do not match cp.globs.  Also delete
214 // directories that are empty (after deleting non-matching files) and
215 // do not match cp.globs themselves.
216 func (cp *copier) applyGlobsToCollectionFS(collfs arvados.CollectionFileSystem) error {
217         if len(cp.globs) == 0 {
218                 return nil
219         }
220         include := make(map[string]bool)
221         err := fs.WalkDir(arvados.FS(collfs), "", func(path string, ent fs.DirEntry, err error) error {
222                 if cp.matchGlobs(path, ent.IsDir()) {
223                         for i, c := range path {
224                                 if i > 0 && c == '/' {
225                                         include[path[:i]] = true
226                                 }
227                         }
228                         include[path] = true
229                 }
230                 return nil
231         })
232         if err != nil {
233                 return err
234         }
235         err = fs.WalkDir(arvados.FS(collfs), "", func(path string, ent fs.DirEntry, err error) error {
236                 if err != nil || path == "" {
237                         return err
238                 }
239                 if !include[path] {
240                         err := collfs.RemoveAll(path)
241                         if err != nil {
242                                 return err
243                         }
244                         if ent.IsDir() {
245                                 return fs.SkipDir
246                         }
247                 }
248                 return nil
249         })
250         return err
251 }
252
253 // Return true if it's possible for any descendant of the given path
254 // to match anything in cp.globs.  Used by walkMount to avoid loading
255 // collections that are mounted underneath ctrOutputPath but excluded
256 // by globs.
257 func (cp *copier) subtreeCouldMatch(path string) bool {
258         if len(cp.globs) == 0 {
259                 return true
260         }
261         pathdepth := 1 + strings.Count(path, "/")
262         for _, glob := range cp.globs {
263                 globdepth := 0
264                 lastsep := 0
265                 for i, c := range glob {
266                         if c != '/' || !doublestar.ValidatePattern(glob[:i]) {
267                                 // Escaped "/", or "/" in a character
268                                 // class, is not a path separator.
269                                 continue
270                         }
271                         if glob[lastsep:i] == "**" {
272                                 return true
273                         }
274                         lastsep = i + 1
275                         if globdepth++; globdepth == pathdepth {
276                                 if match, _ := doublestar.Match(glob[:i]+"/*", path+"/z"); match {
277                                         return true
278                                 }
279                                 break
280                         }
281                 }
282                 if globdepth < pathdepth && glob[lastsep:] == "**" {
283                         return true
284                 }
285         }
286         return false
287 }
288
289 func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
290         cp.logger.Printf("copying %q (%d bytes)", strings.TrimLeft(f.dst, "/"), f.size)
291         dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
292         if err != nil {
293                 return 0, err
294         }
295         src, err := os.Open(f.src)
296         if err != nil {
297                 dst.Close()
298                 return 0, err
299         }
300         defer src.Close()
301         n, err := io.Copy(dst, src)
302         if err != nil {
303                 dst.Close()
304                 return n, err
305         }
306         return n, dst.Close()
307 }
308
309 // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an
310 // absolute path in the container's filesystem) to dest (an absolute
311 // path in the output collection, or "" for output root).
312 //
313 // src must be (or be a descendant of) a readonly "collection" mount,
314 // a writable collection mounted at ctrOutputPath, or a "tmp" mount.
315 //
316 // If walkMountsBelow is true, include contents of any collection
317 // mounted below src as well.
318 func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
319         // srcRoot, srcMount indicate the innermost mount that
320         // contains src.
321         var srcRoot string
322         var srcMount arvados.Mount
323         for root, mnt := range cp.mounts {
324                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
325                         srcRoot, srcMount = root, mnt
326                 }
327         }
328         for root := range cp.secretMounts {
329                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
330                         // Silently omit secrets, and symlinks to
331                         // secrets.
332                         return nil
333                 }
334         }
335         if srcRoot == "" {
336                 return fmt.Errorf("cannot output file %q: not in any mount", src)
337         }
338
339         // srcRelPath is the path to the file/dir we are trying to
340         // copy, relative to its mount point -- ".", "./foo.txt", ...
341         srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
342
343         // outputRelPath is the destination path relative to the
344         // output directory. Used for logging and glob matching.
345         var outputRelPath = ""
346         if strings.HasPrefix(src, cp.ctrOutputDir) {
347                 outputRelPath = strings.TrimPrefix(src[len(cp.ctrOutputDir):], "/")
348         }
349         if outputRelPath == "" {
350                 // blank means copy a whole directory, so replace it
351                 // with a wildcard to make it a little clearer what's
352                 // going on since outputRelPath is only used for logging
353                 outputRelPath = "*"
354         }
355
356         switch {
357         case srcMount.ExcludeFromOutput:
358         case outputRelPath != "*" && !cp.subtreeCouldMatch(outputRelPath):
359                 cp.logger.Printf("not copying %q because contents cannot match output globs", outputRelPath)
360                 return nil
361         case srcMount.Kind == "tmp":
362                 // Handle by walking the host filesystem.
363                 return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
364         case srcMount.Kind != "collection":
365                 return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
366         case !srcMount.Writable:
367                 cp.logger.Printf("copying %q from %v/%v", outputRelPath, srcMount.PortableDataHash, strings.TrimPrefix(srcRelPath, "./"))
368                 mft, err := cp.getManifest(srcMount.PortableDataHash)
369                 if err != nil {
370                         return err
371                 }
372                 err = cp.copyFromCollection(dest, &arvados.Collection{ManifestText: mft}, srcRelPath)
373                 if err != nil {
374                         return err
375                 }
376         default:
377                 cp.logger.Printf("copying %q", outputRelPath)
378                 hostRoot, err := cp.hostRoot(srcRoot)
379                 if err != nil {
380                         return err
381                 }
382                 f, err := os.Open(filepath.Join(hostRoot, ".arvados#collection"))
383                 if err != nil {
384                         return err
385                 }
386                 defer f.Close()
387                 var coll arvados.Collection
388                 err = json.NewDecoder(f).Decode(&coll)
389                 if err != nil {
390                         return err
391                 }
392                 err = cp.copyFromCollection(dest, &coll, srcRelPath)
393                 if err != nil {
394                         return err
395                 }
396         }
397         if walkMountsBelow {
398                 return cp.walkMountsBelow(dest, src)
399         }
400         return nil
401 }
402
403 func (cp *copier) copyFromCollection(dest string, coll *arvados.Collection, srcRelPath string) error {
404         tmpfs, err := coll.FileSystem(cp.client, cp.keepClient)
405         if err != nil {
406                 return err
407         }
408         snap, err := arvados.Snapshot(tmpfs, srcRelPath)
409         if err != nil {
410                 return err
411         }
412         tmpfs, err = (&arvados.Collection{}).FileSystem(cp.client, cp.keepClient)
413         if err != nil {
414                 return err
415         }
416         // Create ancestors of dest, if necessary.
417         for i, c := range dest {
418                 if i > 0 && c == '/' {
419                         err = tmpfs.Mkdir(dest[:i], 0777)
420                         if err != nil && !os.IsExist(err) {
421                                 return err
422                         }
423                 }
424         }
425         err = arvados.Splice(tmpfs, dest, snap)
426         if err != nil {
427                 return err
428         }
429         mtxt, err := tmpfs.MarshalManifest(".")
430         if err != nil {
431                 return err
432         }
433         cp.manifest += mtxt
434         return nil
435 }
436
437 func (cp *copier) walkMountsBelow(dest, src string) error {
438         for mnt, mntinfo := range cp.mounts {
439                 if !strings.HasPrefix(mnt, src+"/") {
440                         continue
441                 }
442                 if cp.copyRegularFiles(mntinfo) {
443                         // These got copied into the nearest parent
444                         // mount as regular files during setup, so
445                         // they get copied as regular files when we
446                         // process the parent. Output will reflect any
447                         // changes and deletions done by the
448                         // container.
449                         continue
450                 }
451                 // Example: we are processing dest=/foo src=/mnt1/dir1
452                 // (perhaps we followed a symlink /outdir/foo ->
453                 // /mnt1/dir1). Caller has already processed the
454                 // collection mounted at /mnt1, but now we find that
455                 // /mnt1/dir1/mnt2 is also a mount, so we need to copy
456                 // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
457                 //
458                 // We handle all descendants of /mnt1/dir1 in this
459                 // loop instead of using recursion:
460                 // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
461                 // /mnt1/dir1/mnt2, but we only want to walk it
462                 // once. (This simplification is safe because mounted
463                 // collections cannot contain symlinks.)
464                 err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
465                 if err != nil {
466                         return err
467                 }
468         }
469         return nil
470 }
471
472 // Add entries to cp.dirs and cp.files so as to copy src (an absolute
473 // path in the container's filesystem which corresponds to a real file
474 // or directory in cp.hostOutputDir) to dest (an absolute path in the
475 // output collection, or "" for output root).
476 //
477 // Always follow symlinks.
478 //
479 // If includeMounts is true, include mounts at and below src.
480 // Otherwise, skip them.
481 func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
482         if includeMounts {
483                 err := cp.walkMountsBelow(dest, src)
484                 if err != nil {
485                         return err
486                 }
487         }
488
489         hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
490
491         // If src is a symlink, walk its target.
492         fi, err := os.Lstat(hostsrc)
493         if err != nil {
494                 return fmt.Errorf("lstat %q: %s", src, err)
495         }
496         if fi.Mode()&os.ModeSymlink != 0 {
497                 if maxSymlinks < 0 {
498                         return errTooManySymlinks
499                 }
500                 target, err := os.Readlink(hostsrc)
501                 if err != nil {
502                         return fmt.Errorf("readlink %q: %s", src, err)
503                 }
504                 if !strings.HasPrefix(target, "/") {
505                         target = filepath.Join(filepath.Dir(src), target)
506                 }
507                 return cp.walkMount(dest, target, maxSymlinks-1, true)
508         }
509
510         // If src is a regular directory, append it to cp.dirs and
511         // walk each of its children. (If there are no children,
512         // create an empty file "dest/.keep".)
513         if fi.Mode().IsDir() {
514                 if dest != "" {
515                         cp.dirs = append(cp.dirs, dest)
516                 }
517                 dir, err := os.Open(hostsrc)
518                 if err != nil {
519                         return fmt.Errorf("open %q: %s", src, err)
520                 }
521                 names, err := dir.Readdirnames(-1)
522                 dir.Close()
523                 if err != nil {
524                         return fmt.Errorf("readdirnames %q: %s", src, err)
525                 }
526                 if len(names) == 0 {
527                         if dest != "" {
528                                 cp.files = append(cp.files, filetodo{
529                                         src: os.DevNull,
530                                         dst: dest + "/.keep",
531                                 })
532                         }
533                         return nil
534                 }
535                 sort.Strings(names)
536                 for _, name := range names {
537                         dest, src := dest+"/"+name, src+"/"+name
538                         if _, isSecret := cp.secretMounts[src]; isSecret {
539                                 continue
540                         }
541                         if mntinfo, isMount := cp.mounts[src]; isMount && !cp.copyRegularFiles(mntinfo) {
542                                 // If a regular file/dir somehow
543                                 // exists at a path that's also a
544                                 // mount target, ignore the file --
545                                 // the mount has already been included
546                                 // with walkMountsBelow().
547                                 //
548                                 // (...except mount types that are
549                                 // handled as regular files.)
550                                 continue
551                         } else if isMount && !cp.subtreeCouldMatch(src[len(cp.ctrOutputDir)+1:]) {
552                                 continue
553                         }
554                         err = cp.walkHostFS(dest, src, maxSymlinks, false)
555                         if err != nil {
556                                 return err
557                         }
558                 }
559                 return nil
560         }
561
562         // If src is a regular file, append it to cp.files.
563         if fi.Mode().IsRegular() {
564                 cp.files = append(cp.files, filetodo{
565                         src:  hostsrc,
566                         dst:  dest,
567                         size: fi.Size(),
568                 })
569                 return nil
570         }
571         cp.logger.Printf("Skipping unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
572         return nil
573 }
574
575 // Return the host path that was mounted at the given path in the
576 // container.
577 func (cp *copier) hostRoot(ctrRoot string) (string, error) {
578         if ctrRoot == cp.ctrOutputDir {
579                 return cp.hostOutputDir, nil
580         }
581         if mnt, ok := cp.bindmounts[ctrRoot]; ok {
582                 return mnt.HostPath, nil
583         }
584         return "", fmt.Errorf("not bind-mounted: %q", ctrRoot)
585 }
586
587 func (cp *copier) copyRegularFiles(m arvados.Mount) bool {
588         return m.Kind == "text" || m.Kind == "json" || (m.Kind == "collection" && m.Writable)
589 }
590
591 func (cp *copier) getManifest(pdh string) (string, error) {
592         if mft, ok := cp.manifestCache[pdh]; ok {
593                 return mft, nil
594         }
595         var coll arvados.Collection
596         err := cp.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, nil)
597         if err != nil {
598                 return "", fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
599         }
600         if cp.manifestCache == nil {
601                 cp.manifestCache = make(map[string]string)
602         }
603         cp.manifestCache[pdh] = coll.ManifestText
604         return coll.ManifestText, nil
605 }