21891: Skip extra serialization step.
[arvados.git] / lib / crunchrun / copier.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "encoding/json"
9         "errors"
10         "fmt"
11         "io"
12         "io/fs"
13         "os"
14         "path/filepath"
15         "sort"
16         "strings"
17
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "git.arvados.org/arvados.git/sdk/go/keepclient"
20         "github.com/bmatcuk/doublestar/v4"
21 )
22
23 type printfer interface {
24         Printf(string, ...interface{})
25 }
26
27 var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
28
29 const limitFollowSymlinks = 10
30
31 type filetodo struct {
32         src  string
33         dst  string
34         size int64
35 }
36
37 // copier copies data from a finished container's output path to a new
38 // Arvados collection.
39 //
40 // Regular files (and symlinks to regular files) in hostOutputDir are
41 // copied from the local filesystem.
42 //
43 // Symlinks to mounted collections, and any collections mounted under
44 // ctrOutputDir, are copied by reference, without moving any data
45 // around.
46 //
47 // Symlinks to other parts of the container's filesystem result in
48 // errors.
49 //
50 // Use:
51 //
52 //      manifest, err := (&copier{...}).Copy()
53 type copier struct {
54         client        *arvados.Client
55         keepClient    IKeepClient
56         hostOutputDir string
57         ctrOutputDir  string
58         globs         []string
59         bindmounts    map[string]bindmount
60         mounts        map[string]arvados.Mount
61         secretMounts  map[string]arvados.Mount
62         logger        printfer
63
64         dirs   []string
65         files  []filetodo
66         staged arvados.CollectionFileSystem
67
68         manifestCache map[string]string
69 }
70
71 // Copy copies data as needed, and returns a new manifest.
72 //
73 // Copy should not be called more than once.
74 func (cp *copier) Copy() (string, error) {
75         var err error
76         cp.staged, err = (&arvados.Collection{}).FileSystem(cp.client, cp.keepClient)
77         if err != nil {
78                 return "", fmt.Errorf("error creating Collection.FileSystem: %v", err)
79         }
80         err = cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
81         if err != nil {
82                 return "", fmt.Errorf("error scanning files to copy to output: %v", err)
83         }
84
85         // Remove files/dirs that don't match globs (the files/dirs
86         // that were added during cp.walkMount() by copying subtree
87         // manifests into cp.staged).
88         err = cp.applyGlobsToStaged()
89         if err != nil {
90                 return "", fmt.Errorf("error while removing non-matching files from output collection: %w", err)
91         }
92         // Remove files/dirs that don't match globs (the files/dirs
93         // that are stored on the local filesystem and would need to
94         // be copied in copyFile() below).
95         cp.applyGlobsToFilesAndDirs()
96         for _, d := range cp.dirs {
97                 err = cp.staged.Mkdir(d, 0777)
98                 if err != nil && err != os.ErrExist {
99                         return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
100                 }
101         }
102
103         var unflushed int64
104         var lastparentdir string
105         for _, f := range cp.files {
106                 // If a dir has just had its last file added, do a
107                 // full Flush. Otherwise, do a partial Flush (write
108                 // full-size blocks, but leave the last short block
109                 // open so f's data can be packed with it).
110                 dir, _ := filepath.Split(f.dst)
111                 if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE {
112                         if err := cp.staged.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
113                                 return "", fmt.Errorf("error flushing output collection file data: %v", err)
114                         }
115                         unflushed = 0
116                 }
117                 lastparentdir = dir
118
119                 n, err := cp.copyFile(cp.staged, f)
120                 if err != nil {
121                         return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
122                 }
123                 unflushed += n
124         }
125         return cp.staged.MarshalManifest(".")
126 }
127
128 func (cp *copier) matchGlobs(path string, isDir bool) bool {
129         // An entry in the top level of the output directory looks
130         // like "/foo", but globs look like "foo", so we strip the
131         // leading "/" before matching.
132         path = strings.TrimLeft(path, "/")
133         for _, glob := range cp.globs {
134                 if !isDir && strings.HasSuffix(glob, "/**") {
135                         // doublestar.Match("f*/**", "ff") and
136                         // doublestar.Match("f*/**", "ff/gg") both
137                         // return true, but (to be compatible with
138                         // bash shopt) "ff" should match only if it is
139                         // a directory.
140                         //
141                         // To avoid errant matches, we add the file's
142                         // basename to the end of the pattern:
143                         //
144                         // Match("f*/**/ff", "ff") => false
145                         // Match("f*/**/gg", "ff/gg") => true
146                         //
147                         // Of course, we need to escape basename in
148                         // case it contains *, ?, \, etc.
149                         _, name := filepath.Split(path)
150                         escapedName := strings.TrimSuffix(strings.Replace(name, "", "\\", -1), "\\")
151                         if match, _ := doublestar.Match(glob+"/"+escapedName, path); match {
152                                 return true
153                         }
154                 } else if match, _ := doublestar.Match(glob, path); match {
155                         return true
156                 } else if isDir {
157                         // Workaround doublestar bug (v4.6.1).
158                         // "foo*/**" should match "foo", but does not,
159                         // because isZeroLengthPattern does not accept
160                         // "*/**" as a zero length pattern.
161                         if trunc := strings.TrimSuffix(glob, "*/**"); trunc != glob {
162                                 if match, _ := doublestar.Match(trunc, path); match {
163                                         return true
164                                 }
165                         }
166                 }
167         }
168         return false
169 }
170
171 // Delete entries from cp.files that do not match cp.globs.
172 //
173 // Delete entries from cp.dirs that do not match cp.globs.
174 //
175 // Ensure parent/ancestor directories of remaining cp.files and
176 // cp.dirs entries are still present in cp.dirs, even if they do not
177 // match cp.globs themselves.
178 func (cp *copier) applyGlobsToFilesAndDirs() {
179         if len(cp.globs) == 0 {
180                 return
181         }
182         keepdirs := make(map[string]bool)
183         for _, path := range cp.dirs {
184                 if cp.matchGlobs(path, true) {
185                         keepdirs[path] = true
186                 }
187         }
188         for path := range keepdirs {
189                 for i, c := range path {
190                         if i > 0 && c == '/' {
191                                 keepdirs[path[:i]] = true
192                         }
193                 }
194         }
195         var keepfiles []filetodo
196         for _, file := range cp.files {
197                 if cp.matchGlobs(file.dst, false) {
198                         keepfiles = append(keepfiles, file)
199                 }
200         }
201         for _, file := range keepfiles {
202                 for i, c := range file.dst {
203                         if i > 0 && c == '/' {
204                                 keepdirs[file.dst[:i]] = true
205                         }
206                 }
207         }
208         cp.dirs = nil
209         for path := range keepdirs {
210                 cp.dirs = append(cp.dirs, path)
211         }
212         sort.Strings(cp.dirs)
213         cp.files = keepfiles
214 }
215
216 // Delete files in cp.staged that do not match cp.globs.  Also delete
217 // directories that are empty (after deleting non-matching files) and
218 // do not match cp.globs themselves.
219 func (cp *copier) applyGlobsToStaged() error {
220         if len(cp.globs) == 0 {
221                 return nil
222         }
223         include := make(map[string]bool)
224         err := fs.WalkDir(arvados.FS(cp.staged), "", func(path string, ent fs.DirEntry, err error) error {
225                 if cp.matchGlobs(path, ent.IsDir()) {
226                         for i, c := range path {
227                                 if i > 0 && c == '/' {
228                                         include[path[:i]] = true
229                                 }
230                         }
231                         include[path] = true
232                 }
233                 return nil
234         })
235         if err != nil {
236                 return err
237         }
238         err = fs.WalkDir(arvados.FS(cp.staged), "", func(path string, ent fs.DirEntry, err error) error {
239                 if err != nil || path == "" {
240                         return err
241                 }
242                 if !include[path] {
243                         err := cp.staged.RemoveAll(path)
244                         if err != nil {
245                                 return err
246                         }
247                         if ent.IsDir() {
248                                 return fs.SkipDir
249                         }
250                 }
251                 return nil
252         })
253         return err
254 }
255
256 // Return true if it's possible for any descendant of the given path
257 // to match anything in cp.globs.  Used by walkMount to avoid loading
258 // collections that are mounted underneath ctrOutputPath but excluded
259 // by globs.
260 func (cp *copier) subtreeCouldMatch(path string) bool {
261         if len(cp.globs) == 0 {
262                 return true
263         }
264         pathdepth := 1 + strings.Count(path, "/")
265         for _, glob := range cp.globs {
266                 globdepth := 0
267                 lastsep := 0
268                 for i, c := range glob {
269                         if c != '/' || !doublestar.ValidatePattern(glob[:i]) {
270                                 // Escaped "/", or "/" in a character
271                                 // class, is not a path separator.
272                                 continue
273                         }
274                         if glob[lastsep:i] == "**" {
275                                 return true
276                         }
277                         lastsep = i + 1
278                         if globdepth++; globdepth == pathdepth {
279                                 if match, _ := doublestar.Match(glob[:i]+"/*", path+"/z"); match {
280                                         return true
281                                 }
282                                 break
283                         }
284                 }
285                 if globdepth < pathdepth && glob[lastsep:] == "**" {
286                         return true
287                 }
288         }
289         return false
290 }
291
292 func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
293         cp.logger.Printf("copying %q (%d bytes)", strings.TrimLeft(f.dst, "/"), f.size)
294         dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
295         if err != nil {
296                 return 0, err
297         }
298         src, err := os.Open(f.src)
299         if err != nil {
300                 dst.Close()
301                 return 0, err
302         }
303         defer src.Close()
304         n, err := io.Copy(dst, src)
305         if err != nil {
306                 dst.Close()
307                 return n, err
308         }
309         return n, dst.Close()
310 }
311
312 // Add to cp.staged, cp.files, and cp.dirs so as to copy src (an
313 // absolute path in the container's filesystem) to dest (an absolute
314 // path in the output collection, or "" for output root).
315 //
316 // src must be (or be a descendant of) a readonly "collection" mount,
317 // a writable collection mounted at ctrOutputPath, or a "tmp" mount.
318 //
319 // If walkMountsBelow is true, include contents of any collection
320 // mounted below src as well.
321 func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
322         // srcRoot, srcMount indicate the innermost mount that
323         // contains src.
324         var srcRoot string
325         var srcMount arvados.Mount
326         for root, mnt := range cp.mounts {
327                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
328                         srcRoot, srcMount = root, mnt
329                 }
330         }
331         for root := range cp.secretMounts {
332                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
333                         // Silently omit secrets, and symlinks to
334                         // secrets.
335                         return nil
336                 }
337         }
338         if srcRoot == "" {
339                 return fmt.Errorf("cannot output file %q: not in any mount", src)
340         }
341
342         // srcRelPath is the path to the file/dir we are trying to
343         // copy, relative to its mount point -- ".", "./foo.txt", ...
344         srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
345
346         // outputRelPath is the destination path relative to the
347         // output directory. Used for logging and glob matching.
348         var outputRelPath = ""
349         if strings.HasPrefix(src, cp.ctrOutputDir) {
350                 outputRelPath = strings.TrimPrefix(src[len(cp.ctrOutputDir):], "/")
351         }
352         if outputRelPath == "" {
353                 // blank means copy a whole directory, so replace it
354                 // with a wildcard to make it a little clearer what's
355                 // going on since outputRelPath is only used for logging
356                 outputRelPath = "*"
357         }
358
359         switch {
360         case srcMount.ExcludeFromOutput:
361         case outputRelPath != "*" && !cp.subtreeCouldMatch(outputRelPath):
362                 cp.logger.Printf("not copying %q because contents cannot match output globs", outputRelPath)
363                 return nil
364         case srcMount.Kind == "tmp":
365                 // Handle by walking the host filesystem.
366                 return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
367         case srcMount.Kind != "collection":
368                 return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
369         case !srcMount.Writable:
370                 cp.logger.Printf("copying %q from %v/%v", outputRelPath, srcMount.PortableDataHash, strings.TrimPrefix(srcRelPath, "./"))
371                 mft, err := cp.getManifest(srcMount.PortableDataHash)
372                 if err != nil {
373                         return err
374                 }
375                 err = cp.copyFromCollection(dest, &arvados.Collection{ManifestText: mft}, srcRelPath)
376                 if err != nil {
377                         return err
378                 }
379         default:
380                 cp.logger.Printf("copying %q", outputRelPath)
381                 hostRoot, err := cp.hostRoot(srcRoot)
382                 if err != nil {
383                         return err
384                 }
385                 f, err := os.Open(filepath.Join(hostRoot, ".arvados#collection"))
386                 if err != nil {
387                         return err
388                 }
389                 defer f.Close()
390                 var coll arvados.Collection
391                 err = json.NewDecoder(f).Decode(&coll)
392                 if err != nil {
393                         return err
394                 }
395                 err = cp.copyFromCollection(dest, &coll, srcRelPath)
396                 if err != nil {
397                         return err
398                 }
399         }
400         if walkMountsBelow {
401                 return cp.walkMountsBelow(dest, src)
402         }
403         return nil
404 }
405
406 func (cp *copier) copyFromCollection(dest string, coll *arvados.Collection, srcRelPath string) error {
407         tmpfs, err := coll.FileSystem(cp.client, cp.keepClient)
408         if err != nil {
409                 return err
410         }
411         snap, err := arvados.Snapshot(tmpfs, srcRelPath)
412         if err != nil {
413                 return err
414         }
415         // Create ancestors of dest, if necessary.
416         for i, c := range dest {
417                 if i > 0 && c == '/' {
418                         err = cp.staged.Mkdir(dest[:i], 0777)
419                         if err != nil && !os.IsExist(err) {
420                                 return err
421                         }
422                 }
423         }
424         return arvados.Splice(cp.staged, dest, snap)
425 }
426
427 func (cp *copier) walkMountsBelow(dest, src string) error {
428         for mnt, mntinfo := range cp.mounts {
429                 if !strings.HasPrefix(mnt, src+"/") {
430                         continue
431                 }
432                 if cp.copyRegularFiles(mntinfo) {
433                         // These got copied into the nearest parent
434                         // mount as regular files during setup, so
435                         // they get copied as regular files when we
436                         // process the parent. Output will reflect any
437                         // changes and deletions done by the
438                         // container.
439                         continue
440                 }
441                 // Example: we are processing dest=/foo src=/mnt1/dir1
442                 // (perhaps we followed a symlink /outdir/foo ->
443                 // /mnt1/dir1). Caller has already processed the
444                 // collection mounted at /mnt1, but now we find that
445                 // /mnt1/dir1/mnt2 is also a mount, so we need to copy
446                 // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
447                 //
448                 // We handle all descendants of /mnt1/dir1 in this
449                 // loop instead of using recursion:
450                 // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
451                 // /mnt1/dir1/mnt2, but we only want to walk it
452                 // once. (This simplification is safe because mounted
453                 // collections cannot contain symlinks.)
454                 err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
455                 if err != nil {
456                         return err
457                 }
458         }
459         return nil
460 }
461
462 // Add entries to cp.dirs and cp.files so as to copy src (an absolute
463 // path in the container's filesystem which corresponds to a real file
464 // or directory in cp.hostOutputDir) to dest (an absolute path in the
465 // output collection, or "" for output root).
466 //
467 // Always follow symlinks.
468 //
469 // If includeMounts is true, include mounts at and below src.
470 // Otherwise, skip them.
471 func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
472         if includeMounts {
473                 err := cp.walkMountsBelow(dest, src)
474                 if err != nil {
475                         return err
476                 }
477         }
478
479         hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
480
481         // If src is a symlink, walk its target.
482         fi, err := os.Lstat(hostsrc)
483         if err != nil {
484                 return fmt.Errorf("lstat %q: %s", src, err)
485         }
486         if fi.Mode()&os.ModeSymlink != 0 {
487                 if maxSymlinks < 0 {
488                         return errTooManySymlinks
489                 }
490                 target, err := os.Readlink(hostsrc)
491                 if err != nil {
492                         return fmt.Errorf("readlink %q: %s", src, err)
493                 }
494                 if !strings.HasPrefix(target, "/") {
495                         target = filepath.Join(filepath.Dir(src), target)
496                 }
497                 return cp.walkMount(dest, target, maxSymlinks-1, true)
498         }
499
500         // If src is a regular directory, append it to cp.dirs and
501         // walk each of its children. (If there are no children,
502         // create an empty file "dest/.keep".)
503         if fi.Mode().IsDir() {
504                 if dest != "" {
505                         cp.dirs = append(cp.dirs, dest)
506                 }
507                 dir, err := os.Open(hostsrc)
508                 if err != nil {
509                         return fmt.Errorf("open %q: %s", src, err)
510                 }
511                 names, err := dir.Readdirnames(-1)
512                 dir.Close()
513                 if err != nil {
514                         return fmt.Errorf("readdirnames %q: %s", src, err)
515                 }
516                 if len(names) == 0 {
517                         if dest != "" {
518                                 cp.files = append(cp.files, filetodo{
519                                         src: os.DevNull,
520                                         dst: dest + "/.keep",
521                                 })
522                         }
523                         return nil
524                 }
525                 sort.Strings(names)
526                 for _, name := range names {
527                         dest, src := dest+"/"+name, src+"/"+name
528                         if _, isSecret := cp.secretMounts[src]; isSecret {
529                                 continue
530                         }
531                         if mntinfo, isMount := cp.mounts[src]; isMount && !cp.copyRegularFiles(mntinfo) {
532                                 // If a regular file/dir somehow
533                                 // exists at a path that's also a
534                                 // mount target, ignore the file --
535                                 // the mount has already been included
536                                 // with walkMountsBelow().
537                                 //
538                                 // (...except mount types that are
539                                 // handled as regular files.)
540                                 continue
541                         } else if isMount && !cp.subtreeCouldMatch(src[len(cp.ctrOutputDir)+1:]) {
542                                 continue
543                         }
544                         err = cp.walkHostFS(dest, src, maxSymlinks, false)
545                         if err != nil {
546                                 return err
547                         }
548                 }
549                 return nil
550         }
551
552         // If src is a regular file, append it to cp.files.
553         if fi.Mode().IsRegular() {
554                 cp.files = append(cp.files, filetodo{
555                         src:  hostsrc,
556                         dst:  dest,
557                         size: fi.Size(),
558                 })
559                 return nil
560         }
561         cp.logger.Printf("Skipping unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
562         return nil
563 }
564
565 // Return the host path that was mounted at the given path in the
566 // container.
567 func (cp *copier) hostRoot(ctrRoot string) (string, error) {
568         if ctrRoot == cp.ctrOutputDir {
569                 return cp.hostOutputDir, nil
570         }
571         if mnt, ok := cp.bindmounts[ctrRoot]; ok {
572                 return mnt.HostPath, nil
573         }
574         return "", fmt.Errorf("not bind-mounted: %q", ctrRoot)
575 }
576
577 func (cp *copier) copyRegularFiles(m arvados.Mount) bool {
578         return m.Kind == "text" || m.Kind == "json" || (m.Kind == "collection" && m.Writable)
579 }
580
581 func (cp *copier) getManifest(pdh string) (string, error) {
582         if mft, ok := cp.manifestCache[pdh]; ok {
583                 return mft, nil
584         }
585         var coll arvados.Collection
586         err := cp.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, nil)
587         if err != nil {
588                 return "", fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
589         }
590         if cp.manifestCache == nil {
591                 cp.manifestCache = make(map[string]string)
592         }
593         cp.manifestCache[pdh] = coll.ManifestText
594         return coll.ManifestText, nil
595 }