17800: Fixes bug.
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import ciso8601
14 import copy
15 import datetime
16 import errno
17 import fcntl
18 import fnmatch
19 import hashlib
20 import json
21 import logging
22 import os
23 import pwd
24 import re
25 import signal
26 import socket
27 import sys
28 import tempfile
29 import threading
30 import time
31 import traceback
32
33 from apiclient import errors as apiclient_errors
34 from arvados._version import __version__
35 from arvados.util import keep_locator_pattern
36
37 import arvados.commands._util as arv_cmd
38
39 api_client = None
40
41 upload_opts = argparse.ArgumentParser(add_help=False)
42
43 upload_opts.add_argument('--version', action='version',
44                          version="%s %s" % (sys.argv[0], __version__),
45                          help='Print version and exit.')
46 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47                          help="""
48 Local file or directory. If path is a directory reference with a trailing
49 slash, then just upload the directory's contents; otherwise upload the
50 directory itself. Default: read from standard input.
51 """)
52
53 _group = upload_opts.add_mutually_exclusive_group()
54
55 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
56                     default=-1, help=argparse.SUPPRESS)
57
58 _group.add_argument('--normalize', action='store_true',
59                     help="""
60 Normalize the manifest by re-ordering files and streams after writing
61 data.
62 """)
63
64 _group.add_argument('--dry-run', action='store_true', default=False,
65                     help="""
66 Don't actually upload files, but only check if any file should be
67 uploaded. Exit with code=2 when files are pending for upload.
68 """)
69
70 _group = upload_opts.add_mutually_exclusive_group()
71
72 _group.add_argument('--as-stream', action='store_true', dest='stream',
73                     help="""
74 Synonym for --stream.
75 """)
76
77 _group.add_argument('--stream', action='store_true',
78                     help="""
79 Store the file content and display the resulting manifest on
80 stdout. Do not save a Collection object in Arvados.
81 """)
82
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89                     help="""
90 Synonym for --manifest.
91 """)
92
93 _group.add_argument('--manifest', action='store_true',
94                     help="""
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
98 """)
99
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
101                     help="""
102 Synonym for --raw.
103 """)
104
105 _group.add_argument('--raw', action='store_true',
106                     help="""
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
109 manifest.
110 """)
111
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113                          dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
116 """)
117
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119                          dest='filename', help="""
120 Synonym for --filename.
121 """)
122
123 upload_opts.add_argument('--filename', type=str, default=None,
124                          help="""
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
129 """)
130
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
132                          help="""
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
135 """)
136
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138                          help="""
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
142 """)
143
144 upload_opts.add_argument('--storage-classes', help="""
145 Specify comma separated list of storage classes to be used when saving data to Keep.
146 """)
147
148 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
149                          help="""
150 Set the number of upload threads to be used. Take into account that
151 using lots of threads will increase the RAM requirements. Default is
152 to use 2 threads.
153 On high latency installations, using a greater number will improve
154 overall throughput.
155 """)
156
157 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
158                       action='append', help="""
159 Exclude files and directories whose names match the given glob pattern. When
160 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
161 directory, relative to the provided input dirs will be excluded.
162 When using a filename pattern like '*.txt', any text file will be excluded
163 no matter where it is placed.
164 For the special case of needing to exclude only files or dirs directly below
165 the given input directory, you can use a pattern like './exclude_this.gif'.
166 You can specify multiple patterns by using this argument more than once.
167 """)
168
169 _group = upload_opts.add_mutually_exclusive_group()
170 _group.add_argument('--follow-links', action='store_true', default=True,
171                     dest='follow_links', help="""
172 Follow file and directory symlinks (default).
173 """)
174 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
175                     help="""
176 Do not follow file and directory symlinks.
177 """)
178
179
180 run_opts = argparse.ArgumentParser(add_help=False)
181
182 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
183 Store the collection in the specified project, instead of your Home
184 project.
185 """)
186
187 run_opts.add_argument('--name', help="""
188 Save the collection with the specified name.
189 """)
190
191 _group = run_opts.add_mutually_exclusive_group()
192 _group.add_argument('--progress', action='store_true',
193                     help="""
194 Display human-readable progress on stderr (bytes and, if possible,
195 percentage of total data size). This is the default behavior when
196 stderr is a tty.
197 """)
198
199 _group.add_argument('--no-progress', action='store_true',
200                     help="""
201 Do not display human-readable progress on stderr, even if stderr is a
202 tty.
203 """)
204
205 _group.add_argument('--batch-progress', action='store_true',
206                     help="""
207 Display machine-readable progress on stderr (bytes and, if known,
208 total data size).
209 """)
210
211 run_opts.add_argument('--silent', action='store_true',
212                       help="""
213 Do not print any debug messages to console. (Any error messages will
214 still be displayed.)
215 """)
216
217 _group = run_opts.add_mutually_exclusive_group()
218 _group.add_argument('--resume', action='store_true', default=True,
219                     help="""
220 Continue interrupted uploads from cached state (default).
221 """)
222 _group.add_argument('--no-resume', action='store_false', dest='resume',
223                     help="""
224 Do not continue interrupted uploads from cached state.
225 """)
226
227 _group = run_opts.add_mutually_exclusive_group()
228 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
229                     help="""
230 Save upload state in a cache file for resuming (default).
231 """)
232 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
233                     help="""
234 Do not save upload state in a cache file for resuming.
235 """)
236
237 _group = upload_opts.add_mutually_exclusive_group()
238 _group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
239                     help="""
240 Set the trash date of the resulting collection to an absolute date in the future.
241 The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
242 Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
243 """)
244 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
245                     help="""
246 Set the trash date of the resulting collection to an amount of days from the
247 date/time that the upload process finishes.
248 """)
249
250 arg_parser = argparse.ArgumentParser(
251     description='Copy data from the local filesystem to Keep.',
252     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
253
254 def parse_arguments(arguments):
255     args = arg_parser.parse_args(arguments)
256
257     if len(args.paths) == 0:
258         args.paths = ['-']
259
260     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
261
262     if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
263         arg_parser.error("""
264     --filename argument cannot be used when storing a directory or
265     multiple files.
266     """)
267
268     # Turn on --progress by default if stderr is a tty.
269     if (not (args.batch_progress or args.no_progress or args.silent)
270         and os.isatty(sys.stderr.fileno())):
271         args.progress = True
272
273     # Turn off --resume (default) if --no-cache is used.
274     if not args.use_cache:
275         args.resume = False
276
277     if args.paths == ['-']:
278         if args.update_collection:
279             arg_parser.error("""
280     --update-collection cannot be used when reading from stdin.
281     """)
282         args.resume = False
283         args.use_cache = False
284         if not args.filename:
285             args.filename = 'stdin'
286
287     # Remove possible duplicated patterns
288     if len(args.exclude) > 0:
289         args.exclude = list(set(args.exclude))
290
291     return args
292
293
294 class PathDoesNotExistError(Exception):
295     pass
296
297
298 class CollectionUpdateError(Exception):
299     pass
300
301
302 class ResumeCacheConflict(Exception):
303     pass
304
305
306 class ResumeCacheInvalidError(Exception):
307     pass
308
309 class ArvPutArgumentConflict(Exception):
310     pass
311
312
313 class ArvPutUploadIsPending(Exception):
314     pass
315
316
317 class ArvPutUploadNotPending(Exception):
318     pass
319
320
321 class FileUploadList(list):
322     def __init__(self, dry_run=False):
323         list.__init__(self)
324         self.dry_run = dry_run
325
326     def append(self, other):
327         if self.dry_run:
328             raise ArvPutUploadIsPending()
329         super(FileUploadList, self).append(other)
330
331
332 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
333 class ArvPutLogFormatter(logging.Formatter):
334     std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
335     err_fmtr = None
336     request_id_informed = False
337
338     def __init__(self, request_id):
339         self.err_fmtr = logging.Formatter(
340             arvados.log_format+' (X-Request-Id: {})'.format(request_id),
341             arvados.log_date_format)
342
343     def format(self, record):
344         if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
345             self.request_id_informed = True
346             return self.err_fmtr.format(record)
347         return self.std_fmtr.format(record)
348
349
350 class ResumeCache(object):
351     CACHE_DIR = '.cache/arvados/arv-put'
352
353     def __init__(self, file_spec):
354         self.cache_file = open(file_spec, 'a+')
355         self._lock_file(self.cache_file)
356         self.filename = self.cache_file.name
357
358     @classmethod
359     def make_path(cls, args):
360         md5 = hashlib.md5()
361         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
362         realpaths = sorted(os.path.realpath(path) for path in args.paths)
363         md5.update(b'\0'.join([p.encode() for p in realpaths]))
364         if any(os.path.isdir(path) for path in realpaths):
365             md5.update(b'-1')
366         elif args.filename:
367             md5.update(args.filename.encode())
368         return os.path.join(
369             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
370             md5.hexdigest())
371
372     def _lock_file(self, fileobj):
373         try:
374             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
375         except IOError:
376             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
377
378     def load(self):
379         self.cache_file.seek(0)
380         return json.load(self.cache_file)
381
382     def check_cache(self, api_client=None, num_retries=0):
383         try:
384             state = self.load()
385             locator = None
386             try:
387                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
388                     locator = state["_finished_streams"][0][1][0]
389                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
390                     locator = state["_current_stream_locators"][0]
391                 if locator is not None:
392                     kc = arvados.keep.KeepClient(api_client=api_client)
393                     kc.head(locator, num_retries=num_retries)
394             except Exception as e:
395                 self.restart()
396         except (ValueError):
397             pass
398
399     def save(self, data):
400         try:
401             new_cache_fd, new_cache_name = tempfile.mkstemp(
402                 dir=os.path.dirname(self.filename))
403             self._lock_file(new_cache_fd)
404             new_cache = os.fdopen(new_cache_fd, 'r+')
405             json.dump(data, new_cache)
406             os.rename(new_cache_name, self.filename)
407         except (IOError, OSError, ResumeCacheConflict):
408             try:
409                 os.unlink(new_cache_name)
410             except NameError:  # mkstemp failed.
411                 pass
412         else:
413             self.cache_file.close()
414             self.cache_file = new_cache
415
416     def close(self):
417         self.cache_file.close()
418
419     def destroy(self):
420         try:
421             os.unlink(self.filename)
422         except OSError as error:
423             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
424                 raise
425         self.close()
426
427     def restart(self):
428         self.destroy()
429         self.__init__(self.filename)
430
431
432 class ArvPutUploadJob(object):
433     CACHE_DIR = '.cache/arvados/arv-put'
434     EMPTY_STATE = {
435         'manifest' : None, # Last saved manifest checkpoint
436         'files' : {} # Previous run file list: {path : {size, mtime}}
437     }
438
439     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
440                  name=None, owner_uuid=None, api_client=None,
441                  ensure_unique_name=False, num_retries=None,
442                  put_threads=None, replication_desired=None, filename=None,
443                  update_time=60.0, update_collection=None, storage_classes=None,
444                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
445                  follow_links=True, exclude_paths=[], exclude_names=None,
446                  trash_at=None):
447         self.paths = paths
448         self.resume = resume
449         self.use_cache = use_cache
450         self.update = False
451         self.reporter = reporter
452         # This will set to 0 before start counting, if no special files are going
453         # to be read.
454         self.bytes_expected = None
455         self.bytes_written = 0
456         self.bytes_skipped = 0
457         self.name = name
458         self.owner_uuid = owner_uuid
459         self.ensure_unique_name = ensure_unique_name
460         self.num_retries = num_retries
461         self.replication_desired = replication_desired
462         self.put_threads = put_threads
463         self.filename = filename
464         self.storage_classes = storage_classes
465         self._api_client = api_client
466         self._state_lock = threading.Lock()
467         self._state = None # Previous run state (file list & manifest)
468         self._current_files = [] # Current run file list
469         self._cache_file = None
470         self._collection_lock = threading.Lock()
471         self._remote_collection = None # Collection being updated (if asked)
472         self._local_collection = None # Collection from previous run manifest
473         self._file_paths = set() # Files to be updated in remote collection
474         self._stop_checkpointer = threading.Event()
475         self._checkpointer = threading.Thread(target=self._update_task)
476         self._checkpointer.daemon = True
477         self._update_task_time = update_time  # How many seconds wait between update runs
478         self._files_to_upload = FileUploadList(dry_run=dry_run)
479         self._upload_started = False
480         self.logger = logger
481         self.dry_run = dry_run
482         self._checkpoint_before_quit = True
483         self.follow_links = follow_links
484         self.exclude_paths = exclude_paths
485         self.exclude_names = exclude_names
486         self._trash_at = trash_at
487
488         if self._trash_at is not None:
489             if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
490                 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
491             if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
492                 raise TypeError('provided trash_at datetime should be timezone-naive')
493
494         if not self.use_cache and self.resume:
495             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
496
497         # Check for obvious dry-run responses
498         if self.dry_run and (not self.use_cache or not self.resume):
499             raise ArvPutUploadIsPending()
500
501         # Load cached data if any and if needed
502         self._setup_state(update_collection)
503
504         # Build the upload file list, excluding requested files and counting the
505         # bytes expected to be uploaded.
506         self._build_upload_list()
507
508     def _build_upload_list(self):
509         """
510         Scan the requested paths to count file sizes, excluding requested files
511         and dirs and building the upload file list.
512         """
513         # If there aren't special files to be read, reset total bytes count to zero
514         # to start counting.
515         if not any([p for p in self.paths
516                     if not (os.path.isfile(p) or os.path.isdir(p))]):
517             self.bytes_expected = 0
518
519         for path in self.paths:
520             # Test for stdin first, in case some file named '-' exist
521             if path == '-':
522                 if self.dry_run:
523                     raise ArvPutUploadIsPending()
524                 self._write_stdin(self.filename or 'stdin')
525             elif not os.path.exists(path):
526                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
527             elif (not self.follow_links) and os.path.islink(path):
528                 continue
529             elif os.path.isdir(path):
530                 # Use absolute paths on cache index so CWD doesn't interfere
531                 # with the caching logic.
532                 orig_path = path
533                 path = os.path.abspath(path)
534                 if orig_path[-1:] == os.sep:
535                     # When passing a directory reference with a trailing slash,
536                     # its contents should be uploaded directly to the
537                     # collection's root.
538                     prefixdir = path
539                 else:
540                     # When passing a directory reference with no trailing slash,
541                     # upload the directory to the collection's root.
542                     prefixdir = os.path.dirname(path)
543                 prefixdir += os.sep
544                 for root, dirs, files in os.walk(path,
545                                                  followlinks=self.follow_links):
546                     root_relpath = os.path.relpath(root, path)
547                     if root_relpath == '.':
548                         root_relpath = ''
549                     # Exclude files/dirs by full path matching pattern
550                     if self.exclude_paths:
551                         dirs[:] = [d for d in dirs
552                                    if not any(pathname_match(
553                                            os.path.join(root_relpath, d), pat)
554                                               for pat in self.exclude_paths)]
555                         files = [f for f in files
556                                  if not any(pathname_match(
557                                          os.path.join(root_relpath, f), pat)
558                                             for pat in self.exclude_paths)]
559                     # Exclude files/dirs by name matching pattern
560                     if self.exclude_names is not None:
561                         dirs[:] = [d for d in dirs
562                                    if not self.exclude_names.match(d)]
563                         files = [f for f in files
564                                  if not self.exclude_names.match(f)]
565                     # Make os.walk()'s dir traversing order deterministic
566                     dirs.sort()
567                     files.sort()
568                     for f in files:
569                         filepath = os.path.join(root, f)
570                         # Add its size to the total bytes count (if applicable)
571                         if self.follow_links or (not os.path.islink(filepath)):
572                             if self.bytes_expected is not None:
573                                 self.bytes_expected += os.path.getsize(filepath)
574                         self._check_file(filepath,
575                                          os.path.join(root[len(prefixdir):], f))
576             else:
577                 filepath = os.path.abspath(path)
578                 # Add its size to the total bytes count (if applicable)
579                 if self.follow_links or (not os.path.islink(filepath)):
580                     if self.bytes_expected is not None:
581                         self.bytes_expected += os.path.getsize(filepath)
582                 self._check_file(filepath,
583                                  self.filename or os.path.basename(path))
584         # If dry-mode is on, and got up to this point, then we should notify that
585         # there aren't any file to upload.
586         if self.dry_run:
587             raise ArvPutUploadNotPending()
588         # Remove local_collection's files that don't exist locally anymore, so the
589         # bytes_written count is correct.
590         for f in self.collection_file_paths(self._local_collection,
591                                             path_prefix=""):
592             if f != 'stdin' and f != self.filename and not f in self._file_paths:
593                 self._local_collection.remove(f)
594
595     def start(self, save_collection):
596         """
597         Start supporting thread & file uploading
598         """
599         self._checkpointer.start()
600         try:
601             # Update bytes_written from current local collection and
602             # report initial progress.
603             self._update()
604             # Actual file upload
605             self._upload_started = True # Used by the update thread to start checkpointing
606             self._upload_files()
607         except (SystemExit, Exception) as e:
608             self._checkpoint_before_quit = False
609             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
610             # Note: We're expecting SystemExit instead of
611             # KeyboardInterrupt because we have a custom signal
612             # handler in place that raises SystemExit with the catched
613             # signal's code.
614             if isinstance(e, PathDoesNotExistError):
615                 # We aren't interested in the traceback for this case
616                 pass
617             elif not isinstance(e, SystemExit) or e.code != -2:
618                 self.logger.warning("Abnormal termination:\n{}".format(
619                     traceback.format_exc()))
620             raise
621         finally:
622             if not self.dry_run:
623                 # Stop the thread before doing anything else
624                 self._stop_checkpointer.set()
625                 self._checkpointer.join()
626                 if self._checkpoint_before_quit:
627                     # Commit all pending blocks & one last _update()
628                     self._local_collection.manifest_text()
629                     self._update(final=True)
630                     if save_collection:
631                         self.save_collection()
632             if self.use_cache:
633                 self._cache_file.close()
634
635     def _collection_trash_at(self):
636         """
637         Returns the trash date that the collection should use at save time.
638         Takes into account absolute/relative trash_at values requested
639         by the user.
640         """
641         if type(self._trash_at) == datetime.timedelta:
642             # Get an absolute datetime for trash_at
643             return datetime.datetime.utcnow() + self._trash_at
644         return self._trash_at
645
646     def save_collection(self):
647         if self.update:
648             # Check if files should be updated on the remote collection.
649             for fp in self._file_paths:
650                 remote_file = self._remote_collection.find(fp)
651                 if not remote_file:
652                     # File don't exist on remote collection, copy it.
653                     self._remote_collection.copy(fp, fp, self._local_collection)
654                 elif remote_file != self._local_collection.find(fp):
655                     # A different file exist on remote collection, overwrite it.
656                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
657                 else:
658                     # The file already exist on remote collection, skip it.
659                     pass
660             self._remote_collection.save(storage_classes=self.storage_classes,
661                                          num_retries=self.num_retries,
662                                          trash_at=self._collection_trash_at())
663         else:
664             if self.storage_classes is None:
665                 self.storage_classes = ['default']
666             self._local_collection.save_new(
667                 name=self.name, owner_uuid=self.owner_uuid,
668                 storage_classes=self.storage_classes,
669                 ensure_unique_name=self.ensure_unique_name,
670                 num_retries=self.num_retries,
671                 trash_at=self._collection_trash_at())
672
673     def destroy_cache(self):
674         if self.use_cache:
675             try:
676                 os.unlink(self._cache_filename)
677             except OSError as error:
678                 # That's what we wanted anyway.
679                 if error.errno != errno.ENOENT:
680                     raise
681             self._cache_file.close()
682
683     def _collection_size(self, collection):
684         """
685         Recursively get the total size of the collection
686         """
687         size = 0
688         for item in listvalues(collection):
689             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
690                 size += self._collection_size(item)
691             else:
692                 size += item.size()
693         return size
694
695     def _update_task(self):
696         """
697         Periodically called support task. File uploading is
698         asynchronous so we poll status from the collection.
699         """
700         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
701             self._update()
702
703     def _update(self, final=False):
704         """
705         Update cached manifest text and report progress.
706         """
707         if self._upload_started:
708             with self._collection_lock:
709                 self.bytes_written = self._collection_size(self._local_collection)
710                 if self.use_cache:
711                     if final:
712                         manifest = self._local_collection.manifest_text()
713                     else:
714                         # Get the manifest text without comitting pending blocks
715                         manifest = self._local_collection.manifest_text(strip=False,
716                                                                         normalize=False,
717                                                                         only_committed=True)
718                     # Update cache
719                     with self._state_lock:
720                         self._state['manifest'] = manifest
721             if self.use_cache:
722                 try:
723                     self._save_state()
724                 except Exception as e:
725                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
726             # Keep remote collection's trash_at attribute synced when using relative expire dates
727             if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
728                 try:
729                     self._api_client.collections().update(
730                         uuid=self._remote_collection.manifest_locator(),
731                         body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
732                     ).execute(num_retries=self.num_retries)
733                 except Exception as e:
734                     self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
735         else:
736             self.bytes_written = self.bytes_skipped
737         # Call the reporter, if any
738         self.report_progress()
739
740     def report_progress(self):
741         if self.reporter is not None:
742             self.reporter(self.bytes_written, self.bytes_expected)
743
744     def _write_stdin(self, filename):
745         output = self._local_collection.open(filename, 'wb')
746         self._write(sys.stdin.buffer, output)
747         output.close()
748
749     def _check_file(self, source, filename):
750         """
751         Check if this file needs to be uploaded
752         """
753         # Ignore symlinks when requested
754         if (not self.follow_links) and os.path.islink(source):
755             return
756         resume_offset = 0
757         should_upload = False
758         new_file_in_cache = False
759         # Record file path for updating the remote collection before exiting
760         self._file_paths.add(filename)
761
762         with self._state_lock:
763             # If no previous cached data on this file, store it for an eventual
764             # repeated run.
765             if source not in self._state['files']:
766                 self._state['files'][source] = {
767                     'mtime': os.path.getmtime(source),
768                     'size' : os.path.getsize(source)
769                 }
770                 new_file_in_cache = True
771             cached_file_data = self._state['files'][source]
772
773         # Check if file was already uploaded (at least partially)
774         file_in_local_collection = self._local_collection.find(filename)
775
776         # If not resuming, upload the full file.
777         if not self.resume:
778             should_upload = True
779         # New file detected from last run, upload it.
780         elif new_file_in_cache:
781             should_upload = True
782         # Local file didn't change from last run.
783         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
784             if not file_in_local_collection:
785                 # File not uploaded yet, upload it completely
786                 should_upload = True
787             elif file_in_local_collection.permission_expired():
788                 # Permission token expired, re-upload file. This will change whenever
789                 # we have a API for refreshing tokens.
790                 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
791                 should_upload = True
792                 self._local_collection.remove(filename)
793             elif cached_file_data['size'] == file_in_local_collection.size():
794                 # File already there, skip it.
795                 self.bytes_skipped += cached_file_data['size']
796             elif cached_file_data['size'] > file_in_local_collection.size():
797                 # File partially uploaded, resume!
798                 resume_offset = file_in_local_collection.size()
799                 self.bytes_skipped += resume_offset
800                 should_upload = True
801             else:
802                 # Inconsistent cache, re-upload the file
803                 should_upload = True
804                 self._local_collection.remove(filename)
805                 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
806         # Local file differs from cached data, re-upload it.
807         else:
808             if file_in_local_collection:
809                 self._local_collection.remove(filename)
810             should_upload = True
811
812         if should_upload:
813             try:
814                 self._files_to_upload.append((source, resume_offset, filename))
815             except ArvPutUploadIsPending:
816                 # This could happen when running on dry-mode, close cache file to
817                 # avoid locking issues.
818                 self._cache_file.close()
819                 raise
820
821     def _upload_files(self):
822         for source, resume_offset, filename in self._files_to_upload:
823             with open(source, 'rb') as source_fd:
824                 with self._state_lock:
825                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
826                     self._state['files'][source]['size'] = os.path.getsize(source)
827                 if resume_offset > 0:
828                     # Start upload where we left off
829                     output = self._local_collection.open(filename, 'ab')
830                     source_fd.seek(resume_offset)
831                 else:
832                     # Start from scratch
833                     output = self._local_collection.open(filename, 'wb')
834                 self._write(source_fd, output)
835                 output.close(flush=False)
836
837     def _write(self, source_fd, output):
838         while True:
839             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
840             if not data:
841                 break
842             output.write(data)
843
844     def _my_collection(self):
845         return self._remote_collection if self.update else self._local_collection
846
847     def _get_cache_filepath(self):
848         # Set up cache file name from input paths.
849         md5 = hashlib.md5()
850         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
851         realpaths = sorted(os.path.realpath(path) for path in self.paths)
852         md5.update(b'\0'.join([p.encode() for p in realpaths]))
853         if self.filename:
854             md5.update(self.filename.encode())
855         cache_filename = md5.hexdigest()
856         cache_filepath = os.path.join(
857             arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
858             cache_filename)
859         return cache_filepath
860
861     def _setup_state(self, update_collection):
862         """
863         Create a new cache file or load a previously existing one.
864         """
865         # Load an already existing collection for update
866         if update_collection and re.match(arvados.util.collection_uuid_pattern,
867                                           update_collection):
868             try:
869                 self._remote_collection = arvados.collection.Collection(
870                     update_collection,
871                     api_client=self._api_client,
872                     num_retries=self.num_retries)
873             except arvados.errors.ApiError as error:
874                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
875             else:
876                 self.update = True
877         elif update_collection:
878             # Collection locator provided, but unknown format
879             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
880
881         if self.use_cache:
882             cache_filepath = self._get_cache_filepath()
883             if self.resume and os.path.exists(cache_filepath):
884                 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
885                 self._cache_file = open(cache_filepath, 'a+')
886             else:
887                 # --no-resume means start with a empty cache file.
888                 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
889                 self._cache_file = open(cache_filepath, 'w+')
890             self._cache_filename = self._cache_file.name
891             self._lock_file(self._cache_file)
892             self._cache_file.seek(0)
893
894         with self._state_lock:
895             if self.use_cache:
896                 try:
897                     self._state = json.load(self._cache_file)
898                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
899                         # Cache at least partially incomplete, set up new cache
900                         self._state = copy.deepcopy(self.EMPTY_STATE)
901                 except ValueError:
902                     # Cache file empty, set up new cache
903                     self._state = copy.deepcopy(self.EMPTY_STATE)
904             else:
905                 self.logger.info("No cache usage requested for this run.")
906                 # No cache file, set empty state
907                 self._state = copy.deepcopy(self.EMPTY_STATE)
908             if not self._cached_manifest_valid():
909                 raise ResumeCacheInvalidError()
910             # Load the previous manifest so we can check if files were modified remotely.
911             self._local_collection = arvados.collection.Collection(
912                 self._state['manifest'],
913                 replication_desired=self.replication_desired,
914                 put_threads=self.put_threads,
915                 api_client=self._api_client,
916                 num_retries=self.num_retries)
917
918     def _cached_manifest_valid(self):
919         """
920         Validate the oldest non-expired block signature to check if cached manifest
921         is usable: checking if the cached manifest was not created with a different
922         arvados account.
923         """
924         if self._state.get('manifest', None) is None:
925             # No cached manifest yet, all good.
926             return True
927         now = datetime.datetime.utcnow()
928         oldest_exp = None
929         oldest_loc = None
930         block_found = False
931         for m in keep_locator_pattern.finditer(self._state['manifest']):
932             loc = m.group(0)
933             try:
934                 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
935             except IndexError:
936                 # Locator without signature
937                 continue
938             block_found = True
939             if exp > now and (oldest_exp is None or exp < oldest_exp):
940                 oldest_exp = exp
941                 oldest_loc = loc
942         if not block_found:
943             # No block signatures found => no invalid block signatures.
944             return True
945         if oldest_loc is None:
946             # Locator signatures found, but all have expired.
947             # Reset the cache and move on.
948             self.logger.info('Cache expired, starting from scratch.')
949             self._state['manifest'] = ''
950             return True
951         kc = arvados.KeepClient(api_client=self._api_client,
952                                 num_retries=self.num_retries)
953         try:
954             kc.head(oldest_loc)
955         except arvados.errors.KeepRequestError:
956             # Something is wrong, cached manifest is not valid.
957             return False
958         return True
959
960     def collection_file_paths(self, col, path_prefix='.'):
961         """Return a list of file paths by recursively go through the entire collection `col`"""
962         file_paths = []
963         for name, item in listitems(col):
964             if isinstance(item, arvados.arvfile.ArvadosFile):
965                 file_paths.append(os.path.join(path_prefix, name))
966             elif isinstance(item, arvados.collection.Subcollection):
967                 new_prefix = os.path.join(path_prefix, name)
968                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
969         return file_paths
970
971     def _lock_file(self, fileobj):
972         try:
973             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
974         except IOError:
975             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
976
977     def _save_state(self):
978         """
979         Atomically save current state into cache.
980         """
981         with self._state_lock:
982             # We're not using copy.deepcopy() here because it's a lot slower
983             # than json.dumps(), and we're already needing JSON format to be
984             # saved on disk.
985             state = json.dumps(self._state)
986         try:
987             new_cache = tempfile.NamedTemporaryFile(
988                 mode='w+',
989                 dir=os.path.dirname(self._cache_filename), delete=False)
990             self._lock_file(new_cache)
991             new_cache.write(state)
992             new_cache.flush()
993             os.fsync(new_cache)
994             os.rename(new_cache.name, self._cache_filename)
995         except (IOError, OSError, ResumeCacheConflict) as error:
996             self.logger.error("There was a problem while saving the cache file: {}".format(error))
997             try:
998                 os.unlink(new_cache_name)
999             except NameError:  # mkstemp failed.
1000                 pass
1001         else:
1002             self._cache_file.close()
1003             self._cache_file = new_cache
1004
1005     def collection_name(self):
1006         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1007
1008     def collection_trash_at(self):
1009         return self._my_collection().get_trash_at()
1010
1011     def manifest_locator(self):
1012         return self._my_collection().manifest_locator()
1013
1014     def portable_data_hash(self):
1015         pdh = self._my_collection().portable_data_hash()
1016         m = self._my_collection().stripped_manifest().encode()
1017         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1018         if pdh != local_pdh:
1019             self.logger.warning("\n".join([
1020                 "arv-put: API server provided PDH differs from local manifest.",
1021                 "         This should not happen; showing API server version."]))
1022         return pdh
1023
1024     def manifest_text(self, stream_name=".", strip=False, normalize=False):
1025         return self._my_collection().manifest_text(stream_name, strip, normalize)
1026
1027     def _datablocks_on_item(self, item):
1028         """
1029         Return a list of datablock locators, recursively navigating
1030         through subcollections
1031         """
1032         if isinstance(item, arvados.arvfile.ArvadosFile):
1033             if item.size() == 0:
1034                 # Empty file locator
1035                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1036             else:
1037                 locators = []
1038                 for segment in item.segments():
1039                     loc = segment.locator
1040                     locators.append(loc)
1041                 return locators
1042         elif isinstance(item, arvados.collection.Collection):
1043             l = [self._datablocks_on_item(x) for x in listvalues(item)]
1044             # Fast list flattener method taken from:
1045             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1046             return [loc for sublist in l for loc in sublist]
1047         else:
1048             return None
1049
1050     def data_locators(self):
1051         with self._collection_lock:
1052             # Make sure all datablocks are flushed before getting the locators
1053             self._my_collection().manifest_text()
1054             datablocks = self._datablocks_on_item(self._my_collection())
1055         return datablocks
1056
1057 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1058                                                             os.getpid())
1059
1060 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1061 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1062 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1063 # so instead we're using it on every path component.
1064 def pathname_match(pathname, pattern):
1065     name = pathname.split(os.sep)
1066     # Fix patterns like 'some/subdir/' or 'some//subdir'
1067     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1068     if len(name) != len(pat):
1069         return False
1070     for i in range(len(name)):
1071         if not fnmatch.fnmatch(name[i], pat[i]):
1072             return False
1073     return True
1074
1075 def machine_progress(bytes_written, bytes_expected):
1076     return _machine_format.format(
1077         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1078
1079 def human_progress(bytes_written, bytes_expected):
1080     if bytes_expected:
1081         return "\r{}M / {}M {:.1%} ".format(
1082             bytes_written >> 20, bytes_expected >> 20,
1083             float(bytes_written) / bytes_expected)
1084     else:
1085         return "\r{} ".format(bytes_written)
1086
1087 def progress_writer(progress_func, outfile=sys.stderr):
1088     def write_progress(bytes_written, bytes_expected):
1089         outfile.write(progress_func(bytes_written, bytes_expected))
1090     return write_progress
1091
1092 def desired_project_uuid(api_client, project_uuid, num_retries):
1093     if not project_uuid:
1094         query = api_client.users().current()
1095     elif arvados.util.user_uuid_pattern.match(project_uuid):
1096         query = api_client.users().get(uuid=project_uuid)
1097     elif arvados.util.group_uuid_pattern.match(project_uuid):
1098         query = api_client.groups().get(uuid=project_uuid)
1099     else:
1100         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1101     return query.execute(num_retries=num_retries)['uuid']
1102
1103 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1104          install_sig_handlers=True):
1105     global api_client
1106
1107     args = parse_arguments(arguments)
1108     logger = logging.getLogger('arvados.arv_put')
1109     if args.silent:
1110         logger.setLevel(logging.WARNING)
1111     else:
1112         logger.setLevel(logging.INFO)
1113     status = 0
1114
1115     request_id = arvados.util.new_request_id()
1116
1117     formatter = ArvPutLogFormatter(request_id)
1118     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1119
1120     if api_client is None:
1121         api_client = arvados.api('v1', request_id=request_id)
1122
1123     if install_sig_handlers:
1124         arv_cmd.install_signal_handlers()
1125
1126     # Trash arguments validation
1127     trash_at = None
1128     if args.trash_at is not None:
1129         # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1130         # make sure the user provides a complete YYYY-MM-DD date.
1131         if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1132             logger.error("--trash-at argument format invalid, use --help to see examples.")
1133             sys.exit(1)
1134         # Check if no time information was provided. In that case, assume end-of-day.
1135         if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1136             args.trash_at += 'T23:59:59'
1137         try:
1138             trash_at = ciso8601.parse_datetime(args.trash_at)
1139         except:
1140             logger.error("--trash-at argument format invalid, use --help to see examples.")
1141             sys.exit(1)
1142         else:
1143             if trash_at.tzinfo is not None:
1144                 # Timezone aware datetime provided.
1145                 utcoffset = -trash_at.utcoffset()
1146             else:
1147                 # Timezone naive datetime provided. Assume is local.
1148                 if time.daylight:
1149                     utcoffset = datetime.timedelta(seconds=time.altzone)
1150                 else:
1151                     utcoffset = datetime.timedelta(seconds=time.timezone)
1152             # Convert to UTC timezone naive datetime.
1153             trash_at = trash_at.replace(tzinfo=None) + utcoffset
1154
1155         if trash_at <= datetime.datetime.utcnow():
1156             logger.error("--trash-at argument must be set in the future")
1157             sys.exit(1)
1158     if args.trash_after is not None:
1159         if args.trash_after < 1:
1160             logger.error("--trash-after argument must be >= 1")
1161             sys.exit(1)
1162         trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1163
1164     # Determine the name to use
1165     if args.name:
1166         if args.stream or args.raw:
1167             logger.error("Cannot use --name with --stream or --raw")
1168             sys.exit(1)
1169         elif args.update_collection:
1170             logger.error("Cannot use --name with --update-collection")
1171             sys.exit(1)
1172         collection_name = args.name
1173     else:
1174         collection_name = "Saved at {} by {}@{}".format(
1175             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1176             pwd.getpwuid(os.getuid()).pw_name,
1177             socket.gethostname())
1178
1179     if args.project_uuid and (args.stream or args.raw):
1180         logger.error("Cannot use --project-uuid with --stream or --raw")
1181         sys.exit(1)
1182
1183     # Determine the parent project
1184     try:
1185         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1186                                             args.retries)
1187     except (apiclient_errors.Error, ValueError) as error:
1188         logger.error(error)
1189         sys.exit(1)
1190
1191     if args.progress:
1192         reporter = progress_writer(human_progress)
1193     elif args.batch_progress:
1194         reporter = progress_writer(machine_progress)
1195     else:
1196         reporter = None
1197
1198     #  Split storage-classes argument
1199     storage_classes = None
1200     if args.storage_classes:
1201         storage_classes = args.storage_classes.strip().split(',')
1202         if len(storage_classes) > 1:
1203             logger.error("Multiple storage classes are not supported currently.")
1204             sys.exit(1)
1205
1206
1207     # Setup exclude regex from all the --exclude arguments provided
1208     name_patterns = []
1209     exclude_paths = []
1210     exclude_names = None
1211     if len(args.exclude) > 0:
1212         # We're supporting 2 kinds of exclusion patterns:
1213         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1214         #                            the name, wherever the file is on the tree)
1215         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1216         #                            entire path, and should be relative to
1217         #                            any input dir argument)
1218         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1219         #                            placed directly underneath the input dir)
1220         for p in args.exclude:
1221             # Only relative paths patterns allowed
1222             if p.startswith(os.sep):
1223                 logger.error("Cannot use absolute paths with --exclude")
1224                 sys.exit(1)
1225             if os.path.dirname(p):
1226                 # We don't support of path patterns with '..'
1227                 p_parts = p.split(os.sep)
1228                 if '..' in p_parts:
1229                     logger.error(
1230                         "Cannot use path patterns that include or '..'")
1231                     sys.exit(1)
1232                 # Path search pattern
1233                 exclude_paths.append(p)
1234             else:
1235                 # Name-only search pattern
1236                 name_patterns.append(p)
1237         # For name only matching, we can combine all patterns into a single
1238         # regexp, for better performance.
1239         exclude_names = re.compile('|'.join(
1240             [fnmatch.translate(p) for p in name_patterns]
1241         )) if len(name_patterns) > 0 else None
1242         # Show the user the patterns to be used, just in case they weren't
1243         # specified inside quotes and got changed by the shell expansion.
1244         logger.info("Exclude patterns: {}".format(args.exclude))
1245
1246     # If this is used by a human, and there's at least one directory to be
1247     # uploaded, the expected bytes calculation can take a moment.
1248     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1249         logger.info("Calculating upload size, this could take some time...")
1250     try:
1251         writer = ArvPutUploadJob(paths = args.paths,
1252                                  resume = args.resume,
1253                                  use_cache = args.use_cache,
1254                                  filename = args.filename,
1255                                  reporter = reporter,
1256                                  api_client = api_client,
1257                                  num_retries = args.retries,
1258                                  replication_desired = args.replication,
1259                                  put_threads = args.threads,
1260                                  name = collection_name,
1261                                  owner_uuid = project_uuid,
1262                                  ensure_unique_name = True,
1263                                  update_collection = args.update_collection,
1264                                  storage_classes=storage_classes,
1265                                  logger=logger,
1266                                  dry_run=args.dry_run,
1267                                  follow_links=args.follow_links,
1268                                  exclude_paths=exclude_paths,
1269                                  exclude_names=exclude_names,
1270                                  trash_at=trash_at)
1271     except ResumeCacheConflict:
1272         logger.error("\n".join([
1273             "arv-put: Another process is already uploading this data.",
1274             "         Use --no-cache if this is really what you want."]))
1275         sys.exit(1)
1276     except ResumeCacheInvalidError:
1277         logger.error("\n".join([
1278             "arv-put: Resume cache contains invalid signature: it may have expired",
1279             "         or been created with another Arvados user's credentials.",
1280             "         Switch user or use one of the following options to restart upload:",
1281             "         --no-resume to start a new resume cache.",
1282             "         --no-cache to disable resume cache."]))
1283         sys.exit(1)
1284     except (CollectionUpdateError, PathDoesNotExistError) as error:
1285         logger.error("\n".join([
1286             "arv-put: %s" % str(error)]))
1287         sys.exit(1)
1288     except ArvPutUploadIsPending:
1289         # Dry run check successful, return proper exit code.
1290         sys.exit(2)
1291     except ArvPutUploadNotPending:
1292         # No files pending for upload
1293         sys.exit(0)
1294
1295     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1296         logger.warning("\n".join([
1297             "arv-put: Resuming previous upload from last checkpoint.",
1298             "         Use the --no-resume option to start over."]))
1299
1300     if not args.dry_run:
1301         writer.report_progress()
1302     output = None
1303     try:
1304         writer.start(save_collection=not(args.stream or args.raw))
1305     except arvados.errors.ApiError as error:
1306         logger.error("\n".join([
1307             "arv-put: %s" % str(error)]))
1308         sys.exit(1)
1309
1310     if args.progress:  # Print newline to split stderr from stdout for humans.
1311         logger.info("\n")
1312
1313     if args.stream:
1314         if args.normalize:
1315             output = writer.manifest_text(normalize=True)
1316         else:
1317             output = writer.manifest_text()
1318     elif args.raw:
1319         output = ','.join(writer.data_locators())
1320     else:
1321         try:
1322             expiration_notice = ""
1323             if writer.collection_trash_at() is not None:
1324                 # Get the local timezone-naive version, and log it with timezone information.
1325                 if time.daylight:
1326                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1327                 else:
1328                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1329                 expiration_notice = ". It will expire on {} {}.".format(
1330                     local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1331             if args.update_collection:
1332                 logger.info(u"Collection updated: '{}'{}".format(
1333                     writer.collection_name(), expiration_notice))
1334             else:
1335                 logger.info(u"Collection saved as '{}'{}".format(
1336                     writer.collection_name(), expiration_notice))
1337             if args.portable_data_hash:
1338                 output = writer.portable_data_hash()
1339             else:
1340                 output = writer.manifest_locator()
1341         except apiclient_errors.Error as error:
1342             logger.error(
1343                 "arv-put: Error creating Collection on project: {}.".format(
1344                     error))
1345             status = 1
1346
1347     # Print the locator (uuid) of the new collection.
1348     if output is None:
1349         status = status or 1
1350     elif not args.silent:
1351         stdout.write(output)
1352         if not output.endswith('\n'):
1353             stdout.write('\n')
1354
1355     if install_sig_handlers:
1356         arv_cmd.restore_signal_handlers()
1357
1358     if status != 0:
1359         sys.exit(status)
1360
1361     # Success!
1362     return output
1363
1364
1365 if __name__ == '__main__':
1366     main()