11308: Merge branch 'master' into 11308-python3
[arvados.git] / sdk / python / arvados / commands / put.py
1 from __future__ import division
2 from future.utils import listitems, listvalues
3 from builtins import str
4 from builtins import object
5 import argparse
6 import arvados
7 import arvados.collection
8 import base64
9 import copy
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import logging
16 import os
17 import pwd
18 import re
19 import signal
20 import socket
21 import sys
22 import tempfile
23 import threading
24 import time
25 import traceback
26
27 from apiclient import errors as apiclient_errors
28 from arvados._version import __version__
29
30 import arvados.commands._util as arv_cmd
31
32 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
33 api_client = None
34
35 upload_opts = argparse.ArgumentParser(add_help=False)
36
37 upload_opts.add_argument('--version', action='version',
38                          version="%s %s" % (sys.argv[0], __version__),
39                          help='Print version and exit.')
40 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
41                          help="""
42 Local file or directory. Default: read from standard input.
43 """)
44
45 _group = upload_opts.add_mutually_exclusive_group()
46
47 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
48                     default=-1, help=argparse.SUPPRESS)
49
50 _group.add_argument('--normalize', action='store_true',
51                     help="""
52 Normalize the manifest by re-ordering files and streams after writing
53 data.
54 """)
55
56 _group.add_argument('--dry-run', action='store_true', default=False,
57                     help="""
58 Don't actually upload files, but only check if any file should be
59 uploaded. Exit with code=2 when files are pending for upload.
60 """)
61
62 _group = upload_opts.add_mutually_exclusive_group()
63
64 _group.add_argument('--as-stream', action='store_true', dest='stream',
65                     help="""
66 Synonym for --stream.
67 """)
68
69 _group.add_argument('--stream', action='store_true',
70                     help="""
71 Store the file content and display the resulting manifest on
72 stdout. Do not write the manifest to Keep or save a Collection object
73 in Arvados.
74 """)
75
76 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
77                     help="""
78 Synonym for --manifest.
79 """)
80
81 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
82                     help="""
83 Synonym for --manifest.
84 """)
85
86 _group.add_argument('--manifest', action='store_true',
87                     help="""
88 Store the file data and resulting manifest in Keep, save a Collection
89 object in Arvados, and display the manifest locator (Collection uuid)
90 on stdout. This is the default behavior.
91 """)
92
93 _group.add_argument('--as-raw', action='store_true', dest='raw',
94                     help="""
95 Synonym for --raw.
96 """)
97
98 _group.add_argument('--raw', action='store_true',
99                     help="""
100 Store the file content and display the data block locators on stdout,
101 separated by commas, with a trailing newline. Do not store a
102 manifest.
103 """)
104
105 upload_opts.add_argument('--update-collection', type=str, default=None,
106                          dest='update_collection', metavar="UUID", help="""
107 Update an existing collection identified by the given Arvados collection
108 UUID. All new local files will be uploaded.
109 """)
110
111 upload_opts.add_argument('--use-filename', type=str, default=None,
112                          dest='filename', help="""
113 Synonym for --filename.
114 """)
115
116 upload_opts.add_argument('--filename', type=str, default=None,
117                          help="""
118 Use the given filename in the manifest, instead of the name of the
119 local file. This is useful when "-" or "/dev/stdin" is given as an
120 input file. It can be used only if there is exactly one path given and
121 it is not a directory. Implies --manifest.
122 """)
123
124 upload_opts.add_argument('--portable-data-hash', action='store_true',
125                          help="""
126 Print the portable data hash instead of the Arvados UUID for the collection
127 created by the upload.
128 """)
129
130 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
131                          help="""
132 Set the replication level for the new collection: how many different
133 physical storage devices (e.g., disks) should have a copy of each data
134 block. Default is to use the server-provided default (if any) or 2.
135 """)
136
137 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
138                          help="""
139 Set the number of upload threads to be used. Take into account that
140 using lots of threads will increase the RAM requirements. Default is
141 to use 2 threads.
142 On high latency installations, using a greater number will improve
143 overall throughput.
144 """)
145
146 run_opts = argparse.ArgumentParser(add_help=False)
147
148 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
149 Store the collection in the specified project, instead of your Home
150 project.
151 """)
152
153 run_opts.add_argument('--name', help="""
154 Save the collection with the specified name.
155 """)
156
157 _group = run_opts.add_mutually_exclusive_group()
158 _group.add_argument('--progress', action='store_true',
159                     help="""
160 Display human-readable progress on stderr (bytes and, if possible,
161 percentage of total data size). This is the default behavior when
162 stderr is a tty.
163 """)
164
165 _group.add_argument('--no-progress', action='store_true',
166                     help="""
167 Do not display human-readable progress on stderr, even if stderr is a
168 tty.
169 """)
170
171 _group.add_argument('--batch-progress', action='store_true',
172                     help="""
173 Display machine-readable progress on stderr (bytes and, if known,
174 total data size).
175 """)
176
177 _group = run_opts.add_mutually_exclusive_group()
178 _group.add_argument('--resume', action='store_true', default=True,
179                     help="""
180 Continue interrupted uploads from cached state (default).
181 """)
182 _group.add_argument('--no-resume', action='store_false', dest='resume',
183                     help="""
184 Do not continue interrupted uploads from cached state.
185 """)
186
187 _group = run_opts.add_mutually_exclusive_group()
188 _group.add_argument('--follow-links', action='store_true', default=True,
189                     dest='follow_links', help="""
190 Follow file and directory symlinks (default).
191 """)
192 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
193                     help="""
194 Do not follow file and directory symlinks.
195 """)
196
197 _group = run_opts.add_mutually_exclusive_group()
198 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
199                     help="""
200 Save upload state in a cache file for resuming (default).
201 """)
202 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
203                     help="""
204 Do not save upload state in a cache file for resuming.
205 """)
206
207 arg_parser = argparse.ArgumentParser(
208     description='Copy data from the local filesystem to Keep.',
209     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
210
211 def parse_arguments(arguments):
212     args = arg_parser.parse_args(arguments)
213
214     if len(args.paths) == 0:
215         args.paths = ['-']
216
217     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
218
219     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
220         if args.filename:
221             arg_parser.error("""
222     --filename argument cannot be used when storing a directory or
223     multiple files.
224     """)
225
226     # Turn on --progress by default if stderr is a tty.
227     if (not (args.batch_progress or args.no_progress)
228         and os.isatty(sys.stderr.fileno())):
229         args.progress = True
230
231     # Turn off --resume (default) if --no-cache is used.
232     if not args.use_cache:
233         args.resume = False
234
235     if args.paths == ['-']:
236         if args.update_collection:
237             arg_parser.error("""
238     --update-collection cannot be used when reading from stdin.
239     """)
240         args.resume = False
241         args.use_cache = False
242         if not args.filename:
243             args.filename = 'stdin'
244
245     return args
246
247
248 class PathDoesNotExistError(Exception):
249     pass
250
251
252 class CollectionUpdateError(Exception):
253     pass
254
255
256 class ResumeCacheConflict(Exception):
257     pass
258
259
260 class ArvPutArgumentConflict(Exception):
261     pass
262
263
264 class ArvPutUploadIsPending(Exception):
265     pass
266
267
268 class ArvPutUploadNotPending(Exception):
269     pass
270
271
272 class FileUploadList(list):
273     def __init__(self, dry_run=False):
274         list.__init__(self)
275         self.dry_run = dry_run
276
277     def append(self, other):
278         if self.dry_run:
279             raise ArvPutUploadIsPending()
280         super(FileUploadList, self).append(other)
281
282
283 class ResumeCache(object):
284     CACHE_DIR = '.cache/arvados/arv-put'
285
286     def __init__(self, file_spec):
287         self.cache_file = open(file_spec, 'a+')
288         self._lock_file(self.cache_file)
289         self.filename = self.cache_file.name
290
291     @classmethod
292     def make_path(cls, args):
293         md5 = hashlib.md5()
294         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
295         realpaths = sorted(os.path.realpath(path) for path in args.paths)
296         md5.update(b'\0'.join([p.encode() for p in realpaths]))
297         if any(os.path.isdir(path) for path in realpaths):
298             md5.update(b'-1')
299         elif args.filename:
300             md5.update(args.filename.encode())
301         return os.path.join(
302             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
303             md5.hexdigest())
304
305     def _lock_file(self, fileobj):
306         try:
307             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
308         except IOError:
309             raise ResumeCacheConflict("{} locked".format(fileobj.name))
310
311     def load(self):
312         self.cache_file.seek(0)
313         return json.load(self.cache_file)
314
315     def check_cache(self, api_client=None, num_retries=0):
316         try:
317             state = self.load()
318             locator = None
319             try:
320                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
321                     locator = state["_finished_streams"][0][1][0]
322                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
323                     locator = state["_current_stream_locators"][0]
324                 if locator is not None:
325                     kc = arvados.keep.KeepClient(api_client=api_client)
326                     kc.head(locator, num_retries=num_retries)
327             except Exception as e:
328                 self.restart()
329         except (ValueError):
330             pass
331
332     def save(self, data):
333         try:
334             new_cache_fd, new_cache_name = tempfile.mkstemp(
335                 dir=os.path.dirname(self.filename))
336             self._lock_file(new_cache_fd)
337             new_cache = os.fdopen(new_cache_fd, 'r+')
338             json.dump(data, new_cache)
339             os.rename(new_cache_name, self.filename)
340         except (IOError, OSError, ResumeCacheConflict) as error:
341             try:
342                 os.unlink(new_cache_name)
343             except NameError:  # mkstemp failed.
344                 pass
345         else:
346             self.cache_file.close()
347             self.cache_file = new_cache
348
349     def close(self):
350         self.cache_file.close()
351
352     def destroy(self):
353         try:
354             os.unlink(self.filename)
355         except OSError as error:
356             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
357                 raise
358         self.close()
359
360     def restart(self):
361         self.destroy()
362         self.__init__(self.filename)
363
364
365 class ArvPutUploadJob(object):
366     CACHE_DIR = '.cache/arvados/arv-put'
367     EMPTY_STATE = {
368         'manifest' : None, # Last saved manifest checkpoint
369         'files' : {} # Previous run file list: {path : {size, mtime}}
370     }
371
372     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
373                  bytes_expected=None, name=None, owner_uuid=None,
374                  ensure_unique_name=False, num_retries=None,
375                  put_threads=None, replication_desired=None,
376                  filename=None, update_time=60.0, update_collection=None,
377                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
378                  follow_links=True):
379         self.paths = paths
380         self.resume = resume
381         self.use_cache = use_cache
382         self.update = False
383         self.reporter = reporter
384         self.bytes_expected = bytes_expected
385         self.bytes_written = 0
386         self.bytes_skipped = 0
387         self.name = name
388         self.owner_uuid = owner_uuid
389         self.ensure_unique_name = ensure_unique_name
390         self.num_retries = num_retries
391         self.replication_desired = replication_desired
392         self.put_threads = put_threads
393         self.filename = filename
394         self._state_lock = threading.Lock()
395         self._state = None # Previous run state (file list & manifest)
396         self._current_files = [] # Current run file list
397         self._cache_file = None
398         self._collection_lock = threading.Lock()
399         self._remote_collection = None # Collection being updated (if asked)
400         self._local_collection = None # Collection from previous run manifest
401         self._file_paths = set() # Files to be updated in remote collection
402         self._stop_checkpointer = threading.Event()
403         self._checkpointer = threading.Thread(target=self._update_task)
404         self._checkpointer.daemon = True
405         self._update_task_time = update_time  # How many seconds wait between update runs
406         self._files_to_upload = FileUploadList(dry_run=dry_run)
407         self._upload_started = False
408         self.logger = logger
409         self.dry_run = dry_run
410         self._checkpoint_before_quit = True
411         self.follow_links = follow_links
412
413         if not self.use_cache and self.resume:
414             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
415
416         # Check for obvious dry-run responses
417         if self.dry_run and (not self.use_cache or not self.resume):
418             raise ArvPutUploadIsPending()
419
420         # Load cached data if any and if needed
421         self._setup_state(update_collection)
422
423     def start(self, save_collection):
424         """
425         Start supporting thread & file uploading
426         """
427         if not self.dry_run:
428             self._checkpointer.start()
429         try:
430             for path in self.paths:
431                 # Test for stdin first, in case some file named '-' exist
432                 if path == '-':
433                     if self.dry_run:
434                         raise ArvPutUploadIsPending()
435                     self._write_stdin(self.filename or 'stdin')
436                 elif not os.path.exists(path):
437                      raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
438                 elif os.path.isdir(path):
439                     # Use absolute paths on cache index so CWD doesn't interfere
440                     # with the caching logic.
441                     prefixdir = path = os.path.abspath(path)
442                     if prefixdir != '/':
443                         prefixdir += '/'
444                     for root, dirs, files in os.walk(path, followlinks=self.follow_links):
445                         # Make os.walk()'s dir traversing order deterministic
446                         dirs.sort()
447                         files.sort()
448                         for f in files:
449                             self._check_file(os.path.join(root, f),
450                                              os.path.join(root[len(prefixdir):], f))
451                 else:
452                     self._check_file(os.path.abspath(path),
453                                      self.filename or os.path.basename(path))
454             # If dry-mode is on, and got up to this point, then we should notify that
455             # there aren't any file to upload.
456             if self.dry_run:
457                 raise ArvPutUploadNotPending()
458             # Remove local_collection's files that don't exist locally anymore, so the
459             # bytes_written count is correct.
460             for f in self.collection_file_paths(self._local_collection,
461                                                 path_prefix=""):
462                 if f != 'stdin' and f != self.filename and not f in self._file_paths:
463                     self._local_collection.remove(f)
464             # Update bytes_written from current local collection and
465             # report initial progress.
466             self._update()
467             # Actual file upload
468             self._upload_started = True # Used by the update thread to start checkpointing
469             self._upload_files()
470         except (SystemExit, Exception) as e:
471             self._checkpoint_before_quit = False
472             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
473             # Note: We're expecting SystemExit instead of
474             # KeyboardInterrupt because we have a custom signal
475             # handler in place that raises SystemExit with the catched
476             # signal's code.
477             if isinstance(e, PathDoesNotExistError):
478                 # We aren't interested in the traceback for this case
479                 pass
480             elif not isinstance(e, SystemExit) or e.code != -2:
481                 self.logger.warning("Abnormal termination:\n{}".format(
482                     traceback.format_exc()))
483             raise
484         finally:
485             if not self.dry_run:
486                 # Stop the thread before doing anything else
487                 self._stop_checkpointer.set()
488                 self._checkpointer.join()
489                 if self._checkpoint_before_quit:
490                     # Commit all pending blocks & one last _update()
491                     self._local_collection.manifest_text()
492                     self._update(final=True)
493                     if save_collection:
494                         self.save_collection()
495             if self.use_cache:
496                 self._cache_file.close()
497
498     def save_collection(self):
499         if self.update:
500             # Check if files should be updated on the remote collection.
501             for fp in self._file_paths:
502                 remote_file = self._remote_collection.find(fp)
503                 if not remote_file:
504                     # File don't exist on remote collection, copy it.
505                     self._remote_collection.copy(fp, fp, self._local_collection)
506                 elif remote_file != self._local_collection.find(fp):
507                     # A different file exist on remote collection, overwrite it.
508                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
509                 else:
510                     # The file already exist on remote collection, skip it.
511                     pass
512             self._remote_collection.save(num_retries=self.num_retries)
513         else:
514             self._local_collection.save_new(
515                 name=self.name, owner_uuid=self.owner_uuid,
516                 ensure_unique_name=self.ensure_unique_name,
517                 num_retries=self.num_retries)
518
519     def destroy_cache(self):
520         if self.use_cache:
521             try:
522                 os.unlink(self._cache_filename)
523             except OSError as error:
524                 # That's what we wanted anyway.
525                 if error.errno != errno.ENOENT:
526                     raise
527             self._cache_file.close()
528
529     def _collection_size(self, collection):
530         """
531         Recursively get the total size of the collection
532         """
533         size = 0
534         for item in listvalues(collection):
535             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
536                 size += self._collection_size(item)
537             else:
538                 size += item.size()
539         return size
540
541     def _update_task(self):
542         """
543         Periodically called support task. File uploading is
544         asynchronous so we poll status from the collection.
545         """
546         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
547             self._update()
548
549     def _update(self, final=False):
550         """
551         Update cached manifest text and report progress.
552         """
553         if self._upload_started:
554             with self._collection_lock:
555                 self.bytes_written = self._collection_size(self._local_collection)
556                 if self.use_cache:
557                     if final:
558                         manifest = self._local_collection.manifest_text()
559                     else:
560                         # Get the manifest text without comitting pending blocks
561                         manifest = self._local_collection.manifest_text(strip=False,
562                                                                         normalize=False,
563                                                                         only_committed=True)
564                     # Update cache
565                     with self._state_lock:
566                         self._state['manifest'] = manifest
567             if self.use_cache:
568                 try:
569                     self._save_state()
570                 except Exception as e:
571                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
572         else:
573             self.bytes_written = self.bytes_skipped
574         # Call the reporter, if any
575         self.report_progress()
576
577     def report_progress(self):
578         if self.reporter is not None:
579             self.reporter(self.bytes_written, self.bytes_expected)
580
581     def _write_stdin(self, filename):
582         output = self._local_collection.open(filename, 'wb')
583         self._write(sys.stdin, output)
584         output.close()
585
586     def _check_file(self, source, filename):
587         """
588         Check if this file needs to be uploaded
589         """
590         # Ignore symlinks when requested
591         if (not self.follow_links) and os.path.islink(source):
592             return
593         resume_offset = 0
594         should_upload = False
595         new_file_in_cache = False
596         # Record file path for updating the remote collection before exiting
597         self._file_paths.add(filename)
598
599         with self._state_lock:
600             # If no previous cached data on this file, store it for an eventual
601             # repeated run.
602             if source not in self._state['files']:
603                 self._state['files'][source] = {
604                     'mtime': os.path.getmtime(source),
605                     'size' : os.path.getsize(source)
606                 }
607                 new_file_in_cache = True
608             cached_file_data = self._state['files'][source]
609
610         # Check if file was already uploaded (at least partially)
611         file_in_local_collection = self._local_collection.find(filename)
612
613         # If not resuming, upload the full file.
614         if not self.resume:
615             should_upload = True
616         # New file detected from last run, upload it.
617         elif new_file_in_cache:
618             should_upload = True
619         # Local file didn't change from last run.
620         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
621             if not file_in_local_collection:
622                 # File not uploaded yet, upload it completely
623                 should_upload = True
624             elif file_in_local_collection.permission_expired():
625                 # Permission token expired, re-upload file. This will change whenever
626                 # we have a API for refreshing tokens.
627                 should_upload = True
628                 self._local_collection.remove(filename)
629             elif cached_file_data['size'] == file_in_local_collection.size():
630                 # File already there, skip it.
631                 self.bytes_skipped += cached_file_data['size']
632             elif cached_file_data['size'] > file_in_local_collection.size():
633                 # File partially uploaded, resume!
634                 resume_offset = file_in_local_collection.size()
635                 self.bytes_skipped += resume_offset
636                 should_upload = True
637             else:
638                 # Inconsistent cache, re-upload the file
639                 should_upload = True
640                 self._local_collection.remove(filename)
641                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
642         # Local file differs from cached data, re-upload it.
643         else:
644             if file_in_local_collection:
645                 self._local_collection.remove(filename)
646             should_upload = True
647
648         if should_upload:
649             self._files_to_upload.append((source, resume_offset, filename))
650
651     def _upload_files(self):
652         for source, resume_offset, filename in self._files_to_upload:
653             with open(source, 'rb') as source_fd:
654                 with self._state_lock:
655                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
656                     self._state['files'][source]['size'] = os.path.getsize(source)
657                 if resume_offset > 0:
658                     # Start upload where we left off
659                     output = self._local_collection.open(filename, 'ab')
660                     source_fd.seek(resume_offset)
661                 else:
662                     # Start from scratch
663                     output = self._local_collection.open(filename, 'wb')
664                 self._write(source_fd, output)
665                 output.close(flush=False)
666
667     def _write(self, source_fd, output):
668         while True:
669             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
670             if not data:
671                 break
672             output.write(data)
673
674     def _my_collection(self):
675         return self._remote_collection if self.update else self._local_collection
676
677     def _setup_state(self, update_collection):
678         """
679         Create a new cache file or load a previously existing one.
680         """
681         # Load an already existing collection for update
682         if update_collection and re.match(arvados.util.collection_uuid_pattern,
683                                           update_collection):
684             try:
685                 self._remote_collection = arvados.collection.Collection(update_collection)
686             except arvados.errors.ApiError as error:
687                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
688             else:
689                 self.update = True
690         elif update_collection:
691             # Collection locator provided, but unknown format
692             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
693
694         if self.use_cache:
695             # Set up cache file name from input paths.
696             md5 = hashlib.md5()
697             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
698             realpaths = sorted(os.path.realpath(path) for path in self.paths)
699             md5.update(b'\0'.join([p.encode() for p in realpaths]))
700             if self.filename:
701                 md5.update(self.filename.encode())
702             cache_filename = md5.hexdigest()
703             cache_filepath = os.path.join(
704                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
705                 cache_filename)
706             if self.resume and os.path.exists(cache_filepath):
707                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
708                 self._cache_file = open(cache_filepath, 'a+')
709             else:
710                 # --no-resume means start with a empty cache file.
711                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
712                 self._cache_file = open(cache_filepath, 'w+')
713             self._cache_filename = self._cache_file.name
714             self._lock_file(self._cache_file)
715             self._cache_file.seek(0)
716
717         with self._state_lock:
718             if self.use_cache:
719                 try:
720                     self._state = json.load(self._cache_file)
721                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
722                         # Cache at least partially incomplete, set up new cache
723                         self._state = copy.deepcopy(self.EMPTY_STATE)
724                 except ValueError:
725                     # Cache file empty, set up new cache
726                     self._state = copy.deepcopy(self.EMPTY_STATE)
727             else:
728                 self.logger.info("No cache usage requested for this run.")
729                 # No cache file, set empty state
730                 self._state = copy.deepcopy(self.EMPTY_STATE)
731             # Load the previous manifest so we can check if files were modified remotely.
732             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
733
734     def collection_file_paths(self, col, path_prefix='.'):
735         """Return a list of file paths by recursively go through the entire collection `col`"""
736         file_paths = []
737         for name, item in listitems(col):
738             if isinstance(item, arvados.arvfile.ArvadosFile):
739                 file_paths.append(os.path.join(path_prefix, name))
740             elif isinstance(item, arvados.collection.Subcollection):
741                 new_prefix = os.path.join(path_prefix, name)
742                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
743         return file_paths
744
745     def _lock_file(self, fileobj):
746         try:
747             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
748         except IOError:
749             raise ResumeCacheConflict("{} locked".format(fileobj.name))
750
751     def _save_state(self):
752         """
753         Atomically save current state into cache.
754         """
755         with self._state_lock:
756             # We're not using copy.deepcopy() here because it's a lot slower
757             # than json.dumps(), and we're already needing JSON format to be
758             # saved on disk.
759             state = json.dumps(self._state)
760         try:
761             new_cache = tempfile.NamedTemporaryFile(
762                 mode='w+',
763                 dir=os.path.dirname(self._cache_filename), delete=False)
764             self._lock_file(new_cache)
765             new_cache.write(state)
766             new_cache.flush()
767             os.fsync(new_cache)
768             os.rename(new_cache.name, self._cache_filename)
769         except (IOError, OSError, ResumeCacheConflict) as error:
770             self.logger.error("There was a problem while saving the cache file: {}".format(error))
771             try:
772                 os.unlink(new_cache_name)
773             except NameError:  # mkstemp failed.
774                 pass
775         else:
776             self._cache_file.close()
777             self._cache_file = new_cache
778
779     def collection_name(self):
780         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
781
782     def manifest_locator(self):
783         return self._my_collection().manifest_locator()
784
785     def portable_data_hash(self):
786         pdh = self._my_collection().portable_data_hash()
787         m = self._my_collection().stripped_manifest().encode()
788         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
789         if pdh != local_pdh:
790             logger.warning("\n".join([
791                 "arv-put: API server provided PDH differs from local manifest.",
792                 "         This should not happen; showing API server version."]))
793         return pdh
794
795     def manifest_text(self, stream_name=".", strip=False, normalize=False):
796         return self._my_collection().manifest_text(stream_name, strip, normalize)
797
798     def _datablocks_on_item(self, item):
799         """
800         Return a list of datablock locators, recursively navigating
801         through subcollections
802         """
803         if isinstance(item, arvados.arvfile.ArvadosFile):
804             if item.size() == 0:
805                 # Empty file locator
806                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
807             else:
808                 locators = []
809                 for segment in item.segments():
810                     loc = segment.locator
811                     locators.append(loc)
812                 return locators
813         elif isinstance(item, arvados.collection.Collection):
814             l = [self._datablocks_on_item(x) for x in listvalues(item)]
815             # Fast list flattener method taken from:
816             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
817             return [loc for sublist in l for loc in sublist]
818         else:
819             return None
820
821     def data_locators(self):
822         with self._collection_lock:
823             # Make sure all datablocks are flushed before getting the locators
824             self._my_collection().manifest_text()
825             datablocks = self._datablocks_on_item(self._my_collection())
826         return datablocks
827
828
829 def expected_bytes_for(pathlist, follow_links=True):
830     # Walk the given directory trees and stat files, adding up file sizes,
831     # so we can display progress as percent
832     bytesum = 0
833     for path in pathlist:
834         if os.path.isdir(path):
835             for root, dirs, files in os.walk(path, followlinks=follow_links):
836                 # Sum file sizes
837                 for f in files:
838                     filepath = os.path.join(root, f)
839                     # Ignore symlinked files when requested
840                     if (not follow_links) and os.path.islink(filepath):
841                         continue
842                     bytesum += os.path.getsize(filepath)
843         elif not os.path.isfile(path):
844             return None
845         else:
846             bytesum += os.path.getsize(path)
847     return bytesum
848
849 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
850                                                             os.getpid())
851 def machine_progress(bytes_written, bytes_expected):
852     return _machine_format.format(
853         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
854
855 def human_progress(bytes_written, bytes_expected):
856     if bytes_expected:
857         return "\r{}M / {}M {:.1%} ".format(
858             bytes_written >> 20, bytes_expected >> 20,
859             float(bytes_written) / bytes_expected)
860     else:
861         return "\r{} ".format(bytes_written)
862
863 def progress_writer(progress_func, outfile=sys.stderr):
864     def write_progress(bytes_written, bytes_expected):
865         outfile.write(progress_func(bytes_written, bytes_expected))
866     return write_progress
867
868 def exit_signal_handler(sigcode, frame):
869     sys.exit(-sigcode)
870
871 def desired_project_uuid(api_client, project_uuid, num_retries):
872     if not project_uuid:
873         query = api_client.users().current()
874     elif arvados.util.user_uuid_pattern.match(project_uuid):
875         query = api_client.users().get(uuid=project_uuid)
876     elif arvados.util.group_uuid_pattern.match(project_uuid):
877         query = api_client.groups().get(uuid=project_uuid)
878     else:
879         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
880     return query.execute(num_retries=num_retries)['uuid']
881
882 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
883     global api_client
884
885     logger = logging.getLogger('arvados.arv_put')
886     logger.setLevel(logging.INFO)
887     args = parse_arguments(arguments)
888     status = 0
889     if api_client is None:
890         api_client = arvados.api('v1')
891
892     # Determine the name to use
893     if args.name:
894         if args.stream or args.raw:
895             logger.error("Cannot use --name with --stream or --raw")
896             sys.exit(1)
897         elif args.update_collection:
898             logger.error("Cannot use --name with --update-collection")
899             sys.exit(1)
900         collection_name = args.name
901     else:
902         collection_name = "Saved at {} by {}@{}".format(
903             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
904             pwd.getpwuid(os.getuid()).pw_name,
905             socket.gethostname())
906
907     if args.project_uuid and (args.stream or args.raw):
908         logger.error("Cannot use --project-uuid with --stream or --raw")
909         sys.exit(1)
910
911     # Determine the parent project
912     try:
913         project_uuid = desired_project_uuid(api_client, args.project_uuid,
914                                             args.retries)
915     except (apiclient_errors.Error, ValueError) as error:
916         logger.error(error)
917         sys.exit(1)
918
919     if args.progress:
920         reporter = progress_writer(human_progress)
921     elif args.batch_progress:
922         reporter = progress_writer(machine_progress)
923     else:
924         reporter = None
925
926     # If this is used by a human, and there's at least one directory to be
927     # uploaded, the expected bytes calculation can take a moment.
928     if args.progress and any([os.path.isdir(f) for f in args.paths]):
929         logger.info("Calculating upload size, this could take some time...")
930     bytes_expected = expected_bytes_for(args.paths, follow_links=args.follow_links)
931
932     try:
933         writer = ArvPutUploadJob(paths = args.paths,
934                                  resume = args.resume,
935                                  use_cache = args.use_cache,
936                                  filename = args.filename,
937                                  reporter = reporter,
938                                  bytes_expected = bytes_expected,
939                                  num_retries = args.retries,
940                                  replication_desired = args.replication,
941                                  put_threads = args.threads,
942                                  name = collection_name,
943                                  owner_uuid = project_uuid,
944                                  ensure_unique_name = True,
945                                  update_collection = args.update_collection,
946                                  logger=logger,
947                                  dry_run=args.dry_run,
948                                  follow_links=args.follow_links)
949     except ResumeCacheConflict:
950         logger.error("\n".join([
951             "arv-put: Another process is already uploading this data.",
952             "         Use --no-cache if this is really what you want."]))
953         sys.exit(1)
954     except CollectionUpdateError as error:
955         logger.error("\n".join([
956             "arv-put: %s" % str(error)]))
957         sys.exit(1)
958     except ArvPutUploadIsPending:
959         # Dry run check successful, return proper exit code.
960         sys.exit(2)
961     except ArvPutUploadNotPending:
962         # No files pending for upload
963         sys.exit(0)
964
965     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
966     # the originals.
967     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
968                             for sigcode in CAUGHT_SIGNALS}
969
970     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
971         logger.warning("\n".join([
972             "arv-put: Resuming previous upload from last checkpoint.",
973             "         Use the --no-resume option to start over."]))
974
975     if not args.dry_run:
976         writer.report_progress()
977     output = None
978     try:
979         writer.start(save_collection=not(args.stream or args.raw))
980     except arvados.errors.ApiError as error:
981         logger.error("\n".join([
982             "arv-put: %s" % str(error)]))
983         sys.exit(1)
984     except ArvPutUploadIsPending:
985         # Dry run check successful, return proper exit code.
986         sys.exit(2)
987     except ArvPutUploadNotPending:
988         # No files pending for upload
989         sys.exit(0)
990     except PathDoesNotExistError as error:
991         logger.error("\n".join([
992             "arv-put: %s" % str(error)]))
993         sys.exit(1)
994
995     if args.progress:  # Print newline to split stderr from stdout for humans.
996         logger.info("\n")
997
998     if args.stream:
999         if args.normalize:
1000             output = writer.manifest_text(normalize=True)
1001         else:
1002             output = writer.manifest_text()
1003     elif args.raw:
1004         output = ','.join(writer.data_locators())
1005     else:
1006         try:
1007             if args.update_collection:
1008                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
1009             else:
1010                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
1011             if args.portable_data_hash:
1012                 output = writer.portable_data_hash()
1013             else:
1014                 output = writer.manifest_locator()
1015         except apiclient_errors.Error as error:
1016             logger.error(
1017                 "arv-put: Error creating Collection on project: {}.".format(
1018                     error))
1019             status = 1
1020
1021     # Print the locator (uuid) of the new collection.
1022     if output is None:
1023         status = status or 1
1024     else:
1025         stdout.write(output)
1026         if not output.endswith('\n'):
1027             stdout.write('\n')
1028
1029     for sigcode, orig_handler in listitems(orig_signal_handlers):
1030         signal.signal(sigcode, orig_handler)
1031
1032     if status != 0:
1033         sys.exit(status)
1034
1035     # Success!
1036     return output
1037
1038
1039 if __name__ == '__main__':
1040     main()