15954: Fix process start/stop race.
[arvados.git] / lib / crunchrun / copier.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package crunchrun
6
7 import (
8         "encoding/json"
9         "errors"
10         "fmt"
11         "io"
12         "os"
13         "path/filepath"
14         "sort"
15         "strings"
16
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "git.arvados.org/arvados.git/sdk/go/keepclient"
19         "git.arvados.org/arvados.git/sdk/go/manifest"
20 )
21
22 type printfer interface {
23         Printf(string, ...interface{})
24 }
25
26 var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
27
28 const limitFollowSymlinks = 10
29
30 type filetodo struct {
31         src  string
32         dst  string
33         size int64
34 }
35
36 // copier copies data from a finished container's output path to a new
37 // Arvados collection.
38 //
39 // Regular files (and symlinks to regular files) in hostOutputDir are
40 // copied from the local filesystem.
41 //
42 // Symlinks to mounted collections, and any collections mounted under
43 // ctrOutputDir, are copied by transforming the relevant parts of the
44 // existing manifests, without moving any data around.
45 //
46 // Symlinks to other parts of the container's filesystem result in
47 // errors.
48 //
49 // Use:
50 //
51 //      manifest, err := (&copier{...}).Copy()
52 type copier struct {
53         client        *arvados.Client
54         arvClient     IArvadosClient
55         keepClient    IKeepClient
56         hostOutputDir string
57         ctrOutputDir  string
58         binds         []string
59         mounts        map[string]arvados.Mount
60         secretMounts  map[string]arvados.Mount
61         logger        printfer
62
63         dirs     []string
64         files    []filetodo
65         manifest string
66
67         manifestCache map[string]*manifest.Manifest
68 }
69
70 // Copy copies data as needed, and returns a new manifest.
71 func (cp *copier) Copy() (string, error) {
72         err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
73         if err != nil {
74                 return "", fmt.Errorf("error scanning files to copy to output: %v", err)
75         }
76         fs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient)
77         if err != nil {
78                 return "", fmt.Errorf("error creating Collection.FileSystem: %v", err)
79         }
80         for _, d := range cp.dirs {
81                 err = fs.Mkdir(d, 0777)
82                 if err != nil && err != os.ErrExist {
83                         return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
84                 }
85         }
86         var unflushed int64
87         var lastparentdir string
88         for _, f := range cp.files {
89                 // If a dir has just had its last file added, do a
90                 // full Flush. Otherwise, do a partial Flush (write
91                 // full-size blocks, but leave the last short block
92                 // open so f's data can be packed with it).
93                 dir, _ := filepath.Split(f.dst)
94                 if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE {
95                         if err := fs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
96                                 return "", fmt.Errorf("error flushing output collection file data: %v", err)
97                         }
98                         unflushed = 0
99                 }
100                 lastparentdir = dir
101
102                 n, err := cp.copyFile(fs, f)
103                 if err != nil {
104                         return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
105                 }
106                 unflushed += n
107         }
108         return fs.MarshalManifest(".")
109 }
110
111 func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
112         cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
113         dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
114         if err != nil {
115                 return 0, err
116         }
117         src, err := os.Open(f.src)
118         if err != nil {
119                 dst.Close()
120                 return 0, err
121         }
122         defer src.Close()
123         n, err := io.Copy(dst, src)
124         if err != nil {
125                 dst.Close()
126                 return n, err
127         }
128         return n, dst.Close()
129 }
130
131 // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an
132 // absolute path in the container's filesystem) to dest (an absolute
133 // path in the output collection, or "" for output root).
134 //
135 // src must be (or be a descendant of) a readonly "collection" mount,
136 // a writable collection mounted at ctrOutputPath, or a "tmp" mount.
137 //
138 // If walkMountsBelow is true, include contents of any collection
139 // mounted below src as well.
140 func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
141         // srcRoot, srcMount indicate the innermost mount that
142         // contains src.
143         var srcRoot string
144         var srcMount arvados.Mount
145         for root, mnt := range cp.mounts {
146                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
147                         srcRoot, srcMount = root, mnt
148                 }
149         }
150         for root := range cp.secretMounts {
151                 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
152                         // Silently omit secrets, and symlinks to
153                         // secrets.
154                         return nil
155                 }
156         }
157         if srcRoot == "" {
158                 return fmt.Errorf("cannot output file %q: not in any mount", src)
159         }
160
161         // srcRelPath is the path to the file/dir we are trying to
162         // copy, relative to its mount point -- ".", "./foo.txt", ...
163         srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
164
165         switch {
166         case srcMount.ExcludeFromOutput:
167         case srcMount.Kind == "tmp":
168                 // Handle by walking the host filesystem.
169                 return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
170         case srcMount.Kind != "collection":
171                 return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
172         case !srcMount.Writable:
173                 mft, err := cp.getManifest(srcMount.PortableDataHash)
174                 if err != nil {
175                         return err
176                 }
177                 cp.manifest += mft.Extract(srcRelPath, dest).Text
178         default:
179                 hostRoot, err := cp.hostRoot(srcRoot)
180                 if err != nil {
181                         return err
182                 }
183                 f, err := os.Open(filepath.Join(hostRoot, ".arvados#collection"))
184                 if err != nil {
185                         return err
186                 }
187                 defer f.Close()
188                 var coll arvados.Collection
189                 err = json.NewDecoder(f).Decode(&coll)
190                 if err != nil {
191                         return err
192                 }
193                 mft := manifest.Manifest{Text: coll.ManifestText}
194                 cp.manifest += mft.Extract(srcRelPath, dest).Text
195         }
196         if walkMountsBelow {
197                 return cp.walkMountsBelow(dest, src)
198         } else {
199                 return nil
200         }
201 }
202
203 func (cp *copier) walkMountsBelow(dest, src string) error {
204         for mnt, mntinfo := range cp.mounts {
205                 if !strings.HasPrefix(mnt, src+"/") {
206                         continue
207                 }
208                 if cp.copyRegularFiles(mntinfo) {
209                         // These got copied into the nearest parent
210                         // mount as regular files during setup, so
211                         // they get copied as regular files when we
212                         // process the parent. Output will reflect any
213                         // changes and deletions done by the
214                         // container.
215                         continue
216                 }
217                 // Example: we are processing dest=/foo src=/mnt1/dir1
218                 // (perhaps we followed a symlink /outdir/foo ->
219                 // /mnt1/dir1). Caller has already processed the
220                 // collection mounted at /mnt1, but now we find that
221                 // /mnt1/dir1/mnt2 is also a mount, so we need to copy
222                 // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
223                 //
224                 // We handle all descendants of /mnt1/dir1 in this
225                 // loop instead of using recursion:
226                 // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
227                 // /mnt1/dir1/mnt2, but we only want to walk it
228                 // once. (This simplification is safe because mounted
229                 // collections cannot contain symlinks.)
230                 err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
231                 if err != nil {
232                         return err
233                 }
234         }
235         return nil
236 }
237
238 // Add entries to cp.dirs and cp.files so as to copy src (an absolute
239 // path in the container's filesystem which corresponds to a real file
240 // or directory in cp.hostOutputDir) to dest (an absolute path in the
241 // output collection, or "" for output root).
242 //
243 // Always follow symlinks.
244 //
245 // If includeMounts is true, include mounts at and below src.
246 // Otherwise, skip them.
247 func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
248         if includeMounts {
249                 err := cp.walkMountsBelow(dest, src)
250                 if err != nil {
251                         return err
252                 }
253         }
254
255         hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
256
257         // If src is a symlink, walk its target.
258         fi, err := os.Lstat(hostsrc)
259         if err != nil {
260                 return fmt.Errorf("lstat %q: %s", src, err)
261         }
262         if fi.Mode()&os.ModeSymlink != 0 {
263                 if maxSymlinks < 0 {
264                         return errTooManySymlinks
265                 }
266                 target, err := os.Readlink(hostsrc)
267                 if err != nil {
268                         return fmt.Errorf("readlink %q: %s", src, err)
269                 }
270                 if !strings.HasPrefix(target, "/") {
271                         target = filepath.Join(filepath.Dir(src), target)
272                 }
273                 return cp.walkMount(dest, target, maxSymlinks-1, true)
274         }
275
276         // If src is a regular directory, append it to cp.dirs and
277         // walk each of its children. (If there are no children,
278         // create an empty file "dest/.keep".)
279         if fi.Mode().IsDir() {
280                 if dest != "" {
281                         cp.dirs = append(cp.dirs, dest)
282                 }
283                 dir, err := os.Open(hostsrc)
284                 if err != nil {
285                         return fmt.Errorf("open %q: %s", src, err)
286                 }
287                 names, err := dir.Readdirnames(-1)
288                 dir.Close()
289                 if err != nil {
290                         return fmt.Errorf("readdirnames %q: %s", src, err)
291                 }
292                 if len(names) == 0 {
293                         if dest != "" {
294                                 cp.files = append(cp.files, filetodo{
295                                         src: os.DevNull,
296                                         dst: dest + "/.keep",
297                                 })
298                         }
299                         return nil
300                 }
301                 sort.Strings(names)
302                 for _, name := range names {
303                         dest, src := dest+"/"+name, src+"/"+name
304                         if _, isSecret := cp.secretMounts[src]; isSecret {
305                                 continue
306                         }
307                         if mntinfo, isMount := cp.mounts[src]; isMount && !cp.copyRegularFiles(mntinfo) {
308                                 // If a regular file/dir somehow
309                                 // exists at a path that's also a
310                                 // mount target, ignore the file --
311                                 // the mount has already been included
312                                 // with walkMountsBelow().
313                                 //
314                                 // (...except mount types that are
315                                 // handled as regular files.)
316                                 continue
317                         }
318                         err = cp.walkHostFS(dest, src, maxSymlinks, false)
319                         if err != nil {
320                                 return err
321                         }
322                 }
323                 return nil
324         }
325
326         // If src is a regular file, append it to cp.files.
327         if fi.Mode().IsRegular() {
328                 cp.files = append(cp.files, filetodo{
329                         src:  hostsrc,
330                         dst:  dest,
331                         size: fi.Size(),
332                 })
333                 return nil
334         }
335
336         return fmt.Errorf("Unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
337 }
338
339 // Return the host path that was mounted at the given path in the
340 // container.
341 func (cp *copier) hostRoot(ctrRoot string) (string, error) {
342         if ctrRoot == cp.ctrOutputDir {
343                 return cp.hostOutputDir, nil
344         }
345         for _, bind := range cp.binds {
346                 tokens := strings.Split(bind, ":")
347                 if len(tokens) >= 2 && tokens[1] == ctrRoot {
348                         return tokens[0], nil
349                 }
350         }
351         return "", fmt.Errorf("not bind-mounted: %q", ctrRoot)
352 }
353
354 func (cp *copier) copyRegularFiles(m arvados.Mount) bool {
355         return m.Kind == "text" || m.Kind == "json" || (m.Kind == "collection" && m.Writable)
356 }
357
358 func (cp *copier) getManifest(pdh string) (*manifest.Manifest, error) {
359         if mft, ok := cp.manifestCache[pdh]; ok {
360                 return mft, nil
361         }
362         var coll arvados.Collection
363         err := cp.arvClient.Get("collections", pdh, nil, &coll)
364         if err != nil {
365                 return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
366         }
367         mft := &manifest.Manifest{Text: coll.ManifestText}
368         if cp.manifestCache == nil {
369                 cp.manifestCache = map[string]*manifest.Manifest{pdh: mft}
370         } else {
371                 cp.manifestCache[pdh] = mft
372         }
373         return mft, nil
374 }