11308: Fix arvfile append mode: write() changes the file pointer.
[arvados.git] / sdk / python / arvados / commands / put.py
1 from __future__ import division
2 from future.utils import listitems, listvalues
3 from builtins import str
4 from builtins import object
5 import argparse
6 import arvados
7 import arvados.collection
8 import base64
9 import copy
10 import datetime
11 import errno
12 import fcntl
13 import hashlib
14 import json
15 import logging
16 import os
17 import pwd
18 import re
19 import signal
20 import socket
21 import sys
22 import tempfile
23 import threading
24 import time
25 import traceback
26
27 from apiclient import errors as apiclient_errors
28 from arvados._version import __version__
29
30 import arvados.commands._util as arv_cmd
31
32 CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
33 api_client = None
34
35 upload_opts = argparse.ArgumentParser(add_help=False)
36
37 upload_opts.add_argument('--version', action='version',
38                          version="%s %s" % (sys.argv[0], __version__),
39                          help='Print version and exit.')
40 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
41                          help="""
42 Local file or directory. Default: read from standard input.
43 """)
44
45 _group = upload_opts.add_mutually_exclusive_group()
46
47 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
48                     default=-1, help=argparse.SUPPRESS)
49
50 _group.add_argument('--normalize', action='store_true',
51                     help="""
52 Normalize the manifest by re-ordering files and streams after writing
53 data.
54 """)
55
56 _group.add_argument('--dry-run', action='store_true', default=False,
57                     help="""
58 Don't actually upload files, but only check if any file should be
59 uploaded. Exit with code=2 when files are pending for upload.
60 """)
61
62 _group = upload_opts.add_mutually_exclusive_group()
63
64 _group.add_argument('--as-stream', action='store_true', dest='stream',
65                     help="""
66 Synonym for --stream.
67 """)
68
69 _group.add_argument('--stream', action='store_true',
70                     help="""
71 Store the file content and display the resulting manifest on
72 stdout. Do not write the manifest to Keep or save a Collection object
73 in Arvados.
74 """)
75
76 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
77                     help="""
78 Synonym for --manifest.
79 """)
80
81 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
82                     help="""
83 Synonym for --manifest.
84 """)
85
86 _group.add_argument('--manifest', action='store_true',
87                     help="""
88 Store the file data and resulting manifest in Keep, save a Collection
89 object in Arvados, and display the manifest locator (Collection uuid)
90 on stdout. This is the default behavior.
91 """)
92
93 _group.add_argument('--as-raw', action='store_true', dest='raw',
94                     help="""
95 Synonym for --raw.
96 """)
97
98 _group.add_argument('--raw', action='store_true',
99                     help="""
100 Store the file content and display the data block locators on stdout,
101 separated by commas, with a trailing newline. Do not store a
102 manifest.
103 """)
104
105 upload_opts.add_argument('--update-collection', type=str, default=None,
106                          dest='update_collection', metavar="UUID", help="""
107 Update an existing collection identified by the given Arvados collection
108 UUID. All new local files will be uploaded.
109 """)
110
111 upload_opts.add_argument('--use-filename', type=str, default=None,
112                          dest='filename', help="""
113 Synonym for --filename.
114 """)
115
116 upload_opts.add_argument('--filename', type=str, default=None,
117                          help="""
118 Use the given filename in the manifest, instead of the name of the
119 local file. This is useful when "-" or "/dev/stdin" is given as an
120 input file. It can be used only if there is exactly one path given and
121 it is not a directory. Implies --manifest.
122 """)
123
124 upload_opts.add_argument('--portable-data-hash', action='store_true',
125                          help="""
126 Print the portable data hash instead of the Arvados UUID for the collection
127 created by the upload.
128 """)
129
130 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
131                          help="""
132 Set the replication level for the new collection: how many different
133 physical storage devices (e.g., disks) should have a copy of each data
134 block. Default is to use the server-provided default (if any) or 2.
135 """)
136
137 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
138                          help="""
139 Set the number of upload threads to be used. Take into account that
140 using lots of threads will increase the RAM requirements. Default is
141 to use 2 threads.
142 On high latency installations, using a greater number will improve
143 overall throughput.
144 """)
145
146 run_opts = argparse.ArgumentParser(add_help=False)
147
148 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
149 Store the collection in the specified project, instead of your Home
150 project.
151 """)
152
153 run_opts.add_argument('--name', help="""
154 Save the collection with the specified name.
155 """)
156
157 _group = run_opts.add_mutually_exclusive_group()
158 _group.add_argument('--progress', action='store_true',
159                     help="""
160 Display human-readable progress on stderr (bytes and, if possible,
161 percentage of total data size). This is the default behavior when
162 stderr is a tty.
163 """)
164
165 _group.add_argument('--no-progress', action='store_true',
166                     help="""
167 Do not display human-readable progress on stderr, even if stderr is a
168 tty.
169 """)
170
171 _group.add_argument('--batch-progress', action='store_true',
172                     help="""
173 Display machine-readable progress on stderr (bytes and, if known,
174 total data size).
175 """)
176
177 _group = run_opts.add_mutually_exclusive_group()
178 _group.add_argument('--resume', action='store_true', default=True,
179                     help="""
180 Continue interrupted uploads from cached state (default).
181 """)
182 _group.add_argument('--no-resume', action='store_false', dest='resume',
183                     help="""
184 Do not continue interrupted uploads from cached state.
185 """)
186
187 _group = run_opts.add_mutually_exclusive_group()
188 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
189                     help="""
190 Save upload state in a cache file for resuming (default).
191 """)
192 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
193                     help="""
194 Do not save upload state in a cache file for resuming.
195 """)
196
197 arg_parser = argparse.ArgumentParser(
198     description='Copy data from the local filesystem to Keep.',
199     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
200
201 def parse_arguments(arguments):
202     args = arg_parser.parse_args(arguments)
203
204     if len(args.paths) == 0:
205         args.paths = ['-']
206
207     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
208
209     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
210         if args.filename:
211             arg_parser.error("""
212     --filename argument cannot be used when storing a directory or
213     multiple files.
214     """)
215
216     # Turn on --progress by default if stderr is a tty.
217     if (not (args.batch_progress or args.no_progress)
218         and os.isatty(sys.stderr.fileno())):
219         args.progress = True
220
221     # Turn off --resume (default) if --no-cache is used.
222     if not args.use_cache:
223         args.resume = False
224
225     if args.paths == ['-']:
226         if args.update_collection:
227             arg_parser.error("""
228     --update-collection cannot be used when reading from stdin.
229     """)
230         args.resume = False
231         args.use_cache = False
232         if not args.filename:
233             args.filename = 'stdin'
234
235     return args
236
237
238 class CollectionUpdateError(Exception):
239     pass
240
241
242 class ResumeCacheConflict(Exception):
243     pass
244
245
246 class ArvPutArgumentConflict(Exception):
247     pass
248
249
250 class ArvPutUploadIsPending(Exception):
251     pass
252
253
254 class ArvPutUploadNotPending(Exception):
255     pass
256
257
258 class FileUploadList(list):
259     def __init__(self, dry_run=False):
260         list.__init__(self)
261         self.dry_run = dry_run
262
263     def append(self, other):
264         if self.dry_run:
265             raise ArvPutUploadIsPending()
266         super(FileUploadList, self).append(other)
267
268
269 class ResumeCache(object):
270     CACHE_DIR = '.cache/arvados/arv-put'
271
272     def __init__(self, file_spec):
273         self.cache_file = open(file_spec, 'a+')
274         self._lock_file(self.cache_file)
275         self.filename = self.cache_file.name
276
277     @classmethod
278     def make_path(cls, args):
279         md5 = hashlib.md5()
280         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
281         realpaths = sorted(os.path.realpath(path) for path in args.paths)
282         md5.update(b'\0'.join([p.encode() for p in realpaths]))
283         if any(os.path.isdir(path) for path in realpaths):
284             md5.update(b'-1')
285         elif args.filename:
286             md5.update(args.filename.encode())
287         return os.path.join(
288             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
289             md5.hexdigest())
290
291     def _lock_file(self, fileobj):
292         try:
293             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
294         except IOError:
295             raise ResumeCacheConflict("{} locked".format(fileobj.name))
296
297     def load(self):
298         self.cache_file.seek(0)
299         return json.load(self.cache_file)
300
301     def check_cache(self, api_client=None, num_retries=0):
302         try:
303             state = self.load()
304             locator = None
305             try:
306                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
307                     locator = state["_finished_streams"][0][1][0]
308                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
309                     locator = state["_current_stream_locators"][0]
310                 if locator is not None:
311                     kc = arvados.keep.KeepClient(api_client=api_client)
312                     kc.head(locator, num_retries=num_retries)
313             except Exception as e:
314                 self.restart()
315         except (ValueError):
316             pass
317
318     def save(self, data):
319         try:
320             new_cache_fd, new_cache_name = tempfile.mkstemp(
321                 dir=os.path.dirname(self.filename))
322             self._lock_file(new_cache_fd)
323             new_cache = os.fdopen(new_cache_fd, 'r+')
324             json.dump(data, new_cache)
325             os.rename(new_cache_name, self.filename)
326         except (IOError, OSError, ResumeCacheConflict) as error:
327             try:
328                 os.unlink(new_cache_name)
329             except NameError:  # mkstemp failed.
330                 pass
331         else:
332             self.cache_file.close()
333             self.cache_file = new_cache
334
335     def close(self):
336         self.cache_file.close()
337
338     def destroy(self):
339         try:
340             os.unlink(self.filename)
341         except OSError as error:
342             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
343                 raise
344         self.close()
345
346     def restart(self):
347         self.destroy()
348         self.__init__(self.filename)
349
350
351 class ArvPutUploadJob(object):
352     CACHE_DIR = '.cache/arvados/arv-put'
353     EMPTY_STATE = {
354         'manifest' : None, # Last saved manifest checkpoint
355         'files' : {} # Previous run file list: {path : {size, mtime}}
356     }
357
358     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
359                  bytes_expected=None, name=None, owner_uuid=None,
360                  ensure_unique_name=False, num_retries=None,
361                  put_threads=None, replication_desired=None,
362                  filename=None, update_time=60.0, update_collection=None,
363                  logger=logging.getLogger('arvados.arv_put'), dry_run=False):
364         self.paths = paths
365         self.resume = resume
366         self.use_cache = use_cache
367         self.update = False
368         self.reporter = reporter
369         self.bytes_expected = bytes_expected
370         self.bytes_written = 0
371         self.bytes_skipped = 0
372         self.name = name
373         self.owner_uuid = owner_uuid
374         self.ensure_unique_name = ensure_unique_name
375         self.num_retries = num_retries
376         self.replication_desired = replication_desired
377         self.put_threads = put_threads
378         self.filename = filename
379         self._state_lock = threading.Lock()
380         self._state = None # Previous run state (file list & manifest)
381         self._current_files = [] # Current run file list
382         self._cache_file = None
383         self._collection_lock = threading.Lock()
384         self._remote_collection = None # Collection being updated (if asked)
385         self._local_collection = None # Collection from previous run manifest
386         self._file_paths = set() # Files to be updated in remote collection
387         self._stop_checkpointer = threading.Event()
388         self._checkpointer = threading.Thread(target=self._update_task)
389         self._checkpointer.daemon = True
390         self._update_task_time = update_time  # How many seconds wait between update runs
391         self._files_to_upload = FileUploadList(dry_run=dry_run)
392         self._upload_started = False
393         self.logger = logger
394         self.dry_run = dry_run
395         self._checkpoint_before_quit = True
396
397         if not self.use_cache and self.resume:
398             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
399
400         # Check for obvious dry-run responses
401         if self.dry_run and (not self.use_cache or not self.resume):
402             raise ArvPutUploadIsPending()
403
404         # Load cached data if any and if needed
405         self._setup_state(update_collection)
406
407     def start(self, save_collection):
408         """
409         Start supporting thread & file uploading
410         """
411         if not self.dry_run:
412             self._checkpointer.start()
413         try:
414             for path in self.paths:
415                 # Test for stdin first, in case some file named '-' exist
416                 if path == '-':
417                     if self.dry_run:
418                         raise ArvPutUploadIsPending()
419                     self._write_stdin(self.filename or 'stdin')
420                 elif os.path.isdir(path):
421                     # Use absolute paths on cache index so CWD doesn't interfere
422                     # with the caching logic.
423                     prefixdir = path = os.path.abspath(path)
424                     if prefixdir != '/':
425                         prefixdir += '/'
426                     for root, dirs, files in os.walk(path):
427                         # Make os.walk()'s dir traversing order deterministic
428                         dirs.sort()
429                         files.sort()
430                         for f in files:
431                             self._check_file(os.path.join(root, f),
432                                              os.path.join(root[len(prefixdir):], f))
433                 else:
434                     self._check_file(os.path.abspath(path),
435                                      self.filename or os.path.basename(path))
436             # If dry-mode is on, and got up to this point, then we should notify that
437             # there aren't any file to upload.
438             if self.dry_run:
439                 raise ArvPutUploadNotPending()
440             # Remove local_collection's files that don't exist locally anymore, so the
441             # bytes_written count is correct.
442             for f in self.collection_file_paths(self._local_collection,
443                                                 path_prefix=""):
444                 if f != 'stdin' and f != self.filename and not f in self._file_paths:
445                     self._local_collection.remove(f)
446             # Update bytes_written from current local collection and
447             # report initial progress.
448             self._update()
449             # Actual file upload
450             self._upload_started = True # Used by the update thread to start checkpointing
451             self._upload_files()
452         except (SystemExit, Exception) as e:
453             self._checkpoint_before_quit = False
454             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
455             # Note: We're expecting SystemExit instead of KeyboardInterrupt because
456             #   we have a custom signal handler in place that raises SystemExit with
457             #   the catched signal's code.
458             if not isinstance(e, SystemExit) or e.code != -2:
459                 self.logger.warning("Abnormal termination:\n{}".format(
460                     traceback.format_exc()))
461             raise
462         finally:
463             if not self.dry_run:
464                 # Stop the thread before doing anything else
465                 self._stop_checkpointer.set()
466                 self._checkpointer.join()
467                 if self._checkpoint_before_quit:
468                     # Commit all pending blocks & one last _update()
469                     self._local_collection.manifest_text()
470                     self._update(final=True)
471                     if save_collection:
472                         self.save_collection()
473             if self.use_cache:
474                 self._cache_file.close()
475
476     def save_collection(self):
477         if self.update:
478             # Check if files should be updated on the remote collection.
479             for fp in self._file_paths:
480                 remote_file = self._remote_collection.find(fp)
481                 if not remote_file:
482                     # File don't exist on remote collection, copy it.
483                     self._remote_collection.copy(fp, fp, self._local_collection)
484                 elif remote_file != self._local_collection.find(fp):
485                     # A different file exist on remote collection, overwrite it.
486                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
487                 else:
488                     # The file already exist on remote collection, skip it.
489                     pass
490             self._remote_collection.save(num_retries=self.num_retries)
491         else:
492             self._local_collection.save_new(
493                 name=self.name, owner_uuid=self.owner_uuid,
494                 ensure_unique_name=self.ensure_unique_name,
495                 num_retries=self.num_retries)
496
497     def destroy_cache(self):
498         if self.use_cache:
499             try:
500                 os.unlink(self._cache_filename)
501             except OSError as error:
502                 # That's what we wanted anyway.
503                 if error.errno != errno.ENOENT:
504                     raise
505             self._cache_file.close()
506
507     def _collection_size(self, collection):
508         """
509         Recursively get the total size of the collection
510         """
511         size = 0
512         for item in listvalues(collection):
513             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
514                 size += self._collection_size(item)
515             else:
516                 size += item.size()
517         return size
518
519     def _update_task(self):
520         """
521         Periodically called support task. File uploading is
522         asynchronous so we poll status from the collection.
523         """
524         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
525             self._update()
526
527     def _update(self, final=False):
528         """
529         Update cached manifest text and report progress.
530         """
531         if self._upload_started:
532             with self._collection_lock:
533                 self.bytes_written = self._collection_size(self._local_collection)
534                 if self.use_cache:
535                     if final:
536                         manifest = self._local_collection.manifest_text()
537                     else:
538                         # Get the manifest text without comitting pending blocks
539                         manifest = self._local_collection.manifest_text(strip=False,
540                                                                         normalize=False,
541                                                                         only_committed=True)
542                     # Update cache
543                     with self._state_lock:
544                         self._state['manifest'] = manifest
545             if self.use_cache:
546                 try:
547                     self._save_state()
548                 except Exception as e:
549                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
550         else:
551             self.bytes_written = self.bytes_skipped
552         # Call the reporter, if any
553         self.report_progress()
554
555     def report_progress(self):
556         if self.reporter is not None:
557             self.reporter(self.bytes_written, self.bytes_expected)
558
559     def _write_stdin(self, filename):
560         output = self._local_collection.open(filename, 'wb')
561         self._write(sys.stdin, output)
562         output.close()
563
564     def _check_file(self, source, filename):
565         """Check if this file needs to be uploaded"""
566         resume_offset = 0
567         should_upload = False
568         new_file_in_cache = False
569         # Record file path for updating the remote collection before exiting
570         self._file_paths.add(filename)
571
572         with self._state_lock:
573             # If no previous cached data on this file, store it for an eventual
574             # repeated run.
575             if source not in self._state['files']:
576                 self._state['files'][source] = {
577                     'mtime': os.path.getmtime(source),
578                     'size' : os.path.getsize(source)
579                 }
580                 new_file_in_cache = True
581             cached_file_data = self._state['files'][source]
582
583         # Check if file was already uploaded (at least partially)
584         file_in_local_collection = self._local_collection.find(filename)
585
586         # If not resuming, upload the full file.
587         if not self.resume:
588             should_upload = True
589         # New file detected from last run, upload it.
590         elif new_file_in_cache:
591             should_upload = True
592         # Local file didn't change from last run.
593         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
594             if not file_in_local_collection:
595                 # File not uploaded yet, upload it completely
596                 should_upload = True
597             elif file_in_local_collection.permission_expired():
598                 # Permission token expired, re-upload file. This will change whenever
599                 # we have a API for refreshing tokens.
600                 should_upload = True
601                 self._local_collection.remove(filename)
602             elif cached_file_data['size'] == file_in_local_collection.size():
603                 # File already there, skip it.
604                 self.bytes_skipped += cached_file_data['size']
605             elif cached_file_data['size'] > file_in_local_collection.size():
606                 # File partially uploaded, resume!
607                 resume_offset = file_in_local_collection.size()
608                 self.bytes_skipped += resume_offset
609                 should_upload = True
610             else:
611                 # Inconsistent cache, re-upload the file
612                 should_upload = True
613                 self._local_collection.remove(filename)
614                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
615         # Local file differs from cached data, re-upload it.
616         else:
617             if file_in_local_collection:
618                 self._local_collection.remove(filename)
619             should_upload = True
620
621         if should_upload:
622             self._files_to_upload.append((source, resume_offset, filename))
623
624     def _upload_files(self):
625         for source, resume_offset, filename in self._files_to_upload:
626             with open(source, 'rb') as source_fd:
627                 with self._state_lock:
628                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
629                     self._state['files'][source]['size'] = os.path.getsize(source)
630                 if resume_offset > 0:
631                     # Start upload where we left off
632                     output = self._local_collection.open(filename, 'ab')
633                     source_fd.seek(resume_offset)
634                 else:
635                     # Start from scratch
636                     output = self._local_collection.open(filename, 'wb')
637                 self._write(source_fd, output)
638                 output.close(flush=False)
639
640     def _write(self, source_fd, output):
641         while True:
642             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
643             if not data:
644                 break
645             output.write(data)
646
647     def _my_collection(self):
648         return self._remote_collection if self.update else self._local_collection
649
650     def _setup_state(self, update_collection):
651         """
652         Create a new cache file or load a previously existing one.
653         """
654         # Load an already existing collection for update
655         if update_collection and re.match(arvados.util.collection_uuid_pattern,
656                                           update_collection):
657             try:
658                 self._remote_collection = arvados.collection.Collection(update_collection)
659             except arvados.errors.ApiError as error:
660                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
661             else:
662                 self.update = True
663         elif update_collection:
664             # Collection locator provided, but unknown format
665             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
666
667         if self.use_cache:
668             # Set up cache file name from input paths.
669             md5 = hashlib.md5()
670             md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
671             realpaths = sorted(os.path.realpath(path) for path in self.paths)
672             md5.update(b'\0'.join([p.encode() for p in realpaths]))
673             if self.filename:
674                 md5.update(self.filename.encode())
675             cache_filename = md5.hexdigest()
676             cache_filepath = os.path.join(
677                 arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
678                 cache_filename)
679             if self.resume and os.path.exists(cache_filepath):
680                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
681                 self._cache_file = open(cache_filepath, 'a+')
682             else:
683                 # --no-resume means start with a empty cache file.
684                 self.logger.info("Creating new cache file at {}".format(cache_filepath))
685                 self._cache_file = open(cache_filepath, 'w+')
686             self._cache_filename = self._cache_file.name
687             self._lock_file(self._cache_file)
688             self._cache_file.seek(0)
689
690         with self._state_lock:
691             if self.use_cache:
692                 try:
693                     self._state = json.load(self._cache_file)
694                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
695                         # Cache at least partially incomplete, set up new cache
696                         self._state = copy.deepcopy(self.EMPTY_STATE)
697                 except ValueError:
698                     # Cache file empty, set up new cache
699                     self._state = copy.deepcopy(self.EMPTY_STATE)
700             else:
701                 self.logger.info("No cache usage requested for this run.")
702                 # No cache file, set empty state
703                 self._state = copy.deepcopy(self.EMPTY_STATE)
704             # Load the previous manifest so we can check if files were modified remotely.
705             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
706
707     def collection_file_paths(self, col, path_prefix='.'):
708         """Return a list of file paths by recursively go through the entire collection `col`"""
709         file_paths = []
710         for name, item in listitems(col):
711             if isinstance(item, arvados.arvfile.ArvadosFile):
712                 file_paths.append(os.path.join(path_prefix, name))
713             elif isinstance(item, arvados.collection.Subcollection):
714                 new_prefix = os.path.join(path_prefix, name)
715                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
716         return file_paths
717
718     def _lock_file(self, fileobj):
719         try:
720             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
721         except IOError:
722             raise ResumeCacheConflict("{} locked".format(fileobj.name))
723
724     def _save_state(self):
725         """
726         Atomically save current state into cache.
727         """
728         with self._state_lock:
729             # We're not using copy.deepcopy() here because it's a lot slower
730             # than json.dumps(), and we're already needing JSON format to be
731             # saved on disk.
732             state = json.dumps(self._state)
733         try:
734             new_cache = tempfile.NamedTemporaryFile(
735                 mode='w+',
736                 dir=os.path.dirname(self._cache_filename), delete=False)
737             self._lock_file(new_cache)
738             new_cache.write(state)
739             new_cache.flush()
740             os.fsync(new_cache)
741             os.rename(new_cache.name, self._cache_filename)
742         except (IOError, OSError, ResumeCacheConflict) as error:
743             self.logger.error("There was a problem while saving the cache file: {}".format(error))
744             try:
745                 os.unlink(new_cache_name)
746             except NameError:  # mkstemp failed.
747                 pass
748         else:
749             self._cache_file.close()
750             self._cache_file = new_cache
751
752     def collection_name(self):
753         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
754
755     def manifest_locator(self):
756         return self._my_collection().manifest_locator()
757
758     def portable_data_hash(self):
759         pdh = self._my_collection().portable_data_hash()
760         m = self._my_collection().stripped_manifest().encode()
761         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
762         if pdh != local_pdh:
763             logger.warning("\n".join([
764                 "arv-put: API server provided PDH differs from local manifest.",
765                 "         This should not happen; showing API server version."]))
766         return pdh
767
768     def manifest_text(self, stream_name=".", strip=False, normalize=False):
769         return self._my_collection().manifest_text(stream_name, strip, normalize)
770
771     def _datablocks_on_item(self, item):
772         """
773         Return a list of datablock locators, recursively navigating
774         through subcollections
775         """
776         if isinstance(item, arvados.arvfile.ArvadosFile):
777             if item.size() == 0:
778                 # Empty file locator
779                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
780             else:
781                 locators = []
782                 for segment in item.segments():
783                     loc = segment.locator
784                     locators.append(loc)
785                 return locators
786         elif isinstance(item, arvados.collection.Collection):
787             l = [self._datablocks_on_item(x) for x in listvalues(item)]
788             # Fast list flattener method taken from:
789             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
790             return [loc for sublist in l for loc in sublist]
791         else:
792             return None
793
794     def data_locators(self):
795         with self._collection_lock:
796             # Make sure all datablocks are flushed before getting the locators
797             self._my_collection().manifest_text()
798             datablocks = self._datablocks_on_item(self._my_collection())
799         return datablocks
800
801
802 def expected_bytes_for(pathlist):
803     # Walk the given directory trees and stat files, adding up file sizes,
804     # so we can display progress as percent
805     bytesum = 0
806     for path in pathlist:
807         if os.path.isdir(path):
808             for filename in arvados.util.listdir_recursive(path):
809                 bytesum += os.path.getsize(os.path.join(path, filename))
810         elif not os.path.isfile(path):
811             return None
812         else:
813             bytesum += os.path.getsize(path)
814     return bytesum
815
816 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
817                                                             os.getpid())
818 def machine_progress(bytes_written, bytes_expected):
819     return _machine_format.format(
820         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
821
822 def human_progress(bytes_written, bytes_expected):
823     if bytes_expected:
824         return "\r{}M / {}M {:.1%} ".format(
825             bytes_written >> 20, bytes_expected >> 20,
826             float(bytes_written) / bytes_expected)
827     else:
828         return "\r{} ".format(bytes_written)
829
830 def progress_writer(progress_func, outfile=sys.stderr):
831     def write_progress(bytes_written, bytes_expected):
832         outfile.write(progress_func(bytes_written, bytes_expected))
833     return write_progress
834
835 def exit_signal_handler(sigcode, frame):
836     sys.exit(-sigcode)
837
838 def desired_project_uuid(api_client, project_uuid, num_retries):
839     if not project_uuid:
840         query = api_client.users().current()
841     elif arvados.util.user_uuid_pattern.match(project_uuid):
842         query = api_client.users().get(uuid=project_uuid)
843     elif arvados.util.group_uuid_pattern.match(project_uuid):
844         query = api_client.groups().get(uuid=project_uuid)
845     else:
846         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
847     return query.execute(num_retries=num_retries)['uuid']
848
849 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
850     global api_client
851
852     logger = logging.getLogger('arvados.arv_put')
853     logger.setLevel(logging.INFO)
854     args = parse_arguments(arguments)
855     status = 0
856     if api_client is None:
857         api_client = arvados.api('v1')
858
859     # Determine the name to use
860     if args.name:
861         if args.stream or args.raw:
862             logger.error("Cannot use --name with --stream or --raw")
863             sys.exit(1)
864         elif args.update_collection:
865             logger.error("Cannot use --name with --update-collection")
866             sys.exit(1)
867         collection_name = args.name
868     else:
869         collection_name = "Saved at {} by {}@{}".format(
870             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
871             pwd.getpwuid(os.getuid()).pw_name,
872             socket.gethostname())
873
874     if args.project_uuid and (args.stream or args.raw):
875         logger.error("Cannot use --project-uuid with --stream or --raw")
876         sys.exit(1)
877
878     # Determine the parent project
879     try:
880         project_uuid = desired_project_uuid(api_client, args.project_uuid,
881                                             args.retries)
882     except (apiclient_errors.Error, ValueError) as error:
883         logger.error(error)
884         sys.exit(1)
885
886     if args.progress:
887         reporter = progress_writer(human_progress)
888     elif args.batch_progress:
889         reporter = progress_writer(machine_progress)
890     else:
891         reporter = None
892
893     # If this is used by a human, and there's at least one directory to be
894     # uploaded, the expected bytes calculation can take a moment.
895     if args.progress and any([os.path.isdir(f) for f in args.paths]):
896         logger.info("Calculating upload size, this could take some time...")
897     bytes_expected = expected_bytes_for(args.paths)
898
899     try:
900         writer = ArvPutUploadJob(paths = args.paths,
901                                  resume = args.resume,
902                                  use_cache = args.use_cache,
903                                  filename = args.filename,
904                                  reporter = reporter,
905                                  bytes_expected = bytes_expected,
906                                  num_retries = args.retries,
907                                  replication_desired = args.replication,
908                                  put_threads = args.threads,
909                                  name = collection_name,
910                                  owner_uuid = project_uuid,
911                                  ensure_unique_name = True,
912                                  update_collection = args.update_collection,
913                                  logger=logger,
914                                  dry_run=args.dry_run)
915     except ResumeCacheConflict:
916         logger.error("\n".join([
917             "arv-put: Another process is already uploading this data.",
918             "         Use --no-cache if this is really what you want."]))
919         sys.exit(1)
920     except CollectionUpdateError as error:
921         logger.error("\n".join([
922             "arv-put: %s" % str(error)]))
923         sys.exit(1)
924     except ArvPutUploadIsPending:
925         # Dry run check successful, return proper exit code.
926         sys.exit(2)
927     except ArvPutUploadNotPending:
928         # No files pending for upload
929         sys.exit(0)
930
931     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
932     # the originals.
933     orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
934                             for sigcode in CAUGHT_SIGNALS}
935
936     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
937         logger.warning("\n".join([
938             "arv-put: Resuming previous upload from last checkpoint.",
939             "         Use the --no-resume option to start over."]))
940
941     if not args.dry_run:
942         writer.report_progress()
943     output = None
944     try:
945         writer.start(save_collection=not(args.stream or args.raw))
946     except arvados.errors.ApiError as error:
947         logger.error("\n".join([
948             "arv-put: %s" % str(error)]))
949         sys.exit(1)
950     except ArvPutUploadIsPending:
951         # Dry run check successful, return proper exit code.
952         sys.exit(2)
953     except ArvPutUploadNotPending:
954         # No files pending for upload
955         sys.exit(0)
956
957     if args.progress:  # Print newline to split stderr from stdout for humans.
958         logger.info("\n")
959
960     if args.stream:
961         if args.normalize:
962             output = writer.manifest_text(normalize=True)
963         else:
964             output = writer.manifest_text()
965     elif args.raw:
966         output = ','.join(writer.data_locators())
967     else:
968         try:
969             if args.update_collection:
970                 logger.info("Collection updated: '{}'".format(writer.collection_name()))
971             else:
972                 logger.info("Collection saved as '{}'".format(writer.collection_name()))
973             if args.portable_data_hash:
974                 output = writer.portable_data_hash()
975             else:
976                 output = writer.manifest_locator()
977         except apiclient_errors.Error as error:
978             logger.error(
979                 "arv-put: Error creating Collection on project: {}.".format(
980                     error))
981             status = 1
982
983     # Print the locator (uuid) of the new collection.
984     if output is None:
985         status = status or 1
986     else:
987         stdout.write(output)
988         if not output.endswith('\n'):
989             stdout.write('\n')
990
991     for sigcode, orig_handler in listitems(orig_signal_handlers):
992         signal.signal(sigcode, orig_handler)
993
994     if status != 0:
995         sys.exit(status)
996
997     # Success!
998     return output
999
1000
1001 if __name__ == '__main__':
1002     main()