Merge branch '17351-arvput-keepclient-storage-support'
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import ciso8601
14 import copy
15 import datetime
16 import errno
17 import fcntl
18 import fnmatch
19 import hashlib
20 import json
21 import logging
22 import os
23 import pwd
24 import re
25 import signal
26 import socket
27 import sys
28 import tempfile
29 import threading
30 import time
31 import traceback
32
33 from apiclient import errors as apiclient_errors
34 from arvados._version import __version__
35 from arvados.util import keep_locator_pattern
36
37 import arvados.commands._util as arv_cmd
38
39 api_client = None
40
41 upload_opts = argparse.ArgumentParser(add_help=False)
42
43 upload_opts.add_argument('--version', action='version',
44                          version="%s %s" % (sys.argv[0], __version__),
45                          help='Print version and exit.')
46 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47                          help="""
48 Local file or directory. If path is a directory reference with a trailing
49 slash, then just upload the directory's contents; otherwise upload the
50 directory itself. Default: read from standard input.
51 """)
52
53 _group = upload_opts.add_mutually_exclusive_group()
54
55 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
56                     default=-1, help=argparse.SUPPRESS)
57
58 _group.add_argument('--normalize', action='store_true',
59                     help="""
60 Normalize the manifest by re-ordering files and streams after writing
61 data.
62 """)
63
64 _group.add_argument('--dry-run', action='store_true', default=False,
65                     help="""
66 Don't actually upload files, but only check if any file should be
67 uploaded. Exit with code=2 when files are pending for upload.
68 """)
69
70 _group = upload_opts.add_mutually_exclusive_group()
71
72 _group.add_argument('--as-stream', action='store_true', dest='stream',
73                     help="""
74 Synonym for --stream.
75 """)
76
77 _group.add_argument('--stream', action='store_true',
78                     help="""
79 Store the file content and display the resulting manifest on
80 stdout. Do not save a Collection object in Arvados.
81 """)
82
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89                     help="""
90 Synonym for --manifest.
91 """)
92
93 _group.add_argument('--manifest', action='store_true',
94                     help="""
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
98 """)
99
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
101                     help="""
102 Synonym for --raw.
103 """)
104
105 _group.add_argument('--raw', action='store_true',
106                     help="""
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
109 manifest.
110 """)
111
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113                          dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
116 """)
117
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119                          dest='filename', help="""
120 Synonym for --filename.
121 """)
122
123 upload_opts.add_argument('--filename', type=str, default=None,
124                          help="""
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
129 """)
130
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
132                          help="""
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
135 """)
136
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138                          help="""
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
142 """)
143
144 upload_opts.add_argument('--storage-classes', help="""
145 Specify comma separated list of storage classes to be used when saving data to Keep.
146 """)
147
148 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
149                          help="""
150 Set the number of upload threads to be used. Take into account that
151 using lots of threads will increase the RAM requirements. Default is
152 to use 2 threads.
153 On high latency installations, using a greater number will improve
154 overall throughput.
155 """)
156
157 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
158                       action='append', help="""
159 Exclude files and directories whose names match the given glob pattern. When
160 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
161 directory, relative to the provided input dirs will be excluded.
162 When using a filename pattern like '*.txt', any text file will be excluded
163 no matter where it is placed.
164 For the special case of needing to exclude only files or dirs directly below
165 the given input directory, you can use a pattern like './exclude_this.gif'.
166 You can specify multiple patterns by using this argument more than once.
167 """)
168
169 _group = upload_opts.add_mutually_exclusive_group()
170 _group.add_argument('--follow-links', action='store_true', default=True,
171                     dest='follow_links', help="""
172 Follow file and directory symlinks (default).
173 """)
174 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
175                     help="""
176 Do not follow file and directory symlinks.
177 """)
178
179
180 run_opts = argparse.ArgumentParser(add_help=False)
181
182 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
183 Store the collection in the specified project, instead of your Home
184 project.
185 """)
186
187 run_opts.add_argument('--name', help="""
188 Save the collection with the specified name.
189 """)
190
191 _group = run_opts.add_mutually_exclusive_group()
192 _group.add_argument('--progress', action='store_true',
193                     help="""
194 Display human-readable progress on stderr (bytes and, if possible,
195 percentage of total data size). This is the default behavior when
196 stderr is a tty.
197 """)
198
199 _group.add_argument('--no-progress', action='store_true',
200                     help="""
201 Do not display human-readable progress on stderr, even if stderr is a
202 tty.
203 """)
204
205 _group.add_argument('--batch-progress', action='store_true',
206                     help="""
207 Display machine-readable progress on stderr (bytes and, if known,
208 total data size).
209 """)
210
211 run_opts.add_argument('--silent', action='store_true',
212                       help="""
213 Do not print any debug messages to console. (Any error messages will
214 still be displayed.)
215 """)
216
217 _group = run_opts.add_mutually_exclusive_group()
218 _group.add_argument('--resume', action='store_true', default=True,
219                     help="""
220 Continue interrupted uploads from cached state (default).
221 """)
222 _group.add_argument('--no-resume', action='store_false', dest='resume',
223                     help="""
224 Do not continue interrupted uploads from cached state.
225 """)
226
227 _group = run_opts.add_mutually_exclusive_group()
228 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
229                     help="""
230 Save upload state in a cache file for resuming (default).
231 """)
232 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
233                     help="""
234 Do not save upload state in a cache file for resuming.
235 """)
236
237 _group = upload_opts.add_mutually_exclusive_group()
238 _group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
239                     help="""
240 Set the trash date of the resulting collection to an absolute date in the future.
241 The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
242 Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
243 """)
244 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
245                     help="""
246 Set the trash date of the resulting collection to an amount of days from the
247 date/time that the upload process finishes.
248 """)
249
250 arg_parser = argparse.ArgumentParser(
251     description='Copy data from the local filesystem to Keep.',
252     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
253
254 def parse_arguments(arguments):
255     args = arg_parser.parse_args(arguments)
256
257     if len(args.paths) == 0:
258         args.paths = ['-']
259
260     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
261
262     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
263         if args.filename:
264             arg_parser.error("""
265     --filename argument cannot be used when storing a directory or
266     multiple files.
267     """)
268
269     # Turn on --progress by default if stderr is a tty.
270     if (not (args.batch_progress or args.no_progress or args.silent)
271         and os.isatty(sys.stderr.fileno())):
272         args.progress = True
273
274     # Turn off --resume (default) if --no-cache is used.
275     if not args.use_cache:
276         args.resume = False
277
278     if args.paths == ['-']:
279         if args.update_collection:
280             arg_parser.error("""
281     --update-collection cannot be used when reading from stdin.
282     """)
283         args.resume = False
284         args.use_cache = False
285         if not args.filename:
286             args.filename = 'stdin'
287
288     # Remove possible duplicated patterns
289     if len(args.exclude) > 0:
290         args.exclude = list(set(args.exclude))
291
292     return args
293
294
295 class PathDoesNotExistError(Exception):
296     pass
297
298
299 class CollectionUpdateError(Exception):
300     pass
301
302
303 class ResumeCacheConflict(Exception):
304     pass
305
306
307 class ResumeCacheInvalidError(Exception):
308     pass
309
310 class ArvPutArgumentConflict(Exception):
311     pass
312
313
314 class ArvPutUploadIsPending(Exception):
315     pass
316
317
318 class ArvPutUploadNotPending(Exception):
319     pass
320
321
322 class FileUploadList(list):
323     def __init__(self, dry_run=False):
324         list.__init__(self)
325         self.dry_run = dry_run
326
327     def append(self, other):
328         if self.dry_run:
329             raise ArvPutUploadIsPending()
330         super(FileUploadList, self).append(other)
331
332
333 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
334 class ArvPutLogFormatter(logging.Formatter):
335     std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
336     err_fmtr = None
337     request_id_informed = False
338
339     def __init__(self, request_id):
340         self.err_fmtr = logging.Formatter(
341             arvados.log_format+' (X-Request-Id: {})'.format(request_id),
342             arvados.log_date_format)
343
344     def format(self, record):
345         if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
346             self.request_id_informed = True
347             return self.err_fmtr.format(record)
348         return self.std_fmtr.format(record)
349
350
351 class ResumeCache(object):
352     CACHE_DIR = '.cache/arvados/arv-put'
353
354     def __init__(self, file_spec):
355         self.cache_file = open(file_spec, 'a+')
356         self._lock_file(self.cache_file)
357         self.filename = self.cache_file.name
358
359     @classmethod
360     def make_path(cls, args):
361         md5 = hashlib.md5()
362         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
363         realpaths = sorted(os.path.realpath(path) for path in args.paths)
364         md5.update(b'\0'.join([p.encode() for p in realpaths]))
365         if any(os.path.isdir(path) for path in realpaths):
366             md5.update(b'-1')
367         elif args.filename:
368             md5.update(args.filename.encode())
369         return os.path.join(
370             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
371             md5.hexdigest())
372
373     def _lock_file(self, fileobj):
374         try:
375             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
376         except IOError:
377             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
378
379     def load(self):
380         self.cache_file.seek(0)
381         return json.load(self.cache_file)
382
383     def check_cache(self, api_client=None, num_retries=0):
384         try:
385             state = self.load()
386             locator = None
387             try:
388                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
389                     locator = state["_finished_streams"][0][1][0]
390                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
391                     locator = state["_current_stream_locators"][0]
392                 if locator is not None:
393                     kc = arvados.keep.KeepClient(api_client=api_client)
394                     kc.head(locator, num_retries=num_retries)
395             except Exception as e:
396                 self.restart()
397         except (ValueError):
398             pass
399
400     def save(self, data):
401         try:
402             new_cache_fd, new_cache_name = tempfile.mkstemp(
403                 dir=os.path.dirname(self.filename))
404             self._lock_file(new_cache_fd)
405             new_cache = os.fdopen(new_cache_fd, 'r+')
406             json.dump(data, new_cache)
407             os.rename(new_cache_name, self.filename)
408         except (IOError, OSError, ResumeCacheConflict):
409             try:
410                 os.unlink(new_cache_name)
411             except NameError:  # mkstemp failed.
412                 pass
413         else:
414             self.cache_file.close()
415             self.cache_file = new_cache
416
417     def close(self):
418         self.cache_file.close()
419
420     def destroy(self):
421         try:
422             os.unlink(self.filename)
423         except OSError as error:
424             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
425                 raise
426         self.close()
427
428     def restart(self):
429         self.destroy()
430         self.__init__(self.filename)
431
432
433 class ArvPutUploadJob(object):
434     CACHE_DIR = '.cache/arvados/arv-put'
435     EMPTY_STATE = {
436         'manifest' : None, # Last saved manifest checkpoint
437         'files' : {} # Previous run file list: {path : {size, mtime}}
438     }
439
440     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
441                  name=None, owner_uuid=None, api_client=None,
442                  ensure_unique_name=False, num_retries=None,
443                  put_threads=None, replication_desired=None, filename=None,
444                  update_time=60.0, update_collection=None, storage_classes=None,
445                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
446                  follow_links=True, exclude_paths=[], exclude_names=None,
447                  trash_at=None):
448         self.paths = paths
449         self.resume = resume
450         self.use_cache = use_cache
451         self.update = False
452         self.reporter = reporter
453         # This will set to 0 before start counting, if no special files are going
454         # to be read.
455         self.bytes_expected = None
456         self.bytes_written = 0
457         self.bytes_skipped = 0
458         self.name = name
459         self.owner_uuid = owner_uuid
460         self.ensure_unique_name = ensure_unique_name
461         self.num_retries = num_retries
462         self.replication_desired = replication_desired
463         self.put_threads = put_threads
464         self.filename = filename
465         self.storage_classes = storage_classes
466         self._api_client = api_client
467         self._state_lock = threading.Lock()
468         self._state = None # Previous run state (file list & manifest)
469         self._current_files = [] # Current run file list
470         self._cache_file = None
471         self._collection_lock = threading.Lock()
472         self._remote_collection = None # Collection being updated (if asked)
473         self._local_collection = None # Collection from previous run manifest
474         self._file_paths = set() # Files to be updated in remote collection
475         self._stop_checkpointer = threading.Event()
476         self._checkpointer = threading.Thread(target=self._update_task)
477         self._checkpointer.daemon = True
478         self._update_task_time = update_time  # How many seconds wait between update runs
479         self._files_to_upload = FileUploadList(dry_run=dry_run)
480         self._upload_started = False
481         self.logger = logger
482         self.dry_run = dry_run
483         self._checkpoint_before_quit = True
484         self.follow_links = follow_links
485         self.exclude_paths = exclude_paths
486         self.exclude_names = exclude_names
487         self._trash_at = trash_at
488
489         if self._trash_at is not None:
490             if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
491                 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
492             if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
493                 raise TypeError('provided trash_at datetime should be timezone-naive')
494
495         if not self.use_cache and self.resume:
496             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
497
498         # Check for obvious dry-run responses
499         if self.dry_run and (not self.use_cache or not self.resume):
500             raise ArvPutUploadIsPending()
501
502         # Load cached data if any and if needed
503         self._setup_state(update_collection)
504
505         # Build the upload file list, excluding requested files and counting the
506         # bytes expected to be uploaded.
507         self._build_upload_list()
508
509     def _build_upload_list(self):
510         """
511         Scan the requested paths to count file sizes, excluding requested files
512         and dirs and building the upload file list.
513         """
514         # If there aren't special files to be read, reset total bytes count to zero
515         # to start counting.
516         if not any([p for p in self.paths
517                     if not (os.path.isfile(p) or os.path.isdir(p))]):
518             self.bytes_expected = 0
519
520         for path in self.paths:
521             # Test for stdin first, in case some file named '-' exist
522             if path == '-':
523                 if self.dry_run:
524                     raise ArvPutUploadIsPending()
525                 self._write_stdin(self.filename or 'stdin')
526             elif not os.path.exists(path):
527                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
528             elif os.path.isdir(path):
529                 # Use absolute paths on cache index so CWD doesn't interfere
530                 # with the caching logic.
531                 orig_path = path
532                 path = os.path.abspath(path)
533                 if orig_path[-1:] == os.sep:
534                     # When passing a directory reference with a trailing slash,
535                     # its contents should be uploaded directly to the
536                     # collection's root.
537                     prefixdir = path
538                 else:
539                     # When passing a directory reference with no trailing slash,
540                     # upload the directory to the collection's root.
541                     prefixdir = os.path.dirname(path)
542                 prefixdir += os.sep
543                 for root, dirs, files in os.walk(path,
544                                                  followlinks=self.follow_links):
545                     root_relpath = os.path.relpath(root, path)
546                     if root_relpath == '.':
547                         root_relpath = ''
548                     # Exclude files/dirs by full path matching pattern
549                     if self.exclude_paths:
550                         dirs[:] = [d for d in dirs
551                                    if not any(pathname_match(
552                                            os.path.join(root_relpath, d), pat)
553                                               for pat in self.exclude_paths)]
554                         files = [f for f in files
555                                  if not any(pathname_match(
556                                          os.path.join(root_relpath, f), pat)
557                                             for pat in self.exclude_paths)]
558                     # Exclude files/dirs by name matching pattern
559                     if self.exclude_names is not None:
560                         dirs[:] = [d for d in dirs
561                                    if not self.exclude_names.match(d)]
562                         files = [f for f in files
563                                  if not self.exclude_names.match(f)]
564                     # Make os.walk()'s dir traversing order deterministic
565                     dirs.sort()
566                     files.sort()
567                     for f in files:
568                         filepath = os.path.join(root, f)
569                         # Add its size to the total bytes count (if applicable)
570                         if self.follow_links or (not os.path.islink(filepath)):
571                             if self.bytes_expected is not None:
572                                 self.bytes_expected += os.path.getsize(filepath)
573                         self._check_file(filepath,
574                                          os.path.join(root[len(prefixdir):], f))
575             else:
576                 filepath = os.path.abspath(path)
577                 # Add its size to the total bytes count (if applicable)
578                 if self.follow_links or (not os.path.islink(filepath)):
579                     if self.bytes_expected is not None:
580                         self.bytes_expected += os.path.getsize(filepath)
581                 self._check_file(filepath,
582                                  self.filename or os.path.basename(path))
583         # If dry-mode is on, and got up to this point, then we should notify that
584         # there aren't any file to upload.
585         if self.dry_run:
586             raise ArvPutUploadNotPending()
587         # Remove local_collection's files that don't exist locally anymore, so the
588         # bytes_written count is correct.
589         for f in self.collection_file_paths(self._local_collection,
590                                             path_prefix=""):
591             if f != 'stdin' and f != self.filename and not f in self._file_paths:
592                 self._local_collection.remove(f)
593
594     def start(self, save_collection):
595         """
596         Start supporting thread & file uploading
597         """
598         self._checkpointer.start()
599         try:
600             # Update bytes_written from current local collection and
601             # report initial progress.
602             self._update()
603             # Actual file upload
604             self._upload_started = True # Used by the update thread to start checkpointing
605             self._upload_files()
606         except (SystemExit, Exception) as e:
607             self._checkpoint_before_quit = False
608             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
609             # Note: We're expecting SystemExit instead of
610             # KeyboardInterrupt because we have a custom signal
611             # handler in place that raises SystemExit with the catched
612             # signal's code.
613             if isinstance(e, PathDoesNotExistError):
614                 # We aren't interested in the traceback for this case
615                 pass
616             elif not isinstance(e, SystemExit) or e.code != -2:
617                 self.logger.warning("Abnormal termination:\n{}".format(
618                     traceback.format_exc()))
619             raise
620         finally:
621             if not self.dry_run:
622                 # Stop the thread before doing anything else
623                 self._stop_checkpointer.set()
624                 self._checkpointer.join()
625                 if self._checkpoint_before_quit:
626                     # Commit all pending blocks & one last _update()
627                     self._local_collection.manifest_text()
628                     self._update(final=True)
629                     if save_collection:
630                         self.save_collection()
631             if self.use_cache:
632                 self._cache_file.close()
633
634     def _collection_trash_at(self):
635         """
636         Returns the trash date that the collection should use at save time.
637         Takes into account absolute/relative trash_at values requested
638         by the user.
639         """
640         if type(self._trash_at) == datetime.timedelta:
641             # Get an absolute datetime for trash_at
642             return datetime.datetime.utcnow() + self._trash_at
643         return self._trash_at
644
645     def save_collection(self):
646         if self.update:
647             # Check if files should be updated on the remote collection.
648             for fp in self._file_paths:
649                 remote_file = self._remote_collection.find(fp)
650                 if not remote_file:
651                     # File don't exist on remote collection, copy it.
652                     self._remote_collection.copy(fp, fp, self._local_collection)
653                 elif remote_file != self._local_collection.find(fp):
654                     # A different file exist on remote collection, overwrite it.
655                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
656                 else:
657                     # The file already exist on remote collection, skip it.
658                     pass
659             self._remote_collection.save(num_retries=self.num_retries,
660                                          trash_at=self._collection_trash_at())
661         else:
662             self._local_collection.save_new(
663                 name=self.name, owner_uuid=self.owner_uuid,
664                 ensure_unique_name=self.ensure_unique_name,
665                 num_retries=self.num_retries,
666                 trash_at=self._collection_trash_at())
667
668     def destroy_cache(self):
669         if self.use_cache:
670             try:
671                 os.unlink(self._cache_filename)
672             except OSError as error:
673                 # That's what we wanted anyway.
674                 if error.errno != errno.ENOENT:
675                     raise
676             self._cache_file.close()
677
678     def _collection_size(self, collection):
679         """
680         Recursively get the total size of the collection
681         """
682         size = 0
683         for item in listvalues(collection):
684             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
685                 size += self._collection_size(item)
686             else:
687                 size += item.size()
688         return size
689
690     def _update_task(self):
691         """
692         Periodically called support task. File uploading is
693         asynchronous so we poll status from the collection.
694         """
695         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
696             self._update()
697
698     def _update(self, final=False):
699         """
700         Update cached manifest text and report progress.
701         """
702         if self._upload_started:
703             with self._collection_lock:
704                 self.bytes_written = self._collection_size(self._local_collection)
705                 if self.use_cache:
706                     if final:
707                         manifest = self._local_collection.manifest_text()
708                     else:
709                         # Get the manifest text without comitting pending blocks
710                         manifest = self._local_collection.manifest_text(strip=False,
711                                                                         normalize=False,
712                                                                         only_committed=True)
713                     # Update cache
714                     with self._state_lock:
715                         self._state['manifest'] = manifest
716             if self.use_cache:
717                 try:
718                     self._save_state()
719                 except Exception as e:
720                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
721             # Keep remote collection's trash_at attribute synced when using relative expire dates
722             if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
723                 try:
724                     self._api_client.collections().update(
725                         uuid=self._remote_collection.manifest_locator(),
726                         body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
727                     ).execute(num_retries=self.num_retries)
728                 except Exception as e:
729                     self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
730         else:
731             self.bytes_written = self.bytes_skipped
732         # Call the reporter, if any
733         self.report_progress()
734
735     def report_progress(self):
736         if self.reporter is not None:
737             self.reporter(self.bytes_written, self.bytes_expected)
738
739     def _write_stdin(self, filename):
740         output = self._local_collection.open(filename, 'wb')
741         self._write(sys.stdin.buffer, output)
742         output.close()
743
744     def _check_file(self, source, filename):
745         """
746         Check if this file needs to be uploaded
747         """
748         # Ignore symlinks when requested
749         if (not self.follow_links) and os.path.islink(source):
750             return
751         resume_offset = 0
752         should_upload = False
753         new_file_in_cache = False
754         # Record file path for updating the remote collection before exiting
755         self._file_paths.add(filename)
756
757         with self._state_lock:
758             # If no previous cached data on this file, store it for an eventual
759             # repeated run.
760             if source not in self._state['files']:
761                 self._state['files'][source] = {
762                     'mtime': os.path.getmtime(source),
763                     'size' : os.path.getsize(source)
764                 }
765                 new_file_in_cache = True
766             cached_file_data = self._state['files'][source]
767
768         # Check if file was already uploaded (at least partially)
769         file_in_local_collection = self._local_collection.find(filename)
770
771         # If not resuming, upload the full file.
772         if not self.resume:
773             should_upload = True
774         # New file detected from last run, upload it.
775         elif new_file_in_cache:
776             should_upload = True
777         # Local file didn't change from last run.
778         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
779             if not file_in_local_collection:
780                 # File not uploaded yet, upload it completely
781                 should_upload = True
782             elif file_in_local_collection.permission_expired():
783                 # Permission token expired, re-upload file. This will change whenever
784                 # we have a API for refreshing tokens.
785                 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
786                 should_upload = True
787                 self._local_collection.remove(filename)
788             elif cached_file_data['size'] == file_in_local_collection.size():
789                 # File already there, skip it.
790                 self.bytes_skipped += cached_file_data['size']
791             elif cached_file_data['size'] > file_in_local_collection.size():
792                 # File partially uploaded, resume!
793                 resume_offset = file_in_local_collection.size()
794                 self.bytes_skipped += resume_offset
795                 should_upload = True
796             else:
797                 # Inconsistent cache, re-upload the file
798                 should_upload = True
799                 self._local_collection.remove(filename)
800                 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
801         # Local file differs from cached data, re-upload it.
802         else:
803             if file_in_local_collection:
804                 self._local_collection.remove(filename)
805             should_upload = True
806
807         if should_upload:
808             try:
809                 self._files_to_upload.append((source, resume_offset, filename))
810             except ArvPutUploadIsPending:
811                 # This could happen when running on dry-mode, close cache file to
812                 # avoid locking issues.
813                 self._cache_file.close()
814                 raise
815
816     def _upload_files(self):
817         for source, resume_offset, filename in self._files_to_upload:
818             with open(source, 'rb') as source_fd:
819                 with self._state_lock:
820                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
821                     self._state['files'][source]['size'] = os.path.getsize(source)
822                 if resume_offset > 0:
823                     # Start upload where we left off
824                     output = self._local_collection.open(filename, 'ab')
825                     source_fd.seek(resume_offset)
826                 else:
827                     # Start from scratch
828                     output = self._local_collection.open(filename, 'wb')
829                 self._write(source_fd, output)
830                 output.close(flush=False)
831
832     def _write(self, source_fd, output):
833         while True:
834             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
835             if not data:
836                 break
837             output.write(data)
838
839     def _my_collection(self):
840         return self._remote_collection if self.update else self._local_collection
841
842     def _get_cache_filepath(self):
843         # Set up cache file name from input paths.
844         md5 = hashlib.md5()
845         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
846         realpaths = sorted(os.path.realpath(path) for path in self.paths)
847         md5.update(b'\0'.join([p.encode() for p in realpaths]))
848         if self.filename:
849             md5.update(self.filename.encode())
850         cache_filename = md5.hexdigest()
851         cache_filepath = os.path.join(
852             arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
853             cache_filename)
854         return cache_filepath
855
856     def _setup_state(self, update_collection):
857         """
858         Create a new cache file or load a previously existing one.
859         """
860         # Load an already existing collection for update
861         if update_collection and re.match(arvados.util.collection_uuid_pattern,
862                                           update_collection):
863             try:
864                 self._remote_collection = arvados.collection.Collection(
865                     update_collection,
866                     api_client=self._api_client,
867                     storage_classes_desired=self.storage_classes,
868                     num_retries=self.num_retries)
869             except arvados.errors.ApiError as error:
870                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
871             else:
872                 self.update = True
873         elif update_collection:
874             # Collection locator provided, but unknown format
875             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
876
877         if self.use_cache:
878             cache_filepath = self._get_cache_filepath()
879             if self.resume and os.path.exists(cache_filepath):
880                 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
881                 self._cache_file = open(cache_filepath, 'a+')
882             else:
883                 # --no-resume means start with a empty cache file.
884                 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
885                 self._cache_file = open(cache_filepath, 'w+')
886             self._cache_filename = self._cache_file.name
887             self._lock_file(self._cache_file)
888             self._cache_file.seek(0)
889
890         with self._state_lock:
891             if self.use_cache:
892                 try:
893                     self._state = json.load(self._cache_file)
894                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
895                         # Cache at least partially incomplete, set up new cache
896                         self._state = copy.deepcopy(self.EMPTY_STATE)
897                 except ValueError:
898                     # Cache file empty, set up new cache
899                     self._state = copy.deepcopy(self.EMPTY_STATE)
900             else:
901                 self.logger.info("No cache usage requested for this run.")
902                 # No cache file, set empty state
903                 self._state = copy.deepcopy(self.EMPTY_STATE)
904             if not self._cached_manifest_valid():
905                 raise ResumeCacheInvalidError()
906             # Load the previous manifest so we can check if files were modified remotely.
907             self._local_collection = arvados.collection.Collection(
908                 self._state['manifest'],
909                 replication_desired=self.replication_desired,
910                 storage_classes_desired=(self.storage_classes or ['default']),
911                 put_threads=self.put_threads,
912                 api_client=self._api_client,
913                 num_retries=self.num_retries)
914
915     def _cached_manifest_valid(self):
916         """
917         Validate the oldest non-expired block signature to check if cached manifest
918         is usable: checking if the cached manifest was not created with a different
919         arvados account.
920         """
921         if self._state.get('manifest', None) is None:
922             # No cached manifest yet, all good.
923             return True
924         now = datetime.datetime.utcnow()
925         oldest_exp = None
926         oldest_loc = None
927         block_found = False
928         for m in keep_locator_pattern.finditer(self._state['manifest']):
929             loc = m.group(0)
930             try:
931                 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
932             except IndexError:
933                 # Locator without signature
934                 continue
935             block_found = True
936             if exp > now and (oldest_exp is None or exp < oldest_exp):
937                 oldest_exp = exp
938                 oldest_loc = loc
939         if not block_found:
940             # No block signatures found => no invalid block signatures.
941             return True
942         if oldest_loc is None:
943             # Locator signatures found, but all have expired.
944             # Reset the cache and move on.
945             self.logger.info('Cache expired, starting from scratch.')
946             self._state['manifest'] = ''
947             return True
948         kc = arvados.KeepClient(api_client=self._api_client,
949                                 num_retries=self.num_retries)
950         try:
951             kc.head(oldest_loc)
952         except arvados.errors.KeepRequestError:
953             # Something is wrong, cached manifest is not valid.
954             return False
955         return True
956
957     def collection_file_paths(self, col, path_prefix='.'):
958         """Return a list of file paths by recursively go through the entire collection `col`"""
959         file_paths = []
960         for name, item in listitems(col):
961             if isinstance(item, arvados.arvfile.ArvadosFile):
962                 file_paths.append(os.path.join(path_prefix, name))
963             elif isinstance(item, arvados.collection.Subcollection):
964                 new_prefix = os.path.join(path_prefix, name)
965                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
966         return file_paths
967
968     def _lock_file(self, fileobj):
969         try:
970             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
971         except IOError:
972             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
973
974     def _save_state(self):
975         """
976         Atomically save current state into cache.
977         """
978         with self._state_lock:
979             # We're not using copy.deepcopy() here because it's a lot slower
980             # than json.dumps(), and we're already needing JSON format to be
981             # saved on disk.
982             state = json.dumps(self._state)
983         try:
984             new_cache = tempfile.NamedTemporaryFile(
985                 mode='w+',
986                 dir=os.path.dirname(self._cache_filename), delete=False)
987             self._lock_file(new_cache)
988             new_cache.write(state)
989             new_cache.flush()
990             os.fsync(new_cache)
991             os.rename(new_cache.name, self._cache_filename)
992         except (IOError, OSError, ResumeCacheConflict) as error:
993             self.logger.error("There was a problem while saving the cache file: {}".format(error))
994             try:
995                 os.unlink(new_cache_name)
996             except NameError:  # mkstemp failed.
997                 pass
998         else:
999             self._cache_file.close()
1000             self._cache_file = new_cache
1001
1002     def collection_name(self):
1003         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1004
1005     def collection_trash_at(self):
1006         return self._my_collection().get_trash_at()
1007
1008     def manifest_locator(self):
1009         return self._my_collection().manifest_locator()
1010
1011     def portable_data_hash(self):
1012         pdh = self._my_collection().portable_data_hash()
1013         m = self._my_collection().stripped_manifest().encode()
1014         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1015         if pdh != local_pdh:
1016             self.logger.warning("\n".join([
1017                 "arv-put: API server provided PDH differs from local manifest.",
1018                 "         This should not happen; showing API server version."]))
1019         return pdh
1020
1021     def manifest_text(self, stream_name=".", strip=False, normalize=False):
1022         return self._my_collection().manifest_text(stream_name, strip, normalize)
1023
1024     def _datablocks_on_item(self, item):
1025         """
1026         Return a list of datablock locators, recursively navigating
1027         through subcollections
1028         """
1029         if isinstance(item, arvados.arvfile.ArvadosFile):
1030             if item.size() == 0:
1031                 # Empty file locator
1032                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1033             else:
1034                 locators = []
1035                 for segment in item.segments():
1036                     loc = segment.locator
1037                     locators.append(loc)
1038                 return locators
1039         elif isinstance(item, arvados.collection.Collection):
1040             l = [self._datablocks_on_item(x) for x in listvalues(item)]
1041             # Fast list flattener method taken from:
1042             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1043             return [loc for sublist in l for loc in sublist]
1044         else:
1045             return None
1046
1047     def data_locators(self):
1048         with self._collection_lock:
1049             # Make sure all datablocks are flushed before getting the locators
1050             self._my_collection().manifest_text()
1051             datablocks = self._datablocks_on_item(self._my_collection())
1052         return datablocks
1053
1054 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1055                                                             os.getpid())
1056
1057 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1058 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1059 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1060 # so instead we're using it on every path component.
1061 def pathname_match(pathname, pattern):
1062     name = pathname.split(os.sep)
1063     # Fix patterns like 'some/subdir/' or 'some//subdir'
1064     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1065     if len(name) != len(pat):
1066         return False
1067     for i in range(len(name)):
1068         if not fnmatch.fnmatch(name[i], pat[i]):
1069             return False
1070     return True
1071
1072 def machine_progress(bytes_written, bytes_expected):
1073     return _machine_format.format(
1074         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1075
1076 def human_progress(bytes_written, bytes_expected):
1077     if bytes_expected:
1078         return "\r{}M / {}M {:.1%} ".format(
1079             bytes_written >> 20, bytes_expected >> 20,
1080             float(bytes_written) / bytes_expected)
1081     else:
1082         return "\r{} ".format(bytes_written)
1083
1084 def progress_writer(progress_func, outfile=sys.stderr):
1085     def write_progress(bytes_written, bytes_expected):
1086         outfile.write(progress_func(bytes_written, bytes_expected))
1087     return write_progress
1088
1089 def desired_project_uuid(api_client, project_uuid, num_retries):
1090     if not project_uuid:
1091         query = api_client.users().current()
1092     elif arvados.util.user_uuid_pattern.match(project_uuid):
1093         query = api_client.users().get(uuid=project_uuid)
1094     elif arvados.util.group_uuid_pattern.match(project_uuid):
1095         query = api_client.groups().get(uuid=project_uuid)
1096     else:
1097         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1098     return query.execute(num_retries=num_retries)['uuid']
1099
1100 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1101          install_sig_handlers=True):
1102     global api_client
1103
1104     args = parse_arguments(arguments)
1105     logger = logging.getLogger('arvados.arv_put')
1106     if args.silent:
1107         logger.setLevel(logging.WARNING)
1108     else:
1109         logger.setLevel(logging.INFO)
1110     status = 0
1111
1112     request_id = arvados.util.new_request_id()
1113
1114     formatter = ArvPutLogFormatter(request_id)
1115     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1116
1117     if api_client is None:
1118         api_client = arvados.api('v1', request_id=request_id)
1119
1120     if install_sig_handlers:
1121         arv_cmd.install_signal_handlers()
1122
1123     # Trash arguments validation
1124     trash_at = None
1125     if args.trash_at is not None:
1126         # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1127         # make sure the user provides a complete YYYY-MM-DD date.
1128         if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1129             logger.error("--trash-at argument format invalid, use --help to see examples.")
1130             sys.exit(1)
1131         # Check if no time information was provided. In that case, assume end-of-day.
1132         if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1133             args.trash_at += 'T23:59:59'
1134         try:
1135             trash_at = ciso8601.parse_datetime(args.trash_at)
1136         except:
1137             logger.error("--trash-at argument format invalid, use --help to see examples.")
1138             sys.exit(1)
1139         else:
1140             if trash_at.tzinfo is not None:
1141                 # Timezone aware datetime provided.
1142                 utcoffset = -trash_at.utcoffset()
1143             else:
1144                 # Timezone naive datetime provided. Assume is local.
1145                 if time.daylight:
1146                     utcoffset = datetime.timedelta(seconds=time.altzone)
1147                 else:
1148                     utcoffset = datetime.timedelta(seconds=time.timezone)
1149             # Convert to UTC timezone naive datetime.
1150             trash_at = trash_at.replace(tzinfo=None) + utcoffset
1151
1152         if trash_at <= datetime.datetime.utcnow():
1153             logger.error("--trash-at argument must be set in the future")
1154             sys.exit(1)
1155     if args.trash_after is not None:
1156         if args.trash_after < 1:
1157             logger.error("--trash-after argument must be >= 1")
1158             sys.exit(1)
1159         trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1160
1161     # Determine the name to use
1162     if args.name:
1163         if args.stream or args.raw:
1164             logger.error("Cannot use --name with --stream or --raw")
1165             sys.exit(1)
1166         elif args.update_collection:
1167             logger.error("Cannot use --name with --update-collection")
1168             sys.exit(1)
1169         collection_name = args.name
1170     else:
1171         collection_name = "Saved at {} by {}@{}".format(
1172             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1173             pwd.getpwuid(os.getuid()).pw_name,
1174             socket.gethostname())
1175
1176     if args.project_uuid and (args.stream or args.raw):
1177         logger.error("Cannot use --project-uuid with --stream or --raw")
1178         sys.exit(1)
1179
1180     # Determine the parent project
1181     try:
1182         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1183                                             args.retries)
1184     except (apiclient_errors.Error, ValueError) as error:
1185         logger.error(error)
1186         sys.exit(1)
1187
1188     if args.progress:
1189         reporter = progress_writer(human_progress)
1190     elif args.batch_progress:
1191         reporter = progress_writer(machine_progress)
1192     else:
1193         reporter = None
1194
1195     #  Split storage-classes argument
1196     storage_classes = None
1197     if args.storage_classes:
1198         storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
1199
1200     # Setup exclude regex from all the --exclude arguments provided
1201     name_patterns = []
1202     exclude_paths = []
1203     exclude_names = None
1204     if len(args.exclude) > 0:
1205         # We're supporting 2 kinds of exclusion patterns:
1206         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1207         #                            the name, wherever the file is on the tree)
1208         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1209         #                            entire path, and should be relative to
1210         #                            any input dir argument)
1211         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1212         #                            placed directly underneath the input dir)
1213         for p in args.exclude:
1214             # Only relative paths patterns allowed
1215             if p.startswith(os.sep):
1216                 logger.error("Cannot use absolute paths with --exclude")
1217                 sys.exit(1)
1218             if os.path.dirname(p):
1219                 # We don't support of path patterns with '..'
1220                 p_parts = p.split(os.sep)
1221                 if '..' in p_parts:
1222                     logger.error(
1223                         "Cannot use path patterns that include or '..'")
1224                     sys.exit(1)
1225                 # Path search pattern
1226                 exclude_paths.append(p)
1227             else:
1228                 # Name-only search pattern
1229                 name_patterns.append(p)
1230         # For name only matching, we can combine all patterns into a single
1231         # regexp, for better performance.
1232         exclude_names = re.compile('|'.join(
1233             [fnmatch.translate(p) for p in name_patterns]
1234         )) if len(name_patterns) > 0 else None
1235         # Show the user the patterns to be used, just in case they weren't
1236         # specified inside quotes and got changed by the shell expansion.
1237         logger.info("Exclude patterns: {}".format(args.exclude))
1238
1239     # If this is used by a human, and there's at least one directory to be
1240     # uploaded, the expected bytes calculation can take a moment.
1241     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1242         logger.info("Calculating upload size, this could take some time...")
1243     try:
1244         writer = ArvPutUploadJob(paths = args.paths,
1245                                  resume = args.resume,
1246                                  use_cache = args.use_cache,
1247                                  filename = args.filename,
1248                                  reporter = reporter,
1249                                  api_client = api_client,
1250                                  num_retries = args.retries,
1251                                  replication_desired = args.replication,
1252                                  put_threads = args.threads,
1253                                  name = collection_name,
1254                                  owner_uuid = project_uuid,
1255                                  ensure_unique_name = True,
1256                                  update_collection = args.update_collection,
1257                                  storage_classes=storage_classes,
1258                                  logger=logger,
1259                                  dry_run=args.dry_run,
1260                                  follow_links=args.follow_links,
1261                                  exclude_paths=exclude_paths,
1262                                  exclude_names=exclude_names,
1263                                  trash_at=trash_at)
1264     except ResumeCacheConflict:
1265         logger.error("\n".join([
1266             "arv-put: Another process is already uploading this data.",
1267             "         Use --no-cache if this is really what you want."]))
1268         sys.exit(1)
1269     except ResumeCacheInvalidError:
1270         logger.error("\n".join([
1271             "arv-put: Resume cache contains invalid signature: it may have expired",
1272             "         or been created with another Arvados user's credentials.",
1273             "         Switch user or use one of the following options to restart upload:",
1274             "         --no-resume to start a new resume cache.",
1275             "         --no-cache to disable resume cache."]))
1276         sys.exit(1)
1277     except (CollectionUpdateError, PathDoesNotExistError) as error:
1278         logger.error("\n".join([
1279             "arv-put: %s" % str(error)]))
1280         sys.exit(1)
1281     except ArvPutUploadIsPending:
1282         # Dry run check successful, return proper exit code.
1283         sys.exit(2)
1284     except ArvPutUploadNotPending:
1285         # No files pending for upload
1286         sys.exit(0)
1287
1288     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1289         logger.warning("\n".join([
1290             "arv-put: Resuming previous upload from last checkpoint.",
1291             "         Use the --no-resume option to start over."]))
1292
1293     if not args.dry_run:
1294         writer.report_progress()
1295     output = None
1296     try:
1297         writer.start(save_collection=not(args.stream or args.raw))
1298     except arvados.errors.ApiError as error:
1299         logger.error("\n".join([
1300             "arv-put: %s" % str(error)]))
1301         sys.exit(1)
1302
1303     if args.progress:  # Print newline to split stderr from stdout for humans.
1304         logger.info("\n")
1305
1306     if args.stream:
1307         if args.normalize:
1308             output = writer.manifest_text(normalize=True)
1309         else:
1310             output = writer.manifest_text()
1311     elif args.raw:
1312         output = ','.join(writer.data_locators())
1313     else:
1314         try:
1315             expiration_notice = ""
1316             if writer.collection_trash_at() is not None:
1317                 # Get the local timezone-naive version, and log it with timezone information.
1318                 if time.daylight:
1319                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1320                 else:
1321                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1322                 expiration_notice = ". It will expire on {} {}.".format(
1323                     local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1324             if args.update_collection:
1325                 logger.info(u"Collection updated: '{}'{}".format(
1326                     writer.collection_name(), expiration_notice))
1327             else:
1328                 logger.info(u"Collection saved as '{}'{}".format(
1329                     writer.collection_name(), expiration_notice))
1330             if args.portable_data_hash:
1331                 output = writer.portable_data_hash()
1332             else:
1333                 output = writer.manifest_locator()
1334         except apiclient_errors.Error as error:
1335             logger.error(
1336                 "arv-put: Error creating Collection on project: {}.".format(
1337                     error))
1338             status = 1
1339
1340     # Print the locator (uuid) of the new collection.
1341     if output is None:
1342         status = status or 1
1343     elif not args.silent:
1344         stdout.write(output)
1345         if not output.endswith('\n'):
1346             stdout.write('\n')
1347
1348     if install_sig_handlers:
1349         arv_cmd.restore_signal_handlers()
1350
1351     if status != 0:
1352         sys.exit(status)
1353
1354     # Success!
1355     return output
1356
1357
1358 if __name__ == '__main__':
1359     main()