10968: Changed the periodic update thread to run every 1 second while arv-put is...
[arvados.git] / sdk / python / arvados / commands / put.py
1 #!/usr/bin/env python
2
3 # TODO:
4 # --md5sum - display md5 of each file as read from disk
5
6 import argparse
7 import arvados
8 import arvados.collection
9 import base64
10 import copy
11 import datetime
12 import errno
13 import fcntl
14 import hashlib
15 import json
16 import logging
17 import os
18 import pwd
19 import re
20 import signal
21 import socket
22 import sys
23 import tempfile
24 import threading
25 import time
26 from apiclient import errors as apiclient_errors
27 from arvados._version import __version__
28
29 import arvados.commands._util as arv_cmd
30
31 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
32 api_client = None
33
34 upload_opts = argparse.ArgumentParser(add_help=False)
35
36 upload_opts.add_argument('--version', action='version',
37                          version="%s %s" % (sys.argv[0], __version__),
38                          help='Print version and exit.')
39 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
40                          help="""
41 Local file or directory. Default: read from standard input.
42 """)
43
44 _group = upload_opts.add_mutually_exclusive_group()
45
46 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
47                     default=-1, help=argparse.SUPPRESS)
48
49 _group.add_argument('--normalize', action='store_true',
50                     help="""
51 Normalize the manifest by re-ordering files and streams after writing
52 data.
53 """)
54
55 _group.add_argument('--dry-run', action='store_true', default=False,
56                     help="""
57 Don't actually upload files, but only check if any file should be
58 uploaded. Exit with code=2 when files are pending for upload.
59 """)
60
61 _group = upload_opts.add_mutually_exclusive_group()
62
63 _group.add_argument('--as-stream', action='store_true', dest='stream',
64                     help="""
65 Synonym for --stream.
66 """)
67
68 _group.add_argument('--stream', action='store_true',
69                     help="""
70 Store the file content and display the resulting manifest on
71 stdout. Do not write the manifest to Keep or save a Collection object
72 in Arvados.
73 """)
74
75 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
76                     help="""
77 Synonym for --manifest.
78 """)
79
80 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
81                     help="""
82 Synonym for --manifest.
83 """)
84
85 _group.add_argument('--manifest', action='store_true',
86                     help="""
87 Store the file data and resulting manifest in Keep, save a Collection
88 object in Arvados, and display the manifest locator (Collection uuid)
89 on stdout. This is the default behavior.
90 """)
91
92 _group.add_argument('--as-raw', action='store_true', dest='raw',
93                     help="""
94 Synonym for --raw.
95 """)
96
97 _group.add_argument('--raw', action='store_true',
98                     help="""
99 Store the file content and display the data block locators on stdout,
100 separated by commas, with a trailing newline. Do not store a
101 manifest.
102 """)
103
104 upload_opts.add_argument('--update-collection', type=str, default=None,
105                          dest='update_collection', metavar="UUID", help="""
106 Update an existing collection identified by the given Arvados collection
107 UUID. All new local files will be uploaded.
108 """)
109
110 upload_opts.add_argument('--use-filename', type=str, default=None,
111                          dest='filename', help="""
112 Synonym for --filename.
113 """)
114
115 upload_opts.add_argument('--filename', type=str, default=None,
116                          help="""
117 Use the given filename in the manifest, instead of the name of the
118 local file. This is useful when "-" or "/dev/stdin" is given as an
119 input file. It can be used only if there is exactly one path given and
120 it is not a directory. Implies --manifest.
121 """)
122
123 upload_opts.add_argument('--portable-data-hash', action='store_true',
124                          help="""
125 Print the portable data hash instead of the Arvados UUID for the collection
126 created by the upload.
127 """)
128
129 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
130                          help="""
131 Set the replication level for the new collection: how many different
132 physical storage devices (e.g., disks) should have a copy of each data
133 block. Default is to use the server-provided default (if any) or 2.
134 """)
135
136 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
137                          help="""
138 Set the number of upload threads to be used. Take into account that
139 using lots of threads will increase the RAM requirements. Default is
140 to use 2 threads.
141 On high latency installations, using a greater number will improve
142 overall throughput.
143 """)
144
145 run_opts = argparse.ArgumentParser(add_help=False)
146
147 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
148 Store the collection in the specified project, instead of your Home
149 project.
150 """)
151
152 run_opts.add_argument('--name', help="""
153 Save the collection with the specified name.
154 """)
155
156 _group = run_opts.add_mutually_exclusive_group()
157 _group.add_argument('--progress', action='store_true',
158                     help="""
159 Display human-readable progress on stderr (bytes and, if possible,
160 percentage of total data size). This is the default behavior when
161 stderr is a tty.
162 """)
163
164 _group.add_argument('--no-progress', action='store_true',
165                     help="""
166 Do not display human-readable progress on stderr, even if stderr is a
167 tty.
168 """)
169
170 _group.add_argument('--batch-progress', action='store_true',
171                     help="""
172 Display machine-readable progress on stderr (bytes and, if known,
173 total data size).
174 """)
175
176 _group = run_opts.add_mutually_exclusive_group()
177 _group.add_argument('--resume', action='store_true', default=True,
178                     help="""
179 Continue interrupted uploads from cached state (default).
180 """)
181 _group.add_argument('--no-resume', action='store_false', dest='resume',
182                     help="""
183 Do not continue interrupted uploads from cached state.
184 """)
185
186 _group = run_opts.add_mutually_exclusive_group()
187 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
188                     help="""
189 Save upload state in a cache file for resuming (default).
190 """)
191 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
192                     help="""
193 Do not save upload state in a cache file for resuming.
194 """)
195
196 arg_parser = argparse.ArgumentParser(
197     description='Copy data from the local filesystem to Keep.',
198     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
199
200 def parse_arguments(arguments):
201     args = arg_parser.parse_args(arguments)
202
203     if len(args.paths) == 0:
204         args.paths = ['-']
205
206     args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
207
208     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
209         if args.filename:
210             arg_parser.error("""
211     --filename argument cannot be used when storing a directory or
212     multiple files.
213     """)
214
215     # Turn on --progress by default if stderr is a tty.
216     if (not (args.batch_progress or args.no_progress)
217         and os.isatty(sys.stderr.fileno())):
218         args.progress = True
219
220     # Turn off --resume (default) if --no-cache is used.
221     if not args.use_cache:
222         args.resume = False
223
224     if args.paths == ['-']:
225         if args.update_collection:
226             arg_parser.error("""
227     --update-collection cannot be used when reading from stdin.
228     """)
229         args.resume = False
230         args.use_cache = False
231         if not args.filename:
232             args.filename = 'stdin'
233
234     return args
235
236
237 class CollectionUpdateError(Exception):
238     pass
239
240
241 class ResumeCacheConflict(Exception):
242     pass
243
244
245 class ArvPutArgumentConflict(Exception):
246     pass
247
248
249 class ArvPutUploadIsPending(Exception):
250     pass
251
252
253 class ArvPutUploadNotPending(Exception):
254     pass
255
256
257 class FileUploadList(list):
258     def __init__(self, dry_run=False):
259         list.__init__(self)
260         self.dry_run = dry_run
261
262     def append(self, other):
263         if self.dry_run:
264             raise ArvPutUploadIsPending()
265         super(FileUploadList, self).append(other)
266
267
268 class ResumeCache(object):
269     CACHE_DIR = '.cache/arvados/arv-put'
270
271     def __init__(self, file_spec):
272         self.cache_file = open(file_spec, 'a+')
273         self._lock_file(self.cache_file)
274         self.filename = self.cache_file.name
275
276     @classmethod
277     def make_path(cls, args):
278         md5 = hashlib.md5()
279         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
280         realpaths = sorted(os.path.realpath(path) for path in args.paths)
281         md5.update('\0'.join(realpaths))
282         if any(os.path.isdir(path) for path in realpaths):
283             md5.update("-1")
284         elif args.filename:
285             md5.update(args.filename)
286         return os.path.join(
287             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
288             md5.hexdigest())
289
290     def _lock_file(self, fileobj):
291         try:
292             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
293         except IOError:
294             raise ResumeCacheConflict("{} locked".format(fileobj.name))
295
296     def load(self):
297         self.cache_file.seek(0)
298         return json.load(self.cache_file)
299
300     def check_cache(self, api_client=None, num_retries=0):
301         try:
302             state = self.load()
303             locator = None
304             try:
305                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
306                     locator = state["_finished_streams"][0][1][0]
307                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
308                     locator = state["_current_stream_locators"][0]
309                 if locator is not None:
310                     kc = arvados.keep.KeepClient(api_client=api_client)
311                     kc.head(locator, num_retries=num_retries)
312             except Exception as e:
313                 self.restart()
314         except (ValueError):
315             pass
316
317     def save(self, data):
318         try:
319             new_cache_fd, new_cache_name = tempfile.mkstemp(
320                 dir=os.path.dirname(self.filename))
321             self._lock_file(new_cache_fd)
322             new_cache = os.fdopen(new_cache_fd, 'r+')
323             json.dump(data, new_cache)
324             os.rename(new_cache_name, self.filename)
325         except (IOError, OSError, ResumeCacheConflict) as error:
326             try:
327                 os.unlink(new_cache_name)
328             except NameError:  # mkstemp failed.
329                 pass
330         else:
331             self.cache_file.close()
332             self.cache_file = new_cache
333
334     def close(self):
335         self.cache_file.close()
336
337     def destroy(self):
338         try:
339             os.unlink(self.filename)
340         except OSError as error:
341             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
342                 raise
343         self.close()
344
345     def restart(self):
346         self.destroy()
347         self.__init__(self.filename)
348
349
350 class ArvPutUploadJob(object):
351     CACHE_DIR = '.cache/arvados/arv-put'
352     EMPTY_STATE = {
353         'manifest' : None, # Last saved manifest checkpoint
354         'files' : {} # Previous run file list: {path : {size, mtime}}
355     }
356
357     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
358                  bytes_expected=None, name=None, owner_uuid=None,
359                  ensure_unique_name=False, num_retries=None,
360                  put_threads=None, replication_desired=None,
361                  filename=None, update_time=60.0, update_collection=None,
362                  logger=logging.getLogger('arvados.arv_put'), dry_run=False):
363         self.paths = paths
364         self.resume = resume
365         self.use_cache = use_cache
366         self.update = False
367         self.reporter = reporter
368         self.bytes_expected = bytes_expected
369         self.bytes_written = 0
370         self.bytes_skipped = 0
371         self.name = name
372         self.owner_uuid = owner_uuid
373         self.ensure_unique_name = ensure_unique_name
374         self.num_retries = num_retries
375         self.replication_desired = replication_desired
376         self.put_threads = put_threads
377         self.filename = filename
378         self._state_lock = threading.Lock()
379         self._state = None # Previous run state (file list & manifest)
380         self._current_files = [] # Current run file list
381         self._cache_file = None
382         self._collection_lock = threading.Lock()
383         self._remote_collection = None # Collection being updated (if asked)
384         self._local_collection = None # Collection from previous run manifest
385         self._file_paths = set() # Files to be updated in remote collection
386         self._stop_checkpointer = threading.Event()
387         self._checkpointer = threading.Thread(target=self._update_task)
388         self._checkpointer.daemon = True
389         self._update_task_time = update_time  # How many seconds wait between update runs
390         self._files_to_upload = FileUploadList(dry_run=dry_run)
391         self._upload_started = False
392         self.logger = logger
393         self.dry_run = dry_run
394
395         if not self.use_cache and self.resume:
396             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
397
398         # Check for obvious dry-run responses
399         if self.dry_run and (not self.use_cache or not self.resume):
400             raise ArvPutUploadIsPending()
401
402         # Load cached data if any and if needed
403         self._setup_state(update_collection)
404
405     def start(self, save_collection):
406         """
407         Start supporting thread & file uploading
408         """
409         if not self.dry_run:
410             self._checkpointer.start()
411         try:
412             for path in self.paths:
413                 # Test for stdin first, in case some file named '-' exist
414                 if path == '-':
415                     if self.dry_run:
416                         raise ArvPutUploadIsPending()
417                     self._write_stdin(self.filename or 'stdin')
418                 elif os.path.isdir(path):
419                     # Use absolute paths on cache index so CWD doesn't interfere
420                     # with the caching logic.
421                     prefixdir = path = os.path.abspath(path)
422                     if prefixdir != '/':
423                         prefixdir += '/'
424                     for root, dirs, files in os.walk(path):
425                         # Make os.walk()'s dir traversing order deterministic
426                         dirs.sort()
427                         files.sort()
428                         for f in files:
429                             self._check_file(os.path.join(root, f),
430                                              os.path.join(root[len(prefixdir):], f))
431                 else:
432                     self._check_file(os.path.abspath(path),
433                                      self.filename or os.path.basename(path))
434             # If dry-mode is on, and got up to this point, then we should notify that
435             # there aren't any file to upload.
436             if self.dry_run:
437                 raise ArvPutUploadNotPending()
438             # Remove local_collection's files that don't exist locally anymore, so the
439             # bytes_written count is correct.
440             for f in self.collection_file_paths(self._local_collection,
441                                                 path_prefix=""):
442                 if f != 'stdin' and f != self.filename and not f in self._file_paths:
443                     self._local_collection.remove(f)
444             # Update bytes_written from current local collection and
445             # report initial progress.
446             self._update()
447             # Actual file upload
448             self._upload_started = True
449             self._upload_files()
450         finally:
451             if not self.dry_run:
452                 # Stop the thread before doing anything else
453                 self._stop_checkpointer.set()
454                 self._checkpointer.join()
455                 # Commit all pending blocks & one last _update()
456                 self._local_collection.manifest_text()
457                 self._update(final=True)
458                 if save_collection:
459                     self.save_collection()
460             if self.use_cache:
461                 self._cache_file.close()
462
463     def save_collection(self):
464         if self.update:
465             # Check if files should be updated on the remote collection.
466             for fp in self._file_paths:
467                 remote_file = self._remote_collection.find(fp)
468                 if not remote_file:
469                     # File don't exist on remote collection, copy it.
470                     self._remote_collection.copy(fp, fp, self._local_collection)
471                 elif remote_file != self._local_collection.find(fp):
472                     # A different file exist on remote collection, overwrite it.
473                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
474                 else:
475                     # The file already exist on remote collection, skip it.
476                     pass
477             self._remote_collection.save(num_retries=self.num_retries)
478         else:
479             self._local_collection.save_new(
480                 name=self.name, owner_uuid=self.owner_uuid,
481                 ensure_unique_name=self.ensure_unique_name,
482                 num_retries=self.num_retries)
483
484     def destroy_cache(self):
485         if self.use_cache:
486             try:
487                 os.unlink(self._cache_filename)
488             except OSError as error:
489                 # That's what we wanted anyway.
490                 if error.errno != errno.ENOENT:
491                     raise
492             self._cache_file.close()
493
494     def _collection_size(self, collection):
495         """
496         Recursively get the total size of the collection
497         """
498         size = 0
499         for item in collection.values():
500             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
501                 size += self._collection_size(item)
502             else:
503                 size += item.size()
504         return size
505
506     def _update_task(self):
507         """
508         Periodically called support task. File uploading is
509         asynchronous so we poll status from the collection.
510         """
511         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
512             self._update()
513
514     def _update(self, final=False):
515         """
516         Update cached manifest text and report progress.
517         """
518         if self._upload_started:
519             with self._collection_lock:
520                 self.bytes_written = self._collection_size(self._local_collection)
521                 if self.use_cache:
522                     if final:
523                         manifest = self._local_collection.manifest_text()
524                     else:
525                         # Get the manifest text without comitting pending blocks
526                         manifest = self._local_collection.manifest_text(strip=False,
527                                                                         normalize=False,
528                                                                         only_committed=True)
529                     # Update cache
530                     with self._state_lock:
531                         self._state['manifest'] = manifest
532             if self.use_cache:
533                 self._save_state()
534         else:
535             self.bytes_written = self.bytes_skipped
536         # Call the reporter, if any
537         self.report_progress()
538
539     def report_progress(self):
540         if self.reporter is not None:
541             self.reporter(self.bytes_written, self.bytes_expected)
542
543     def _write_stdin(self, filename):
544         output = self._local_collection.open(filename, 'w')
545         self._write(sys.stdin, output)
546         output.close()
547
548     def _check_file(self, source, filename):
549         """Check if this file needs to be uploaded"""
550         resume_offset = 0
551         should_upload = False
552         new_file_in_cache = False
553         # Record file path for updating the remote collection before exiting
554         self._file_paths.add(filename)
555
556         with self._state_lock:
557             # If no previous cached data on this file, store it for an eventual
558             # repeated run.
559             if source not in self._state['files']:
560                 self._state['files'][source] = {
561                     'mtime': os.path.getmtime(source),
562                     'size' : os.path.getsize(source)
563                 }
564                 new_file_in_cache = True
565             cached_file_data = self._state['files'][source]
566
567         # Check if file was already uploaded (at least partially)
568         file_in_local_collection = self._local_collection.find(filename)
569
570         # If not resuming, upload the full file.
571         if not self.resume:
572             should_upload = True
573         # New file detected from last run, upload it.
574         elif new_file_in_cache:
575             should_upload = True
576         # Local file didn't change from last run.
577         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
578             if not file_in_local_collection:
579                 # File not uploaded yet, upload it completely
580                 should_upload = True
581             elif file_in_local_collection.permission_expired():
582                 # Permission token expired, re-upload file. This will change whenever
583                 # we have a API for refreshing tokens.
584                 should_upload = True
585                 self._local_collection.remove(filename)
586             elif cached_file_data['size'] == file_in_local_collection.size():
587                 # File already there, skip it.
588                 self.bytes_skipped += cached_file_data['size']
589             elif cached_file_data['size'] > file_in_local_collection.size():
590                 # File partially uploaded, resume!
591                 resume_offset = file_in_local_collection.size()
592                 self.bytes_skipped += resume_offset
593                 should_upload = True
594             else:
595                 # Inconsistent cache, re-upload the file
596                 should_upload = True
597                 self._local_collection.remove(filename)
598                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
599         # Local file differs from cached data, re-upload it.
600         else:
601             if file_in_local_collection:
602                 self._local_collection.remove(filename)
603             should_upload = True
604
605         if should_upload:
606             self._files_to_upload.append((source, resume_offset, filename))
607
608     def _upload_files(self):
609         for source, resume_offset, filename in self._files_to_upload:
610             with open(source, 'r') as source_fd:
611                 with self._state_lock:
612                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
613                     self._state['files'][source]['size'] = os.path.getsize(source)
614                 if resume_offset > 0:
615                     # Start upload where we left off
616                     output = self._local_collection.open(filename, 'a')
617                     source_fd.seek(resume_offset)
618                 else:
619                     # Start from scratch
620                     output = self._local_collection.open(filename, 'w')
621                 self._write(source_fd, output)
622                 output.close(flush=False)
623
624     def _write(self, source_fd, output):
625         while True:
626             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
627             if not data:
628                 break
629             output.write(data)
630
631     def _my_collection(self):
632         return self._remote_collection if self.update else self._local_collection
633
634     def _setup_state(self, update_collection):
635         """
636         Create a new cache file or load a previously existing one.
637         """
638         # Load an already existing collection for update
639         if update_collection and re.match(arvados.util.collection_uuid_pattern,
640                                           update_collection):
641             try:
642                 self._remote_collection = arvados.collection.Collection(update_collection)
643             except arvados.errors.ApiError as error:
644                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
645             else:
646                 self.update = True
647         elif update_collection:
648             # Collection locator provided, but unknown format
649             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
650
651         if self.use_cache:
652             # Set up cache file name from input paths.
653             md5 = hashlib.md5()
654             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
655             realpaths = sorted(os.path.realpath(path) for path in self.paths)
656             md5.update('\0'.join(realpaths))
657             if self.filename:
658                 md5.update(self.filename)
659             cache_filename = md5.hexdigest()
660             cache_filepath = os.path.join(
661                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
662                 cache_filename)
663             if self.resume:
664                 self._cache_file = open(cache_filepath, 'a+')
665             else:
666                 # --no-resume means start with a empty cache file.
667                 self._cache_file = open(cache_filepath, 'w+')
668             self._cache_filename = self._cache_file.name
669             self._lock_file(self._cache_file)
670             self._cache_file.seek(0)
671
672         with self._state_lock:
673             if self.use_cache:
674                 try:
675                     self._state = json.load(self._cache_file)
676                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
677                         # Cache at least partially incomplete, set up new cache
678                         self._state = copy.deepcopy(self.EMPTY_STATE)
679                 except ValueError:
680                     # Cache file empty, set up new cache
681                     self._state = copy.deepcopy(self.EMPTY_STATE)
682             else:
683                 # No cache file, set empty state
684                 self._state = copy.deepcopy(self.EMPTY_STATE)
685             # Load the previous manifest so we can check if files were modified remotely.
686             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
687
688     def collection_file_paths(self, col, path_prefix='.'):
689         """Return a list of file paths by recursively go through the entire collection `col`"""
690         file_paths = []
691         for name, item in col.items():
692             if isinstance(item, arvados.arvfile.ArvadosFile):
693                 file_paths.append(os.path.join(path_prefix, name))
694             elif isinstance(item, arvados.collection.Subcollection):
695                 new_prefix = os.path.join(path_prefix, name)
696                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
697         return file_paths
698
699     def _lock_file(self, fileobj):
700         try:
701             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
702         except IOError:
703             raise ResumeCacheConflict("{} locked".format(fileobj.name))
704
705     def _save_state(self):
706         """
707         Atomically save current state into cache.
708         """
709         try:
710             with self._state_lock:
711                 # We're not using copy.deepcopy() here because it's a lot slower
712                 # than json.dumps(), and we're already needing JSON format to be
713                 # saved on disk.
714                 state = json.dumps(self._state)
715             new_cache_fd, new_cache_name = tempfile.mkstemp(
716                 dir=os.path.dirname(self._cache_filename))
717             self._lock_file(new_cache_fd)
718             new_cache = os.fdopen(new_cache_fd, 'r+')
719             new_cache.write(state)
720             new_cache.flush()
721             os.fsync(new_cache)
722             os.rename(new_cache_name, self._cache_filename)
723         except (IOError, OSError, ResumeCacheConflict) as error:
724             self.logger.error("There was a problem while saving the cache file: {}".format(error))
725             try:
726                 os.unlink(new_cache_name)
727             except NameError:  # mkstemp failed.
728                 pass
729         else:
730             self._cache_file.close()
731             self._cache_file = new_cache
732
733     def collection_name(self):
734         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
735
736     def manifest_locator(self):
737         return self._my_collection().manifest_locator()
738
739     def portable_data_hash(self):
740         return self._my_collection().portable_data_hash()
741
742     def manifest_text(self, stream_name=".", strip=False, normalize=False):
743         return self._my_collection().manifest_text(stream_name, strip, normalize)
744
745     def _datablocks_on_item(self, item):
746         """
747         Return a list of datablock locators, recursively navigating
748         through subcollections
749         """
750         if isinstance(item, arvados.arvfile.ArvadosFile):
751             if item.size() == 0:
752                 # Empty file locator
753                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
754             else:
755                 locators = []
756                 for segment in item.segments():
757                     loc = segment.locator
758                     locators.append(loc)
759                 return locators
760         elif isinstance(item, arvados.collection.Collection):
761             l = [self._datablocks_on_item(x) for x in item.values()]
762             # Fast list flattener method taken from:
763             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
764             return [loc for sublist in l for loc in sublist]
765         else:
766             return None
767
768     def data_locators(self):
769         with self._collection_lock:
770             # Make sure all datablocks are flushed before getting the locators
771             self._my_collection().manifest_text()
772             datablocks = self._datablocks_on_item(self._my_collection())
773         return datablocks
774
775
776 def expected_bytes_for(pathlist):
777     # Walk the given directory trees and stat files, adding up file sizes,
778     # so we can display progress as percent
779     bytesum = 0
780     for path in pathlist:
781         if os.path.isdir(path):
782             for filename in arvados.util.listdir_recursive(path):
783                 bytesum += os.path.getsize(os.path.join(path, filename))
784         elif not os.path.isfile(path):
785             return None
786         else:
787             bytesum += os.path.getsize(path)
788     return bytesum
789
790 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
791                                                             os.getpid())
792 def machine_progress(bytes_written, bytes_expected):
793     return _machine_format.format(
794         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
795
796 def human_progress(bytes_written, bytes_expected):
797     if bytes_expected:
798         return "\r{}M / {}M {:.1%} ".format(
799             bytes_written >> 20, bytes_expected >> 20,
800             float(bytes_written) / bytes_expected)
801     else:
802         return "\r{} ".format(bytes_written)
803
804 def progress_writer(progress_func, outfile=sys.stderr):
805     def write_progress(bytes_written, bytes_expected):
806         outfile.write(progress_func(bytes_written, bytes_expected))
807     return write_progress
808
809 def exit_signal_handler(sigcode, frame):
810     sys.exit(-sigcode)
811
812 def desired_project_uuid(api_client, project_uuid, num_retries):
813     if not project_uuid:
814         query = api_client.users().current()
815     elif arvados.util.user_uuid_pattern.match(project_uuid):
816         query = api_client.users().get(uuid=project_uuid)
817     elif arvados.util.group_uuid_pattern.match(project_uuid):
818         query = api_client.groups().get(uuid=project_uuid)
819     else:
820         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
821     return query.execute(num_retries=num_retries)['uuid']
822
823 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
824     global api_client
825
826     logger = logging.getLogger('arvados.arv_put')
827     args = parse_arguments(arguments)
828     status = 0
829     if api_client is None:
830         api_client = arvados.api('v1')
831
832     # Determine the name to use
833     if args.name:
834         if args.stream or args.raw:
835             logger.error("Cannot use --name with --stream or --raw")
836             sys.exit(1)
837         elif args.update_collection:
838             logger.error("Cannot use --name with --update-collection")
839             sys.exit(1)
840         collection_name = args.name
841     else:
842         collection_name = "Saved at {} by {}@{}".format(
843             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
844             pwd.getpwuid(os.getuid()).pw_name,
845             socket.gethostname())
846
847     if args.project_uuid and (args.stream or args.raw):
848         logger.error("Cannot use --project-uuid with --stream or --raw")
849         sys.exit(1)
850
851     # Determine the parent project
852     try:
853         project_uuid = desired_project_uuid(api_client, args.project_uuid,
854                                             args.retries)
855     except (apiclient_errors.Error, ValueError) as error:
856         logger.error(error)
857         sys.exit(1)
858
859     if args.progress:
860         reporter = progress_writer(human_progress)
861     elif args.batch_progress:
862         reporter = progress_writer(machine_progress)
863     else:
864         reporter = None
865
866     bytes_expected = expected_bytes_for(args.paths)
867
868     try:
869         writer = ArvPutUploadJob(paths = args.paths,
870                                  resume = args.resume,
871                                  use_cache = args.use_cache,
872                                  filename = args.filename,
873                                  reporter = reporter,
874                                  bytes_expected = bytes_expected,
875                                  num_retries = args.retries,
876                                  replication_desired = args.replication,
877                                  put_threads = args.threads,
878                                  name = collection_name,
879                                  owner_uuid = project_uuid,
880                                  ensure_unique_name = True,
881                                  update_collection = args.update_collection,
882                                  logger=logger,
883                                  dry_run=args.dry_run)
884     except ResumeCacheConflict:
885         logger.error("\n".join([
886             "arv-put: Another process is already uploading this data.",
887             "         Use --no-cache if this is really what you want."]))
888         sys.exit(1)
889     except CollectionUpdateError as error:
890         logger.error("\n".join([
891             "arv-put: %s" % str(error)]))
892         sys.exit(1)
893     except ArvPutUploadIsPending:
894         # Dry run check successful, return proper exit code.
895         sys.exit(2)
896     except ArvPutUploadNotPending:
897         # No files pending for upload
898         sys.exit(0)
899
900     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
901     # the originals.
902     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
903                             for sigcode in CAUGHT_SIGNALS}
904
905     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
906         logger.warning("\n".join([
907             "arv-put: Resuming previous upload from last checkpoint.",
908             "         Use the --no-resume option to start over."]))
909
910     if not args.dry_run:
911         writer.report_progress()
912     output = None
913     try:
914         writer.start(save_collection=not(args.stream or args.raw))
915     except arvados.errors.ApiError as error:
916         logger.error("\n".join([
917             "arv-put: %s" % str(error)]))
918         sys.exit(1)
919     except ArvPutUploadIsPending:
920         # Dry run check successful, return proper exit code.
921         sys.exit(2)
922     except ArvPutUploadNotPending:
923         # No files pending for upload
924         sys.exit(0)
925
926     if args.progress:  # Print newline to split stderr from stdout for humans.
927         logger.info("\n")
928
929     if args.stream:
930         if args.normalize:
931             output = writer.manifest_text(normalize=True)
932         else:
933             output = writer.manifest_text()
934     elif args.raw:
935         output = ','.join(writer.data_locators())
936     else:
937         try:
938             if args.update_collection:
939                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
940             else:
941                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
942             if args.portable_data_hash:
943                 output = writer.portable_data_hash()
944             else:
945                 output = writer.manifest_locator()
946         except apiclient_errors.Error as error:
947             logger.error(
948                 "arv-put: Error creating Collection on project: {}.".format(
949                     error))
950             status = 1
951
952     # Print the locator (uuid) of the new collection.
953     if output is None:
954         status = status or 1
955     else:
956         stdout.write(output)
957         if not output.endswith('\n'):
958             stdout.write('\n')
959
960     for sigcode, orig_handler in orig_signal_handlers.items():
961         signal.signal(sigcode, orig_handler)
962
963     if status != 0:
964         sys.exit(status)
965
966     # Success!
967     return output
968
969
970 if __name__ == '__main__':
971     main()