38e43496149ca2786580c257f729242695071c36
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import copy
11 import datetime
12 import errno
13 import fcntl
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 from apiclient import errors as apiclient_errors
27 from arvados._version import __version__
28
29 import arvados.commands._util as arv_cmd
30
31 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
32 api_client = None
33
34 upload_opts = argparse.ArgumentParser(add_help=False)
35
36 upload_opts.add_argument('--version', action='version',
37                          version="%s %s" % (sys.argv[0], __version__),
38                          help='Print version and exit.')
39 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
40                          help="""
41 Local file or directory. Default: read from standard input.
42 """)
43
44 _group = upload_opts.add_mutually_exclusive_group()
45
46 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
47                     default=-1, help=argparse.SUPPRESS)
48
49 _group.add_argument('--normalize', action='store_true',
50                     help="""
51 Normalize the manifest by re-ordering files and streams after writing
52 data.
53 """)
54
55 _group.add_argument('--dry-run', action='store_true', default=False,
56                     help="""
57 Don't actually upload files, but only check if any file should be
58 uploaded. Exit with code=2 when files are pending for upload.
59 """)
60
61 _group = upload_opts.add_mutually_exclusive_group()
62
63 _group.add_argument('--as-stream', action='store_true', dest='stream',
64                     help="""
65 Synonym for --stream.
66 """)
67
68 _group.add_argument('--stream', action='store_true',
69                     help="""
70 Store the file content and display the resulting manifest on
71 stdout. Do not write the manifest to Keep or save a Collection object
72 in Arvados.
73 """)
74
75 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
76                     help="""
77 Synonym for --manifest.
78 """)
79
80 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
81                     help="""
82 Synonym for --manifest.
83 """)
84
85 _group.add_argument('--manifest', action='store_true',
86                     help="""
87 Store the file data and resulting manifest in Keep, save a Collection
88 object in Arvados, and display the manifest locator (Collection uuid)
89 on stdout. This is the default behavior.
90 """)
91
92 _group.add_argument('--as-raw', action='store_true', dest='raw',
93                     help="""
94 Synonym for --raw.
95 """)
96
97 _group.add_argument('--raw', action='store_true',
98                     help="""
99 Store the file content and display the data block locators on stdout,
100 separated by commas, with a trailing newline. Do not store a
101 manifest.
102 """)
103
104 upload_opts.add_argument('--update-collection', type=str, default=None,
105                          dest='update_collection', metavar="UUID", help="""
106 Update an existing collection identified by the given Arvados collection
107 UUID. All new local files will be uploaded.
108 """)
109
110 upload_opts.add_argument('--use-filename', type=str, default=None,
111                          dest='filename', help="""
112 Synonym for --filename.
113 """)
114
115 upload_opts.add_argument('--filename', type=str, default=None,
116                          help="""
117 Use the given filename in the manifest, instead of the name of the
118 local file. This is useful when "-" or "/dev/stdin" is given as an
119 input file. It can be used only if there is exactly one path given and
120 it is not a directory. Implies --manifest.
121 """)
122
123 upload_opts.add_argument('--portable-data-hash', action='store_true',
124                          help="""
125 Print the portable data hash instead of the Arvados UUID for the collection
126 created by the upload.
127 """)
128
129 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
130                          help="""
131 Set the replication level for the new collection: how many different
132 physical storage devices (e.g., disks) should have a copy of each data
133 block. Default is to use the server-provided default (if any) or 2.
134 """)
135
136 run_opts = argparse.ArgumentParser(add_help=False)
137
138 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
139 Store the collection in the specified project, instead of your Home
140 project.
141 """)
142
143 run_opts.add_argument('--name', help="""
144 Save the collection with the specified name.
145 """)
146
147 _group = run_opts.add_mutually_exclusive_group()
148 _group.add_argument('--progress', action='store_true',
149                     help="""
150 Display human-readable progress on stderr (bytes and, if possible,
151 percentage of total data size). This is the default behavior when
152 stderr is a tty.
153 """)
154
155 _group.add_argument('--no-progress', action='store_true',
156                     help="""
157 Do not display human-readable progress on stderr, even if stderr is a
158 tty.
159 """)
160
161 _group.add_argument('--batch-progress', action='store_true',
162                     help="""
163 Display machine-readable progress on stderr (bytes and, if known,
164 total data size).
165 """)
166
167 _group = run_opts.add_mutually_exclusive_group()
168 _group.add_argument('--resume', action='store_true', default=True,
169                     help="""
170 Continue interrupted uploads from cached state (default).
171 """)
172 _group.add_argument('--no-resume', action='store_false', dest='resume',
173                     help="""
174 Do not continue interrupted uploads from cached state.
175 """)
176
177 _group = run_opts.add_mutually_exclusive_group()
178 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
179                     help="""
180 Save upload state in a cache file for resuming (default).
181 """)
182 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
183                     help="""
184 Do not save upload state in a cache file for resuming.
185 """)
186
187 arg_parser = argparse.ArgumentParser(
188     description='Copy data from the local filesystem to Keep.',
189     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
190
191 def parse_arguments(arguments):
192     args = arg_parser.parse_args(arguments)
193
194     if len(args.paths) == 0:
195         args.paths = ['-']
196
197     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
198
199     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
200         if args.filename:
201             arg_parser.error("""
202     --filename argument cannot be used when storing a directory or
203     multiple files.
204     """)
205
206     # Turn on --progress by default if stderr is a tty.
207     if (not (args.batch_progress or args.no_progress)
208         and os.isatty(sys.stderr.fileno())):
209         args.progress = True
210
211     # Turn off --resume (default) if --no-cache is used.
212     if not args.use_cache:
213         args.resume = False
214
215     if args.paths == ['-']:
216         if args.update_collection:
217             arg_parser.error("""
218     --update-collection cannot be used when reading from stdin.
219     """)
220         args.resume = False
221         args.use_cache = False
222         if not args.filename:
223             args.filename = 'stdin'
224
225     return args
226
227
228 class CollectionUpdateError(Exception):
229     pass
230
231
232 class ResumeCacheConflict(Exception):
233     pass
234
235
236 class ArvPutArgumentConflict(Exception):
237     pass
238
239
240 class ArvPutUploadIsPending(Exception):
241     pass
242
243
244 class ArvPutUploadNotPending(Exception):
245     pass
246
247
248 class FileUploadList(list):
249     def __init__(self, dry_run=False):
250         list.__init__(self)
251         self.dry_run = dry_run
252
253     def append(self, other):
254         if self.dry_run:
255             raise ArvPutUploadIsPending()
256         super(FileUploadList, self).append(other)
257
258
259 class ResumeCache(object):
260     CACHE_DIR = '.cache/arvados/arv-put'
261
262     def __init__(self, file_spec):
263         self.cache_file = open(file_spec, 'a+')
264         self._lock_file(self.cache_file)
265         self.filename = self.cache_file.name
266
267     @classmethod
268     def make_path(cls, args):
269         md5 = hashlib.md5()
270         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
271         realpaths = sorted(os.path.realpath(path) for path in args.paths)
272         md5.update('\0'.join(realpaths))
273         if any(os.path.isdir(path) for path in realpaths):
274             md5.update("-1")
275         elif args.filename:
276             md5.update(args.filename)
277         return os.path.join(
278             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
279             md5.hexdigest())
280
281     def _lock_file(self, fileobj):
282         try:
283             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
284         except IOError:
285             raise ResumeCacheConflict("{} locked".format(fileobj.name))
286
287     def load(self):
288         self.cache_file.seek(0)
289         return json.load(self.cache_file)
290
291     def check_cache(self, api_client=None, num_retries=0):
292         try:
293             state = self.load()
294             locator = None
295             try:
296                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
297                     locator = state["_finished_streams"][0][1][0]
298                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
299                     locator = state["_current_stream_locators"][0]
300                 if locator is not None:
301                     kc = arvados.keep.KeepClient(api_client=api_client)
302                     kc.head(locator, num_retries=num_retries)
303             except Exception as e:
304                 self.restart()
305         except (ValueError):
306             pass
307
308     def save(self, data):
309         try:
310             new_cache_fd, new_cache_name = tempfile.mkstemp(
311                 dir=os.path.dirname(self.filename))
312             self._lock_file(new_cache_fd)
313             new_cache = os.fdopen(new_cache_fd, 'r+')
314             json.dump(data, new_cache)
315             os.rename(new_cache_name, self.filename)
316         except (IOError, OSError, ResumeCacheConflict) as error:
317             try:
318                 os.unlink(new_cache_name)
319             except NameError:  # mkstemp failed.
320                 pass
321         else:
322             self.cache_file.close()
323             self.cache_file = new_cache
324
325     def close(self):
326         self.cache_file.close()
327
328     def destroy(self):
329         try:
330             os.unlink(self.filename)
331         except OSError as error:
332             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
333                 raise
334         self.close()
335
336     def restart(self):
337         self.destroy()
338         self.__init__(self.filename)
339
340
341 class ArvPutUploadJob(object):
342     CACHE_DIR = '.cache/arvados/arv-put'
343     EMPTY_STATE = {
344         'manifest' : None, # Last saved manifest checkpoint
345         'files' : {} # Previous run file list: {path : {size, mtime}}
346     }
347
348     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
349                  bytes_expected=None, name=None, owner_uuid=None,
350                  ensure_unique_name=False, num_retries=None, replication_desired=None,
351                  filename=None, update_time=20.0, update_collection=None,
352                  logger=logging.getLogger('arvados.arv_put'), dry_run=False):
353         self.paths = paths
354         self.resume = resume
355         self.use_cache = use_cache
356         self.update = False
357         self.reporter = reporter
358         self.bytes_expected = bytes_expected
359         self.bytes_written = 0
360         self.bytes_skipped = 0
361         self.name = name
362         self.owner_uuid = owner_uuid
363         self.ensure_unique_name = ensure_unique_name
364         self.num_retries = num_retries
365         self.replication_desired = replication_desired
366         self.filename = filename
367         self._state_lock = threading.Lock()
368         self._state = None # Previous run state (file list & manifest)
369         self._current_files = [] # Current run file list
370         self._cache_file = None
371         self._collection_lock = threading.Lock()
372         self._remote_collection = None # Collection being updated (if asked)
373         self._local_collection = None # Collection from previous run manifest
374         self._file_paths = [] # Files to be updated in remote collection
375         self._stop_checkpointer = threading.Event()
376         self._checkpointer = threading.Thread(target=self._update_task)
377         self._checkpointer.daemon = True
378         self._update_task_time = update_time  # How many seconds wait between update runs
379         self._files_to_upload = FileUploadList(dry_run=dry_run)
380         self.logger = logger
381         self.dry_run = dry_run
382
383         if not self.use_cache and self.resume:
384             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
385
386         # Check for obvious dry-run responses
387         if self.dry_run and (not self.use_cache or not self.resume):
388             raise ArvPutUploadIsPending()
389
390         # Load cached data if any and if needed
391         self._setup_state(update_collection)
392
393     def start(self, save_collection):
394         """
395         Start supporting thread & file uploading
396         """
397         if not self.dry_run:
398             self._checkpointer.start()
399         try:
400             for path in self.paths:
401                 # Test for stdin first, in case some file named '-' exist
402                 if path == '-':
403                     if self.dry_run:
404                         raise ArvPutUploadIsPending()
405                     self._write_stdin(self.filename or 'stdin')
406                 elif os.path.isdir(path):
407                     # Use absolute paths on cache index so CWD doesn't interfere
408                     # with the caching logic.
409                     prefixdir = path = os.path.abspath(path)
410                     if prefixdir != '/':
411                         prefixdir += '/'
412                     for root, dirs, files in os.walk(path):
413                         # Make os.walk()'s dir traversing order deterministic
414                         dirs.sort()
415                         files.sort()
416                         for f in files:
417                             self._check_file(os.path.join(root, f),
418                                              os.path.join(root[len(prefixdir):], f))
419                 else:
420                     self._check_file(os.path.abspath(path),
421                                      self.filename or os.path.basename(path))
422             # If dry-mode is on, and got up to this point, then we should notify that
423             # there aren't any file to upload.
424             if self.dry_run:
425                 raise ArvPutUploadNotPending()
426             # Remove local_collection's files that don't exist locally anymore, so the
427             # bytes_written count is correct.
428             for f in self.collection_file_paths(self._local_collection,
429                                                 path_prefix=""):
430                 if f != 'stdin' and not f in self._file_paths:
431                     self._local_collection.remove(f)
432             # Update bytes_written from current local collection and
433             # report initial progress.
434             self._update()
435             # Actual file upload
436             self._upload_files()
437         finally:
438             if not self.dry_run:
439                 # Stop the thread before doing anything else
440                 self._stop_checkpointer.set()
441                 self._checkpointer.join()
442                 # Commit all pending blocks & one last _update()
443                 self._local_collection.manifest_text()
444                 self._update(final=True)
445                 if save_collection:
446                     self.save_collection()
447             if self.use_cache:
448                 self._cache_file.close()
449
450     def save_collection(self):
451         if self.update:
452             # Check if files should be updated on the remote collection.
453             for fp in self._file_paths:
454                 remote_file = self._remote_collection.find(fp)
455                 if not remote_file:
456                     # File don't exist on remote collection, copy it.
457                     self._remote_collection.copy(fp, fp, self._local_collection)
458                 elif remote_file != self._local_collection.find(fp):
459                     # A different file exist on remote collection, overwrite it.
460                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
461                 else:
462                     # The file already exist on remote collection, skip it.
463                     pass
464             self._remote_collection.save(num_retries=self.num_retries)
465         else:
466             self._local_collection.save_new(
467                 name=self.name, owner_uuid=self.owner_uuid,
468                 ensure_unique_name=self.ensure_unique_name,
469                 num_retries=self.num_retries)
470
471     def destroy_cache(self):
472         if self.use_cache:
473             try:
474                 os.unlink(self._cache_filename)
475             except OSError as error:
476                 # That's what we wanted anyway.
477                 if error.errno != errno.ENOENT:
478                     raise
479             self._cache_file.close()
480
481     def _collection_size(self, collection):
482         """
483         Recursively get the total size of the collection
484         """
485         size = 0
486         for item in collection.values():
487             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
488                 size += self._collection_size(item)
489             else:
490                 size += item.size()
491         return size
492
493     def _update_task(self):
494         """
495         Periodically called support task. File uploading is
496         asynchronous so we poll status from the collection.
497         """
498         while not self._stop_checkpointer.wait(self._update_task_time):
499             self._update()
500
501     def _update(self, final=False):
502         """
503         Update cached manifest text and report progress.
504         """
505         with self._collection_lock:
506             self.bytes_written = self._collection_size(self._local_collection)
507             if self.use_cache:
508                 # Update cache
509                 with self._state_lock:
510                     if final:
511                         self._state['manifest'] = self._local_collection.manifest_text()
512                     else:
513                         # Get the manifest text without comitting pending blocks
514                         self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
515                 self._save_state()
516         # Call the reporter, if any
517         self.report_progress()
518
519     def report_progress(self):
520         if self.reporter is not None:
521             self.reporter(self.bytes_written, self.bytes_expected)
522
523     def _write_stdin(self, filename):
524         output = self._local_collection.open(filename, 'w')
525         self._write(sys.stdin, output)
526         output.close()
527
528     def _check_file(self, source, filename):
529         """Check if this file needs to be uploaded"""
530         resume_offset = 0
531         should_upload = False
532         new_file_in_cache = False
533         # Record file path for updating the remote collection before exiting
534         self._file_paths.append(filename)
535
536         with self._state_lock:
537             # If no previous cached data on this file, store it for an eventual
538             # repeated run.
539             if source not in self._state['files']:
540                 self._state['files'][source] = {
541                     'mtime': os.path.getmtime(source),
542                     'size' : os.path.getsize(source)
543                 }
544                 new_file_in_cache = True
545             cached_file_data = self._state['files'][source]
546
547         # Check if file was already uploaded (at least partially)
548         file_in_local_collection = self._local_collection.find(filename)
549
550         # If not resuming, upload the full file.
551         if not self.resume:
552             should_upload = True
553         # New file detected from last run, upload it.
554         elif new_file_in_cache:
555             should_upload = True
556         # Local file didn't change from last run.
557         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
558             if not file_in_local_collection:
559                 # File not uploaded yet, upload it completely
560                 should_upload = True
561             elif file_in_local_collection.permission_expired():
562                 # Permission token expired, re-upload file. This will change whenever
563                 # we have a API for refreshing tokens.
564                 should_upload = True
565                 self._local_collection.remove(filename)
566             elif cached_file_data['size'] == file_in_local_collection.size():
567                 # File already there, skip it.
568                 self.bytes_skipped += cached_file_data['size']
569             elif cached_file_data['size'] > file_in_local_collection.size():
570                 # File partially uploaded, resume!
571                 resume_offset = file_in_local_collection.size()
572                 self.bytes_skipped += resume_offset
573                 should_upload = True
574             else:
575                 # Inconsistent cache, re-upload the file
576                 should_upload = True
577                 self._local_collection.remove(filename)
578                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
579         # Local file differs from cached data, re-upload it.
580         else:
581             if file_in_local_collection:
582                 self._local_collection.remove(filename)
583             should_upload = True
584
585         if should_upload:
586             self._files_to_upload.append((source, resume_offset, filename))
587
588     def _upload_files(self):
589         for source, resume_offset, filename in self._files_to_upload:
590             with open(source, 'r') as source_fd:
591                 with self._state_lock:
592                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
593                     self._state['files'][source]['size'] = os.path.getsize(source)
594                 if resume_offset > 0:
595                     # Start upload where we left off
596                     output = self._local_collection.open(filename, 'a')
597                     source_fd.seek(resume_offset)
598                 else:
599                     # Start from scratch
600                     output = self._local_collection.open(filename, 'w')
601                 self._write(source_fd, output)
602                 output.close(flush=False)
603
604     def _write(self, source_fd, output):
605         while True:
606             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
607             if not data:
608                 break
609             output.write(data)
610
611     def _my_collection(self):
612         return self._remote_collection if self.update else self._local_collection
613
614     def _setup_state(self, update_collection):
615         """
616         Create a new cache file or load a previously existing one.
617         """
618         # Load an already existing collection for update
619         if update_collection and re.match(arvados.util.collection_uuid_pattern,
620                                           update_collection):
621             try:
622                 self._remote_collection = arvados.collection.Collection(update_collection)
623             except arvados.errors.ApiError as error:
624                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
625             else:
626                 self.update = True
627         elif update_collection:
628             # Collection locator provided, but unknown format
629             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
630
631         if self.use_cache:
632             # Set up cache file name from input paths.
633             md5 = hashlib.md5()
634             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
635             realpaths = sorted(os.path.realpath(path) for path in self.paths)
636             md5.update('\0'.join(realpaths))
637             if self.filename:
638                 md5.update(self.filename)
639             cache_filename = md5.hexdigest()
640             cache_filepath = os.path.join(
641                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
642                 cache_filename)
643             if self.resume:
644                 self._cache_file = open(cache_filepath, 'a+')
645             else:
646                 # --no-resume means start with a empty cache file.
647                 self._cache_file = open(cache_filepath, 'w+')
648             self._cache_filename = self._cache_file.name
649             self._lock_file(self._cache_file)
650             self._cache_file.seek(0)
651
652         with self._state_lock:
653             if self.use_cache:
654                 try:
655                     self._state = json.load(self._cache_file)
656                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
657                         # Cache at least partially incomplete, set up new cache
658                         self._state = copy.deepcopy(self.EMPTY_STATE)
659                 except ValueError:
660                     # Cache file empty, set up new cache
661                     self._state = copy.deepcopy(self.EMPTY_STATE)
662             else:
663                 # No cache file, set empty state
664                 self._state = copy.deepcopy(self.EMPTY_STATE)
665             # Load the previous manifest so we can check if files were modified remotely.
666             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
667
668     def collection_file_paths(self, col, path_prefix='.'):
669         """Return a list of file paths by recursively go through the entire collection `col`"""
670         file_paths = []
671         for name, item in col.items():
672             if isinstance(item, arvados.arvfile.ArvadosFile):
673                 file_paths.append(os.path.join(path_prefix, name))
674             elif isinstance(item, arvados.collection.Subcollection):
675                 new_prefix = os.path.join(path_prefix, name)
676                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
677         return file_paths
678
679     def _lock_file(self, fileobj):
680         try:
681             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
682         except IOError:
683             raise ResumeCacheConflict("{} locked".format(fileobj.name))
684
685     def _save_state(self):
686         """
687         Atomically save current state into cache.
688         """
689         try:
690             with self._state_lock:
691                 state = copy.deepcopy(self._state)
692             new_cache_fd, new_cache_name = tempfile.mkstemp(
693                 dir=os.path.dirname(self._cache_filename))
694             self._lock_file(new_cache_fd)
695             new_cache = os.fdopen(new_cache_fd, 'r+')
696             json.dump(state, new_cache)
697             new_cache.flush()
698             os.fsync(new_cache)
699             os.rename(new_cache_name, self._cache_filename)
700         except (IOError, OSError, ResumeCacheConflict) as error:
701             self.logger.error("There was a problem while saving the cache file: {}".format(error))
702             try:
703                 os.unlink(new_cache_name)
704             except NameError:  # mkstemp failed.
705                 pass
706         else:
707             self._cache_file.close()
708             self._cache_file = new_cache
709
710     def collection_name(self):
711         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
712
713     def manifest_locator(self):
714         return self._my_collection().manifest_locator()
715
716     def portable_data_hash(self):
717         return self._my_collection().portable_data_hash()
718
719     def manifest_text(self, stream_name=".", strip=False, normalize=False):
720         return self._my_collection().manifest_text(stream_name, strip, normalize)
721
722     def _datablocks_on_item(self, item):
723         """
724         Return a list of datablock locators, recursively navigating
725         through subcollections
726         """
727         if isinstance(item, arvados.arvfile.ArvadosFile):
728             if item.size() == 0:
729                 # Empty file locator
730                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
731             else:
732                 locators = []
733                 for segment in item.segments():
734                     loc = segment.locator
735                     locators.append(loc)
736                 return locators
737         elif isinstance(item, arvados.collection.Collection):
738             l = [self._datablocks_on_item(x) for x in item.values()]
739             # Fast list flattener method taken from:
740             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
741             return [loc for sublist in l for loc in sublist]
742         else:
743             return None
744
745     def data_locators(self):
746         with self._collection_lock:
747             # Make sure all datablocks are flushed before getting the locators
748             self._my_collection().manifest_text()
749             datablocks = self._datablocks_on_item(self._my_collection())
750         return datablocks
751
752
753 def expected_bytes_for(pathlist):
754     # Walk the given directory trees and stat files, adding up file sizes,
755     # so we can display progress as percent
756     bytesum = 0
757     for path in pathlist:
758         if os.path.isdir(path):
759             for filename in arvados.util.listdir_recursive(path):
760                 bytesum += os.path.getsize(os.path.join(path, filename))
761         elif not os.path.isfile(path):
762             return None
763         else:
764             bytesum += os.path.getsize(path)
765     return bytesum
766
767 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
768                                                             os.getpid())
769 def machine_progress(bytes_written, bytes_expected):
770     return _machine_format.format(
771         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
772
773 def human_progress(bytes_written, bytes_expected):
774     if bytes_expected:
775         return "\r{}M / {}M {:.1%} ".format(
776             bytes_written >> 20, bytes_expected >> 20,
777             float(bytes_written) / bytes_expected)
778     else:
779         return "\r{} ".format(bytes_written)
780
781 def progress_writer(progress_func, outfile=sys.stderr):
782     def write_progress(bytes_written, bytes_expected):
783         outfile.write(progress_func(bytes_written, bytes_expected))
784     return write_progress
785
786 def exit_signal_handler(sigcode, frame):
787     sys.exit(-sigcode)
788
789 def desired_project_uuid(api_client, project_uuid, num_retries):
790     if not project_uuid:
791         query = api_client.users().current()
792     elif arvados.util.user_uuid_pattern.match(project_uuid):
793         query = api_client.users().get(uuid=project_uuid)
794     elif arvados.util.group_uuid_pattern.match(project_uuid):
795         query = api_client.groups().get(uuid=project_uuid)
796     else:
797         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
798     return query.execute(num_retries=num_retries)['uuid']
799
800 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
801     global api_client
802
803     logger = logging.getLogger('arvados.arv_put')
804     args = parse_arguments(arguments)
805     status = 0
806     if api_client is None:
807         api_client = arvados.api('v1')
808
809     # Determine the name to use
810     if args.name:
811         if args.stream or args.raw:
812             logger.error("Cannot use --name with --stream or --raw")
813             sys.exit(1)
814         elif args.update_collection:
815             logger.error("Cannot use --name with --update-collection")
816             sys.exit(1)
817         collection_name = args.name
818     else:
819         collection_name = "Saved at {} by {}@{}".format(
820             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
821             pwd.getpwuid(os.getuid()).pw_name,
822             socket.gethostname())
823
824     if args.project_uuid and (args.stream or args.raw):
825         logger.error("Cannot use --project-uuid with --stream or --raw")
826         sys.exit(1)
827
828     # Determine the parent project
829     try:
830         project_uuid = desired_project_uuid(api_client, args.project_uuid,
831                                             args.retries)
832     except (apiclient_errors.Error, ValueError) as error:
833         logger.error(error)
834         sys.exit(1)
835
836     if args.progress:
837         reporter = progress_writer(human_progress)
838     elif args.batch_progress:
839         reporter = progress_writer(machine_progress)
840     else:
841         reporter = None
842
843     bytes_expected = expected_bytes_for(args.paths)
844
845     try:
846         writer = ArvPutUploadJob(paths = args.paths,
847                                  resume = args.resume,
848                                  use_cache = args.use_cache,
849                                  filename = args.filename,
850                                  reporter = reporter,
851                                  bytes_expected = bytes_expected,
852                                  num_retries = args.retries,
853                                  replication_desired = args.replication,
854                                  name = collection_name,
855                                  owner_uuid = project_uuid,
856                                  ensure_unique_name = True,
857                                  update_collection = args.update_collection,
858                                  logger=logger,
859                                  dry_run=args.dry_run)
860     except ResumeCacheConflict:
861         logger.error("\n".join([
862             "arv-put: Another process is already uploading this data.",
863             "         Use --no-cache if this is really what you want."]))
864         sys.exit(1)
865     except CollectionUpdateError as error:
866         logger.error("\n".join([
867             "arv-put: %s" % str(error)]))
868         sys.exit(1)
869     except ArvPutUploadIsPending:
870         # Dry run check successful, return proper exit code.
871         sys.exit(2)
872     except ArvPutUploadNotPending:
873         # No files pending for upload
874         sys.exit(0)
875
876     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
877     # the originals.
878     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
879                             for sigcode in CAUGHT_SIGNALS}
880
881     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
882         logger.warning("\n".join([
883             "arv-put: Resuming previous upload from last checkpoint.",
884             "         Use the --no-resume option to start over."]))
885
886     if not args.dry_run:
887         writer.report_progress()
888     output = None
889     try:
890         writer.start(save_collection=not(args.stream or args.raw))
891     except arvados.errors.ApiError as error:
892         logger.error("\n".join([
893             "arv-put: %s" % str(error)]))
894         sys.exit(1)
895     except ArvPutUploadIsPending:
896         # Dry run check successful, return proper exit code.
897         sys.exit(2)
898     except ArvPutUploadNotPending:
899         # No files pending for upload
900         sys.exit(0)
901
902     if args.progress:  # Print newline to split stderr from stdout for humans.
903         logger.info("\n")
904
905     if args.stream:
906         if args.normalize:
907             output = writer.manifest_text(normalize=True)
908         else:
909             output = writer.manifest_text()
910     elif args.raw:
911         output = ','.join(writer.data_locators())
912     else:
913         try:
914             if args.update_collection:
915                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
916             else:
917                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
918             if args.portable_data_hash:
919                 output = writer.portable_data_hash()
920             else:
921                 output = writer.manifest_locator()
922         except apiclient_errors.Error as error:
923             logger.error(
924                 "arv-put: Error creating Collection on project: {}.".format(
925                     error))
926             status = 1
927
928     # Print the locator (uuid) of the new collection.
929     if output is None:
930         status = status or 1
931     else:
932         stdout.write(output)
933         if not output.endswith('\n'):
934             stdout.write('\n')
935
936     for sigcode, orig_handler in orig_signal_handlers.items():
937         signal.signal(sigcode, orig_handler)
938
939     if status != 0:
940         sys.exit(status)
941
942     # Success!
943     return output
944
945
946 if __name__ == '__main__':
947     main()