1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.curoverse.com/arvados.git/sdk/go/arvados"
18 "git.curoverse.com/arvados.git/sdk/go/manifest"
21 type printfer interface {
22 Printf(string, ...interface{})
25 var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
27 const limitFollowSymlinks = 10
29 type filetodo struct {
35 // copier copies data from a finished container's output path to a new
36 // Arvados collection.
38 // Regular files (and symlinks to regular files) in hostOutputDir are
39 // copied from the local filesystem.
41 // Symlinks to mounted collections, and any collections mounted under
42 // ctrOutputDir, are copied by transforming the relevant parts of the
43 // existing manifests, without moving any data around.
45 // Symlinks to other parts of the container's filesystem result in
50 // manifest, err := (&copier{...}).Copy()
52 client *arvados.Client
53 arvClient IArvadosClient
54 keepClient IKeepClient
58 mounts map[string]arvados.Mount
59 secretMounts map[string]arvados.Mount
66 manifestCache map[string]*manifest.Manifest
69 // Copy copies data as needed, and returns a new manifest.
70 func (cp *copier) Copy() (string, error) {
71 err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
75 fs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient)
79 for _, d := range cp.dirs {
80 err = fs.Mkdir(d, 0777)
85 for _, f := range cp.files {
86 err = cp.copyFile(fs, f)
91 return fs.MarshalManifest(".")
94 func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error {
95 cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
96 dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
100 src, err := os.Open(f.src)
106 _, err = io.Copy(dst, src)
114 // Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an
115 // absolute path in the container's filesystem) to dest (an absolute
116 // path in the output collection, or "" for output root).
118 // src must be (or be a descendant of) a readonly "collection" mount,
119 // a writable collection mounted at ctrOutputPath, or a "tmp" mount.
121 // If walkMountsBelow is true, include contents of any collection
122 // mounted below src as well.
123 func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
124 // srcRoot, srcMount indicate the innermost mount that
127 var srcMount arvados.Mount
128 for root, mnt := range cp.mounts {
129 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
130 srcRoot, srcMount = root, mnt
133 for root := range cp.secretMounts {
134 if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
135 // Silently omit secrets, and symlinks to
141 return fmt.Errorf("cannot output file %q: not in any mount", src)
144 // srcRelPath is the path to the file/dir we are trying to
145 // copy, relative to its mount point -- ".", "./foo.txt", ...
146 srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
149 case srcMount.ExcludeFromOutput:
150 case srcMount.Kind == "tmp":
151 // Handle by walking the host filesystem.
152 return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
153 case srcMount.Kind != "collection":
154 return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
155 case !srcMount.Writable:
156 mft, err := cp.getManifest(srcMount.PortableDataHash)
160 cp.manifest += mft.Extract(srcRelPath, dest).Text
162 hostRoot, err := cp.hostRoot(srcRoot)
166 f, err := os.Open(filepath.Join(hostRoot, ".arvados#collection"))
171 var coll arvados.Collection
172 err = json.NewDecoder(f).Decode(&coll)
176 mft := manifest.Manifest{Text: coll.ManifestText}
177 cp.manifest += mft.Extract(srcRelPath, dest).Text
180 return cp.walkMountsBelow(dest, src)
186 func (cp *copier) walkMountsBelow(dest, src string) error {
187 for mnt, mntinfo := range cp.mounts {
188 if !strings.HasPrefix(mnt, src+"/") {
191 if cp.copyRegularFiles(mntinfo) {
192 // These got copied into the nearest parent
193 // mount as regular files during setup, so
194 // they get copied as regular files when we
195 // process the parent. Output will reflect any
196 // changes and deletions done by the
200 // Example: we are processing dest=/foo src=/mnt1/dir1
201 // (perhaps we followed a symlink /outdir/foo ->
202 // /mnt1/dir1). Caller has already processed the
203 // collection mounted at /mnt1, but now we find that
204 // /mnt1/dir1/mnt2 is also a mount, so we need to copy
205 // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
207 // We handle all descendants of /mnt1/dir1 in this
208 // loop instead of using recursion:
209 // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
210 // /mnt1/dir1/mnt2, but we only want to walk it
211 // once. (This simplification is safe because mounted
212 // collections cannot contain symlinks.)
213 err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
221 // Add entries to cp.dirs and cp.files so as to copy src (an absolute
222 // path in the container's filesystem which corresponds to a real file
223 // or directory in cp.hostOutputDir) to dest (an absolute path in the
224 // output collection, or "" for output root).
226 // Always follow symlinks.
228 // If includeMounts is true, include mounts at and below src.
229 // Otherwise, skip them.
230 func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
232 err := cp.walkMountsBelow(dest, src)
238 hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
240 // If src is a symlink, walk its target.
241 fi, err := os.Lstat(hostsrc)
243 return fmt.Errorf("lstat %q: %s", src, err)
245 if fi.Mode()&os.ModeSymlink != 0 {
247 return errTooManySymlinks
249 target, err := os.Readlink(hostsrc)
251 return fmt.Errorf("readlink %q: %s", src, err)
253 if !strings.HasPrefix(target, "/") {
254 target = filepath.Join(filepath.Dir(src), target)
256 return cp.walkMount(dest, target, maxSymlinks-1, true)
259 // If src is a regular directory, append it to cp.dirs and
260 // walk each of its children. (If there are no children,
261 // create an empty file "dest/.keep".)
262 if fi.Mode().IsDir() {
264 cp.dirs = append(cp.dirs, dest)
266 dir, err := os.Open(hostsrc)
268 return fmt.Errorf("open %q: %s", src, err)
270 names, err := dir.Readdirnames(-1)
273 return fmt.Errorf("readdirnames %q: %s", src, err)
277 cp.files = append(cp.files, filetodo{
279 dst: dest + "/.keep",
285 for _, name := range names {
286 dest, src := dest+"/"+name, src+"/"+name
287 if _, isSecret := cp.secretMounts[src]; isSecret {
290 if mntinfo, isMount := cp.mounts[src]; isMount && !cp.copyRegularFiles(mntinfo) {
291 // If a regular file/dir somehow
292 // exists at a path that's also a
293 // mount target, ignore the file --
294 // the mount has already been included
295 // with walkMountsBelow().
297 // (...except mount types that are
298 // handled as regular files.)
301 err = cp.walkHostFS(dest, src, maxSymlinks, false)
309 // If src is a regular file, append it to cp.files.
310 if fi.Mode().IsRegular() {
311 cp.files = append(cp.files, filetodo{
319 return fmt.Errorf("Unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
322 // Return the host path that was mounted at the given path in the
324 func (cp *copier) hostRoot(ctrRoot string) (string, error) {
325 if ctrRoot == cp.ctrOutputDir {
326 return cp.hostOutputDir, nil
328 for _, bind := range cp.binds {
329 tokens := strings.Split(bind, ":")
330 if len(tokens) >= 2 && tokens[1] == ctrRoot {
331 return tokens[0], nil
334 return "", fmt.Errorf("not bind-mounted: %q", ctrRoot)
337 func (cp *copier) copyRegularFiles(m arvados.Mount) bool {
338 return m.Kind == "text" || m.Kind == "json" || (m.Kind == "collection" && m.Writable)
341 func (cp *copier) getManifest(pdh string) (*manifest.Manifest, error) {
342 if mft, ok := cp.manifestCache[pdh]; ok {
345 var coll arvados.Collection
346 err := cp.arvClient.Get("collections", pdh, nil, &coll)
348 return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
350 mft := &manifest.Manifest{Text: coll.ManifestText}
351 if cp.manifestCache == nil {
352 cp.manifestCache = map[string]*manifest.Manifest{pdh: mft}
354 cp.manifestCache[pdh] = mft