1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.curoverse.com/arvados.git/sdk/go/arvados"
18 "git.curoverse.com/arvados.git/sdk/go/manifest"
21 type printfer interface {
22 Printf(string, ...interface{})
25 var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
27 const limitFollowSymlinks = 10
29 type filetodo struct {
35 // copier copies data from a finished container's output path to a new
36 // Arvados collection.
38 // Regular files (and symlinks to regular files) in hostOutputDir are
39 // copied from the local filesystem.
41 // Symlinks to mounted collections, and any collections mounted under
42 // ctrOutputDir, are copied by transforming the relevant parts of the
43 // existing manifests, without moving any data around.
45 // Symlinks to other parts of the container's filesystem result in
50 // manifest, err := (&copier{...}).Copy()
52 client *arvados.Client
53 arvClient IArvadosClient
54 keepClient IKeepClient
57 mounts map[string]arvados.Mount
58 secretMounts map[string]arvados.Mount
65 manifestCache map[string]*manifest.Manifest
68 // Copy copies data as needed, and returns a new manifest.
69 func (cp *copier) Copy() (string, error) {
70 err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
74 fs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient)
78 for _, d := range cp.dirs {
79 err = fs.Mkdir(d, 0777)
84 for _, f := range cp.files {
85 err = cp.copyFile(fs, f)
90 return fs.MarshalManifest(".")
93 func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error {
94 cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
95 dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
99 src, err := os.Open(f.src)
105 _, err = io.Copy(dst, src)
113 // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an
114 // absolute path in the container's filesystem) to dest (an absolute
115 // path in the output collection, or "" for output root).
117 // src must be (or be a descendant of) a readonly "collection" mount,
118 // a writable collection mounted at ctrOutputPath, or a "tmp" mount.
120 // If walkMountsBelow is true, include contents of any collection
121 // mounted below src as well.
122 func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
123 // srcRoot, srcMount indicate the innermost mount that
126 var srcMount arvados.Mount
127 for root, mnt := range cp.mounts {
128 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
129 srcRoot, srcMount = root, mnt
132 for root := range cp.secretMounts {
133 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
134 // Silently omit secrets, and symlinks to
140 return fmt.Errorf("cannot output file %q: not in any mount", src)
143 // srcRelPath is the path to the file/dir we are trying to
144 // copy, relative to its mount point -- ".", "./foo.txt", ...
145 srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
148 case srcMount.ExcludeFromOutput:
149 case srcMount.Kind == "tmp":
150 // Handle by walking the host filesystem.
151 return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
152 case srcMount.Kind != "collection":
153 return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
154 case !srcMount.Writable:
155 mft, err := cp.getManifest(srcMount.PortableDataHash)
159 cp.manifest += mft.Extract(srcRelPath, dest).Text
160 case srcRoot == cp.ctrOutputDir:
161 f, err := os.Open(filepath.Join(cp.hostOutputDir, ".arvados#collection"))
166 var coll arvados.Collection
167 err = json.NewDecoder(f).Decode(&coll)
171 mft := manifest.Manifest{Text: coll.ManifestText}
172 cp.manifest += mft.Extract(srcRelPath, dest).Text
174 return fmt.Errorf("cannot output %q as %q: writable collection mounted at %q", src, dest, srcRoot)
177 return cp.walkMountsBelow(dest, src)
183 func (cp *copier) walkMountsBelow(dest, src string) error {
184 for mnt, mntinfo := range cp.mounts {
185 if !strings.HasPrefix(mnt, src+"/") {
188 if cp.copyRegularFiles(mntinfo) {
189 // These got copied into the nearest parent
190 // mount as regular files during setup, so
191 // they get copied as regular files when we
192 // process the parent. Output will reflect any
193 // changes and deletions done by the
197 // Example: we are processing dest=/foo src=/mnt1/dir1
198 // (perhaps we followed a symlink /outdir/foo ->
199 // /mnt1/dir1). Caller has already processed the
200 // collection mounted at /mnt1, but now we find that
201 // /mnt1/dir1/mnt2 is also a mount, so we need to copy
202 // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
204 // We handle all descendants of /mnt1/dir1 in this
205 // loop instead of using recursion:
206 // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
207 // /mnt1/dir1/mnt2, but we only want to walk it
208 // once. (This simplification is safe because mounted
209 // collections cannot contain symlinks.)
210 err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
218 // Add entries to cp.dirs and cp.files so as to copy src (an absolute
219 // path in the container's filesystem which corresponds to a real file
220 // or directory in cp.hostOutputDir) to dest (an absolute path in the
221 // output collection, or "" for output root).
223 // Always follow symlinks.
225 // If includeMounts is true, include mounts at and below src.
226 // Otherwise, skip them.
227 func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
229 err := cp.walkMountsBelow(dest, src)
235 hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
237 // If src is a symlink, walk its target.
238 fi, err := os.Lstat(hostsrc)
240 return fmt.Errorf("lstat %q: %s", src, err)
242 if fi.Mode()&os.ModeSymlink != 0 {
244 return errTooManySymlinks
246 target, err := os.Readlink(hostsrc)
248 return fmt.Errorf("readlink %q: %s", src, err)
250 if !strings.HasPrefix(target, "/") {
251 target = filepath.Join(filepath.Dir(src), target)
253 return cp.walkMount(dest, target, maxSymlinks-1, true)
256 // If src is a regular directory, append it to cp.dirs and
257 // walk each of its children. (If there are no children,
258 // create an empty file "dest/.keep".)
259 if fi.Mode().IsDir() {
261 cp.dirs = append(cp.dirs, dest)
263 dir, err := os.Open(hostsrc)
265 return fmt.Errorf("open %q: %s", src, err)
267 names, err := dir.Readdirnames(-1)
270 return fmt.Errorf("readdirnames %q: %s", src, err)
274 cp.files = append(cp.files, filetodo{
276 dst: dest + "/.keep",
282 for _, name := range names {
283 dest, src := dest+"/"+name, src+"/"+name
284 if _, isSecret := cp.secretMounts[src]; isSecret {
287 if mntinfo, isMount := cp.mounts[src]; isMount && !cp.copyRegularFiles(mntinfo) {
288 // If a regular file/dir somehow
289 // exists at a path that's also a
290 // mount target, ignore the file --
291 // the mount has already been included
292 // with walkMountsBelow().
294 // (...except mount types that are
295 // handled as regular files.)
298 err = cp.walkHostFS(dest, src, maxSymlinks, false)
306 // If src is a regular file, append it to cp.files.
307 if fi.Mode().IsRegular() {
308 cp.files = append(cp.files, filetodo{
316 return fmt.Errorf("Unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
319 func (cp *copier) copyRegularFiles(m arvados.Mount) bool {
320 return m.Kind == "text" || m.Kind == "json" || (m.Kind == "collection" && m.Writable)
323 func (cp *copier) getManifest(pdh string) (*manifest.Manifest, error) {
324 if mft, ok := cp.manifestCache[pdh]; ok {
327 var coll arvados.Collection
328 err := cp.arvClient.Get("collections", pdh, nil, &coll)
330 return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
332 mft := &manifest.Manifest{Text: coll.ManifestText}
333 if cp.manifestCache == nil {
334 cp.manifestCache = map[string]*manifest.Manifest{pdh: mft}
336 cp.manifestCache[pdh] = mft