12452: Fix test
[arvados.git] / crunch_scripts / crunchutil / vwd.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import arvados
6 import os
7 import stat
8 import arvados.commands.run
9 import logging
10
11 # Implements "Virtual Working Directory"
12 # Provides a way of emulating a shared writable directory in Keep based
13 # on a "check out, edit, check in, merge" model.
14 # At the moment, this only permits adding new files, applications
15 # cannot modify or delete existing files.
16
17 # Create a symlink tree rooted at target_dir mirroring arv-mounted
18 # source_collection.  target_dir must be empty, and will be created if it
19 # doesn't exist.
20 def checkout(source_collection, target_dir, keepmount=None):
21     # create symlinks
22     if keepmount is None:
23         keepmount = os.environ['TASK_KEEPMOUNT']
24
25     if not os.path.exists(target_dir):
26         os.makedirs(target_dir)
27
28     l = os.listdir(target_dir)
29     if len(l) > 0:
30         raise Exception("target_dir must be empty before checkout, contains %s" % l)
31
32     stem = os.path.join(keepmount, source_collection)
33     for root, dirs, files in os.walk(os.path.join(keepmount, source_collection), topdown=True):
34         rel = root[len(stem)+1:]
35         for d in dirs:
36             os.mkdir(os.path.join(target_dir, rel, d))
37         for f in files:
38             os.symlink(os.path.join(root, f), os.path.join(target_dir, rel, f))
39
40 def checkin(target_dir):
41     """Write files in `target_dir` to Keep.
42
43     Regular files or symlinks to files outside the keep mount are written to
44     Keep as normal files (Keep does not support symlinks).
45
46     Symlinks to files in the keep mount will result in files in the new
47     collection which reference existing Keep blocks, no data copying necessary.
48
49     Returns a new Collection object, with data flushed but the collection record
50     not saved to the API.
51
52     """
53
54     outputcollection = arvados.collection.Collection(num_retries=5)
55
56     if target_dir[-1:] != '/':
57         target_dir += '/'
58
59     collections = {}
60
61     logger = logging.getLogger("arvados")
62
63     last_error = None
64     for root, dirs, files in os.walk(target_dir):
65         for f in files:
66             try:
67                 s = os.lstat(os.path.join(root, f))
68
69                 writeIt = False
70
71                 if stat.S_ISREG(s.st_mode):
72                     writeIt = True
73                 elif stat.S_ISLNK(s.st_mode):
74                     # 1. check if it is a link into a collection
75                     real = os.path.split(os.path.realpath(os.path.join(root, f)))
76                     (pdh, branch) = arvados.commands.run.is_in_collection(real[0], real[1])
77                     if pdh is not None:
78                         # 2. load collection
79                         if pdh not in collections:
80                             # 2.1 make sure it is flushed (see #5787 note 11)
81                             fd = os.open(real[0], os.O_RDONLY)
82                             os.fsync(fd)
83                             os.close(fd)
84
85                             # 2.2 get collection from API server
86                             collections[pdh] = arvados.collection.CollectionReader(pdh,
87                                                                                    api_client=outputcollection._my_api(),
88                                                                                    keep_client=outputcollection._my_keep(),
89                                                                                    num_retries=5)
90                         # 3. copy arvfile to new collection
91                         outputcollection.copy(branch, os.path.join(root[len(target_dir):], f), source_collection=collections[pdh])
92                     else:
93                         writeIt = True
94
95                 if writeIt:
96                     reldir = root[len(target_dir):]
97                     with outputcollection.open(os.path.join(reldir, f), "wb") as writer:
98                         with open(os.path.join(root, f), "rb") as reader:
99                             dat = reader.read(64*1024)
100                             while dat:
101                                 writer.write(dat)
102                                 dat = reader.read(64*1024)
103             except (IOError, OSError) as e:
104                 logger.error(e)
105                 last_error = e
106
107     return (outputcollection, last_error)