20937: Improve error handling
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import ciso8601
14 import copy
15 import datetime
16 import errno
17 import fcntl
18 import fnmatch
19 import hashlib
20 import json
21 import logging
22 import os
23 import pwd
24 import re
25 import signal
26 import socket
27 import sys
28 import tempfile
29 import threading
30 import time
31 import traceback
32
33 from apiclient import errors as apiclient_errors
34 from arvados._version import __version__
35 from arvados.util import keep_locator_pattern
36
37 import arvados.commands._util as arv_cmd
38
39 api_client = None
40
41 upload_opts = argparse.ArgumentParser(add_help=False)
42
43 upload_opts.add_argument('--version', action='version',
44                          version="%s %s" % (sys.argv[0], __version__),
45                          help='Print version and exit.')
46 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47                          help="""
48 Local file or directory. If path is a directory reference with a trailing
49 slash, then just upload the directory's contents; otherwise upload the
50 directory itself. Default: read from standard input.
51 """)
52
53 _group = upload_opts.add_mutually_exclusive_group()
54
55 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
56                     default=-1, help=argparse.SUPPRESS)
57
58 _group.add_argument('--normalize', action='store_true',
59                     help="""
60 Normalize the manifest by re-ordering files and streams after writing
61 data.
62 """)
63
64 _group.add_argument('--dry-run', action='store_true', default=False,
65                     help="""
66 Don't actually upload files, but only check if any file should be
67 uploaded. Exit with code=2 when files are pending for upload.
68 """)
69
70 _group = upload_opts.add_mutually_exclusive_group()
71
72 _group.add_argument('--as-stream', action='store_true', dest='stream',
73                     help="""
74 Synonym for --stream.
75 """)
76
77 _group.add_argument('--stream', action='store_true',
78                     help="""
79 Store the file content and display the resulting manifest on
80 stdout. Do not save a Collection object in Arvados.
81 """)
82
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89                     help="""
90 Synonym for --manifest.
91 """)
92
93 _group.add_argument('--manifest', action='store_true',
94                     help="""
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
98 """)
99
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
101                     help="""
102 Synonym for --raw.
103 """)
104
105 _group.add_argument('--raw', action='store_true',
106                     help="""
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
109 manifest.
110 """)
111
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113                          dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
116 """)
117
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119                          dest='filename', help="""
120 Synonym for --filename.
121 """)
122
123 upload_opts.add_argument('--filename', type=str, default=None,
124                          help="""
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
129 """)
130
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
132                          help="""
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
135 """)
136
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138                          help="""
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
142 """)
143
144 upload_opts.add_argument('--storage-classes', help="""
145 Specify comma separated list of storage classes to be used when saving data to Keep.
146 """)
147
148 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
149                          help="""
150 Set the number of upload threads to be used. Take into account that
151 using lots of threads will increase the RAM requirements. Default is
152 to use 2 threads.
153 On high latency installations, using a greater number will improve
154 overall throughput.
155 """)
156
157 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
158                       action='append', help="""
159 Exclude files and directories whose names match the given glob pattern. When
160 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
161 directory, relative to the provided input dirs will be excluded.
162 When using a filename pattern like '*.txt', any text file will be excluded
163 no matter where it is placed.
164 For the special case of needing to exclude only files or dirs directly below
165 the given input directory, you can use a pattern like './exclude_this.gif'.
166 You can specify multiple patterns by using this argument more than once.
167 """)
168
169 _group = upload_opts.add_mutually_exclusive_group()
170 _group.add_argument('--follow-links', action='store_true', default=True,
171                     dest='follow_links', help="""
172 Follow file and directory symlinks (default).
173 """)
174 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
175                     help="""
176 Ignore file and directory symlinks. Even paths given explicitly on the
177 command line will be skipped if they are symlinks.
178 """)
179
180
181 run_opts = argparse.ArgumentParser(add_help=False)
182
183 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
184 Store the collection in the specified project, instead of your Home
185 project.
186 """)
187
188 run_opts.add_argument('--name', help="""
189 Save the collection with the specified name.
190 """)
191
192 _group = run_opts.add_mutually_exclusive_group()
193 _group.add_argument('--progress', action='store_true',
194                     help="""
195 Display human-readable progress on stderr (bytes and, if possible,
196 percentage of total data size). This is the default behavior when
197 stderr is a tty.
198 """)
199
200 _group.add_argument('--no-progress', action='store_true',
201                     help="""
202 Do not display human-readable progress on stderr, even if stderr is a
203 tty.
204 """)
205
206 _group.add_argument('--batch-progress', action='store_true',
207                     help="""
208 Display machine-readable progress on stderr (bytes and, if known,
209 total data size).
210 """)
211
212 run_opts.add_argument('--silent', action='store_true',
213                       help="""
214 Do not print any debug messages to console. (Any error messages will
215 still be displayed.)
216 """)
217
218 run_opts.add_argument('--batch', action='store_true', default=False,
219                       help="""
220 Retries with '--no-resume --no-cache' if cached state contains invalid/expired
221 block signatures.
222 """)
223
224 _group = run_opts.add_mutually_exclusive_group()
225 _group.add_argument('--resume', action='store_true', default=True,
226                     help="""
227 Continue interrupted uploads from cached state (default).
228 """)
229 _group.add_argument('--no-resume', action='store_false', dest='resume',
230                     help="""
231 Do not continue interrupted uploads from cached state.
232 """)
233
234 _group = run_opts.add_mutually_exclusive_group()
235 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
236                     help="""
237 Save upload state in a cache file for resuming (default).
238 """)
239 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
240                     help="""
241 Do not save upload state in a cache file for resuming.
242 """)
243
244 _group = upload_opts.add_mutually_exclusive_group()
245 _group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
246                     help="""
247 Set the trash date of the resulting collection to an absolute date in the future.
248 The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
249 Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
250 """)
251 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
252                     help="""
253 Set the trash date of the resulting collection to an amount of days from the
254 date/time that the upload process finishes.
255 """)
256
257 arg_parser = argparse.ArgumentParser(
258     description='Copy data from the local filesystem to Keep.',
259     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
260
261 def parse_arguments(arguments):
262     args = arg_parser.parse_args(arguments)
263
264     if len(args.paths) == 0:
265         args.paths = ['-']
266
267     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
268
269     if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
270         arg_parser.error("""
271     --filename argument cannot be used when storing a directory or
272     multiple files.
273     """)
274
275     # Turn on --progress by default if stderr is a tty.
276     if (not (args.batch_progress or args.no_progress or args.silent)
277         and os.isatty(sys.stderr.fileno())):
278         args.progress = True
279
280     # Turn off --resume (default) if --no-cache is used.
281     if not args.use_cache:
282         args.resume = False
283
284     if args.paths == ['-']:
285         if args.update_collection:
286             arg_parser.error("""
287     --update-collection cannot be used when reading from stdin.
288     """)
289         args.resume = False
290         args.use_cache = False
291         if not args.filename:
292             args.filename = 'stdin'
293
294     # Remove possible duplicated patterns
295     if len(args.exclude) > 0:
296         args.exclude = list(set(args.exclude))
297
298     return args
299
300
301 class PathDoesNotExistError(Exception):
302     pass
303
304
305 class CollectionUpdateError(Exception):
306     pass
307
308
309 class ResumeCacheConflict(Exception):
310     pass
311
312
313 class ResumeCacheInvalidError(Exception):
314     pass
315
316 class ArvPutArgumentConflict(Exception):
317     pass
318
319
320 class ArvPutUploadIsPending(Exception):
321     pass
322
323
324 class ArvPutUploadNotPending(Exception):
325     pass
326
327
328 class FileUploadList(list):
329     def __init__(self, dry_run=False):
330         list.__init__(self)
331         self.dry_run = dry_run
332
333     def append(self, other):
334         if self.dry_run:
335             raise ArvPutUploadIsPending()
336         super(FileUploadList, self).append(other)
337
338
339 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
340 class ArvPutLogFormatter(logging.Formatter):
341     std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
342     err_fmtr = None
343     request_id_informed = False
344
345     def __init__(self, request_id):
346         self.err_fmtr = logging.Formatter(
347             arvados.log_format+' (X-Request-Id: {})'.format(request_id),
348             arvados.log_date_format)
349
350     def format(self, record):
351         if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
352             self.request_id_informed = True
353             return self.err_fmtr.format(record)
354         return self.std_fmtr.format(record)
355
356
357 class ResumeCache(object):
358     CACHE_DIR = '.cache/arvados/arv-put'
359
360     def __init__(self, file_spec):
361         self.cache_file = open(file_spec, 'a+')
362         self._lock_file(self.cache_file)
363         self.filename = self.cache_file.name
364
365     @classmethod
366     def make_path(cls, args):
367         md5 = hashlib.md5()
368         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
369         realpaths = sorted(os.path.realpath(path) for path in args.paths)
370         md5.update(b'\0'.join([p.encode() for p in realpaths]))
371         if any(os.path.isdir(path) for path in realpaths):
372             md5.update(b'-1')
373         elif args.filename:
374             md5.update(args.filename.encode())
375         return os.path.join(
376             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
377             md5.hexdigest())
378
379     def _lock_file(self, fileobj):
380         try:
381             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
382         except IOError:
383             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
384
385     def load(self):
386         self.cache_file.seek(0)
387         return json.load(self.cache_file)
388
389     def check_cache(self, api_client=None, num_retries=0):
390         try:
391             state = self.load()
392             locator = None
393             try:
394                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
395                     locator = state["_finished_streams"][0][1][0]
396                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
397                     locator = state["_current_stream_locators"][0]
398                 if locator is not None:
399                     kc = arvados.keep.KeepClient(api_client=api_client)
400                     kc.head(locator, num_retries=num_retries)
401             except Exception as e:
402                 self.restart()
403         except (ValueError):
404             pass
405
406     def save(self, data):
407         try:
408             new_cache_fd, new_cache_name = tempfile.mkstemp(
409                 dir=os.path.dirname(self.filename))
410             self._lock_file(new_cache_fd)
411             new_cache = os.fdopen(new_cache_fd, 'r+')
412             json.dump(data, new_cache)
413             os.rename(new_cache_name, self.filename)
414         except (IOError, OSError, ResumeCacheConflict):
415             try:
416                 os.unlink(new_cache_name)
417             except NameError:  # mkstemp failed.
418                 pass
419         else:
420             self.cache_file.close()
421             self.cache_file = new_cache
422
423     def close(self):
424         self.cache_file.close()
425
426     def destroy(self):
427         try:
428             os.unlink(self.filename)
429         except OSError as error:
430             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
431                 raise
432         self.close()
433
434     def restart(self):
435         self.destroy()
436         self.__init__(self.filename)
437
438
439 class ArvPutUploadJob(object):
440     CACHE_DIR = '.cache/arvados/arv-put'
441     EMPTY_STATE = {
442         'manifest' : None, # Last saved manifest checkpoint
443         'files' : {} # Previous run file list: {path : {size, mtime}}
444     }
445
446     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
447                  name=None, owner_uuid=None, api_client=None, batch_mode=False,
448                  ensure_unique_name=False, num_retries=None,
449                  put_threads=None, replication_desired=None, filename=None,
450                  update_time=60.0, update_collection=None, storage_classes=None,
451                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
452                  follow_links=True, exclude_paths=[], exclude_names=None,
453                  trash_at=None):
454         self.paths = paths
455         self.resume = resume
456         self.use_cache = use_cache
457         self.batch_mode = batch_mode
458         self.update = False
459         self.reporter = reporter
460         # This will set to 0 before start counting, if no special files are going
461         # to be read.
462         self.bytes_expected = None
463         self.bytes_written = 0
464         self.bytes_skipped = 0
465         self.name = name
466         self.owner_uuid = owner_uuid
467         self.ensure_unique_name = ensure_unique_name
468         self.num_retries = num_retries
469         self.replication_desired = replication_desired
470         self.put_threads = put_threads
471         self.filename = filename
472         self.storage_classes = storage_classes
473         self._api_client = api_client
474         self._state_lock = threading.Lock()
475         self._state = None # Previous run state (file list & manifest)
476         self._current_files = [] # Current run file list
477         self._cache_file = None
478         self._collection_lock = threading.Lock()
479         self._remote_collection = None # Collection being updated (if asked)
480         self._local_collection = None # Collection from previous run manifest
481         self._file_paths = set() # Files to be updated in remote collection
482         self._stop_checkpointer = threading.Event()
483         self._checkpointer = threading.Thread(target=self._update_task)
484         self._checkpointer.daemon = True
485         self._update_task_time = update_time  # How many seconds wait between update runs
486         self._files_to_upload = FileUploadList(dry_run=dry_run)
487         self._upload_started = False
488         self.logger = logger
489         self.dry_run = dry_run
490         self._checkpoint_before_quit = True
491         self.follow_links = follow_links
492         self.exclude_paths = exclude_paths
493         self.exclude_names = exclude_names
494         self._trash_at = trash_at
495
496         if self._trash_at is not None:
497             if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
498                 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
499             if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
500                 raise TypeError('provided trash_at datetime should be timezone-naive')
501
502         if not self.use_cache and self.resume:
503             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
504
505         # Check for obvious dry-run responses
506         if self.dry_run and (not self.use_cache or not self.resume):
507             raise ArvPutUploadIsPending()
508
509         # Load cached data if any and if needed
510         self._setup_state(update_collection)
511
512         # Build the upload file list, excluding requested files and counting the
513         # bytes expected to be uploaded.
514         self._build_upload_list()
515
516     def _build_upload_list(self):
517         """
518         Scan the requested paths to count file sizes, excluding requested files
519         and dirs and building the upload file list.
520         """
521         # If there aren't special files to be read, reset total bytes count to zero
522         # to start counting.
523         if not any([p for p in self.paths
524                     if not (os.path.isfile(p) or os.path.isdir(p))]):
525             self.bytes_expected = 0
526
527         for path in self.paths:
528             # Test for stdin first, in case some file named '-' exist
529             if path == '-':
530                 if self.dry_run:
531                     raise ArvPutUploadIsPending()
532                 self._write_stdin(self.filename or 'stdin')
533             elif not os.path.exists(path):
534                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
535             elif (not self.follow_links) and os.path.islink(path):
536                 self.logger.warning("Skipping symlink '{}'".format(path))
537                 continue
538             elif os.path.isdir(path):
539                 # Use absolute paths on cache index so CWD doesn't interfere
540                 # with the caching logic.
541                 orig_path = path
542                 path = os.path.abspath(path)
543                 if orig_path[-1:] == os.sep:
544                     # When passing a directory reference with a trailing slash,
545                     # its contents should be uploaded directly to the
546                     # collection's root.
547                     prefixdir = path
548                 else:
549                     # When passing a directory reference with no trailing slash,
550                     # upload the directory to the collection's root.
551                     prefixdir = os.path.dirname(path)
552                 prefixdir += os.sep
553                 for root, dirs, files in os.walk(path,
554                                                  followlinks=self.follow_links):
555                     root_relpath = os.path.relpath(root, path)
556                     if root_relpath == '.':
557                         root_relpath = ''
558                     # Exclude files/dirs by full path matching pattern
559                     if self.exclude_paths:
560                         dirs[:] = [d for d in dirs
561                                    if not any(pathname_match(
562                                            os.path.join(root_relpath, d), pat)
563                                               for pat in self.exclude_paths)]
564                         files = [f for f in files
565                                  if not any(pathname_match(
566                                          os.path.join(root_relpath, f), pat)
567                                             for pat in self.exclude_paths)]
568                     # Exclude files/dirs by name matching pattern
569                     if self.exclude_names is not None:
570                         dirs[:] = [d for d in dirs
571                                    if not self.exclude_names.match(d)]
572                         files = [f for f in files
573                                  if not self.exclude_names.match(f)]
574                     # Make os.walk()'s dir traversing order deterministic
575                     dirs.sort()
576                     files.sort()
577                     for f in files:
578                         filepath = os.path.join(root, f)
579                         if not os.path.isfile(filepath):
580                             self.logger.warning("Skipping non-regular file '{}'".format(filepath))
581                             continue
582                         # Add its size to the total bytes count (if applicable)
583                         if self.follow_links or (not os.path.islink(filepath)):
584                             if self.bytes_expected is not None:
585                                 self.bytes_expected += os.path.getsize(filepath)
586                         self._check_file(filepath,
587                                          os.path.join(root[len(prefixdir):], f))
588             else:
589                 filepath = os.path.abspath(path)
590                 # Add its size to the total bytes count (if applicable)
591                 if self.follow_links or (not os.path.islink(filepath)):
592                     if self.bytes_expected is not None:
593                         self.bytes_expected += os.path.getsize(filepath)
594                 self._check_file(filepath,
595                                  self.filename or os.path.basename(path))
596         # If dry-mode is on, and got up to this point, then we should notify that
597         # there aren't any file to upload.
598         if self.dry_run:
599             raise ArvPutUploadNotPending()
600         # Remove local_collection's files that don't exist locally anymore, so the
601         # bytes_written count is correct.
602         for f in self.collection_file_paths(self._local_collection,
603                                             path_prefix=""):
604             if f != 'stdin' and f != self.filename and not f in self._file_paths:
605                 self._local_collection.remove(f)
606
607     def start(self, save_collection):
608         """
609         Start supporting thread & file uploading
610         """
611         self._checkpointer.start()
612         try:
613             # Update bytes_written from current local collection and
614             # report initial progress.
615             self._update()
616             # Actual file upload
617             self._upload_started = True # Used by the update thread to start checkpointing
618             self._upload_files()
619         except (SystemExit, Exception) as e:
620             self._checkpoint_before_quit = False
621             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
622             # Note: We're expecting SystemExit instead of
623             # KeyboardInterrupt because we have a custom signal
624             # handler in place that raises SystemExit with the catched
625             # signal's code.
626             if isinstance(e, PathDoesNotExistError):
627                 # We aren't interested in the traceback for this case
628                 pass
629             elif not isinstance(e, SystemExit) or e.code != -2:
630                 self.logger.warning("Abnormal termination:\n{}".format(
631                     traceback.format_exc()))
632             raise
633         finally:
634             if not self.dry_run:
635                 # Stop the thread before doing anything else
636                 self._stop_checkpointer.set()
637                 self._checkpointer.join()
638                 if self._checkpoint_before_quit:
639                     # Commit all pending blocks & one last _update()
640                     self._local_collection.manifest_text()
641                     self._update(final=True)
642                     if save_collection:
643                         self.save_collection()
644             if self.use_cache:
645                 self._cache_file.close()
646
647     def _collection_trash_at(self):
648         """
649         Returns the trash date that the collection should use at save time.
650         Takes into account absolute/relative trash_at values requested
651         by the user.
652         """
653         if type(self._trash_at) == datetime.timedelta:
654             # Get an absolute datetime for trash_at
655             return datetime.datetime.utcnow() + self._trash_at
656         return self._trash_at
657
658     def save_collection(self):
659         if self.update:
660             # Check if files should be updated on the remote collection.
661             for fp in self._file_paths:
662                 remote_file = self._remote_collection.find(fp)
663                 if not remote_file:
664                     # File don't exist on remote collection, copy it.
665                     self._remote_collection.copy(fp, fp, self._local_collection)
666                 elif remote_file != self._local_collection.find(fp):
667                     # A different file exist on remote collection, overwrite it.
668                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
669                 else:
670                     # The file already exist on remote collection, skip it.
671                     pass
672             self._remote_collection.save(num_retries=self.num_retries,
673                                          trash_at=self._collection_trash_at())
674         else:
675             if len(self._local_collection) == 0:
676                 self.logger.warning("No files were uploaded, skipping collection creation.")
677                 return
678             self._local_collection.save_new(
679                 name=self.name, owner_uuid=self.owner_uuid,
680                 ensure_unique_name=self.ensure_unique_name,
681                 num_retries=self.num_retries,
682                 trash_at=self._collection_trash_at())
683
684     def destroy_cache(self):
685         if self.use_cache:
686             try:
687                 os.unlink(self._cache_filename)
688             except OSError as error:
689                 # That's what we wanted anyway.
690                 if error.errno != errno.ENOENT:
691                     raise
692             self._cache_file.close()
693
694     def _collection_size(self, collection):
695         """
696         Recursively get the total size of the collection
697         """
698         size = 0
699         for item in listvalues(collection):
700             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
701                 size += self._collection_size(item)
702             else:
703                 size += item.size()
704         return size
705
706     def _update_task(self):
707         """
708         Periodically called support task. File uploading is
709         asynchronous so we poll status from the collection.
710         """
711         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
712             self._update()
713
714     def _update(self, final=False):
715         """
716         Update cached manifest text and report progress.
717         """
718         if self._upload_started:
719             with self._collection_lock:
720                 self.bytes_written = self._collection_size(self._local_collection)
721                 if self.use_cache:
722                     if final:
723                         manifest = self._local_collection.manifest_text()
724                     else:
725                         # Get the manifest text without comitting pending blocks
726                         manifest = self._local_collection.manifest_text(strip=False,
727                                                                         normalize=False,
728                                                                         only_committed=True)
729                     # Update cache
730                     with self._state_lock:
731                         self._state['manifest'] = manifest
732             if self.use_cache:
733                 try:
734                     self._save_state()
735                 except Exception as e:
736                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
737             # Keep remote collection's trash_at attribute synced when using relative expire dates
738             if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
739                 try:
740                     self._api_client.collections().update(
741                         uuid=self._remote_collection.manifest_locator(),
742                         body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
743                     ).execute(num_retries=self.num_retries)
744                 except Exception as e:
745                     self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
746         else:
747             self.bytes_written = self.bytes_skipped
748         # Call the reporter, if any
749         self.report_progress()
750
751     def report_progress(self):
752         if self.reporter is not None:
753             self.reporter(self.bytes_written, self.bytes_expected)
754
755     def _write_stdin(self, filename):
756         output = self._local_collection.open(filename, 'wb')
757         self._write(sys.stdin.buffer, output)
758         output.close()
759
760     def _check_file(self, source, filename):
761         """
762         Check if this file needs to be uploaded
763         """
764         # Ignore symlinks when requested
765         if (not self.follow_links) and os.path.islink(source):
766             return
767         resume_offset = 0
768         should_upload = False
769         new_file_in_cache = False
770         # Record file path for updating the remote collection before exiting
771         self._file_paths.add(filename)
772
773         with self._state_lock:
774             # If no previous cached data on this file, store it for an eventual
775             # repeated run.
776             if source not in self._state['files']:
777                 self._state['files'][source] = {
778                     'mtime': os.path.getmtime(source),
779                     'size' : os.path.getsize(source)
780                 }
781                 new_file_in_cache = True
782             cached_file_data = self._state['files'][source]
783
784         # Check if file was already uploaded (at least partially)
785         file_in_local_collection = self._local_collection.find(filename)
786
787         # If not resuming, upload the full file.
788         if not self.resume:
789             should_upload = True
790         # New file detected from last run, upload it.
791         elif new_file_in_cache:
792             should_upload = True
793         # Local file didn't change from last run.
794         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
795             if not file_in_local_collection:
796                 # File not uploaded yet, upload it completely
797                 should_upload = True
798             elif file_in_local_collection.permission_expired():
799                 # Permission token expired, re-upload file. This will change whenever
800                 # we have a API for refreshing tokens.
801                 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
802                 should_upload = True
803                 self._local_collection.remove(filename)
804             elif cached_file_data['size'] == file_in_local_collection.size():
805                 # File already there, skip it.
806                 self.bytes_skipped += cached_file_data['size']
807             elif cached_file_data['size'] > file_in_local_collection.size():
808                 # File partially uploaded, resume!
809                 resume_offset = file_in_local_collection.size()
810                 self.bytes_skipped += resume_offset
811                 should_upload = True
812             else:
813                 # Inconsistent cache, re-upload the file
814                 should_upload = True
815                 self._local_collection.remove(filename)
816                 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
817         # Local file differs from cached data, re-upload it.
818         else:
819             if file_in_local_collection:
820                 self._local_collection.remove(filename)
821             should_upload = True
822
823         if should_upload:
824             try:
825                 self._files_to_upload.append((source, resume_offset, filename))
826             except ArvPutUploadIsPending:
827                 # This could happen when running on dry-mode, close cache file to
828                 # avoid locking issues.
829                 self._cache_file.close()
830                 raise
831
832     def _upload_files(self):
833         for source, resume_offset, filename in self._files_to_upload:
834             with open(source, 'rb') as source_fd:
835                 with self._state_lock:
836                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
837                     self._state['files'][source]['size'] = os.path.getsize(source)
838                 if resume_offset > 0:
839                     # Start upload where we left off
840                     output = self._local_collection.open(filename, 'ab')
841                     source_fd.seek(resume_offset)
842                 else:
843                     # Start from scratch
844                     output = self._local_collection.open(filename, 'wb')
845                 self._write(source_fd, output)
846                 output.close(flush=False)
847
848     def _write(self, source_fd, output):
849         while True:
850             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
851             if not data:
852                 break
853             output.write(data)
854
855     def _my_collection(self):
856         return self._remote_collection if self.update else self._local_collection
857
858     def _get_cache_filepath(self):
859         # Set up cache file name from input paths.
860         md5 = hashlib.md5()
861         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
862         realpaths = sorted(os.path.realpath(path) for path in self.paths)
863         md5.update(b'\0'.join([p.encode() for p in realpaths]))
864         if self.filename:
865             md5.update(self.filename.encode())
866         cache_filename = md5.hexdigest()
867         cache_filepath = os.path.join(
868             arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
869             cache_filename)
870         return cache_filepath
871
872     def _setup_state(self, update_collection):
873         """
874         Create a new cache file or load a previously existing one.
875         """
876         # Load an already existing collection for update
877         if update_collection and re.match(arvados.util.collection_uuid_pattern,
878                                           update_collection):
879             try:
880                 self._remote_collection = arvados.collection.Collection(
881                     update_collection,
882                     api_client=self._api_client,
883                     storage_classes_desired=self.storage_classes,
884                     num_retries=self.num_retries)
885             except arvados.errors.ApiError as error:
886                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
887             else:
888                 self.update = True
889         elif update_collection:
890             # Collection locator provided, but unknown format
891             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
892
893         if self.use_cache:
894             cache_filepath = self._get_cache_filepath()
895             if self.resume and os.path.exists(cache_filepath):
896                 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
897                 self._cache_file = open(cache_filepath, 'a+')
898             else:
899                 # --no-resume means start with a empty cache file.
900                 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
901                 self._cache_file = open(cache_filepath, 'w+')
902             self._cache_filename = self._cache_file.name
903             self._lock_file(self._cache_file)
904             self._cache_file.seek(0)
905
906         with self._state_lock:
907             if self.use_cache:
908                 try:
909                     self._state = json.load(self._cache_file)
910                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
911                         # Cache at least partially incomplete, set up new cache
912                         self._state = copy.deepcopy(self.EMPTY_STATE)
913                 except ValueError:
914                     # Cache file empty, set up new cache
915                     self._state = copy.deepcopy(self.EMPTY_STATE)
916             else:
917                 self.logger.info("No cache usage requested for this run.")
918                 # No cache file, set empty state
919                 self._state = copy.deepcopy(self.EMPTY_STATE)
920             if not self._cached_manifest_valid():
921                 if not self.batch_mode:
922                     raise ResumeCacheInvalidError()
923                 else:
924                     self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
925                     self.use_cache = False # Don't overwrite preexisting cache file.
926                     self._state = copy.deepcopy(self.EMPTY_STATE)
927             # Load the previous manifest so we can check if files were modified remotely.
928             self._local_collection = arvados.collection.Collection(
929                 self._state['manifest'],
930                 replication_desired=self.replication_desired,
931                 storage_classes_desired=self.storage_classes,
932                 put_threads=self.put_threads,
933                 api_client=self._api_client,
934                 num_retries=self.num_retries)
935
936     def _cached_manifest_valid(self):
937         """
938         Validate the oldest non-expired block signature to check if cached manifest
939         is usable: checking if the cached manifest was not created with a different
940         arvados account.
941         """
942         if self._state.get('manifest', None) is None:
943             # No cached manifest yet, all good.
944             return True
945         now = datetime.datetime.utcnow()
946         oldest_exp = None
947         oldest_loc = None
948         block_found = False
949         for m in keep_locator_pattern.finditer(self._state['manifest']):
950             loc = m.group(0)
951             try:
952                 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
953             except IndexError:
954                 # Locator without signature
955                 continue
956             block_found = True
957             if exp > now and (oldest_exp is None or exp < oldest_exp):
958                 oldest_exp = exp
959                 oldest_loc = loc
960         if not block_found:
961             # No block signatures found => no invalid block signatures.
962             return True
963         if oldest_loc is None:
964             # Locator signatures found, but all have expired.
965             # Reset the cache and move on.
966             self.logger.info('Cache expired, starting from scratch.')
967             self._state['manifest'] = ''
968             return True
969         kc = arvados.KeepClient(api_client=self._api_client,
970                                 num_retries=self.num_retries)
971         try:
972             kc.head(oldest_loc)
973         except arvados.errors.KeepRequestError:
974             # Something is wrong, cached manifest is not valid.
975             return False
976         return True
977
978     def collection_file_paths(self, col, path_prefix='.'):
979         """Return a list of file paths by recursively go through the entire collection `col`"""
980         file_paths = []
981         for name, item in listitems(col):
982             if isinstance(item, arvados.arvfile.ArvadosFile):
983                 file_paths.append(os.path.join(path_prefix, name))
984             elif isinstance(item, arvados.collection.Subcollection):
985                 new_prefix = os.path.join(path_prefix, name)
986                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
987         return file_paths
988
989     def _lock_file(self, fileobj):
990         try:
991             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
992         except IOError:
993             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
994
995     def _save_state(self):
996         """
997         Atomically save current state into cache.
998         """
999         with self._state_lock:
1000             # We're not using copy.deepcopy() here because it's a lot slower
1001             # than json.dumps(), and we're already needing JSON format to be
1002             # saved on disk.
1003             state = json.dumps(self._state)
1004         try:
1005             new_cache = tempfile.NamedTemporaryFile(
1006                 mode='w+',
1007                 dir=os.path.dirname(self._cache_filename), delete=False)
1008             self._lock_file(new_cache)
1009             new_cache.write(state)
1010             new_cache.flush()
1011             os.fsync(new_cache)
1012             os.rename(new_cache.name, self._cache_filename)
1013         except (IOError, OSError, ResumeCacheConflict) as error:
1014             self.logger.error("There was a problem while saving the cache file: {}".format(error))
1015             try:
1016                 os.unlink(new_cache_name)
1017             except NameError:  # mkstemp failed.
1018                 pass
1019         else:
1020             self._cache_file.close()
1021             self._cache_file = new_cache
1022
1023     def collection_name(self):
1024         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1025
1026     def collection_trash_at(self):
1027         return self._my_collection().get_trash_at()
1028
1029     def manifest_locator(self):
1030         return self._my_collection().manifest_locator()
1031
1032     def portable_data_hash(self):
1033         pdh = self._my_collection().portable_data_hash()
1034         m = self._my_collection().stripped_manifest().encode()
1035         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1036         if pdh != local_pdh:
1037             self.logger.warning("\n".join([
1038                 "arv-put: API server provided PDH differs from local manifest.",
1039                 "         This should not happen; showing API server version."]))
1040         return pdh
1041
1042     def manifest_text(self, stream_name=".", strip=False, normalize=False):
1043         return self._my_collection().manifest_text(stream_name, strip, normalize)
1044
1045     def _datablocks_on_item(self, item):
1046         """
1047         Return a list of datablock locators, recursively navigating
1048         through subcollections
1049         """
1050         if isinstance(item, arvados.arvfile.ArvadosFile):
1051             if item.size() == 0:
1052                 # Empty file locator
1053                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1054             else:
1055                 locators = []
1056                 for segment in item.segments():
1057                     loc = segment.locator
1058                     locators.append(loc)
1059                 return locators
1060         elif isinstance(item, arvados.collection.Collection):
1061             l = [self._datablocks_on_item(x) for x in listvalues(item)]
1062             # Fast list flattener method taken from:
1063             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1064             return [loc for sublist in l for loc in sublist]
1065         else:
1066             return None
1067
1068     def data_locators(self):
1069         with self._collection_lock:
1070             # Make sure all datablocks are flushed before getting the locators
1071             self._my_collection().manifest_text()
1072             datablocks = self._datablocks_on_item(self._my_collection())
1073         return datablocks
1074
1075 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1076                                                             os.getpid())
1077
1078 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1079 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1080 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1081 # so instead we're using it on every path component.
1082 def pathname_match(pathname, pattern):
1083     name = pathname.split(os.sep)
1084     # Fix patterns like 'some/subdir/' or 'some//subdir'
1085     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1086     if len(name) != len(pat):
1087         return False
1088     for i in range(len(name)):
1089         if not fnmatch.fnmatch(name[i], pat[i]):
1090             return False
1091     return True
1092
1093 def machine_progress(bytes_written, bytes_expected):
1094     return _machine_format.format(
1095         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1096
1097 def human_progress(bytes_written, bytes_expected):
1098     if bytes_expected:
1099         return "\r{}M / {}M {:.1%} ".format(
1100             bytes_written >> 20, bytes_expected >> 20,
1101             float(bytes_written) / bytes_expected)
1102     else:
1103         return "\r{} ".format(bytes_written)
1104
1105 def progress_writer(progress_func, outfile=sys.stderr):
1106     def write_progress(bytes_written, bytes_expected):
1107         outfile.write(progress_func(bytes_written, bytes_expected))
1108     return write_progress
1109
1110 def desired_project_uuid(api_client, project_uuid, num_retries):
1111     if not project_uuid:
1112         query = api_client.users().current()
1113     elif arvados.util.user_uuid_pattern.match(project_uuid):
1114         query = api_client.users().get(uuid=project_uuid)
1115     elif arvados.util.group_uuid_pattern.match(project_uuid):
1116         query = api_client.groups().get(uuid=project_uuid)
1117     else:
1118         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1119     return query.execute(num_retries=num_retries)['uuid']
1120
1121 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1122          install_sig_handlers=True):
1123     global api_client
1124
1125     args = parse_arguments(arguments)
1126     logger = logging.getLogger('arvados.arv_put')
1127     if args.silent:
1128         logger.setLevel(logging.WARNING)
1129     else:
1130         logger.setLevel(logging.INFO)
1131     status = 0
1132
1133     request_id = arvados.util.new_request_id()
1134
1135     formatter = ArvPutLogFormatter(request_id)
1136     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1137
1138     if api_client is None:
1139         api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
1140
1141     if install_sig_handlers:
1142         arv_cmd.install_signal_handlers()
1143
1144     # Trash arguments validation
1145     trash_at = None
1146     if args.trash_at is not None:
1147         # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1148         # make sure the user provides a complete YYYY-MM-DD date.
1149         if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1150             logger.error("--trash-at argument format invalid, use --help to see examples.")
1151             sys.exit(1)
1152         # Check if no time information was provided. In that case, assume end-of-day.
1153         if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1154             args.trash_at += 'T23:59:59'
1155         try:
1156             trash_at = ciso8601.parse_datetime(args.trash_at)
1157         except:
1158             logger.error("--trash-at argument format invalid, use --help to see examples.")
1159             sys.exit(1)
1160         else:
1161             if trash_at.tzinfo is not None:
1162                 # Timezone aware datetime provided.
1163                 utcoffset = -trash_at.utcoffset()
1164             else:
1165                 # Timezone naive datetime provided. Assume is local.
1166                 if time.daylight:
1167                     utcoffset = datetime.timedelta(seconds=time.altzone)
1168                 else:
1169                     utcoffset = datetime.timedelta(seconds=time.timezone)
1170             # Convert to UTC timezone naive datetime.
1171             trash_at = trash_at.replace(tzinfo=None) + utcoffset
1172
1173         if trash_at <= datetime.datetime.utcnow():
1174             logger.error("--trash-at argument must be set in the future")
1175             sys.exit(1)
1176     if args.trash_after is not None:
1177         if args.trash_after < 1:
1178             logger.error("--trash-after argument must be >= 1")
1179             sys.exit(1)
1180         trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1181
1182     # Determine the name to use
1183     if args.name:
1184         if args.stream or args.raw:
1185             logger.error("Cannot use --name with --stream or --raw")
1186             sys.exit(1)
1187         elif args.update_collection:
1188             logger.error("Cannot use --name with --update-collection")
1189             sys.exit(1)
1190         collection_name = args.name
1191     else:
1192         collection_name = "Saved at {} by {}@{}".format(
1193             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1194             pwd.getpwuid(os.getuid()).pw_name,
1195             socket.gethostname())
1196
1197     if args.project_uuid and (args.stream or args.raw):
1198         logger.error("Cannot use --project-uuid with --stream or --raw")
1199         sys.exit(1)
1200
1201     # Determine the parent project
1202     try:
1203         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1204                                             args.retries)
1205     except (apiclient_errors.Error, ValueError) as error:
1206         logger.error(error)
1207         sys.exit(1)
1208
1209     if args.progress:
1210         reporter = progress_writer(human_progress)
1211     elif args.batch_progress:
1212         reporter = progress_writer(machine_progress)
1213     else:
1214         reporter = None
1215
1216     #  Split storage-classes argument
1217     storage_classes = None
1218     if args.storage_classes:
1219         storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
1220
1221     # Setup exclude regex from all the --exclude arguments provided
1222     name_patterns = []
1223     exclude_paths = []
1224     exclude_names = None
1225     if len(args.exclude) > 0:
1226         # We're supporting 2 kinds of exclusion patterns:
1227         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1228         #                            the name, wherever the file is on the tree)
1229         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1230         #                            entire path, and should be relative to
1231         #                            any input dir argument)
1232         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1233         #                            placed directly underneath the input dir)
1234         for p in args.exclude:
1235             # Only relative paths patterns allowed
1236             if p.startswith(os.sep):
1237                 logger.error("Cannot use absolute paths with --exclude")
1238                 sys.exit(1)
1239             if os.path.dirname(p):
1240                 # We don't support of path patterns with '..'
1241                 p_parts = p.split(os.sep)
1242                 if '..' in p_parts:
1243                     logger.error(
1244                         "Cannot use path patterns that include or '..'")
1245                     sys.exit(1)
1246                 # Path search pattern
1247                 exclude_paths.append(p)
1248             else:
1249                 # Name-only search pattern
1250                 name_patterns.append(p)
1251         # For name only matching, we can combine all patterns into a single
1252         # regexp, for better performance.
1253         exclude_names = re.compile('|'.join(
1254             [fnmatch.translate(p) for p in name_patterns]
1255         )) if len(name_patterns) > 0 else None
1256         # Show the user the patterns to be used, just in case they weren't
1257         # specified inside quotes and got changed by the shell expansion.
1258         logger.info("Exclude patterns: {}".format(args.exclude))
1259
1260     # If this is used by a human, and there's at least one directory to be
1261     # uploaded, the expected bytes calculation can take a moment.
1262     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1263         logger.info("Calculating upload size, this could take some time...")
1264     try:
1265         writer = ArvPutUploadJob(paths = args.paths,
1266                                  resume = args.resume,
1267                                  use_cache = args.use_cache,
1268                                  batch_mode= args.batch,
1269                                  filename = args.filename,
1270                                  reporter = reporter,
1271                                  api_client = api_client,
1272                                  num_retries = args.retries,
1273                                  replication_desired = args.replication,
1274                                  put_threads = args.threads,
1275                                  name = collection_name,
1276                                  owner_uuid = project_uuid,
1277                                  ensure_unique_name = True,
1278                                  update_collection = args.update_collection,
1279                                  storage_classes=storage_classes,
1280                                  logger=logger,
1281                                  dry_run=args.dry_run,
1282                                  follow_links=args.follow_links,
1283                                  exclude_paths=exclude_paths,
1284                                  exclude_names=exclude_names,
1285                                  trash_at=trash_at)
1286     except ResumeCacheConflict:
1287         logger.error("\n".join([
1288             "arv-put: Another process is already uploading this data.",
1289             "         Use --no-cache if this is really what you want."]))
1290         sys.exit(1)
1291     except ResumeCacheInvalidError:
1292         logger.error("\n".join([
1293             "arv-put: Resume cache contains invalid signature: it may have expired",
1294             "         or been created with another Arvados user's credentials.",
1295             "         Switch user or use one of the following options to restart upload:",
1296             "         --no-resume to start a new resume cache.",
1297             "         --no-cache to disable resume cache.",
1298             "         --batch to ignore the resume cache if invalid."]))
1299         sys.exit(1)
1300     except (CollectionUpdateError, PathDoesNotExistError) as error:
1301         logger.error("\n".join([
1302             "arv-put: %s" % str(error)]))
1303         sys.exit(1)
1304     except ArvPutUploadIsPending:
1305         # Dry run check successful, return proper exit code.
1306         sys.exit(2)
1307     except ArvPutUploadNotPending:
1308         # No files pending for upload
1309         sys.exit(0)
1310
1311     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1312         logger.warning("\n".join([
1313             "arv-put: Resuming previous upload from last checkpoint.",
1314             "         Use the --no-resume option to start over."]))
1315
1316     if not args.dry_run:
1317         writer.report_progress()
1318     output = None
1319     try:
1320         writer.start(save_collection=not(args.stream or args.raw))
1321     except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
1322         logger.error("\n".join([
1323             "arv-put: %s" % str(error)]))
1324         sys.exit(1)
1325
1326     if args.progress:  # Print newline to split stderr from stdout for humans.
1327         logger.info("\n")
1328
1329     if args.stream:
1330         if args.normalize:
1331             output = writer.manifest_text(normalize=True)
1332         else:
1333             output = writer.manifest_text()
1334     elif args.raw:
1335         output = ','.join(writer.data_locators())
1336     elif writer.manifest_locator() is not None:
1337         try:
1338             expiration_notice = ""
1339             if writer.collection_trash_at() is not None:
1340                 # Get the local timezone-naive version, and log it with timezone information.
1341                 if time.daylight:
1342                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1343                 else:
1344                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1345                 expiration_notice = ". It will expire on {} {}.".format(
1346                     local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1347             if args.update_collection:
1348                 logger.info(u"Collection updated: '{}'{}".format(
1349                     writer.collection_name(), expiration_notice))
1350             else:
1351                 logger.info(u"Collection saved as '{}'{}".format(
1352                     writer.collection_name(), expiration_notice))
1353             if args.portable_data_hash:
1354                 output = writer.portable_data_hash()
1355             else:
1356                 output = writer.manifest_locator()
1357         except apiclient_errors.Error as error:
1358             logger.error(
1359                 "arv-put: Error creating Collection on project: {}.".format(
1360                     error))
1361             status = 1
1362     else:
1363         status = 1
1364
1365     # Print the locator (uuid) of the new collection.
1366     if output is None:
1367         status = status or 1
1368     elif not args.silent:
1369         stdout.write(output)
1370         if not output.endswith('\n'):
1371             stdout.write('\n')
1372
1373     if install_sig_handlers:
1374         arv_cmd.restore_signal_handlers()
1375
1376     if status != 0:
1377         sys.exit(status)
1378
1379     # Success!
1380     return output
1381
1382
1383 if __name__ == '__main__':
1384     main()