1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.arvados.org/arvados.git/sdk/go/arvados"
18 "git.arvados.org/arvados.git/sdk/go/keepclient"
19 "git.arvados.org/arvados.git/sdk/go/manifest"
22 type printfer interface {
23 Printf(string, ...interface{})
26 var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
28 const limitFollowSymlinks = 10
30 type filetodo struct {
36 // copier copies data from a finished container's output path to a new
37 // Arvados collection.
39 // Regular files (and symlinks to regular files) in hostOutputDir are
40 // copied from the local filesystem.
42 // Symlinks to mounted collections, and any collections mounted under
43 // ctrOutputDir, are copied by transforming the relevant parts of the
44 // existing manifests, without moving any data around.
46 // Symlinks to other parts of the container's filesystem result in
51 // manifest, err := (&copier{...}).Copy()
53 client *arvados.Client
54 arvClient IArvadosClient
55 keepClient IKeepClient
58 bindmounts map[string]bindmount
59 mounts map[string]arvados.Mount
60 secretMounts map[string]arvados.Mount
67 manifestCache map[string]*manifest.Manifest
70 // Copy copies data as needed, and returns a new manifest.
71 func (cp *copier) Copy() (string, error) {
72 err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
74 return "", fmt.Errorf("error scanning files to copy to output: %v", err)
76 fs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient)
78 return "", fmt.Errorf("error creating Collection.FileSystem: %v", err)
80 for _, d := range cp.dirs {
81 err = fs.Mkdir(d, 0777)
82 if err != nil && err != os.ErrExist {
83 return "", fmt.Errorf("error making directory %q in output collection: %v", d, err)
87 var lastparentdir string
88 for _, f := range cp.files {
89 // If a dir has just had its last file added, do a
90 // full Flush. Otherwise, do a partial Flush (write
91 // full-size blocks, but leave the last short block
92 // open so f's data can be packed with it).
93 dir, _ := filepath.Split(f.dst)
94 if dir != lastparentdir || unflushed > keepclient.BLOCKSIZE {
95 if err := fs.Flush("/"+lastparentdir, dir != lastparentdir); err != nil {
96 return "", fmt.Errorf("error flushing output collection file data: %v", err)
102 n, err := cp.copyFile(fs, f)
104 return "", fmt.Errorf("error copying file %q into output collection: %v", f, err)
108 return fs.MarshalManifest(".")
111 func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) (int64, error) {
112 cp.logger.Printf("copying %q (%d bytes)", strings.TrimLeft(f.dst, "/"), f.size)
113 dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
117 src, err := os.Open(f.src)
123 n, err := io.Copy(dst, src)
128 return n, dst.Close()
131 // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an
132 // absolute path in the container's filesystem) to dest (an absolute
133 // path in the output collection, or "" for output root).
135 // src must be (or be a descendant of) a readonly "collection" mount,
136 // a writable collection mounted at ctrOutputPath, or a "tmp" mount.
138 // If walkMountsBelow is true, include contents of any collection
139 // mounted below src as well.
140 func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
141 // srcRoot, srcMount indicate the innermost mount that
144 var srcMount arvados.Mount
145 for root, mnt := range cp.mounts {
146 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
147 srcRoot, srcMount = root, mnt
150 for root := range cp.secretMounts {
151 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
152 // Silently omit secrets, and symlinks to
158 return fmt.Errorf("cannot output file %q: not in any mount", src)
161 // srcRelPath is the path to the file/dir we are trying to
162 // copy, relative to its mount point -- ".", "./foo.txt", ...
163 srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
165 // outputRelPath is the path relative in the output directory
166 // that corresponds to the path in the output collection where
167 // the file will go, for logging
168 var outputRelPath = ""
169 if strings.HasPrefix(src, cp.ctrOutputDir) {
170 outputRelPath = strings.TrimPrefix(src[len(cp.ctrOutputDir):], "/")
172 if outputRelPath == "" {
173 // blank means copy a whole directory, so replace it
174 // with a wildcard to make it a little clearer what's
175 // going on since outputRelPath is only used for logging
180 case srcMount.ExcludeFromOutput:
181 case srcMount.Kind == "tmp":
182 // Handle by walking the host filesystem.
183 return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
184 case srcMount.Kind != "collection":
185 return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
186 case !srcMount.Writable:
187 cp.logger.Printf("copying %q from %v/%v", outputRelPath, srcMount.PortableDataHash, strings.TrimPrefix(srcRelPath, "./"))
188 mft, err := cp.getManifest(srcMount.PortableDataHash)
192 cp.manifest += mft.Extract(srcRelPath, dest).Text
194 cp.logger.Printf("copying %q", outputRelPath)
195 hostRoot, err := cp.hostRoot(srcRoot)
199 f, err := os.Open(filepath.Join(hostRoot, ".arvados#collection"))
204 var coll arvados.Collection
205 err = json.NewDecoder(f).Decode(&coll)
209 mft := manifest.Manifest{Text: coll.ManifestText}
210 cp.manifest += mft.Extract(srcRelPath, dest).Text
213 return cp.walkMountsBelow(dest, src)
218 func (cp *copier) walkMountsBelow(dest, src string) error {
219 for mnt, mntinfo := range cp.mounts {
220 if !strings.HasPrefix(mnt, src+"/") {
223 if cp.copyRegularFiles(mntinfo) {
224 // These got copied into the nearest parent
225 // mount as regular files during setup, so
226 // they get copied as regular files when we
227 // process the parent. Output will reflect any
228 // changes and deletions done by the
232 // Example: we are processing dest=/foo src=/mnt1/dir1
233 // (perhaps we followed a symlink /outdir/foo ->
234 // /mnt1/dir1). Caller has already processed the
235 // collection mounted at /mnt1, but now we find that
236 // /mnt1/dir1/mnt2 is also a mount, so we need to copy
237 // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
239 // We handle all descendants of /mnt1/dir1 in this
240 // loop instead of using recursion:
241 // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
242 // /mnt1/dir1/mnt2, but we only want to walk it
243 // once. (This simplification is safe because mounted
244 // collections cannot contain symlinks.)
245 err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
253 // Add entries to cp.dirs and cp.files so as to copy src (an absolute
254 // path in the container's filesystem which corresponds to a real file
255 // or directory in cp.hostOutputDir) to dest (an absolute path in the
256 // output collection, or "" for output root).
258 // Always follow symlinks.
260 // If includeMounts is true, include mounts at and below src.
261 // Otherwise, skip them.
262 func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
264 err := cp.walkMountsBelow(dest, src)
270 hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
272 // If src is a symlink, walk its target.
273 fi, err := os.Lstat(hostsrc)
275 return fmt.Errorf("lstat %q: %s", src, err)
277 if fi.Mode()&os.ModeSymlink != 0 {
279 return errTooManySymlinks
281 target, err := os.Readlink(hostsrc)
283 return fmt.Errorf("readlink %q: %s", src, err)
285 if !strings.HasPrefix(target, "/") {
286 target = filepath.Join(filepath.Dir(src), target)
288 return cp.walkMount(dest, target, maxSymlinks-1, true)
291 // If src is a regular directory, append it to cp.dirs and
292 // walk each of its children. (If there are no children,
293 // create an empty file "dest/.keep".)
294 if fi.Mode().IsDir() {
296 cp.dirs = append(cp.dirs, dest)
298 dir, err := os.Open(hostsrc)
300 return fmt.Errorf("open %q: %s", src, err)
302 names, err := dir.Readdirnames(-1)
305 return fmt.Errorf("readdirnames %q: %s", src, err)
309 cp.files = append(cp.files, filetodo{
311 dst: dest + "/.keep",
317 for _, name := range names {
318 dest, src := dest+"/"+name, src+"/"+name
319 if _, isSecret := cp.secretMounts[src]; isSecret {
322 if mntinfo, isMount := cp.mounts[src]; isMount && !cp.copyRegularFiles(mntinfo) {
323 // If a regular file/dir somehow
324 // exists at a path that's also a
325 // mount target, ignore the file --
326 // the mount has already been included
327 // with walkMountsBelow().
329 // (...except mount types that are
330 // handled as regular files.)
333 err = cp.walkHostFS(dest, src, maxSymlinks, false)
341 // If src is a regular file, append it to cp.files.
342 if fi.Mode().IsRegular() {
343 cp.files = append(cp.files, filetodo{
350 cp.logger.Printf("Skipping unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
354 // Return the host path that was mounted at the given path in the
356 func (cp *copier) hostRoot(ctrRoot string) (string, error) {
357 if ctrRoot == cp.ctrOutputDir {
358 return cp.hostOutputDir, nil
360 if mnt, ok := cp.bindmounts[ctrRoot]; ok {
361 return mnt.HostPath, nil
363 return "", fmt.Errorf("not bind-mounted: %q", ctrRoot)
366 func (cp *copier) copyRegularFiles(m arvados.Mount) bool {
367 return m.Kind == "text" || m.Kind == "json" || (m.Kind == "collection" && m.Writable)
370 func (cp *copier) getManifest(pdh string) (*manifest.Manifest, error) {
371 if mft, ok := cp.manifestCache[pdh]; ok {
374 var coll arvados.Collection
375 err := cp.arvClient.Get("collections", pdh, nil, &coll)
377 return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
379 mft := &manifest.Manifest{Text: coll.ManifestText}
380 if cp.manifestCache == nil {
381 cp.manifestCache = map[string]*manifest.Manifest{pdh: mft}
383 cp.manifestCache[pdh] = mft