Merge branch '20937-arv-copy-http' refs #20937
[arvados.git] / lib / crunchrun / copier.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "encoding/json"
9         "errors"
10         "fmt"
11         "io"
12         "os"
13         "path/filepath"
14         "sort"
15         "strings"
16
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "git.arvados.org/arvados.git/sdk/go/keepclient"
19         "git.arvados.org/arvados.git/sdk/go/manifest"
20 )
21
22 type printfer interface {
23         Printf(string, ...interface{})
24 }
25
26 var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
27
28 const limitFollowSymlinks = 10
29
30 type filetodo struct {
31         src  string
32         dst  string
33         size int64
34 }
35
36 // copier copies data from a finished container's output path to a new
37 // Arvados collection.
38 //
39 // Regular files (and symlinks to regular files) in hostOutputDir are
40 // copied from the local filesystem.
41 //
42 // Symlinks to mounted collections, and any collections mounted under
43 // ctrOutputDir, are copied by transforming the relevant parts of the
44 // existing manifests, without moving any data around.
45 //
46 // Symlinks to other parts of the container's filesystem result in
47 // errors.
48 //
49 // Use:
50 //
51 //      manifest, err := (&copier{...}).Copy()
52 type copier struct {
53         client        *arvados.Client
54         keepClient    IKeepClient
55         hostOutputDir string
56         ctrOutputDir  string
57         bindmounts    map[string]bindmount
58         mounts        map[string]arvados.Mount
59         secretMounts  map[string]arvados.Mount
60         logger        printfer
61
62         dirs     []string
63         files    []filetodo
64         manifest string
65
66         manifestCache map[string]*manifest.Manifest
67 }
68
69 // Copy copies data as needed, and returns a new manifest.
70 func (cp *copier) Copy() (string, error) {
71         err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
72         if err != nil {
73                 return "", fmt.Errorf("error scanning files to copy to output: %v", err)
74         }
75         fs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient)
76         if err != nil {
77                 return "", fmt.Errorf("error creating Collection.FileSystem: %v", err)
78         }
79         for _, d := range cp.dirs {
80                 err = fs.Mkdir(d, 0777)
81                 if err != nil && err != os.ErrExist {
82                         return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
83                 }
84         }
85         var unflushed int64
86         var lastparentdir string
87         for _, f := range cp.files {
88                 // If a dir has just had its last file added, do a
89                 // full Flush. Otherwise, do a partial Flush (write
90                 // full-size blocks, but leave the last short block
91                 // open so f's data can be packed with it).
92                 dir, _ := filepath.Split(f.dst)
93                 if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE {
94                         if err := fs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
95                                 return "", fmt.Errorf("error flushing output collection file data: %v", err)
96                         }
97                         unflushed = 0
98                 }
99                 lastparentdir = dir
100
101                 n, err := cp.copyFile(fs, f)
102                 if err != nil {
103                         return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
104                 }
105                 unflushed += n
106         }
107         return fs.MarshalManifest(".")
108 }
109
110 func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
111         cp.logger.Printf("copying %q (%d bytes)", strings.TrimLeft(f.dst, "/"), f.size)
112         dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
113         if err != nil {
114                 return 0, err
115         }
116         src, err := os.Open(f.src)
117         if err != nil {
118                 dst.Close()
119                 return 0, err
120         }
121         defer src.Close()
122         n, err := io.Copy(dst, src)
123         if err != nil {
124                 dst.Close()
125                 return n, err
126         }
127         return n, dst.Close()
128 }
129
130 // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an
131 // absolute path in the container's filesystem) to dest (an absolute
132 // path in the output collection, or "" for output root).
133 //
134 // src must be (or be a descendant of) a readonly "collection" mount,
135 // a writable collection mounted at ctrOutputPath, or a "tmp" mount.
136 //
137 // If walkMountsBelow is true, include contents of any collection
138 // mounted below src as well.
139 func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
140         // srcRoot, srcMount indicate the innermost mount that
141         // contains src.
142         var srcRoot string
143         var srcMount arvados.Mount
144         for root, mnt := range cp.mounts {
145                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
146                         srcRoot, srcMount = root, mnt
147                 }
148         }
149         for root := range cp.secretMounts {
150                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
151                         // Silently omit secrets, and symlinks to
152                         // secrets.
153                         return nil
154                 }
155         }
156         if srcRoot == "" {
157                 return fmt.Errorf("cannot output file %q: not in any mount", src)
158         }
159
160         // srcRelPath is the path to the file/dir we are trying to
161         // copy, relative to its mount point -- ".", "./foo.txt", ...
162         srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
163
164         // outputRelPath is the path relative in the output directory
165         // that corresponds to the path in the output collection where
166         // the file will go, for logging
167         var outputRelPath = ""
168         if strings.HasPrefix(src, cp.ctrOutputDir) {
169                 outputRelPath = strings.TrimPrefix(src[len(cp.ctrOutputDir):], "/")
170         }
171         if outputRelPath == "" {
172                 // blank means copy a whole directory, so replace it
173                 // with a wildcard to make it a little clearer what's
174                 // going on since outputRelPath is only used for logging
175                 outputRelPath = "*"
176         }
177
178         switch {
179         case srcMount.ExcludeFromOutput:
180         case srcMount.Kind == "tmp":
181                 // Handle by walking the host filesystem.
182                 return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
183         case srcMount.Kind != "collection":
184                 return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
185         case !srcMount.Writable:
186                 cp.logger.Printf("copying %q from %v/%v", outputRelPath, srcMount.PortableDataHash, strings.TrimPrefix(srcRelPath, "./"))
187                 mft, err := cp.getManifest(srcMount.PortableDataHash)
188                 if err != nil {
189                         return err
190                 }
191                 cp.manifest += mft.Extract(srcRelPath, dest).Text
192         default:
193                 cp.logger.Printf("copying %q", outputRelPath)
194                 hostRoot, err := cp.hostRoot(srcRoot)
195                 if err != nil {
196                         return err
197                 }
198                 f, err := os.Open(filepath.Join(hostRoot, ".arvados#collection"))
199                 if err != nil {
200                         return err
201                 }
202                 defer f.Close()
203                 var coll arvados.Collection
204                 err = json.NewDecoder(f).Decode(&coll)
205                 if err != nil {
206                         return err
207                 }
208                 mft := manifest.Manifest{Text: coll.ManifestText}
209                 cp.manifest += mft.Extract(srcRelPath, dest).Text
210         }
211         if walkMountsBelow {
212                 return cp.walkMountsBelow(dest, src)
213         }
214         return nil
215 }
216
217 func (cp *copier) walkMountsBelow(dest, src string) error {
218         for mnt, mntinfo := range cp.mounts {
219                 if !strings.HasPrefix(mnt, src+"/") {
220                         continue
221                 }
222                 if cp.copyRegularFiles(mntinfo) {
223                         // These got copied into the nearest parent
224                         // mount as regular files during setup, so
225                         // they get copied as regular files when we
226                         // process the parent. Output will reflect any
227                         // changes and deletions done by the
228                         // container.
229                         continue
230                 }
231                 // Example: we are processing dest=/foo src=/mnt1/dir1
232                 // (perhaps we followed a symlink /outdir/foo ->
233                 // /mnt1/dir1). Caller has already processed the
234                 // collection mounted at /mnt1, but now we find that
235                 // /mnt1/dir1/mnt2 is also a mount, so we need to copy
236                 // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
237                 //
238                 // We handle all descendants of /mnt1/dir1 in this
239                 // loop instead of using recursion:
240                 // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
241                 // /mnt1/dir1/mnt2, but we only want to walk it
242                 // once. (This simplification is safe because mounted
243                 // collections cannot contain symlinks.)
244                 err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
245                 if err != nil {
246                         return err
247                 }
248         }
249         return nil
250 }
251
252 // Add entries to cp.dirs and cp.files so as to copy src (an absolute
253 // path in the container's filesystem which corresponds to a real file
254 // or directory in cp.hostOutputDir) to dest (an absolute path in the
255 // output collection, or "" for output root).
256 //
257 // Always follow symlinks.
258 //
259 // If includeMounts is true, include mounts at and below src.
260 // Otherwise, skip them.
261 func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
262         if includeMounts {
263                 err := cp.walkMountsBelow(dest, src)
264                 if err != nil {
265                         return err
266                 }
267         }
268
269         hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
270
271         // If src is a symlink, walk its target.
272         fi, err := os.Lstat(hostsrc)
273         if err != nil {
274                 return fmt.Errorf("lstat %q: %s", src, err)
275         }
276         if fi.Mode()&os.ModeSymlink != 0 {
277                 if maxSymlinks < 0 {
278                         return errTooManySymlinks
279                 }
280                 target, err := os.Readlink(hostsrc)
281                 if err != nil {
282                         return fmt.Errorf("readlink %q: %s", src, err)
283                 }
284                 if !strings.HasPrefix(target, "/") {
285                         target = filepath.Join(filepath.Dir(src), target)
286                 }
287                 return cp.walkMount(dest, target, maxSymlinks-1, true)
288         }
289
290         // If src is a regular directory, append it to cp.dirs and
291         // walk each of its children. (If there are no children,
292         // create an empty file "dest/.keep".)
293         if fi.Mode().IsDir() {
294                 if dest != "" {
295                         cp.dirs = append(cp.dirs, dest)
296                 }
297                 dir, err := os.Open(hostsrc)
298                 if err != nil {
299                         return fmt.Errorf("open %q: %s", src, err)
300                 }
301                 names, err := dir.Readdirnames(-1)
302                 dir.Close()
303                 if err != nil {
304                         return fmt.Errorf("readdirnames %q: %s", src, err)
305                 }
306                 if len(names) == 0 {
307                         if dest != "" {
308                                 cp.files = append(cp.files, filetodo{
309                                         src: os.DevNull,
310                                         dst: dest + "/.keep",
311                                 })
312                         }
313                         return nil
314                 }
315                 sort.Strings(names)
316                 for _, name := range names {
317                         dest, src := dest+"/"+name, src+"/"+name
318                         if _, isSecret := cp.secretMounts[src]; isSecret {
319                                 continue
320                         }
321                         if mntinfo, isMount := cp.mounts[src]; isMount && !cp.copyRegularFiles(mntinfo) {
322                                 // If a regular file/dir somehow
323                                 // exists at a path that's also a
324                                 // mount target, ignore the file --
325                                 // the mount has already been included
326                                 // with walkMountsBelow().
327                                 //
328                                 // (...except mount types that are
329                                 // handled as regular files.)
330                                 continue
331                         }
332                         err = cp.walkHostFS(dest, src, maxSymlinks, false)
333                         if err != nil {
334                                 return err
335                         }
336                 }
337                 return nil
338         }
339
340         // If src is a regular file, append it to cp.files.
341         if fi.Mode().IsRegular() {
342                 cp.files = append(cp.files, filetodo{
343                         src:  hostsrc,
344                         dst:  dest,
345                         size: fi.Size(),
346                 })
347                 return nil
348         }
349         cp.logger.Printf("Skipping unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
350         return nil
351 }
352
353 // Return the host path that was mounted at the given path in the
354 // container.
355 func (cp *copier) hostRoot(ctrRoot string) (string, error) {
356         if ctrRoot == cp.ctrOutputDir {
357                 return cp.hostOutputDir, nil
358         }
359         if mnt, ok := cp.bindmounts[ctrRoot]; ok {
360                 return mnt.HostPath, nil
361         }
362         return "", fmt.Errorf("not bind-mounted: %q", ctrRoot)
363 }
364
365 func (cp *copier) copyRegularFiles(m arvados.Mount) bool {
366         return m.Kind == "text" || m.Kind == "json" || (m.Kind == "collection" && m.Writable)
367 }
368
369 func (cp *copier) getManifest(pdh string) (*manifest.Manifest, error) {
370         if mft, ok := cp.manifestCache[pdh]; ok {
371                 return mft, nil
372         }
373         var coll arvados.Collection
374         err := cp.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+pdh, nil, nil)
375         if err != nil {
376                 return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
377         }
378         mft := &manifest.Manifest{Text: coll.ManifestText}
379         if cp.manifestCache == nil {
380                 cp.manifestCache = map[string]*manifest.Manifest{pdh: mft}
381         } else {
382                 cp.manifestCache[pdh] = mft
383         }
384         return mft, nil
385 }