Merge branch '21846-rightclick-context-menu'
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import arvados
7 import arvados.collection
8 import base64
9 import ciso8601
10 import copy
11 import datetime
12 import errno
13 import fcntl
14 import fnmatch
15 import hashlib
16 import json
17 import logging
18 import os
19 import pwd
20 import re
21 import signal
22 import socket
23 import sys
24 import tempfile
25 import threading
26 import time
27 import traceback
28
29 from pathlib import Path
30
31 from apiclient import errors as apiclient_errors
32 from arvados._version import __version__
33
34 import arvados.util
35 import arvados.commands._util as arv_cmd
36
37 api_client = None
38
39 upload_opts = argparse.ArgumentParser(add_help=False)
40
41 upload_opts.add_argument('--version', action='version',
42                          version="%s %s" % (sys.argv[0], __version__),
43                          help='Print version and exit.')
44 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
45                          help="""
46 Local file or directory. If path is a directory reference with a trailing
47 slash, then just upload the directory's contents; otherwise upload the
48 directory itself. Default: read from standard input.
49 """)
50
51 _group = upload_opts.add_mutually_exclusive_group()
52
53 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
54                     default=-1, help=argparse.SUPPRESS)
55
56 _group.add_argument('--normalize', action='store_true',
57                     help="""
58 Normalize the manifest by re-ordering files and streams after writing
59 data.
60 """)
61
62 _group.add_argument('--dry-run', action='store_true', default=False,
63                     help="""
64 Don't actually upload files, but only check if any file should be
65 uploaded. Exit with code=2 when files are pending for upload.
66 """)
67
68 _group = upload_opts.add_mutually_exclusive_group()
69
70 _group.add_argument('--as-stream', action='store_true', dest='stream',
71                     help="""
72 Synonym for --stream.
73 """)
74
75 _group.add_argument('--stream', action='store_true',
76                     help="""
77 Store the file content and display the resulting manifest on
78 stdout. Do not save a Collection object in Arvados.
79 """)
80
81 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
82                     help="""
83 Synonym for --manifest.
84 """)
85
86 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
87                     help="""
88 Synonym for --manifest.
89 """)
90
91 _group.add_argument('--manifest', action='store_true',
92                     help="""
93 Store the file data and resulting manifest in Keep, save a Collection
94 object in Arvados, and display the manifest locator (Collection uuid)
95 on stdout. This is the default behavior.
96 """)
97
98 _group.add_argument('--as-raw', action='store_true', dest='raw',
99                     help="""
100 Synonym for --raw.
101 """)
102
103 _group.add_argument('--raw', action='store_true',
104                     help="""
105 Store the file content and display the data block locators on stdout,
106 separated by commas, with a trailing newline. Do not store a
107 manifest.
108 """)
109
110 upload_opts.add_argument('--update-collection', type=str, default=None,
111                          dest='update_collection', metavar="UUID", help="""
112 Update an existing collection identified by the given Arvados collection
113 UUID. All new local files will be uploaded.
114 """)
115
116 upload_opts.add_argument('--use-filename', type=str, default=None,
117                          dest='filename', help="""
118 Synonym for --filename.
119 """)
120
121 upload_opts.add_argument('--filename', type=str, default=None,
122                          help="""
123 Use the given filename in the manifest, instead of the name of the
124 local file. This is useful when "-" or "/dev/stdin" is given as an
125 input file. It can be used only if there is exactly one path given and
126 it is not a directory. Implies --manifest.
127 """)
128
129 upload_opts.add_argument('--portable-data-hash', action='store_true',
130                          help="""
131 Print the portable data hash instead of the Arvados UUID for the collection
132 created by the upload.
133 """)
134
135 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
136                          help="""
137 Set the replication level for the new collection: how many different
138 physical storage devices (e.g., disks) should have a copy of each data
139 block. Default is to use the server-provided default (if any) or 2.
140 """)
141
142 upload_opts.add_argument('--storage-classes', help="""
143 Specify comma separated list of storage classes to be used when saving data to Keep.
144 """)
145
146 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
147                          help="""
148 Set the number of upload threads to be used. Take into account that
149 using lots of threads will increase the RAM requirements. Default is
150 to use 2 threads.
151 On high latency installations, using a greater number will improve
152 overall throughput.
153 """)
154
155 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
156                       action='append', help="""
157 Exclude files and directories whose names match the given glob pattern. When
158 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
159 directory, relative to the provided input dirs will be excluded.
160 When using a filename pattern like '*.txt', any text file will be excluded
161 no matter where it is placed.
162 For the special case of needing to exclude only files or dirs directly below
163 the given input directory, you can use a pattern like './exclude_this.gif'.
164 You can specify multiple patterns by using this argument more than once.
165 """)
166
167 _group = upload_opts.add_mutually_exclusive_group()
168 _group.add_argument('--follow-links', action='store_true', default=True,
169                     dest='follow_links', help="""
170 Follow file and directory symlinks (default).
171 """)
172 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
173                     help="""
174 Ignore file and directory symlinks. Even paths given explicitly on the
175 command line will be skipped if they are symlinks.
176 """)
177
178
179 run_opts = argparse.ArgumentParser(add_help=False)
180
181 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
182 Store the collection in the specified project, instead of your Home
183 project.
184 """)
185
186 run_opts.add_argument('--name', help="""
187 Save the collection with the specified name.
188 """)
189
190 _group = run_opts.add_mutually_exclusive_group()
191 _group.add_argument('--progress', action='store_true',
192                     help="""
193 Display human-readable progress on stderr (bytes and, if possible,
194 percentage of total data size). This is the default behavior when
195 stderr is a tty.
196 """)
197
198 _group.add_argument('--no-progress', action='store_true',
199                     help="""
200 Do not display human-readable progress on stderr, even if stderr is a
201 tty.
202 """)
203
204 _group.add_argument('--batch-progress', action='store_true',
205                     help="""
206 Display machine-readable progress on stderr (bytes and, if known,
207 total data size).
208 """)
209
210 run_opts.add_argument('--silent', action='store_true',
211                       help="""
212 Do not print any debug messages to console. (Any error messages will
213 still be displayed.)
214 """)
215
216 run_opts.add_argument('--batch', action='store_true', default=False,
217                       help="""
218 Retries with '--no-resume --no-cache' if cached state contains invalid/expired
219 block signatures.
220 """)
221
222 _group = run_opts.add_mutually_exclusive_group()
223 _group.add_argument('--resume', action='store_true', default=True,
224                     help="""
225 Continue interrupted uploads from cached state (default).
226 """)
227 _group.add_argument('--no-resume', action='store_false', dest='resume',
228                     help="""
229 Do not continue interrupted uploads from cached state.
230 """)
231
232 _group = run_opts.add_mutually_exclusive_group()
233 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
234                     help="""
235 Save upload state in a cache file for resuming (default).
236 """)
237 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
238                     help="""
239 Do not save upload state in a cache file for resuming.
240 """)
241
242 _group = upload_opts.add_mutually_exclusive_group()
243 _group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
244                     help="""
245 Set the trash date of the resulting collection to an absolute date in the future.
246 The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
247 Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
248 """)
249 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
250                     help="""
251 Set the trash date of the resulting collection to an amount of days from the
252 date/time that the upload process finishes.
253 """)
254
255 arg_parser = argparse.ArgumentParser(
256     description='Copy data from the local filesystem to Keep.',
257     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
258
259 def parse_arguments(arguments):
260     args = arg_parser.parse_args(arguments)
261
262     if len(args.paths) == 0:
263         args.paths = ['-']
264
265     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
266
267     if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
268         arg_parser.error("""
269     --filename argument cannot be used when storing a directory or
270     multiple files.
271     """)
272
273     # Turn on --progress by default if stderr is a tty.
274     if (not (args.batch_progress or args.no_progress or args.silent)
275         and os.isatty(sys.stderr.fileno())):
276         args.progress = True
277
278     # Turn off --resume (default) if --no-cache is used.
279     if not args.use_cache:
280         args.resume = False
281
282     if args.paths == ['-']:
283         if args.update_collection:
284             arg_parser.error("""
285     --update-collection cannot be used when reading from stdin.
286     """)
287         args.resume = False
288         args.use_cache = False
289         if not args.filename:
290             args.filename = 'stdin'
291
292     # Remove possible duplicated patterns
293     if len(args.exclude) > 0:
294         args.exclude = list(set(args.exclude))
295
296     return args
297
298
299 class PathDoesNotExistError(Exception):
300     pass
301
302
303 class CollectionUpdateError(Exception):
304     pass
305
306
307 class ResumeCacheConflict(Exception):
308     pass
309
310
311 class ResumeCacheInvalidError(Exception):
312     pass
313
314 class ArvPutArgumentConflict(Exception):
315     pass
316
317
318 class ArvPutUploadIsPending(Exception):
319     pass
320
321
322 class ArvPutUploadNotPending(Exception):
323     pass
324
325
326 class FileUploadList(list):
327     def __init__(self, dry_run=False):
328         list.__init__(self)
329         self.dry_run = dry_run
330
331     def append(self, other):
332         if self.dry_run:
333             raise ArvPutUploadIsPending()
334         super(FileUploadList, self).append(other)
335
336
337 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
338 class ArvPutLogFormatter(logging.Formatter):
339     std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
340     err_fmtr = None
341     request_id_informed = False
342
343     def __init__(self, request_id):
344         self.err_fmtr = logging.Formatter(
345             arvados.log_format+' (X-Request-Id: {})'.format(request_id),
346             arvados.log_date_format)
347
348     def format(self, record):
349         if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
350             self.request_id_informed = True
351             return self.err_fmtr.format(record)
352         return self.std_fmtr.format(record)
353
354
355 class ResumeCache(object):
356     CACHE_DIR = 'arv-put'
357
358     def __init__(self, file_spec):
359         self.cache_file = open(file_spec, 'a+')
360         self._lock_file(self.cache_file)
361         self.filename = self.cache_file.name
362
363     @classmethod
364     def make_path(cls, args):
365         md5 = hashlib.md5()
366         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
367         realpaths = sorted(os.path.realpath(path) for path in args.paths)
368         md5.update(b'\0'.join([p.encode() for p in realpaths]))
369         if any(os.path.isdir(path) for path in realpaths):
370             md5.update(b'-1')
371         elif args.filename:
372             md5.update(args.filename.encode())
373         cache_path = Path(cls.CACHE_DIR)
374         if len(cache_path.parts) == 1:
375             cache_path = arvados.util._BaseDirectories('CACHE').storage_path(cache_path)
376         else:
377             # Note this is a noop if cache_path is absolute, which is what we want.
378             cache_path = Path.home() / cache_path
379             cache_path.mkdir(parents=True, exist_ok=True, mode=0o700)
380         return str(cache_path / md5.hexdigest())
381
382     def _lock_file(self, fileobj):
383         try:
384             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
385         except IOError:
386             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
387
388     def load(self):
389         self.cache_file.seek(0)
390         return json.load(self.cache_file)
391
392     def check_cache(self, api_client=None, num_retries=0):
393         try:
394             state = self.load()
395             locator = None
396             try:
397                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
398                     locator = state["_finished_streams"][0][1][0]
399                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
400                     locator = state["_current_stream_locators"][0]
401                 if locator is not None:
402                     kc = arvados.keep.KeepClient(api_client=api_client)
403                     kc.head(locator, num_retries=num_retries)
404             except Exception as e:
405                 self.restart()
406         except (ValueError):
407             pass
408
409     def save(self, data):
410         try:
411             new_cache_fd, new_cache_name = tempfile.mkstemp(
412                 dir=os.path.dirname(self.filename))
413             self._lock_file(new_cache_fd)
414             new_cache = os.fdopen(new_cache_fd, 'r+')
415             json.dump(data, new_cache)
416             os.rename(new_cache_name, self.filename)
417         except (IOError, OSError, ResumeCacheConflict):
418             try:
419                 os.unlink(new_cache_name)
420             except NameError:  # mkstemp failed.
421                 pass
422         else:
423             self.cache_file.close()
424             self.cache_file = new_cache
425
426     def close(self):
427         self.cache_file.close()
428
429     def destroy(self):
430         try:
431             os.unlink(self.filename)
432         except OSError as error:
433             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
434                 raise
435         self.close()
436
437     def restart(self):
438         self.destroy()
439         self.__init__(self.filename)
440
441
442 class ArvPutUploadJob(object):
443     CACHE_DIR = 'arv-put'
444     EMPTY_STATE = {
445         'manifest' : None, # Last saved manifest checkpoint
446         'files' : {} # Previous run file list: {path : {size, mtime}}
447     }
448
449     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
450                  name=None, owner_uuid=None, api_client=None, batch_mode=False,
451                  ensure_unique_name=False, num_retries=None,
452                  put_threads=None, replication_desired=None, filename=None,
453                  update_time=60.0, update_collection=None, storage_classes=None,
454                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
455                  follow_links=True, exclude_paths=[], exclude_names=None,
456                  trash_at=None):
457         self.paths = paths
458         self.resume = resume
459         self.use_cache = use_cache
460         self.batch_mode = batch_mode
461         self.update = False
462         self.reporter = reporter
463         # This will set to 0 before start counting, if no special files are going
464         # to be read.
465         self.bytes_expected = None
466         self.bytes_written = 0
467         self.bytes_skipped = 0
468         self.name = name
469         self.owner_uuid = owner_uuid
470         self.ensure_unique_name = ensure_unique_name
471         self.num_retries = num_retries
472         self.replication_desired = replication_desired
473         self.put_threads = put_threads
474         self.filename = filename
475         self.storage_classes = storage_classes
476         self._api_client = api_client
477         self._state_lock = threading.Lock()
478         self._state = None # Previous run state (file list & manifest)
479         self._current_files = [] # Current run file list
480         self._cache_file = None
481         self._collection_lock = threading.Lock()
482         self._remote_collection = None # Collection being updated (if asked)
483         self._local_collection = None # Collection from previous run manifest
484         self._file_paths = set() # Files to be updated in remote collection
485         self._stop_checkpointer = threading.Event()
486         self._checkpointer = threading.Thread(target=self._update_task)
487         self._checkpointer.daemon = True
488         self._update_task_time = update_time  # How many seconds wait between update runs
489         self._files_to_upload = FileUploadList(dry_run=dry_run)
490         self._upload_started = False
491         self.logger = logger
492         self.dry_run = dry_run
493         self._checkpoint_before_quit = True
494         self.follow_links = follow_links
495         self.exclude_paths = exclude_paths
496         self.exclude_names = exclude_names
497         self._trash_at = trash_at
498
499         if self._trash_at is not None:
500             if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
501                 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
502             if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
503                 raise TypeError('provided trash_at datetime should be timezone-naive')
504
505         if not self.use_cache and self.resume:
506             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
507
508         # Check for obvious dry-run responses
509         if self.dry_run and (not self.use_cache or not self.resume):
510             raise ArvPutUploadIsPending()
511
512         # Load cached data if any and if needed
513         self._setup_state(update_collection)
514
515         # Build the upload file list, excluding requested files and counting the
516         # bytes expected to be uploaded.
517         self._build_upload_list()
518
519     def _build_upload_list(self):
520         """
521         Scan the requested paths to count file sizes, excluding requested files
522         and dirs and building the upload file list.
523         """
524         # If there aren't special files to be read, reset total bytes count to zero
525         # to start counting.
526         if not any([p for p in self.paths
527                     if not (os.path.isfile(p) or os.path.isdir(p))]):
528             self.bytes_expected = 0
529
530         for path in self.paths:
531             # Test for stdin first, in case some file named '-' exist
532             if path == '-':
533                 if self.dry_run:
534                     raise ArvPutUploadIsPending()
535                 self._write_stdin(self.filename or 'stdin')
536             elif not os.path.exists(path):
537                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
538             elif (not self.follow_links) and os.path.islink(path):
539                 self.logger.warning("Skipping symlink '{}'".format(path))
540                 continue
541             elif os.path.isdir(path):
542                 # Use absolute paths on cache index so CWD doesn't interfere
543                 # with the caching logic.
544                 orig_path = path
545                 path = os.path.abspath(path)
546                 if orig_path[-1:] == os.sep:
547                     # When passing a directory reference with a trailing slash,
548                     # its contents should be uploaded directly to the
549                     # collection's root.
550                     prefixdir = path
551                 else:
552                     # When passing a directory reference with no trailing slash,
553                     # upload the directory to the collection's root.
554                     prefixdir = os.path.dirname(path)
555                 prefixdir += os.sep
556                 for root, dirs, files in os.walk(path,
557                                                  followlinks=self.follow_links):
558                     root_relpath = os.path.relpath(root, path)
559                     if root_relpath == '.':
560                         root_relpath = ''
561                     # Exclude files/dirs by full path matching pattern
562                     if self.exclude_paths:
563                         dirs[:] = [d for d in dirs
564                                    if not any(pathname_match(
565                                            os.path.join(root_relpath, d), pat)
566                                               for pat in self.exclude_paths)]
567                         files = [f for f in files
568                                  if not any(pathname_match(
569                                          os.path.join(root_relpath, f), pat)
570                                             for pat in self.exclude_paths)]
571                     # Exclude files/dirs by name matching pattern
572                     if self.exclude_names is not None:
573                         dirs[:] = [d for d in dirs
574                                    if not self.exclude_names.match(d)]
575                         files = [f for f in files
576                                  if not self.exclude_names.match(f)]
577                     # Make os.walk()'s dir traversing order deterministic
578                     dirs.sort()
579                     files.sort()
580                     for f in files:
581                         filepath = os.path.join(root, f)
582                         if not os.path.isfile(filepath):
583                             self.logger.warning("Skipping non-regular file '{}'".format(filepath))
584                             continue
585                         # Add its size to the total bytes count (if applicable)
586                         if self.follow_links or (not os.path.islink(filepath)):
587                             if self.bytes_expected is not None:
588                                 self.bytes_expected += os.path.getsize(filepath)
589                         self._check_file(filepath,
590                                          os.path.join(root[len(prefixdir):], f))
591             else:
592                 filepath = os.path.abspath(path)
593                 # Add its size to the total bytes count (if applicable)
594                 if self.follow_links or (not os.path.islink(filepath)):
595                     if self.bytes_expected is not None:
596                         self.bytes_expected += os.path.getsize(filepath)
597                 self._check_file(filepath,
598                                  self.filename or os.path.basename(path))
599         # If dry-mode is on, and got up to this point, then we should notify that
600         # there aren't any file to upload.
601         if self.dry_run:
602             raise ArvPutUploadNotPending()
603         # Remove local_collection's files that don't exist locally anymore, so the
604         # bytes_written count is correct.
605         for f in self.collection_file_paths(self._local_collection,
606                                             path_prefix=""):
607             if f != 'stdin' and f != self.filename and not f in self._file_paths:
608                 self._local_collection.remove(f)
609
610     def start(self, save_collection):
611         """
612         Start supporting thread & file uploading
613         """
614         self._checkpointer.start()
615         try:
616             # Update bytes_written from current local collection and
617             # report initial progress.
618             self._update()
619             # Actual file upload
620             self._upload_started = True # Used by the update thread to start checkpointing
621             self._upload_files()
622         except (SystemExit, Exception) as e:
623             self._checkpoint_before_quit = False
624             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
625             # Note: We're expecting SystemExit instead of
626             # KeyboardInterrupt because we have a custom signal
627             # handler in place that raises SystemExit with the catched
628             # signal's code.
629             if isinstance(e, PathDoesNotExistError):
630                 # We aren't interested in the traceback for this case
631                 pass
632             elif not isinstance(e, SystemExit) or e.code != -2:
633                 self.logger.warning("Abnormal termination:\n{}".format(
634                     traceback.format_exc()))
635             raise
636         finally:
637             if not self.dry_run:
638                 # Stop the thread before doing anything else
639                 self._stop_checkpointer.set()
640                 self._checkpointer.join()
641                 if self._checkpoint_before_quit:
642                     # Commit all pending blocks & one last _update()
643                     self._local_collection.manifest_text()
644                     self._update(final=True)
645                     if save_collection:
646                         self.save_collection()
647             if self.use_cache:
648                 self._cache_file.close()
649
650     def _collection_trash_at(self):
651         """
652         Returns the trash date that the collection should use at save time.
653         Takes into account absolute/relative trash_at values requested
654         by the user.
655         """
656         if type(self._trash_at) == datetime.timedelta:
657             # Get an absolute datetime for trash_at
658             return datetime.datetime.utcnow() + self._trash_at
659         return self._trash_at
660
661     def save_collection(self):
662         if self.update:
663             # Check if files should be updated on the remote collection.
664             for fp in self._file_paths:
665                 remote_file = self._remote_collection.find(fp)
666                 if not remote_file:
667                     # File don't exist on remote collection, copy it.
668                     self._remote_collection.copy(fp, fp, self._local_collection)
669                 elif remote_file != self._local_collection.find(fp):
670                     # A different file exist on remote collection, overwrite it.
671                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
672                 else:
673                     # The file already exist on remote collection, skip it.
674                     pass
675             self._remote_collection.save(num_retries=self.num_retries,
676                                          trash_at=self._collection_trash_at())
677         else:
678             if len(self._local_collection) == 0:
679                 self.logger.warning("No files were uploaded, skipping collection creation.")
680                 return
681             self._local_collection.save_new(
682                 name=self.name, owner_uuid=self.owner_uuid,
683                 ensure_unique_name=self.ensure_unique_name,
684                 num_retries=self.num_retries,
685                 trash_at=self._collection_trash_at())
686
687     def destroy_cache(self):
688         if self.use_cache:
689             try:
690                 os.unlink(self._cache_filename)
691             except OSError as error:
692                 # That's what we wanted anyway.
693                 if error.errno != errno.ENOENT:
694                     raise
695             self._cache_file.close()
696
697     def _collection_size(self, collection):
698         """
699         Recursively get the total size of the collection
700         """
701         size = 0
702         for item in collection.values():
703             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
704                 size += self._collection_size(item)
705             else:
706                 size += item.size()
707         return size
708
709     def _update_task(self):
710         """
711         Periodically called support task. File uploading is
712         asynchronous so we poll status from the collection.
713         """
714         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
715             self._update()
716
717     def _update(self, final=False):
718         """
719         Update cached manifest text and report progress.
720         """
721         if self._upload_started:
722             with self._collection_lock:
723                 self.bytes_written = self._collection_size(self._local_collection)
724                 if self.use_cache:
725                     if final:
726                         manifest = self._local_collection.manifest_text()
727                     else:
728                         # Get the manifest text without comitting pending blocks
729                         manifest = self._local_collection.manifest_text(strip=False,
730                                                                         normalize=False,
731                                                                         only_committed=True)
732                     # Update cache
733                     with self._state_lock:
734                         self._state['manifest'] = manifest
735             if self.use_cache:
736                 try:
737                     self._save_state()
738                 except Exception as e:
739                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
740             # Keep remote collection's trash_at attribute synced when using relative expire dates
741             if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
742                 try:
743                     self._api_client.collections().update(
744                         uuid=self._remote_collection.manifest_locator(),
745                         body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
746                     ).execute(num_retries=self.num_retries)
747                 except Exception as e:
748                     self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
749         else:
750             self.bytes_written = self.bytes_skipped
751         # Call the reporter, if any
752         self.report_progress()
753
754     def report_progress(self):
755         if self.reporter is not None:
756             self.reporter(self.bytes_written, self.bytes_expected)
757
758     def _write_stdin(self, filename):
759         output = self._local_collection.open(filename, 'wb')
760         self._write(sys.stdin.buffer, output)
761         output.close()
762
763     def _check_file(self, source, filename):
764         """
765         Check if this file needs to be uploaded
766         """
767         # Ignore symlinks when requested
768         if (not self.follow_links) and os.path.islink(source):
769             return
770         resume_offset = 0
771         should_upload = False
772         new_file_in_cache = False
773         # Record file path for updating the remote collection before exiting
774         self._file_paths.add(filename)
775
776         with self._state_lock:
777             # If no previous cached data on this file, store it for an eventual
778             # repeated run.
779             if source not in self._state['files']:
780                 self._state['files'][source] = {
781                     'mtime': os.path.getmtime(source),
782                     'size' : os.path.getsize(source)
783                 }
784                 new_file_in_cache = True
785             cached_file_data = self._state['files'][source]
786
787         # Check if file was already uploaded (at least partially)
788         file_in_local_collection = self._local_collection.find(filename)
789
790         # If not resuming, upload the full file.
791         if not self.resume:
792             should_upload = True
793         # New file detected from last run, upload it.
794         elif new_file_in_cache:
795             should_upload = True
796         # Local file didn't change from last run.
797         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
798             if not file_in_local_collection:
799                 # File not uploaded yet, upload it completely
800                 should_upload = True
801             elif file_in_local_collection.permission_expired():
802                 # Permission token expired, re-upload file. This will change whenever
803                 # we have a API for refreshing tokens.
804                 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
805                 should_upload = True
806                 self._local_collection.remove(filename)
807             elif cached_file_data['size'] == file_in_local_collection.size():
808                 # File already there, skip it.
809                 self.bytes_skipped += cached_file_data['size']
810             elif cached_file_data['size'] > file_in_local_collection.size():
811                 # File partially uploaded, resume!
812                 resume_offset = file_in_local_collection.size()
813                 self.bytes_skipped += resume_offset
814                 should_upload = True
815             else:
816                 # Inconsistent cache, re-upload the file
817                 should_upload = True
818                 self._local_collection.remove(filename)
819                 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
820         # Local file differs from cached data, re-upload it.
821         else:
822             if file_in_local_collection:
823                 self._local_collection.remove(filename)
824             should_upload = True
825
826         if should_upload:
827             try:
828                 self._files_to_upload.append((source, resume_offset, filename))
829             except ArvPutUploadIsPending:
830                 # This could happen when running on dry-mode, close cache file to
831                 # avoid locking issues.
832                 self._cache_file.close()
833                 raise
834
835     def _upload_files(self):
836         for source, resume_offset, filename in self._files_to_upload:
837             with open(source, 'rb') as source_fd:
838                 with self._state_lock:
839                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
840                     self._state['files'][source]['size'] = os.path.getsize(source)
841                 if resume_offset > 0:
842                     # Start upload where we left off
843                     output = self._local_collection.open(filename, 'ab')
844                     source_fd.seek(resume_offset)
845                 else:
846                     # Start from scratch
847                     output = self._local_collection.open(filename, 'wb')
848                 self._write(source_fd, output)
849                 output.close(flush=False)
850
851     def _write(self, source_fd, output):
852         while True:
853             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
854             if not data:
855                 break
856             output.write(data)
857
858     def _my_collection(self):
859         return self._remote_collection if self.update else self._local_collection
860
861     def _get_cache_filepath(self):
862         # Set up cache file name from input paths.
863         md5 = hashlib.md5()
864         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
865         realpaths = sorted(os.path.realpath(path) for path in self.paths)
866         md5.update(b'\0'.join([p.encode() for p in realpaths]))
867         if self.filename:
868             md5.update(self.filename.encode())
869         cache_path = Path(self.CACHE_DIR)
870         if len(cache_path.parts) == 1:
871             cache_path = arvados.util._BaseDirectories('CACHE').storage_path(cache_path)
872         else:
873             # Note this is a noop if cache_path is absolute, which is what we want.
874             cache_path = Path.home() / cache_path
875             cache_path.mkdir(parents=True, exist_ok=True, mode=0o700)
876         return str(cache_path / md5.hexdigest())
877
878     def _setup_state(self, update_collection):
879         """
880         Create a new cache file or load a previously existing one.
881         """
882         # Load an already existing collection for update
883         if update_collection and re.match(arvados.util.collection_uuid_pattern,
884                                           update_collection):
885             try:
886                 self._remote_collection = arvados.collection.Collection(
887                     update_collection,
888                     api_client=self._api_client,
889                     storage_classes_desired=self.storage_classes,
890                     num_retries=self.num_retries)
891             except arvados.errors.ApiError as error:
892                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
893             else:
894                 self.update = True
895         elif update_collection:
896             # Collection locator provided, but unknown format
897             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
898
899         if self.use_cache:
900             cache_filepath = self._get_cache_filepath()
901             if self.resume and os.path.exists(cache_filepath):
902                 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
903                 self._cache_file = open(cache_filepath, 'a+')
904             else:
905                 # --no-resume means start with a empty cache file.
906                 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
907                 self._cache_file = open(cache_filepath, 'w+')
908             self._cache_filename = self._cache_file.name
909             self._lock_file(self._cache_file)
910             self._cache_file.seek(0)
911
912         with self._state_lock:
913             if self.use_cache:
914                 try:
915                     self._state = json.load(self._cache_file)
916                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
917                         # Cache at least partially incomplete, set up new cache
918                         self._state = copy.deepcopy(self.EMPTY_STATE)
919                 except ValueError:
920                     # Cache file empty, set up new cache
921                     self._state = copy.deepcopy(self.EMPTY_STATE)
922             else:
923                 self.logger.info("No cache usage requested for this run.")
924                 # No cache file, set empty state
925                 self._state = copy.deepcopy(self.EMPTY_STATE)
926             if not self._cached_manifest_valid():
927                 if not self.batch_mode:
928                     raise ResumeCacheInvalidError()
929                 else:
930                     self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
931                     self.use_cache = False # Don't overwrite preexisting cache file.
932                     self._state = copy.deepcopy(self.EMPTY_STATE)
933             # Load the previous manifest so we can check if files were modified remotely.
934             self._local_collection = arvados.collection.Collection(
935                 self._state['manifest'],
936                 replication_desired=self.replication_desired,
937                 storage_classes_desired=self.storage_classes,
938                 put_threads=self.put_threads,
939                 api_client=self._api_client,
940                 num_retries=self.num_retries)
941
942     def _cached_manifest_valid(self):
943         """
944         Validate the oldest non-expired block signature to check if cached manifest
945         is usable: checking if the cached manifest was not created with a different
946         arvados account.
947         """
948         if self._state.get('manifest', None) is None:
949             # No cached manifest yet, all good.
950             return True
951         now = datetime.datetime.utcnow()
952         oldest_exp = None
953         oldest_loc = None
954         block_found = False
955         for m in arvados.util.keep_locator_pattern.finditer(self._state['manifest']):
956             loc = m.group(0)
957             try:
958                 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
959             except IndexError:
960                 # Locator without signature
961                 continue
962             block_found = True
963             if exp > now and (oldest_exp is None or exp < oldest_exp):
964                 oldest_exp = exp
965                 oldest_loc = loc
966         if not block_found:
967             # No block signatures found => no invalid block signatures.
968             return True
969         if oldest_loc is None:
970             # Locator signatures found, but all have expired.
971             # Reset the cache and move on.
972             self.logger.info('Cache expired, starting from scratch.')
973             self._state['manifest'] = ''
974             return True
975         kc = arvados.KeepClient(api_client=self._api_client,
976                                 num_retries=self.num_retries)
977         try:
978             kc.head(oldest_loc)
979         except arvados.errors.KeepRequestError:
980             # Something is wrong, cached manifest is not valid.
981             return False
982         return True
983
984     def collection_file_paths(self, col, path_prefix='.'):
985         """Return a list of file paths by recursively go through the entire collection `col`"""
986         file_paths = []
987         for name, item in col.items():
988             if isinstance(item, arvados.arvfile.ArvadosFile):
989                 file_paths.append(os.path.join(path_prefix, name))
990             elif isinstance(item, arvados.collection.Subcollection):
991                 new_prefix = os.path.join(path_prefix, name)
992                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
993         return file_paths
994
995     def _lock_file(self, fileobj):
996         try:
997             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
998         except IOError:
999             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
1000
1001     def _save_state(self):
1002         """
1003         Atomically save current state into cache.
1004         """
1005         with self._state_lock:
1006             # We're not using copy.deepcopy() here because it's a lot slower
1007             # than json.dumps(), and we're already needing JSON format to be
1008             # saved on disk.
1009             state = json.dumps(self._state)
1010         try:
1011             new_cache = tempfile.NamedTemporaryFile(
1012                 mode='w+',
1013                 dir=os.path.dirname(self._cache_filename), delete=False)
1014             self._lock_file(new_cache)
1015             new_cache.write(state)
1016             new_cache.flush()
1017             os.fsync(new_cache)
1018             os.rename(new_cache.name, self._cache_filename)
1019         except (IOError, OSError, ResumeCacheConflict) as error:
1020             self.logger.error("There was a problem while saving the cache file: {}".format(error))
1021             try:
1022                 os.unlink(new_cache_name)
1023             except NameError:  # mkstemp failed.
1024                 pass
1025         else:
1026             self._cache_file.close()
1027             self._cache_file = new_cache
1028
1029     def collection_name(self):
1030         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1031
1032     def collection_trash_at(self):
1033         return self._my_collection().get_trash_at()
1034
1035     def manifest_locator(self):
1036         return self._my_collection().manifest_locator()
1037
1038     def portable_data_hash(self):
1039         pdh = self._my_collection().portable_data_hash()
1040         m = self._my_collection().stripped_manifest().encode()
1041         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1042         if pdh != local_pdh:
1043             self.logger.warning("\n".join([
1044                 "arv-put: API server provided PDH differs from local manifest.",
1045                 "         This should not happen; showing API server version."]))
1046         return pdh
1047
1048     def manifest_text(self, stream_name=".", strip=False, normalize=False):
1049         return self._my_collection().manifest_text(stream_name, strip, normalize)
1050
1051     def _datablocks_on_item(self, item):
1052         """
1053         Return a list of datablock locators, recursively navigating
1054         through subcollections
1055         """
1056         if isinstance(item, arvados.arvfile.ArvadosFile):
1057             if item.size() == 0:
1058                 # Empty file locator
1059                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1060             else:
1061                 locators = []
1062                 for segment in item.segments():
1063                     loc = segment.locator
1064                     locators.append(loc)
1065                 return locators
1066         elif isinstance(item, arvados.collection.Collection):
1067             l = [self._datablocks_on_item(x) for x in item.values()]
1068             # Fast list flattener method taken from:
1069             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1070             return [loc for sublist in l for loc in sublist]
1071         else:
1072             return None
1073
1074     def data_locators(self):
1075         with self._collection_lock:
1076             # Make sure all datablocks are flushed before getting the locators
1077             self._my_collection().manifest_text()
1078             datablocks = self._datablocks_on_item(self._my_collection())
1079         return datablocks
1080
1081 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1082                                                             os.getpid())
1083
1084 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1085 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1086 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1087 # so instead we're using it on every path component.
1088 def pathname_match(pathname, pattern):
1089     name = pathname.split(os.sep)
1090     # Fix patterns like 'some/subdir/' or 'some//subdir'
1091     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1092     if len(name) != len(pat):
1093         return False
1094     for i in range(len(name)):
1095         if not fnmatch.fnmatch(name[i], pat[i]):
1096             return False
1097     return True
1098
1099 def machine_progress(bytes_written, bytes_expected):
1100     return _machine_format.format(
1101         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1102
1103 def human_progress(bytes_written, bytes_expected):
1104     if bytes_expected:
1105         return "\r{}M / {}M {:.1%} ".format(
1106             bytes_written >> 20, bytes_expected >> 20,
1107             float(bytes_written) / bytes_expected)
1108     else:
1109         return "\r{} ".format(bytes_written)
1110
1111 def progress_writer(progress_func, outfile=sys.stderr):
1112     def write_progress(bytes_written, bytes_expected):
1113         outfile.write(progress_func(bytes_written, bytes_expected))
1114     return write_progress
1115
1116 def desired_project_uuid(api_client, project_uuid, num_retries):
1117     if not project_uuid:
1118         query = api_client.users().current()
1119     elif arvados.util.user_uuid_pattern.match(project_uuid):
1120         query = api_client.users().get(uuid=project_uuid)
1121     elif arvados.util.group_uuid_pattern.match(project_uuid):
1122         query = api_client.groups().get(uuid=project_uuid)
1123     else:
1124         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1125     return query.execute(num_retries=num_retries)['uuid']
1126
1127 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1128          install_sig_handlers=True):
1129     global api_client
1130
1131     args = parse_arguments(arguments)
1132     logger = logging.getLogger('arvados.arv_put')
1133     if args.silent:
1134         logger.setLevel(logging.WARNING)
1135     else:
1136         logger.setLevel(logging.INFO)
1137     status = 0
1138
1139     request_id = arvados.util.new_request_id()
1140
1141     formatter = ArvPutLogFormatter(request_id)
1142     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1143
1144     if api_client is None:
1145         api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
1146
1147     if install_sig_handlers:
1148         arv_cmd.install_signal_handlers()
1149
1150     # Trash arguments validation
1151     trash_at = None
1152     if args.trash_at is not None:
1153         # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1154         # make sure the user provides a complete YYYY-MM-DD date.
1155         if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1156             logger.error("--trash-at argument format invalid, use --help to see examples.")
1157             sys.exit(1)
1158         # Check if no time information was provided. In that case, assume end-of-day.
1159         if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1160             args.trash_at += 'T23:59:59'
1161         try:
1162             trash_at = ciso8601.parse_datetime(args.trash_at)
1163         except:
1164             logger.error("--trash-at argument format invalid, use --help to see examples.")
1165             sys.exit(1)
1166         else:
1167             if trash_at.tzinfo is not None:
1168                 # Timezone aware datetime provided.
1169                 utcoffset = -trash_at.utcoffset()
1170             else:
1171                 # Timezone naive datetime provided. Assume is local.
1172                 if time.daylight:
1173                     utcoffset = datetime.timedelta(seconds=time.altzone)
1174                 else:
1175                     utcoffset = datetime.timedelta(seconds=time.timezone)
1176             # Convert to UTC timezone naive datetime.
1177             trash_at = trash_at.replace(tzinfo=None) + utcoffset
1178
1179         if trash_at <= datetime.datetime.utcnow():
1180             logger.error("--trash-at argument must be set in the future")
1181             sys.exit(1)
1182     if args.trash_after is not None:
1183         if args.trash_after < 1:
1184             logger.error("--trash-after argument must be >= 1")
1185             sys.exit(1)
1186         trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1187
1188     # Determine the name to use
1189     if args.name:
1190         if args.stream or args.raw:
1191             logger.error("Cannot use --name with --stream or --raw")
1192             sys.exit(1)
1193         elif args.update_collection:
1194             logger.error("Cannot use --name with --update-collection")
1195             sys.exit(1)
1196         collection_name = args.name
1197     else:
1198         collection_name = "Saved at {} by {}@{}".format(
1199             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1200             pwd.getpwuid(os.getuid()).pw_name,
1201             socket.gethostname())
1202
1203     if args.project_uuid and (args.stream or args.raw):
1204         logger.error("Cannot use --project-uuid with --stream or --raw")
1205         sys.exit(1)
1206
1207     # Determine the parent project
1208     try:
1209         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1210                                             args.retries)
1211     except (apiclient_errors.Error, ValueError) as error:
1212         logger.error(error)
1213         sys.exit(1)
1214
1215     if args.progress:
1216         reporter = progress_writer(human_progress)
1217     elif args.batch_progress:
1218         reporter = progress_writer(machine_progress)
1219     else:
1220         reporter = None
1221
1222     #  Split storage-classes argument
1223     storage_classes = None
1224     if args.storage_classes:
1225         storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
1226
1227     # Setup exclude regex from all the --exclude arguments provided
1228     name_patterns = []
1229     exclude_paths = []
1230     exclude_names = None
1231     if len(args.exclude) > 0:
1232         # We're supporting 2 kinds of exclusion patterns:
1233         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1234         #                            the name, wherever the file is on the tree)
1235         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1236         #                            entire path, and should be relative to
1237         #                            any input dir argument)
1238         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1239         #                            placed directly underneath the input dir)
1240         for p in args.exclude:
1241             # Only relative paths patterns allowed
1242             if p.startswith(os.sep):
1243                 logger.error("Cannot use absolute paths with --exclude")
1244                 sys.exit(1)
1245             if os.path.dirname(p):
1246                 # We don't support of path patterns with '..'
1247                 p_parts = p.split(os.sep)
1248                 if '..' in p_parts:
1249                     logger.error(
1250                         "Cannot use path patterns that include or '..'")
1251                     sys.exit(1)
1252                 # Path search pattern
1253                 exclude_paths.append(p)
1254             else:
1255                 # Name-only search pattern
1256                 name_patterns.append(p)
1257         # For name only matching, we can combine all patterns into a single
1258         # regexp, for better performance.
1259         exclude_names = re.compile('|'.join(
1260             [fnmatch.translate(p) for p in name_patterns]
1261         )) if len(name_patterns) > 0 else None
1262         # Show the user the patterns to be used, just in case they weren't
1263         # specified inside quotes and got changed by the shell expansion.
1264         logger.info("Exclude patterns: {}".format(args.exclude))
1265
1266     # If this is used by a human, and there's at least one directory to be
1267     # uploaded, the expected bytes calculation can take a moment.
1268     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1269         logger.info("Calculating upload size, this could take some time...")
1270     try:
1271         writer = ArvPutUploadJob(paths = args.paths,
1272                                  resume = args.resume,
1273                                  use_cache = args.use_cache,
1274                                  batch_mode= args.batch,
1275                                  filename = args.filename,
1276                                  reporter = reporter,
1277                                  api_client = api_client,
1278                                  num_retries = args.retries,
1279                                  replication_desired = args.replication,
1280                                  put_threads = args.threads,
1281                                  name = collection_name,
1282                                  owner_uuid = project_uuid,
1283                                  ensure_unique_name = True,
1284                                  update_collection = args.update_collection,
1285                                  storage_classes=storage_classes,
1286                                  logger=logger,
1287                                  dry_run=args.dry_run,
1288                                  follow_links=args.follow_links,
1289                                  exclude_paths=exclude_paths,
1290                                  exclude_names=exclude_names,
1291                                  trash_at=trash_at)
1292     except ResumeCacheConflict:
1293         logger.error("\n".join([
1294             "arv-put: Another process is already uploading this data.",
1295             "         Use --no-cache if this is really what you want."]))
1296         sys.exit(1)
1297     except ResumeCacheInvalidError:
1298         logger.error("\n".join([
1299             "arv-put: Resume cache contains invalid signature: it may have expired",
1300             "         or been created with another Arvados user's credentials.",
1301             "         Switch user or use one of the following options to restart upload:",
1302             "         --no-resume to start a new resume cache.",
1303             "         --no-cache to disable resume cache.",
1304             "         --batch to ignore the resume cache if invalid."]))
1305         sys.exit(1)
1306     except (CollectionUpdateError, PathDoesNotExistError) as error:
1307         logger.error("\n".join([
1308             "arv-put: %s" % str(error)]))
1309         sys.exit(1)
1310     except ArvPutUploadIsPending:
1311         # Dry run check successful, return proper exit code.
1312         sys.exit(2)
1313     except ArvPutUploadNotPending:
1314         # No files pending for upload
1315         sys.exit(0)
1316
1317     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1318         logger.warning("\n".join([
1319             "arv-put: Resuming previous upload from last checkpoint.",
1320             "         Use the --no-resume option to start over."]))
1321
1322     if not args.dry_run:
1323         writer.report_progress()
1324     output = None
1325     try:
1326         writer.start(save_collection=not(args.stream or args.raw))
1327     except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
1328         logger.error("\n".join([
1329             "arv-put: %s" % str(error)]))
1330         sys.exit(1)
1331
1332     if args.progress:  # Print newline to split stderr from stdout for humans.
1333         logger.info("\n")
1334
1335     if args.stream:
1336         if args.normalize:
1337             output = writer.manifest_text(normalize=True)
1338         else:
1339             output = writer.manifest_text()
1340     elif args.raw:
1341         output = ','.join(writer.data_locators())
1342     elif writer.manifest_locator() is not None:
1343         try:
1344             expiration_notice = ""
1345             if writer.collection_trash_at() is not None:
1346                 # Get the local timezone-naive version, and log it with timezone information.
1347                 if time.daylight:
1348                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1349                 else:
1350                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1351                 expiration_notice = ". It will expire on {} {}.".format(
1352                     local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1353             if args.update_collection:
1354                 logger.info(u"Collection updated: '{}'{}".format(
1355                     writer.collection_name(), expiration_notice))
1356             else:
1357                 logger.info(u"Collection saved as '{}'{}".format(
1358                     writer.collection_name(), expiration_notice))
1359             if args.portable_data_hash:
1360                 output = writer.portable_data_hash()
1361             else:
1362                 output = writer.manifest_locator()
1363         except apiclient_errors.Error as error:
1364             logger.error(
1365                 "arv-put: Error creating Collection on project: {}.".format(
1366                     error))
1367             status = 1
1368     else:
1369         status = 1
1370
1371     # Print the locator (uuid) of the new collection.
1372     if output is None:
1373         status = status or 1
1374     elif not args.silent:
1375         stdout.write(output)
1376         if not output.endswith('\n'):
1377             stdout.write('\n')
1378
1379     if install_sig_handlers:
1380         arv_cmd.restore_signal_handlers()
1381
1382     if status != 0:
1383         sys.exit(status)
1384
1385     # Success!
1386     return output
1387
1388
1389 if __name__ == '__main__':
1390     main()