Merge branch 'master' into 14930-arvput-trash-at
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import ciso8601
14 import copy
15 import datetime
16 import errno
17 import fcntl
18 import fnmatch
19 import hashlib
20 import json
21 import logging
22 import os
23 import pwd
24 import re
25 import signal
26 import socket
27 import sys
28 import tempfile
29 import threading
30 import time
31 import traceback
32
33 from apiclient import errors as apiclient_errors
34 from arvados._version import __version__
35 from arvados.util import keep_locator_pattern
36
37 import arvados.commands._util as arv_cmd
38
39 api_client = None
40
41 upload_opts = argparse.ArgumentParser(add_help=False)
42
43 upload_opts.add_argument('--version', action='version',
44                          version="%s %s" % (sys.argv[0], __version__),
45                          help='Print version and exit.')
46 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47                          help="""
48 Local file or directory. If path is a directory reference with a trailing
49 slash, then just upload the directory's contents; otherwise upload the
50 directory itself. Default: read from standard input.
51 """)
52
53 _group = upload_opts.add_mutually_exclusive_group()
54
55 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
56                     default=-1, help=argparse.SUPPRESS)
57
58 _group.add_argument('--normalize', action='store_true',
59                     help="""
60 Normalize the manifest by re-ordering files and streams after writing
61 data.
62 """)
63
64 _group.add_argument('--dry-run', action='store_true', default=False,
65                     help="""
66 Don't actually upload files, but only check if any file should be
67 uploaded. Exit with code=2 when files are pending for upload.
68 """)
69
70 _group = upload_opts.add_mutually_exclusive_group()
71
72 _group.add_argument('--as-stream', action='store_true', dest='stream',
73                     help="""
74 Synonym for --stream.
75 """)
76
77 _group.add_argument('--stream', action='store_true',
78                     help="""
79 Store the file content and display the resulting manifest on
80 stdout. Do not write the manifest to Keep or save a Collection object
81 in Arvados.
82 """)
83
84 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
85                     help="""
86 Synonym for --manifest.
87 """)
88
89 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
90                     help="""
91 Synonym for --manifest.
92 """)
93
94 _group.add_argument('--manifest', action='store_true',
95                     help="""
96 Store the file data and resulting manifest in Keep, save a Collection
97 object in Arvados, and display the manifest locator (Collection uuid)
98 on stdout. This is the default behavior.
99 """)
100
101 _group.add_argument('--as-raw', action='store_true', dest='raw',
102                     help="""
103 Synonym for --raw.
104 """)
105
106 _group.add_argument('--raw', action='store_true',
107                     help="""
108 Store the file content and display the data block locators on stdout,
109 separated by commas, with a trailing newline. Do not store a
110 manifest.
111 """)
112
113 upload_opts.add_argument('--update-collection', type=str, default=None,
114                          dest='update_collection', metavar="UUID", help="""
115 Update an existing collection identified by the given Arvados collection
116 UUID. All new local files will be uploaded.
117 """)
118
119 upload_opts.add_argument('--use-filename', type=str, default=None,
120                          dest='filename', help="""
121 Synonym for --filename.
122 """)
123
124 upload_opts.add_argument('--filename', type=str, default=None,
125                          help="""
126 Use the given filename in the manifest, instead of the name of the
127 local file. This is useful when "-" or "/dev/stdin" is given as an
128 input file. It can be used only if there is exactly one path given and
129 it is not a directory. Implies --manifest.
130 """)
131
132 upload_opts.add_argument('--portable-data-hash', action='store_true',
133                          help="""
134 Print the portable data hash instead of the Arvados UUID for the collection
135 created by the upload.
136 """)
137
138 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
139                          help="""
140 Set the replication level for the new collection: how many different
141 physical storage devices (e.g., disks) should have a copy of each data
142 block. Default is to use the server-provided default (if any) or 2.
143 """)
144
145 upload_opts.add_argument('--storage-classes', help="""
146 Specify comma separated list of storage classes to be used when saving data to Keep.
147 """)
148
149 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
150                          help="""
151 Set the number of upload threads to be used. Take into account that
152 using lots of threads will increase the RAM requirements. Default is
153 to use 2 threads.
154 On high latency installations, using a greater number will improve
155 overall throughput.
156 """)
157
158 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
159                       action='append', help="""
160 Exclude files and directories whose names match the given glob pattern. When
161 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
162 directory, relative to the provided input dirs will be excluded.
163 When using a filename pattern like '*.txt', any text file will be excluded
164 no matter where it is placed.
165 For the special case of needing to exclude only files or dirs directly below
166 the given input directory, you can use a pattern like './exclude_this.gif'.
167 You can specify multiple patterns by using this argument more than once.
168 """)
169
170 _group = upload_opts.add_mutually_exclusive_group()
171 _group.add_argument('--follow-links', action='store_true', default=True,
172                     dest='follow_links', help="""
173 Follow file and directory symlinks (default).
174 """)
175 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
176                     help="""
177 Do not follow file and directory symlinks.
178 """)
179
180
181 run_opts = argparse.ArgumentParser(add_help=False)
182
183 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
184 Store the collection in the specified project, instead of your Home
185 project.
186 """)
187
188 run_opts.add_argument('--name', help="""
189 Save the collection with the specified name.
190 """)
191
192 _group = run_opts.add_mutually_exclusive_group()
193 _group.add_argument('--progress', action='store_true',
194                     help="""
195 Display human-readable progress on stderr (bytes and, if possible,
196 percentage of total data size). This is the default behavior when
197 stderr is a tty.
198 """)
199
200 _group.add_argument('--no-progress', action='store_true',
201                     help="""
202 Do not display human-readable progress on stderr, even if stderr is a
203 tty.
204 """)
205
206 _group.add_argument('--batch-progress', action='store_true',
207                     help="""
208 Display machine-readable progress on stderr (bytes and, if known,
209 total data size).
210 """)
211
212 run_opts.add_argument('--silent', action='store_true',
213                       help="""
214 Do not print any debug messages to console. (Any error messages will
215 still be displayed.)
216 """)
217
218 _group = run_opts.add_mutually_exclusive_group()
219 _group.add_argument('--resume', action='store_true', default=True,
220                     help="""
221 Continue interrupted uploads from cached state (default).
222 """)
223 _group.add_argument('--no-resume', action='store_false', dest='resume',
224                     help="""
225 Do not continue interrupted uploads from cached state.
226 """)
227
228 _group = run_opts.add_mutually_exclusive_group()
229 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
230                     help="""
231 Save upload state in a cache file for resuming (default).
232 """)
233 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
234                     help="""
235 Do not save upload state in a cache file for resuming.
236 """)
237
238 _group = upload_opts.add_mutually_exclusive_group()
239 _group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
240                     help="""
241 Set the trash date of the resulting collection to an absolute date in the future.
242 The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
243 Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
244 """)
245 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
246                     help="""
247 Set the trash date of the resulting collection to an amount of days from the
248 date/time that the upload process finishes.
249 """)
250
251 arg_parser = argparse.ArgumentParser(
252     description='Copy data from the local filesystem to Keep.',
253     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
254
255 def parse_arguments(arguments):
256     args = arg_parser.parse_args(arguments)
257
258     if len(args.paths) == 0:
259         args.paths = ['-']
260
261     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
262
263     if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
264         if args.filename:
265             arg_parser.error("""
266     --filename argument cannot be used when storing a directory or
267     multiple files.
268     """)
269
270     # Turn on --progress by default if stderr is a tty.
271     if (not (args.batch_progress or args.no_progress or args.silent)
272         and os.isatty(sys.stderr.fileno())):
273         args.progress = True
274
275     # Turn off --resume (default) if --no-cache is used.
276     if not args.use_cache:
277         args.resume = False
278
279     if args.paths == ['-']:
280         if args.update_collection:
281             arg_parser.error("""
282     --update-collection cannot be used when reading from stdin.
283     """)
284         args.resume = False
285         args.use_cache = False
286         if not args.filename:
287             args.filename = 'stdin'
288
289     # Remove possible duplicated patterns
290     if len(args.exclude) > 0:
291         args.exclude = list(set(args.exclude))
292
293     return args
294
295
296 class PathDoesNotExistError(Exception):
297     pass
298
299
300 class CollectionUpdateError(Exception):
301     pass
302
303
304 class ResumeCacheConflict(Exception):
305     pass
306
307
308 class ResumeCacheInvalidError(Exception):
309     pass
310
311 class ArvPutArgumentConflict(Exception):
312     pass
313
314
315 class ArvPutUploadIsPending(Exception):
316     pass
317
318
319 class ArvPutUploadNotPending(Exception):
320     pass
321
322
323 class FileUploadList(list):
324     def __init__(self, dry_run=False):
325         list.__init__(self)
326         self.dry_run = dry_run
327
328     def append(self, other):
329         if self.dry_run:
330             raise ArvPutUploadIsPending()
331         super(FileUploadList, self).append(other)
332
333
334 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
335 class ArvPutLogFormatter(logging.Formatter):
336     std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
337     err_fmtr = None
338     request_id_informed = False
339
340     def __init__(self, request_id):
341         self.err_fmtr = logging.Formatter(
342             arvados.log_format+' (X-Request-Id: {})'.format(request_id),
343             arvados.log_date_format)
344
345     def format(self, record):
346         if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
347             self.request_id_informed = True
348             return self.err_fmtr.format(record)
349         return self.std_fmtr.format(record)
350
351
352 class ResumeCache(object):
353     CACHE_DIR = '.cache/arvados/arv-put'
354
355     def __init__(self, file_spec):
356         self.cache_file = open(file_spec, 'a+')
357         self._lock_file(self.cache_file)
358         self.filename = self.cache_file.name
359
360     @classmethod
361     def make_path(cls, args):
362         md5 = hashlib.md5()
363         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
364         realpaths = sorted(os.path.realpath(path) for path in args.paths)
365         md5.update(b'\0'.join([p.encode() for p in realpaths]))
366         if any(os.path.isdir(path) for path in realpaths):
367             md5.update(b'-1')
368         elif args.filename:
369             md5.update(args.filename.encode())
370         return os.path.join(
371             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
372             md5.hexdigest())
373
374     def _lock_file(self, fileobj):
375         try:
376             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
377         except IOError:
378             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
379
380     def load(self):
381         self.cache_file.seek(0)
382         return json.load(self.cache_file)
383
384     def check_cache(self, api_client=None, num_retries=0):
385         try:
386             state = self.load()
387             locator = None
388             try:
389                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
390                     locator = state["_finished_streams"][0][1][0]
391                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
392                     locator = state["_current_stream_locators"][0]
393                 if locator is not None:
394                     kc = arvados.keep.KeepClient(api_client=api_client)
395                     kc.head(locator, num_retries=num_retries)
396             except Exception as e:
397                 self.restart()
398         except (ValueError):
399             pass
400
401     def save(self, data):
402         try:
403             new_cache_fd, new_cache_name = tempfile.mkstemp(
404                 dir=os.path.dirname(self.filename))
405             self._lock_file(new_cache_fd)
406             new_cache = os.fdopen(new_cache_fd, 'r+')
407             json.dump(data, new_cache)
408             os.rename(new_cache_name, self.filename)
409         except (IOError, OSError, ResumeCacheConflict):
410             try:
411                 os.unlink(new_cache_name)
412             except NameError:  # mkstemp failed.
413                 pass
414         else:
415             self.cache_file.close()
416             self.cache_file = new_cache
417
418     def close(self):
419         self.cache_file.close()
420
421     def destroy(self):
422         try:
423             os.unlink(self.filename)
424         except OSError as error:
425             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
426                 raise
427         self.close()
428
429     def restart(self):
430         self.destroy()
431         self.__init__(self.filename)
432
433
434 class ArvPutUploadJob(object):
435     CACHE_DIR = '.cache/arvados/arv-put'
436     EMPTY_STATE = {
437         'manifest' : None, # Last saved manifest checkpoint
438         'files' : {} # Previous run file list: {path : {size, mtime}}
439     }
440
441     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
442                  name=None, owner_uuid=None, api_client=None,
443                  ensure_unique_name=False, num_retries=None,
444                  put_threads=None, replication_desired=None, filename=None,
445                  update_time=60.0, update_collection=None, storage_classes=None,
446                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
447                  follow_links=True, exclude_paths=[], exclude_names=None,
448                  trash_at=None):
449         self.paths = paths
450         self.resume = resume
451         self.use_cache = use_cache
452         self.update = False
453         self.reporter = reporter
454         # This will set to 0 before start counting, if no special files are going
455         # to be read.
456         self.bytes_expected = None
457         self.bytes_written = 0
458         self.bytes_skipped = 0
459         self.name = name
460         self.owner_uuid = owner_uuid
461         self.ensure_unique_name = ensure_unique_name
462         self.num_retries = num_retries
463         self.replication_desired = replication_desired
464         self.put_threads = put_threads
465         self.filename = filename
466         self.storage_classes = storage_classes
467         self._api_client = api_client
468         self._state_lock = threading.Lock()
469         self._state = None # Previous run state (file list & manifest)
470         self._current_files = [] # Current run file list
471         self._cache_file = None
472         self._collection_lock = threading.Lock()
473         self._remote_collection = None # Collection being updated (if asked)
474         self._local_collection = None # Collection from previous run manifest
475         self._file_paths = set() # Files to be updated in remote collection
476         self._stop_checkpointer = threading.Event()
477         self._checkpointer = threading.Thread(target=self._update_task)
478         self._checkpointer.daemon = True
479         self._update_task_time = update_time  # How many seconds wait between update runs
480         self._files_to_upload = FileUploadList(dry_run=dry_run)
481         self._upload_started = False
482         self.logger = logger
483         self.dry_run = dry_run
484         self._checkpoint_before_quit = True
485         self.follow_links = follow_links
486         self.exclude_paths = exclude_paths
487         self.exclude_names = exclude_names
488         self._trash_at = trash_at
489
490         if self._trash_at is not None:
491             if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
492                 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
493             if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
494                 raise TypeError('provided trash_at datetime should be timezone-naive')
495
496         if not self.use_cache and self.resume:
497             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
498
499         # Check for obvious dry-run responses
500         if self.dry_run and (not self.use_cache or not self.resume):
501             raise ArvPutUploadIsPending()
502
503         # Load cached data if any and if needed
504         self._setup_state(update_collection)
505
506         # Build the upload file list, excluding requested files and counting the
507         # bytes expected to be uploaded.
508         self._build_upload_list()
509
510     def _build_upload_list(self):
511         """
512         Scan the requested paths to count file sizes, excluding requested files
513         and dirs and building the upload file list.
514         """
515         # If there aren't special files to be read, reset total bytes count to zero
516         # to start counting.
517         if not any([p for p in self.paths
518                     if not (os.path.isfile(p) or os.path.isdir(p))]):
519             self.bytes_expected = 0
520
521         for path in self.paths:
522             # Test for stdin first, in case some file named '-' exist
523             if path == '-':
524                 if self.dry_run:
525                     raise ArvPutUploadIsPending()
526                 self._write_stdin(self.filename or 'stdin')
527             elif not os.path.exists(path):
528                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
529             elif os.path.isdir(path):
530                 # Use absolute paths on cache index so CWD doesn't interfere
531                 # with the caching logic.
532                 orig_path = path
533                 path = os.path.abspath(path)
534                 if orig_path[-1:] == os.sep:
535                     # When passing a directory reference with a trailing slash,
536                     # its contents should be uploaded directly to the
537                     # collection's root.
538                     prefixdir = path
539                 else:
540                     # When passing a directory reference with no trailing slash,
541                     # upload the directory to the collection's root.
542                     prefixdir = os.path.dirname(path)
543                 prefixdir += os.sep
544                 for root, dirs, files in os.walk(path,
545                                                  followlinks=self.follow_links):
546                     root_relpath = os.path.relpath(root, path)
547                     if root_relpath == '.':
548                         root_relpath = ''
549                     # Exclude files/dirs by full path matching pattern
550                     if self.exclude_paths:
551                         dirs[:] = [d for d in dirs
552                                    if not any(pathname_match(
553                                            os.path.join(root_relpath, d), pat)
554                                               for pat in self.exclude_paths)]
555                         files = [f for f in files
556                                  if not any(pathname_match(
557                                          os.path.join(root_relpath, f), pat)
558                                             for pat in self.exclude_paths)]
559                     # Exclude files/dirs by name matching pattern
560                     if self.exclude_names is not None:
561                         dirs[:] = [d for d in dirs
562                                    if not self.exclude_names.match(d)]
563                         files = [f for f in files
564                                  if not self.exclude_names.match(f)]
565                     # Make os.walk()'s dir traversing order deterministic
566                     dirs.sort()
567                     files.sort()
568                     for f in files:
569                         filepath = os.path.join(root, f)
570                         # Add its size to the total bytes count (if applicable)
571                         if self.follow_links or (not os.path.islink(filepath)):
572                             if self.bytes_expected is not None:
573                                 self.bytes_expected += os.path.getsize(filepath)
574                         self._check_file(filepath,
575                                          os.path.join(root[len(prefixdir):], f))
576             else:
577                 filepath = os.path.abspath(path)
578                 # Add its size to the total bytes count (if applicable)
579                 if self.follow_links or (not os.path.islink(filepath)):
580                     if self.bytes_expected is not None:
581                         self.bytes_expected += os.path.getsize(filepath)
582                 self._check_file(filepath,
583                                  self.filename or os.path.basename(path))
584         # If dry-mode is on, and got up to this point, then we should notify that
585         # there aren't any file to upload.
586         if self.dry_run:
587             raise ArvPutUploadNotPending()
588         # Remove local_collection's files that don't exist locally anymore, so the
589         # bytes_written count is correct.
590         for f in self.collection_file_paths(self._local_collection,
591                                             path_prefix=""):
592             if f != 'stdin' and f != self.filename and not f in self._file_paths:
593                 self._local_collection.remove(f)
594
595     def start(self, save_collection):
596         """
597         Start supporting thread & file uploading
598         """
599         self._checkpointer.start()
600         try:
601             # Update bytes_written from current local collection and
602             # report initial progress.
603             self._update()
604             # Actual file upload
605             self._upload_started = True # Used by the update thread to start checkpointing
606             self._upload_files()
607         except (SystemExit, Exception) as e:
608             self._checkpoint_before_quit = False
609             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
610             # Note: We're expecting SystemExit instead of
611             # KeyboardInterrupt because we have a custom signal
612             # handler in place that raises SystemExit with the catched
613             # signal's code.
614             if isinstance(e, PathDoesNotExistError):
615                 # We aren't interested in the traceback for this case
616                 pass
617             elif not isinstance(e, SystemExit) or e.code != -2:
618                 self.logger.warning("Abnormal termination:\n{}".format(
619                     traceback.format_exc()))
620             raise
621         finally:
622             if not self.dry_run:
623                 # Stop the thread before doing anything else
624                 self._stop_checkpointer.set()
625                 self._checkpointer.join()
626                 if self._checkpoint_before_quit:
627                     # Commit all pending blocks & one last _update()
628                     self._local_collection.manifest_text()
629                     self._update(final=True)
630                     if save_collection:
631                         self.save_collection()
632             if self.use_cache:
633                 self._cache_file.close()
634
635     def _collection_trash_at(self):
636         """
637         Returns the trash date that the collection should use at save time.
638         Takes into account absolute/relative trash_at values requested
639         by the user.
640         """
641         if type(self._trash_at) == datetime.timedelta:
642             # Get an absolute datetime for trash_at
643             return datetime.datetime.utcnow() + self._trash_at
644         return self._trash_at
645
646     def save_collection(self):
647         if self.update:
648             # Check if files should be updated on the remote collection.
649             for fp in self._file_paths:
650                 remote_file = self._remote_collection.find(fp)
651                 if not remote_file:
652                     # File don't exist on remote collection, copy it.
653                     self._remote_collection.copy(fp, fp, self._local_collection)
654                 elif remote_file != self._local_collection.find(fp):
655                     # A different file exist on remote collection, overwrite it.
656                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
657                 else:
658                     # The file already exist on remote collection, skip it.
659                     pass
660             self._remote_collection.save(storage_classes=self.storage_classes,
661                                          num_retries=self.num_retries,
662                                          trash_at=self._collection_trash_at())
663         else:
664             if self.storage_classes is None:
665                 self.storage_classes = ['default']
666             self._local_collection.save_new(
667                 name=self.name, owner_uuid=self.owner_uuid,
668                 storage_classes=self.storage_classes,
669                 ensure_unique_name=self.ensure_unique_name,
670                 num_retries=self.num_retries,
671                 trash_at=self._collection_trash_at())
672
673     def destroy_cache(self):
674         if self.use_cache:
675             try:
676                 os.unlink(self._cache_filename)
677             except OSError as error:
678                 # That's what we wanted anyway.
679                 if error.errno != errno.ENOENT:
680                     raise
681             self._cache_file.close()
682
683     def _collection_size(self, collection):
684         """
685         Recursively get the total size of the collection
686         """
687         size = 0
688         for item in listvalues(collection):
689             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
690                 size += self._collection_size(item)
691             else:
692                 size += item.size()
693         return size
694
695     def _update_task(self):
696         """
697         Periodically called support task. File uploading is
698         asynchronous so we poll status from the collection.
699         """
700         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
701             self._update()
702
703     def _update(self, final=False):
704         """
705         Update cached manifest text and report progress.
706         """
707         if self._upload_started:
708             with self._collection_lock:
709                 self.bytes_written = self._collection_size(self._local_collection)
710                 if self.use_cache:
711                     if final:
712                         manifest = self._local_collection.manifest_text()
713                     else:
714                         # Get the manifest text without comitting pending blocks
715                         manifest = self._local_collection.manifest_text(strip=False,
716                                                                         normalize=False,
717                                                                         only_committed=True)
718                     # Update cache
719                     with self._state_lock:
720                         self._state['manifest'] = manifest
721             if self.use_cache:
722                 try:
723                     self._save_state()
724                 except Exception as e:
725                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
726             # Keep remote collection's trash_at attribute synced when using relative expire dates
727             if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
728                 try:
729                     self._api_client.collections().update(
730                         uuid=self._remote_collection.manifest_locator(),
731                         body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
732                     ).execute(num_retries=self.num_retries)
733                 except Exception as e:
734                     self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
735         else:
736             self.bytes_written = self.bytes_skipped
737         # Call the reporter, if any
738         self.report_progress()
739
740     def report_progress(self):
741         if self.reporter is not None:
742             self.reporter(self.bytes_written, self.bytes_expected)
743
744     def _write_stdin(self, filename):
745         output = self._local_collection.open(filename, 'wb')
746         self._write(sys.stdin, output)
747         output.close()
748
749     def _check_file(self, source, filename):
750         """
751         Check if this file needs to be uploaded
752         """
753         # Ignore symlinks when requested
754         if (not self.follow_links) and os.path.islink(source):
755             return
756         resume_offset = 0
757         should_upload = False
758         new_file_in_cache = False
759         # Record file path for updating the remote collection before exiting
760         self._file_paths.add(filename)
761
762         with self._state_lock:
763             # If no previous cached data on this file, store it for an eventual
764             # repeated run.
765             if source not in self._state['files']:
766                 self._state['files'][source] = {
767                     'mtime': os.path.getmtime(source),
768                     'size' : os.path.getsize(source)
769                 }
770                 new_file_in_cache = True
771             cached_file_data = self._state['files'][source]
772
773         # Check if file was already uploaded (at least partially)
774         file_in_local_collection = self._local_collection.find(filename)
775
776         # If not resuming, upload the full file.
777         if not self.resume:
778             should_upload = True
779         # New file detected from last run, upload it.
780         elif new_file_in_cache:
781             should_upload = True
782         # Local file didn't change from last run.
783         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
784             if not file_in_local_collection:
785                 # File not uploaded yet, upload it completely
786                 should_upload = True
787             elif file_in_local_collection.permission_expired():
788                 # Permission token expired, re-upload file. This will change whenever
789                 # we have a API for refreshing tokens.
790                 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
791                 should_upload = True
792                 self._local_collection.remove(filename)
793             elif cached_file_data['size'] == file_in_local_collection.size():
794                 # File already there, skip it.
795                 self.bytes_skipped += cached_file_data['size']
796             elif cached_file_data['size'] > file_in_local_collection.size():
797                 # File partially uploaded, resume!
798                 resume_offset = file_in_local_collection.size()
799                 self.bytes_skipped += resume_offset
800                 should_upload = True
801             else:
802                 # Inconsistent cache, re-upload the file
803                 should_upload = True
804                 self._local_collection.remove(filename)
805                 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
806         # Local file differs from cached data, re-upload it.
807         else:
808             if file_in_local_collection:
809                 self._local_collection.remove(filename)
810             should_upload = True
811
812         if should_upload:
813             try:
814                 self._files_to_upload.append((source, resume_offset, filename))
815             except ArvPutUploadIsPending:
816                 # This could happen when running on dry-mode, close cache file to
817                 # avoid locking issues.
818                 self._cache_file.close()
819                 raise
820
821     def _upload_files(self):
822         for source, resume_offset, filename in self._files_to_upload:
823             with open(source, 'rb') as source_fd:
824                 with self._state_lock:
825                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
826                     self._state['files'][source]['size'] = os.path.getsize(source)
827                 if resume_offset > 0:
828                     # Start upload where we left off
829                     output = self._local_collection.open(filename, 'ab')
830                     source_fd.seek(resume_offset)
831                 else:
832                     # Start from scratch
833                     output = self._local_collection.open(filename, 'wb')
834                 self._write(source_fd, output)
835                 output.close(flush=False)
836
837     def _write(self, source_fd, output):
838         while True:
839             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
840             if not data:
841                 break
842             output.write(data)
843
844     def _my_collection(self):
845         return self._remote_collection if self.update else self._local_collection
846
847     def _get_cache_filepath(self):
848         # Set up cache file name from input paths.
849         md5 = hashlib.md5()
850         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
851         realpaths = sorted(os.path.realpath(path) for path in self.paths)
852         md5.update(b'\0'.join([p.encode() for p in realpaths]))
853         if self.filename:
854             md5.update(self.filename.encode())
855         cache_filename = md5.hexdigest()
856         cache_filepath = os.path.join(
857             arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
858             cache_filename)
859         return cache_filepath
860
861     def _setup_state(self, update_collection):
862         """
863         Create a new cache file or load a previously existing one.
864         """
865         # Load an already existing collection for update
866         if update_collection and re.match(arvados.util.collection_uuid_pattern,
867                                           update_collection):
868             try:
869                 self._remote_collection = arvados.collection.Collection(
870                     update_collection, api_client=self._api_client)
871             except arvados.errors.ApiError as error:
872                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
873             else:
874                 self.update = True
875         elif update_collection:
876             # Collection locator provided, but unknown format
877             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
878
879         if self.use_cache:
880             cache_filepath = self._get_cache_filepath()
881             if self.resume and os.path.exists(cache_filepath):
882                 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
883                 self._cache_file = open(cache_filepath, 'a+')
884             else:
885                 # --no-resume means start with a empty cache file.
886                 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
887                 self._cache_file = open(cache_filepath, 'w+')
888             self._cache_filename = self._cache_file.name
889             self._lock_file(self._cache_file)
890             self._cache_file.seek(0)
891
892         with self._state_lock:
893             if self.use_cache:
894                 try:
895                     self._state = json.load(self._cache_file)
896                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
897                         # Cache at least partially incomplete, set up new cache
898                         self._state = copy.deepcopy(self.EMPTY_STATE)
899                 except ValueError:
900                     # Cache file empty, set up new cache
901                     self._state = copy.deepcopy(self.EMPTY_STATE)
902             else:
903                 self.logger.info("No cache usage requested for this run.")
904                 # No cache file, set empty state
905                 self._state = copy.deepcopy(self.EMPTY_STATE)
906             if not self._cached_manifest_valid():
907                 raise ResumeCacheInvalidError()
908             # Load the previous manifest so we can check if files were modified remotely.
909             self._local_collection = arvados.collection.Collection(
910                 self._state['manifest'],
911                 replication_desired=self.replication_desired,
912                 put_threads=self.put_threads,
913                 api_client=self._api_client)
914
915     def _cached_manifest_valid(self):
916         """
917         Validate the oldest non-expired block signature to check if cached manifest
918         is usable: checking if the cached manifest was not created with a different
919         arvados account.
920         """
921         if self._state.get('manifest', None) is None:
922             # No cached manifest yet, all good.
923             return True
924         now = datetime.datetime.utcnow()
925         oldest_exp = None
926         oldest_loc = None
927         block_found = False
928         for m in keep_locator_pattern.finditer(self._state['manifest']):
929             loc = m.group(0)
930             try:
931                 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
932             except IndexError:
933                 # Locator without signature
934                 continue
935             block_found = True
936             if exp > now and (oldest_exp is None or exp < oldest_exp):
937                 oldest_exp = exp
938                 oldest_loc = loc
939         if not block_found:
940             # No block signatures found => no invalid block signatures.
941             return True
942         if oldest_loc is None:
943             # Locator signatures found, but all have expired.
944             # Reset the cache and move on.
945             self.logger.info('Cache expired, starting from scratch.')
946             self._state['manifest'] = ''
947             return True
948         kc = arvados.KeepClient(api_client=self._api_client,
949                                 num_retries=self.num_retries)
950         try:
951             kc.head(oldest_loc)
952         except arvados.errors.KeepRequestError:
953             # Something is wrong, cached manifest is not valid.
954             return False
955         return True
956
957     def collection_file_paths(self, col, path_prefix='.'):
958         """Return a list of file paths by recursively go through the entire collection `col`"""
959         file_paths = []
960         for name, item in listitems(col):
961             if isinstance(item, arvados.arvfile.ArvadosFile):
962                 file_paths.append(os.path.join(path_prefix, name))
963             elif isinstance(item, arvados.collection.Subcollection):
964                 new_prefix = os.path.join(path_prefix, name)
965                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
966         return file_paths
967
968     def _lock_file(self, fileobj):
969         try:
970             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
971         except IOError:
972             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
973
974     def _save_state(self):
975         """
976         Atomically save current state into cache.
977         """
978         with self._state_lock:
979             # We're not using copy.deepcopy() here because it's a lot slower
980             # than json.dumps(), and we're already needing JSON format to be
981             # saved on disk.
982             state = json.dumps(self._state)
983         try:
984             new_cache = tempfile.NamedTemporaryFile(
985                 mode='w+',
986                 dir=os.path.dirname(self._cache_filename), delete=False)
987             self._lock_file(new_cache)
988             new_cache.write(state)
989             new_cache.flush()
990             os.fsync(new_cache)
991             os.rename(new_cache.name, self._cache_filename)
992         except (IOError, OSError, ResumeCacheConflict) as error:
993             self.logger.error("There was a problem while saving the cache file: {}".format(error))
994             try:
995                 os.unlink(new_cache_name)
996             except NameError:  # mkstemp failed.
997                 pass
998         else:
999             self._cache_file.close()
1000             self._cache_file = new_cache
1001
1002     def collection_name(self):
1003         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1004
1005     def collection_trash_at(self):
1006         return self._my_collection().get_trash_at()
1007
1008     def manifest_locator(self):
1009         return self._my_collection().manifest_locator()
1010
1011     def portable_data_hash(self):
1012         pdh = self._my_collection().portable_data_hash()
1013         m = self._my_collection().stripped_manifest().encode()
1014         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1015         if pdh != local_pdh:
1016             self.logger.warning("\n".join([
1017                 "arv-put: API server provided PDH differs from local manifest.",
1018                 "         This should not happen; showing API server version."]))
1019         return pdh
1020
1021     def manifest_text(self, stream_name=".", strip=False, normalize=False):
1022         return self._my_collection().manifest_text(stream_name, strip, normalize)
1023
1024     def _datablocks_on_item(self, item):
1025         """
1026         Return a list of datablock locators, recursively navigating
1027         through subcollections
1028         """
1029         if isinstance(item, arvados.arvfile.ArvadosFile):
1030             if item.size() == 0:
1031                 # Empty file locator
1032                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1033             else:
1034                 locators = []
1035                 for segment in item.segments():
1036                     loc = segment.locator
1037                     locators.append(loc)
1038                 return locators
1039         elif isinstance(item, arvados.collection.Collection):
1040             l = [self._datablocks_on_item(x) for x in listvalues(item)]
1041             # Fast list flattener method taken from:
1042             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1043             return [loc for sublist in l for loc in sublist]
1044         else:
1045             return None
1046
1047     def data_locators(self):
1048         with self._collection_lock:
1049             # Make sure all datablocks are flushed before getting the locators
1050             self._my_collection().manifest_text()
1051             datablocks = self._datablocks_on_item(self._my_collection())
1052         return datablocks
1053
1054 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1055                                                             os.getpid())
1056
1057 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1058 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1059 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1060 # so instead we're using it on every path component.
1061 def pathname_match(pathname, pattern):
1062     name = pathname.split(os.sep)
1063     # Fix patterns like 'some/subdir/' or 'some//subdir'
1064     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1065     if len(name) != len(pat):
1066         return False
1067     for i in range(len(name)):
1068         if not fnmatch.fnmatch(name[i], pat[i]):
1069             return False
1070     return True
1071
1072 def machine_progress(bytes_written, bytes_expected):
1073     return _machine_format.format(
1074         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1075
1076 def human_progress(bytes_written, bytes_expected):
1077     if bytes_expected:
1078         return "\r{}M / {}M {:.1%} ".format(
1079             bytes_written >> 20, bytes_expected >> 20,
1080             float(bytes_written) / bytes_expected)
1081     else:
1082         return "\r{} ".format(bytes_written)
1083
1084 def progress_writer(progress_func, outfile=sys.stderr):
1085     def write_progress(bytes_written, bytes_expected):
1086         outfile.write(progress_func(bytes_written, bytes_expected))
1087     return write_progress
1088
1089 def desired_project_uuid(api_client, project_uuid, num_retries):
1090     if not project_uuid:
1091         query = api_client.users().current()
1092     elif arvados.util.user_uuid_pattern.match(project_uuid):
1093         query = api_client.users().get(uuid=project_uuid)
1094     elif arvados.util.group_uuid_pattern.match(project_uuid):
1095         query = api_client.groups().get(uuid=project_uuid)
1096     else:
1097         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1098     return query.execute(num_retries=num_retries)['uuid']
1099
1100 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1101          install_sig_handlers=True):
1102     global api_client
1103
1104     args = parse_arguments(arguments)
1105     logger = logging.getLogger('arvados.arv_put')
1106     if args.silent:
1107         logger.setLevel(logging.WARNING)
1108     else:
1109         logger.setLevel(logging.INFO)
1110     status = 0
1111
1112     request_id = arvados.util.new_request_id()
1113
1114     formatter = ArvPutLogFormatter(request_id)
1115     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1116
1117     if api_client is None:
1118         api_client = arvados.api('v1', request_id=request_id)
1119
1120     if install_sig_handlers:
1121         arv_cmd.install_signal_handlers()
1122
1123     # Trash arguments validation
1124     trash_at = None
1125     if args.trash_at is not None:
1126         # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1127         # make sure the user provides a complete YYYY-MM-DD date.
1128         if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1129             logger.error("--trash-at argument format invalid, use --help to see examples.")
1130             sys.exit(1)
1131         # Check if no time information was provided. In that case, assume end-of-day.
1132         if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1133             args.trash_at += 'T23:59:59'
1134         try:
1135             trash_at = ciso8601.parse_datetime(args.trash_at)
1136         except:
1137             logger.error("--trash-at argument format invalid, use --help to see examples.")
1138             sys.exit(1)
1139         else:
1140             if trash_at.tzinfo is not None:
1141                 # Timezone aware datetime provided.
1142                 utcoffset = -trash_at.utcoffset()
1143             else:
1144                 # Timezone naive datetime provided. Assume is local.
1145                 if time.daylight:
1146                     utcoffset = datetime.timedelta(seconds=time.altzone)
1147                 else:
1148                     utcoffset = datetime.timedelta(seconds=time.timezone)
1149             # Convert to UTC timezone naive datetime.
1150             trash_at = trash_at.replace(tzinfo=None) + utcoffset
1151
1152         if trash_at <= datetime.datetime.utcnow():
1153             logger.error("--trash-at argument must be set in the future")
1154             sys.exit(1)
1155     if args.trash_after is not None:
1156         if args.trash_after < 1:
1157             logger.error("--trash-after argument must be >= 1")
1158             sys.exit(1)
1159         trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1160
1161     # Determine the name to use
1162     if args.name:
1163         if args.stream or args.raw:
1164             logger.error("Cannot use --name with --stream or --raw")
1165             sys.exit(1)
1166         elif args.update_collection:
1167             logger.error("Cannot use --name with --update-collection")
1168             sys.exit(1)
1169         collection_name = args.name
1170     else:
1171         collection_name = "Saved at {} by {}@{}".format(
1172             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1173             pwd.getpwuid(os.getuid()).pw_name,
1174             socket.gethostname())
1175
1176     if args.project_uuid and (args.stream or args.raw):
1177         logger.error("Cannot use --project-uuid with --stream or --raw")
1178         sys.exit(1)
1179
1180     # Determine the parent project
1181     try:
1182         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1183                                             args.retries)
1184     except (apiclient_errors.Error, ValueError) as error:
1185         logger.error(error)
1186         sys.exit(1)
1187
1188     if args.progress:
1189         reporter = progress_writer(human_progress)
1190     elif args.batch_progress:
1191         reporter = progress_writer(machine_progress)
1192     else:
1193         reporter = None
1194
1195     #  Split storage-classes argument
1196     storage_classes = None
1197     if args.storage_classes:
1198         storage_classes = args.storage_classes.strip().split(',')
1199         if len(storage_classes) > 1:
1200             logger.error("Multiple storage classes are not supported currently.")
1201             sys.exit(1)
1202
1203
1204     # Setup exclude regex from all the --exclude arguments provided
1205     name_patterns = []
1206     exclude_paths = []
1207     exclude_names = None
1208     if len(args.exclude) > 0:
1209         # We're supporting 2 kinds of exclusion patterns:
1210         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1211         #                            the name, wherever the file is on the tree)
1212         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1213         #                            entire path, and should be relative to
1214         #                            any input dir argument)
1215         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1216         #                            placed directly underneath the input dir)
1217         for p in args.exclude:
1218             # Only relative paths patterns allowed
1219             if p.startswith(os.sep):
1220                 logger.error("Cannot use absolute paths with --exclude")
1221                 sys.exit(1)
1222             if os.path.dirname(p):
1223                 # We don't support of path patterns with '..'
1224                 p_parts = p.split(os.sep)
1225                 if '..' in p_parts:
1226                     logger.error(
1227                         "Cannot use path patterns that include or '..'")
1228                     sys.exit(1)
1229                 # Path search pattern
1230                 exclude_paths.append(p)
1231             else:
1232                 # Name-only search pattern
1233                 name_patterns.append(p)
1234         # For name only matching, we can combine all patterns into a single
1235         # regexp, for better performance.
1236         exclude_names = re.compile('|'.join(
1237             [fnmatch.translate(p) for p in name_patterns]
1238         )) if len(name_patterns) > 0 else None
1239         # Show the user the patterns to be used, just in case they weren't
1240         # specified inside quotes and got changed by the shell expansion.
1241         logger.info("Exclude patterns: {}".format(args.exclude))
1242
1243     # If this is used by a human, and there's at least one directory to be
1244     # uploaded, the expected bytes calculation can take a moment.
1245     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1246         logger.info("Calculating upload size, this could take some time...")
1247     try:
1248         writer = ArvPutUploadJob(paths = args.paths,
1249                                  resume = args.resume,
1250                                  use_cache = args.use_cache,
1251                                  filename = args.filename,
1252                                  reporter = reporter,
1253                                  api_client = api_client,
1254                                  num_retries = args.retries,
1255                                  replication_desired = args.replication,
1256                                  put_threads = args.threads,
1257                                  name = collection_name,
1258                                  owner_uuid = project_uuid,
1259                                  ensure_unique_name = True,
1260                                  update_collection = args.update_collection,
1261                                  storage_classes=storage_classes,
1262                                  logger=logger,
1263                                  dry_run=args.dry_run,
1264                                  follow_links=args.follow_links,
1265                                  exclude_paths=exclude_paths,
1266                                  exclude_names=exclude_names,
1267                                  trash_at=trash_at)
1268     except ResumeCacheConflict:
1269         logger.error("\n".join([
1270             "arv-put: Another process is already uploading this data.",
1271             "         Use --no-cache if this is really what you want."]))
1272         sys.exit(1)
1273     except ResumeCacheInvalidError:
1274         logger.error("\n".join([
1275             "arv-put: Resume cache contains invalid signature: it may have expired",
1276             "         or been created with another Arvados user's credentials.",
1277             "         Switch user or use one of the following options to restart upload:",
1278             "         --no-resume to start a new resume cache.",
1279             "         --no-cache to disable resume cache."]))
1280         sys.exit(1)
1281     except (CollectionUpdateError, PathDoesNotExistError) as error:
1282         logger.error("\n".join([
1283             "arv-put: %s" % str(error)]))
1284         sys.exit(1)
1285     except ArvPutUploadIsPending:
1286         # Dry run check successful, return proper exit code.
1287         sys.exit(2)
1288     except ArvPutUploadNotPending:
1289         # No files pending for upload
1290         sys.exit(0)
1291
1292     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1293         logger.warning("\n".join([
1294             "arv-put: Resuming previous upload from last checkpoint.",
1295             "         Use the --no-resume option to start over."]))
1296
1297     if not args.dry_run:
1298         writer.report_progress()
1299     output = None
1300     try:
1301         writer.start(save_collection=not(args.stream or args.raw))
1302     except arvados.errors.ApiError as error:
1303         logger.error("\n".join([
1304             "arv-put: %s" % str(error)]))
1305         sys.exit(1)
1306
1307     if args.progress:  # Print newline to split stderr from stdout for humans.
1308         logger.info("\n")
1309
1310     if args.stream:
1311         if args.normalize:
1312             output = writer.manifest_text(normalize=True)
1313         else:
1314             output = writer.manifest_text()
1315     elif args.raw:
1316         output = ','.join(writer.data_locators())
1317     else:
1318         try:
1319             expiration_notice = ""
1320             if writer.collection_trash_at() is not None:
1321                 # Get the local timezone-naive version, and log it with timezone information.
1322                 if time.daylight:
1323                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1324                 else:
1325                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1326                 expiration_notice = ". It will expire on {} {}.".format(
1327                     local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1328             if args.update_collection:
1329                 logger.info(u"Collection updated: '{}'{}".format(
1330                     writer.collection_name(), expiration_notice))
1331             else:
1332                 logger.info(u"Collection saved as '{}'{}".format(
1333                     writer.collection_name(), expiration_notice))
1334             if args.portable_data_hash:
1335                 output = writer.portable_data_hash()
1336             else:
1337                 output = writer.manifest_locator()
1338         except apiclient_errors.Error as error:
1339             logger.error(
1340                 "arv-put: Error creating Collection on project: {}.".format(
1341                     error))
1342             status = 1
1343
1344     # Print the locator (uuid) of the new collection.
1345     if output is None:
1346         status = status or 1
1347     elif not args.silent:
1348         stdout.write(output)
1349         if not output.endswith('\n'):
1350             stdout.write('\n')
1351
1352     if install_sig_handlers:
1353         arv_cmd.restore_signal_handlers()
1354
1355     if status != 0:
1356         sys.exit(status)
1357
1358     # Success!
1359     return output
1360
1361
1362 if __name__ == '__main__':
1363     main()