10701: Fix edge case: scheduling_parameters.partitions=[]
[arvados.git] / crunch_scripts / crunchutil / vwd.py
1 import arvados
2 import os
3 import stat
4 import arvados.commands.run
5 import logging
6
7 # Implements "Virtual Working Directory"
8 # Provides a way of emulating a shared writable directory in Keep based
9 # on a "check out, edit, check in, merge" model.
10 # At the moment, this only permits adding new files, applications
11 # cannot modify or delete existing files.
12
13 # Create a symlink tree rooted at target_dir mirroring arv-mounted
14 # source_collection.  target_dir must be empty, and will be created if it
15 # doesn't exist.
16 def checkout(source_collection, target_dir, keepmount=None):
17     # create symlinks
18     if keepmount is None:
19         keepmount = os.environ['TASK_KEEPMOUNT']
20
21     if not os.path.exists(target_dir):
22         os.makedirs(target_dir)
23
24     l = os.listdir(target_dir)
25     if len(l) > 0:
26         raise Exception("target_dir must be empty before checkout, contains %s" % l)
27
28     stem = os.path.join(keepmount, source_collection)
29     for root, dirs, files in os.walk(os.path.join(keepmount, source_collection), topdown=True):
30         rel = root[len(stem)+1:]
31         for d in dirs:
32             os.mkdir(os.path.join(target_dir, rel, d))
33         for f in files:
34             os.symlink(os.path.join(root, f), os.path.join(target_dir, rel, f))
35
36 def checkin(target_dir):
37     """Write files in `target_dir` to Keep.
38
39     Regular files or symlinks to files outside the keep mount are written to
40     Keep as normal files (Keep does not support symlinks).
41
42     Symlinks to files in the keep mount will result in files in the new
43     collection which reference existing Keep blocks, no data copying necessary.
44
45     Returns a new Collection object, with data flushed but the collection record
46     not saved to the API.
47
48     """
49
50     outputcollection = arvados.collection.Collection(num_retries=5)
51
52     if target_dir[-1:] != '/':
53         target_dir += '/'
54
55     collections = {}
56
57     logger = logging.getLogger("arvados")
58
59     last_error = None
60     for root, dirs, files in os.walk(target_dir):
61         for f in files:
62             try:
63                 s = os.lstat(os.path.join(root, f))
64
65                 writeIt = False
66
67                 if stat.S_ISREG(s.st_mode):
68                     writeIt = True
69                 elif stat.S_ISLNK(s.st_mode):
70                     # 1. check if it is a link into a collection
71                     real = os.path.split(os.path.realpath(os.path.join(root, f)))
72                     (pdh, branch) = arvados.commands.run.is_in_collection(real[0], real[1])
73                     if pdh is not None:
74                         # 2. load collection
75                         if pdh not in collections:
76                             # 2.1 make sure it is flushed (see #5787 note 11)
77                             fd = os.open(real[0], os.O_RDONLY)
78                             os.fsync(fd)
79                             os.close(fd)
80
81                             # 2.2 get collection from API server
82                             collections[pdh] = arvados.collection.CollectionReader(pdh,
83                                                                                    api_client=outputcollection._my_api(),
84                                                                                    keep_client=outputcollection._my_keep(),
85                                                                                    num_retries=5)
86                         # 3. copy arvfile to new collection
87                         outputcollection.copy(branch, os.path.join(root[len(target_dir):], f), source_collection=collections[pdh])
88                     else:
89                         writeIt = True
90
91                 if writeIt:
92                     reldir = root[len(target_dir):]
93                     with outputcollection.open(os.path.join(reldir, f), "wb") as writer:
94                         with open(os.path.join(root, f), "rb") as reader:
95                             dat = reader.read(64*1024)
96                             while dat:
97                                 writer.write(dat)
98                                 dat = reader.read(64*1024)
99             except (IOError, OSError) as e:
100                 logger.error(e)
101                 last_error = e
102
103     return (outputcollection, last_error)