15397: Remove updated_at usage.
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import arvados
7 import arvados.collection
8 import base64
9 import ciso8601
10 import copy
11 import datetime
12 import errno
13 import fcntl
14 import fnmatch
15 import hashlib
16 import json
17 import logging
18 import os
19 import pwd
20 import re
21 import signal
22 import socket
23 import sys
24 import tempfile
25 import threading
26 import time
27 import traceback
28
29 from apiclient import errors as apiclient_errors
30 from arvados._version import __version__
31 from arvados.util import keep_locator_pattern
32
33 import arvados.commands._util as arv_cmd
34
35 api_client = None
36
37 upload_opts = argparse.ArgumentParser(add_help=False)
38
39 upload_opts.add_argument('--version', action='version',
40                          version="%s %s" % (sys.argv[0], __version__),
41                          help='Print version and exit.')
42 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
43                          help="""
44 Local file or directory. If path is a directory reference with a trailing
45 slash, then just upload the directory's contents; otherwise upload the
46 directory itself. Default: read from standard input.
47 """)
48
49 _group = upload_opts.add_mutually_exclusive_group()
50
51 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
52                     default=-1, help=argparse.SUPPRESS)
53
54 _group.add_argument('--normalize', action='store_true',
55                     help="""
56 Normalize the manifest by re-ordering files and streams after writing
57 data.
58 """)
59
60 _group.add_argument('--dry-run', action='store_true', default=False,
61                     help="""
62 Don't actually upload files, but only check if any file should be
63 uploaded. Exit with code=2 when files are pending for upload.
64 """)
65
66 _group = upload_opts.add_mutually_exclusive_group()
67
68 _group.add_argument('--as-stream', action='store_true', dest='stream',
69                     help="""
70 Synonym for --stream.
71 """)
72
73 _group.add_argument('--stream', action='store_true',
74                     help="""
75 Store the file content and display the resulting manifest on
76 stdout. Do not save a Collection object in Arvados.
77 """)
78
79 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
80                     help="""
81 Synonym for --manifest.
82 """)
83
84 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
85                     help="""
86 Synonym for --manifest.
87 """)
88
89 _group.add_argument('--manifest', action='store_true',
90                     help="""
91 Store the file data and resulting manifest in Keep, save a Collection
92 object in Arvados, and display the manifest locator (Collection uuid)
93 on stdout. This is the default behavior.
94 """)
95
96 _group.add_argument('--as-raw', action='store_true', dest='raw',
97                     help="""
98 Synonym for --raw.
99 """)
100
101 _group.add_argument('--raw', action='store_true',
102                     help="""
103 Store the file content and display the data block locators on stdout,
104 separated by commas, with a trailing newline. Do not store a
105 manifest.
106 """)
107
108 upload_opts.add_argument('--update-collection', type=str, default=None,
109                          dest='update_collection', metavar="UUID", help="""
110 Update an existing collection identified by the given Arvados collection
111 UUID. All new local files will be uploaded.
112 """)
113
114 upload_opts.add_argument('--use-filename', type=str, default=None,
115                          dest='filename', help="""
116 Synonym for --filename.
117 """)
118
119 upload_opts.add_argument('--filename', type=str, default=None,
120                          help="""
121 Use the given filename in the manifest, instead of the name of the
122 local file. This is useful when "-" or "/dev/stdin" is given as an
123 input file. It can be used only if there is exactly one path given and
124 it is not a directory. Implies --manifest.
125 """)
126
127 upload_opts.add_argument('--portable-data-hash', action='store_true',
128                          help="""
129 Print the portable data hash instead of the Arvados UUID for the collection
130 created by the upload.
131 """)
132
133 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
134                          help="""
135 Set the replication level for the new collection: how many different
136 physical storage devices (e.g., disks) should have a copy of each data
137 block. Default is to use the server-provided default (if any) or 2.
138 """)
139
140 upload_opts.add_argument('--storage-classes', help="""
141 Specify comma separated list of storage classes to be used when saving data to Keep.
142 """)
143
144 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
145                          help="""
146 Set the number of upload threads to be used. Take into account that
147 using lots of threads will increase the RAM requirements. Default is
148 to use 2 threads.
149 On high latency installations, using a greater number will improve
150 overall throughput.
151 """)
152
153 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
154                       action='append', help="""
155 Exclude files and directories whose names match the given glob pattern. When
156 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
157 directory, relative to the provided input dirs will be excluded.
158 When using a filename pattern like '*.txt', any text file will be excluded
159 no matter where it is placed.
160 For the special case of needing to exclude only files or dirs directly below
161 the given input directory, you can use a pattern like './exclude_this.gif'.
162 You can specify multiple patterns by using this argument more than once.
163 """)
164
165 _group = upload_opts.add_mutually_exclusive_group()
166 _group.add_argument('--follow-links', action='store_true', default=True,
167                     dest='follow_links', help="""
168 Follow file and directory symlinks (default).
169 """)
170 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
171                     help="""
172 Ignore file and directory symlinks. Even paths given explicitly on the
173 command line will be skipped if they are symlinks.
174 """)
175
176
177 run_opts = argparse.ArgumentParser(add_help=False)
178
179 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
180 Store the collection in the specified project, instead of your Home
181 project.
182 """)
183
184 run_opts.add_argument('--name', help="""
185 Save the collection with the specified name.
186 """)
187
188 _group = run_opts.add_mutually_exclusive_group()
189 _group.add_argument('--progress', action='store_true',
190                     help="""
191 Display human-readable progress on stderr (bytes and, if possible,
192 percentage of total data size). This is the default behavior when
193 stderr is a tty.
194 """)
195
196 _group.add_argument('--no-progress', action='store_true',
197                     help="""
198 Do not display human-readable progress on stderr, even if stderr is a
199 tty.
200 """)
201
202 _group.add_argument('--batch-progress', action='store_true',
203                     help="""
204 Display machine-readable progress on stderr (bytes and, if known,
205 total data size).
206 """)
207
208 run_opts.add_argument('--silent', action='store_true',
209                       help="""
210 Do not print any debug messages to console. (Any error messages will
211 still be displayed.)
212 """)
213
214 run_opts.add_argument('--batch', action='store_true', default=False,
215                       help="""
216 Retries with '--no-resume --no-cache' if cached state contains invalid/expired
217 block signatures.
218 """)
219
220 _group = run_opts.add_mutually_exclusive_group()
221 _group.add_argument('--resume', action='store_true', default=True,
222                     help="""
223 Continue interrupted uploads from cached state (default).
224 """)
225 _group.add_argument('--no-resume', action='store_false', dest='resume',
226                     help="""
227 Do not continue interrupted uploads from cached state.
228 """)
229
230 _group = run_opts.add_mutually_exclusive_group()
231 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
232                     help="""
233 Save upload state in a cache file for resuming (default).
234 """)
235 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
236                     help="""
237 Do not save upload state in a cache file for resuming.
238 """)
239
240 _group = upload_opts.add_mutually_exclusive_group()
241 _group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
242                     help="""
243 Set the trash date of the resulting collection to an absolute date in the future.
244 The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
245 Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
246 """)
247 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
248                     help="""
249 Set the trash date of the resulting collection to an amount of days from the
250 date/time that the upload process finishes.
251 """)
252
253 arg_parser = argparse.ArgumentParser(
254     description='Copy data from the local filesystem to Keep.',
255     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
256
257 def parse_arguments(arguments):
258     args = arg_parser.parse_args(arguments)
259
260     if len(args.paths) == 0:
261         args.paths = ['-']
262
263     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
264
265     if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
266         arg_parser.error("""
267     --filename argument cannot be used when storing a directory or
268     multiple files.
269     """)
270
271     # Turn on --progress by default if stderr is a tty.
272     if (not (args.batch_progress or args.no_progress or args.silent)
273         and os.isatty(sys.stderr.fileno())):
274         args.progress = True
275
276     # Turn off --resume (default) if --no-cache is used.
277     if not args.use_cache:
278         args.resume = False
279
280     if args.paths == ['-']:
281         if args.update_collection:
282             arg_parser.error("""
283     --update-collection cannot be used when reading from stdin.
284     """)
285         args.resume = False
286         args.use_cache = False
287         if not args.filename:
288             args.filename = 'stdin'
289
290     # Remove possible duplicated patterns
291     if len(args.exclude) > 0:
292         args.exclude = list(set(args.exclude))
293
294     return args
295
296
297 class PathDoesNotExistError(Exception):
298     pass
299
300
301 class CollectionUpdateError(Exception):
302     pass
303
304
305 class ResumeCacheConflict(Exception):
306     pass
307
308
309 class ResumeCacheInvalidError(Exception):
310     pass
311
312 class ArvPutArgumentConflict(Exception):
313     pass
314
315
316 class ArvPutUploadIsPending(Exception):
317     pass
318
319
320 class ArvPutUploadNotPending(Exception):
321     pass
322
323
324 class FileUploadList(list):
325     def __init__(self, dry_run=False):
326         list.__init__(self)
327         self.dry_run = dry_run
328
329     def append(self, other):
330         if self.dry_run:
331             raise ArvPutUploadIsPending()
332         super(FileUploadList, self).append(other)
333
334
335 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
336 class ArvPutLogFormatter(logging.Formatter):
337     std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
338     err_fmtr = None
339     request_id_informed = False
340
341     def __init__(self, request_id):
342         self.err_fmtr = logging.Formatter(
343             arvados.log_format+' (X-Request-Id: {})'.format(request_id),
344             arvados.log_date_format)
345
346     def format(self, record):
347         if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
348             self.request_id_informed = True
349             return self.err_fmtr.format(record)
350         return self.std_fmtr.format(record)
351
352
353 class ResumeCache(object):
354     CACHE_DIR = '.cache/arvados/arv-put'
355
356     def __init__(self, file_spec):
357         self.cache_file = open(file_spec, 'a+')
358         self._lock_file(self.cache_file)
359         self.filename = self.cache_file.name
360
361     @classmethod
362     def make_path(cls, args):
363         md5 = hashlib.md5()
364         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
365         realpaths = sorted(os.path.realpath(path) for path in args.paths)
366         md5.update(b'\0'.join([p.encode() for p in realpaths]))
367         if any(os.path.isdir(path) for path in realpaths):
368             md5.update(b'-1')
369         elif args.filename:
370             md5.update(args.filename.encode())
371         return os.path.join(
372             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
373             md5.hexdigest())
374
375     def _lock_file(self, fileobj):
376         try:
377             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
378         except IOError:
379             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
380
381     def load(self):
382         self.cache_file.seek(0)
383         return json.load(self.cache_file)
384
385     def check_cache(self, api_client=None, num_retries=0):
386         try:
387             state = self.load()
388             locator = None
389             try:
390                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
391                     locator = state["_finished_streams"][0][1][0]
392                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
393                     locator = state["_current_stream_locators"][0]
394                 if locator is not None:
395                     kc = arvados.keep.KeepClient(api_client=api_client)
396                     kc.head(locator, num_retries=num_retries)
397             except Exception as e:
398                 self.restart()
399         except (ValueError):
400             pass
401
402     def save(self, data):
403         try:
404             new_cache_fd, new_cache_name = tempfile.mkstemp(
405                 dir=os.path.dirname(self.filename))
406             self._lock_file(new_cache_fd)
407             new_cache = os.fdopen(new_cache_fd, 'r+')
408             json.dump(data, new_cache)
409             os.rename(new_cache_name, self.filename)
410         except (IOError, OSError, ResumeCacheConflict):
411             try:
412                 os.unlink(new_cache_name)
413             except NameError:  # mkstemp failed.
414                 pass
415         else:
416             self.cache_file.close()
417             self.cache_file = new_cache
418
419     def close(self):
420         self.cache_file.close()
421
422     def destroy(self):
423         try:
424             os.unlink(self.filename)
425         except OSError as error:
426             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
427                 raise
428         self.close()
429
430     def restart(self):
431         self.destroy()
432         self.__init__(self.filename)
433
434
435 class ArvPutUploadJob(object):
436     CACHE_DIR = '.cache/arvados/arv-put'
437     EMPTY_STATE = {
438         'manifest' : None, # Last saved manifest checkpoint
439         'files' : {} # Previous run file list: {path : {size, mtime}}
440     }
441
442     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
443                  name=None, owner_uuid=None, api_client=None, batch_mode=False,
444                  ensure_unique_name=False, num_retries=None,
445                  put_threads=None, replication_desired=None, filename=None,
446                  update_time=60.0, update_collection=None, storage_classes=None,
447                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
448                  follow_links=True, exclude_paths=[], exclude_names=None,
449                  trash_at=None):
450         self.paths = paths
451         self.resume = resume
452         self.use_cache = use_cache
453         self.batch_mode = batch_mode
454         self.update = False
455         self.reporter = reporter
456         # This will set to 0 before start counting, if no special files are going
457         # to be read.
458         self.bytes_expected = None
459         self.bytes_written = 0
460         self.bytes_skipped = 0
461         self.name = name
462         self.owner_uuid = owner_uuid
463         self.ensure_unique_name = ensure_unique_name
464         self.num_retries = num_retries
465         self.replication_desired = replication_desired
466         self.put_threads = put_threads
467         self.filename = filename
468         self.storage_classes = storage_classes
469         self._api_client = api_client
470         self._state_lock = threading.Lock()
471         self._state = None # Previous run state (file list & manifest)
472         self._current_files = [] # Current run file list
473         self._cache_file = None
474         self._collection_lock = threading.Lock()
475         self._remote_collection = None # Collection being updated (if asked)
476         self._local_collection = None # Collection from previous run manifest
477         self._file_paths = set() # Files to be updated in remote collection
478         self._stop_checkpointer = threading.Event()
479         self._checkpointer = threading.Thread(target=self._update_task)
480         self._checkpointer.daemon = True
481         self._update_task_time = update_time  # How many seconds wait between update runs
482         self._files_to_upload = FileUploadList(dry_run=dry_run)
483         self._upload_started = False
484         self.logger = logger
485         self.dry_run = dry_run
486         self._checkpoint_before_quit = True
487         self.follow_links = follow_links
488         self.exclude_paths = exclude_paths
489         self.exclude_names = exclude_names
490         self._trash_at = trash_at
491
492         if self._trash_at is not None:
493             if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
494                 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
495             if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
496                 raise TypeError('provided trash_at datetime should be timezone-naive')
497
498         if not self.use_cache and self.resume:
499             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
500
501         # Check for obvious dry-run responses
502         if self.dry_run and (not self.use_cache or not self.resume):
503             raise ArvPutUploadIsPending()
504
505         # Load cached data if any and if needed
506         self._setup_state(update_collection)
507
508         # Build the upload file list, excluding requested files and counting the
509         # bytes expected to be uploaded.
510         self._build_upload_list()
511
512     def _build_upload_list(self):
513         """
514         Scan the requested paths to count file sizes, excluding requested files
515         and dirs and building the upload file list.
516         """
517         # If there aren't special files to be read, reset total bytes count to zero
518         # to start counting.
519         if not any([p for p in self.paths
520                     if not (os.path.isfile(p) or os.path.isdir(p))]):
521             self.bytes_expected = 0
522
523         for path in self.paths:
524             # Test for stdin first, in case some file named '-' exist
525             if path == '-':
526                 if self.dry_run:
527                     raise ArvPutUploadIsPending()
528                 self._write_stdin(self.filename or 'stdin')
529             elif not os.path.exists(path):
530                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
531             elif (not self.follow_links) and os.path.islink(path):
532                 self.logger.warning("Skipping symlink '{}'".format(path))
533                 continue
534             elif os.path.isdir(path):
535                 # Use absolute paths on cache index so CWD doesn't interfere
536                 # with the caching logic.
537                 orig_path = path
538                 path = os.path.abspath(path)
539                 if orig_path[-1:] == os.sep:
540                     # When passing a directory reference with a trailing slash,
541                     # its contents should be uploaded directly to the
542                     # collection's root.
543                     prefixdir = path
544                 else:
545                     # When passing a directory reference with no trailing slash,
546                     # upload the directory to the collection's root.
547                     prefixdir = os.path.dirname(path)
548                 prefixdir += os.sep
549                 for root, dirs, files in os.walk(path,
550                                                  followlinks=self.follow_links):
551                     root_relpath = os.path.relpath(root, path)
552                     if root_relpath == '.':
553                         root_relpath = ''
554                     # Exclude files/dirs by full path matching pattern
555                     if self.exclude_paths:
556                         dirs[:] = [d for d in dirs
557                                    if not any(pathname_match(
558                                            os.path.join(root_relpath, d), pat)
559                                               for pat in self.exclude_paths)]
560                         files = [f for f in files
561                                  if not any(pathname_match(
562                                          os.path.join(root_relpath, f), pat)
563                                             for pat in self.exclude_paths)]
564                     # Exclude files/dirs by name matching pattern
565                     if self.exclude_names is not None:
566                         dirs[:] = [d for d in dirs
567                                    if not self.exclude_names.match(d)]
568                         files = [f for f in files
569                                  if not self.exclude_names.match(f)]
570                     # Make os.walk()'s dir traversing order deterministic
571                     dirs.sort()
572                     files.sort()
573                     for f in files:
574                         filepath = os.path.join(root, f)
575                         if not os.path.isfile(filepath):
576                             self.logger.warning("Skipping non-regular file '{}'".format(filepath))
577                             continue
578                         # Add its size to the total bytes count (if applicable)
579                         if self.follow_links or (not os.path.islink(filepath)):
580                             if self.bytes_expected is not None:
581                                 self.bytes_expected += os.path.getsize(filepath)
582                         self._check_file(filepath,
583                                          os.path.join(root[len(prefixdir):], f))
584             else:
585                 filepath = os.path.abspath(path)
586                 # Add its size to the total bytes count (if applicable)
587                 if self.follow_links or (not os.path.islink(filepath)):
588                     if self.bytes_expected is not None:
589                         self.bytes_expected += os.path.getsize(filepath)
590                 self._check_file(filepath,
591                                  self.filename or os.path.basename(path))
592         # If dry-mode is on, and got up to this point, then we should notify that
593         # there aren't any file to upload.
594         if self.dry_run:
595             raise ArvPutUploadNotPending()
596         # Remove local_collection's files that don't exist locally anymore, so the
597         # bytes_written count is correct.
598         for f in self.collection_file_paths(self._local_collection,
599                                             path_prefix=""):
600             if f != 'stdin' and f != self.filename and not f in self._file_paths:
601                 self._local_collection.remove(f)
602
603     def start(self, save_collection):
604         """
605         Start supporting thread & file uploading
606         """
607         self._checkpointer.start()
608         try:
609             # Update bytes_written from current local collection and
610             # report initial progress.
611             self._update()
612             # Actual file upload
613             self._upload_started = True # Used by the update thread to start checkpointing
614             self._upload_files()
615         except (SystemExit, Exception) as e:
616             self._checkpoint_before_quit = False
617             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
618             # Note: We're expecting SystemExit instead of
619             # KeyboardInterrupt because we have a custom signal
620             # handler in place that raises SystemExit with the catched
621             # signal's code.
622             if isinstance(e, PathDoesNotExistError):
623                 # We aren't interested in the traceback for this case
624                 pass
625             elif not isinstance(e, SystemExit) or e.code != -2:
626                 self.logger.warning("Abnormal termination:\n{}".format(
627                     traceback.format_exc()))
628             raise
629         finally:
630             if not self.dry_run:
631                 # Stop the thread before doing anything else
632                 self._stop_checkpointer.set()
633                 self._checkpointer.join()
634                 if self._checkpoint_before_quit:
635                     # Commit all pending blocks & one last _update()
636                     self._local_collection.manifest_text()
637                     self._update(final=True)
638                     if save_collection:
639                         self.save_collection()
640             if self.use_cache:
641                 self._cache_file.close()
642
643     def _collection_trash_at(self):
644         """
645         Returns the trash date that the collection should use at save time.
646         Takes into account absolute/relative trash_at values requested
647         by the user.
648         """
649         if type(self._trash_at) == datetime.timedelta:
650             # Get an absolute datetime for trash_at
651             return datetime.datetime.utcnow() + self._trash_at
652         return self._trash_at
653
654     def save_collection(self):
655         if self.update:
656             # Check if files should be updated on the remote collection.
657             for fp in self._file_paths:
658                 remote_file = self._remote_collection.find(fp)
659                 if not remote_file:
660                     # File don't exist on remote collection, copy it.
661                     self._remote_collection.copy(fp, fp, self._local_collection)
662                 elif remote_file != self._local_collection.find(fp):
663                     # A different file exist on remote collection, overwrite it.
664                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
665                 else:
666                     # The file already exist on remote collection, skip it.
667                     pass
668             self._remote_collection.save(num_retries=self.num_retries,
669                                          trash_at=self._collection_trash_at())
670         else:
671             if len(self._local_collection) == 0:
672                 self.logger.warning("No files were uploaded, skipping collection creation.")
673                 return
674             self._local_collection.save_new(
675                 name=self.name, owner_uuid=self.owner_uuid,
676                 ensure_unique_name=self.ensure_unique_name,
677                 num_retries=self.num_retries,
678                 trash_at=self._collection_trash_at())
679
680     def destroy_cache(self):
681         if self.use_cache:
682             try:
683                 os.unlink(self._cache_filename)
684             except OSError as error:
685                 # That's what we wanted anyway.
686                 if error.errno != errno.ENOENT:
687                     raise
688             self._cache_file.close()
689
690     def _collection_size(self, collection):
691         """
692         Recursively get the total size of the collection
693         """
694         size = 0
695         for item in collection.values():
696             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
697                 size += self._collection_size(item)
698             else:
699                 size += item.size()
700         return size
701
702     def _update_task(self):
703         """
704         Periodically called support task. File uploading is
705         asynchronous so we poll status from the collection.
706         """
707         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
708             self._update()
709
710     def _update(self, final=False):
711         """
712         Update cached manifest text and report progress.
713         """
714         if self._upload_started:
715             with self._collection_lock:
716                 self.bytes_written = self._collection_size(self._local_collection)
717                 if self.use_cache:
718                     if final:
719                         manifest = self._local_collection.manifest_text()
720                     else:
721                         # Get the manifest text without comitting pending blocks
722                         manifest = self._local_collection.manifest_text(strip=False,
723                                                                         normalize=False,
724                                                                         only_committed=True)
725                     # Update cache
726                     with self._state_lock:
727                         self._state['manifest'] = manifest
728             if self.use_cache:
729                 try:
730                     self._save_state()
731                 except Exception as e:
732                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
733             # Keep remote collection's trash_at attribute synced when using relative expire dates
734             if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
735                 try:
736                     self._api_client.collections().update(
737                         uuid=self._remote_collection.manifest_locator(),
738                         body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
739                     ).execute(num_retries=self.num_retries)
740                 except Exception as e:
741                     self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
742         else:
743             self.bytes_written = self.bytes_skipped
744         # Call the reporter, if any
745         self.report_progress()
746
747     def report_progress(self):
748         if self.reporter is not None:
749             self.reporter(self.bytes_written, self.bytes_expected)
750
751     def _write_stdin(self, filename):
752         output = self._local_collection.open(filename, 'wb')
753         self._write(sys.stdin.buffer, output)
754         output.close()
755
756     def _check_file(self, source, filename):
757         """
758         Check if this file needs to be uploaded
759         """
760         # Ignore symlinks when requested
761         if (not self.follow_links) and os.path.islink(source):
762             return
763         resume_offset = 0
764         should_upload = False
765         new_file_in_cache = False
766         # Record file path for updating the remote collection before exiting
767         self._file_paths.add(filename)
768
769         with self._state_lock:
770             # If no previous cached data on this file, store it for an eventual
771             # repeated run.
772             if source not in self._state['files']:
773                 self._state['files'][source] = {
774                     'mtime': os.path.getmtime(source),
775                     'size' : os.path.getsize(source)
776                 }
777                 new_file_in_cache = True
778             cached_file_data = self._state['files'][source]
779
780         # Check if file was already uploaded (at least partially)
781         file_in_local_collection = self._local_collection.find(filename)
782
783         # If not resuming, upload the full file.
784         if not self.resume:
785             should_upload = True
786         # New file detected from last run, upload it.
787         elif new_file_in_cache:
788             should_upload = True
789         # Local file didn't change from last run.
790         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
791             if not file_in_local_collection:
792                 # File not uploaded yet, upload it completely
793                 should_upload = True
794             elif file_in_local_collection.permission_expired():
795                 # Permission token expired, re-upload file. This will change whenever
796                 # we have a API for refreshing tokens.
797                 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
798                 should_upload = True
799                 self._local_collection.remove(filename)
800             elif cached_file_data['size'] == file_in_local_collection.size():
801                 # File already there, skip it.
802                 self.bytes_skipped += cached_file_data['size']
803             elif cached_file_data['size'] > file_in_local_collection.size():
804                 # File partially uploaded, resume!
805                 resume_offset = file_in_local_collection.size()
806                 self.bytes_skipped += resume_offset
807                 should_upload = True
808             else:
809                 # Inconsistent cache, re-upload the file
810                 should_upload = True
811                 self._local_collection.remove(filename)
812                 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
813         # Local file differs from cached data, re-upload it.
814         else:
815             if file_in_local_collection:
816                 self._local_collection.remove(filename)
817             should_upload = True
818
819         if should_upload:
820             try:
821                 self._files_to_upload.append((source, resume_offset, filename))
822             except ArvPutUploadIsPending:
823                 # This could happen when running on dry-mode, close cache file to
824                 # avoid locking issues.
825                 self._cache_file.close()
826                 raise
827
828     def _upload_files(self):
829         for source, resume_offset, filename in self._files_to_upload:
830             with open(source, 'rb') as source_fd:
831                 with self._state_lock:
832                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
833                     self._state['files'][source]['size'] = os.path.getsize(source)
834                 if resume_offset > 0:
835                     # Start upload where we left off
836                     output = self._local_collection.open(filename, 'ab')
837                     source_fd.seek(resume_offset)
838                 else:
839                     # Start from scratch
840                     output = self._local_collection.open(filename, 'wb')
841                 self._write(source_fd, output)
842                 output.close(flush=False)
843
844     def _write(self, source_fd, output):
845         while True:
846             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
847             if not data:
848                 break
849             output.write(data)
850
851     def _my_collection(self):
852         return self._remote_collection if self.update else self._local_collection
853
854     def _get_cache_filepath(self):
855         # Set up cache file name from input paths.
856         md5 = hashlib.md5()
857         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
858         realpaths = sorted(os.path.realpath(path) for path in self.paths)
859         md5.update(b'\0'.join([p.encode() for p in realpaths]))
860         if self.filename:
861             md5.update(self.filename.encode())
862         cache_filename = md5.hexdigest()
863         cache_filepath = os.path.join(
864             arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
865             cache_filename)
866         return cache_filepath
867
868     def _setup_state(self, update_collection):
869         """
870         Create a new cache file or load a previously existing one.
871         """
872         # Load an already existing collection for update
873         if update_collection and re.match(arvados.util.collection_uuid_pattern,
874                                           update_collection):
875             try:
876                 self._remote_collection = arvados.collection.Collection(
877                     update_collection,
878                     api_client=self._api_client,
879                     storage_classes_desired=self.storage_classes,
880                     num_retries=self.num_retries)
881             except arvados.errors.ApiError as error:
882                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
883             else:
884                 self.update = True
885         elif update_collection:
886             # Collection locator provided, but unknown format
887             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
888
889         if self.use_cache:
890             cache_filepath = self._get_cache_filepath()
891             if self.resume and os.path.exists(cache_filepath):
892                 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
893                 self._cache_file = open(cache_filepath, 'a+')
894             else:
895                 # --no-resume means start with a empty cache file.
896                 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
897                 self._cache_file = open(cache_filepath, 'w+')
898             self._cache_filename = self._cache_file.name
899             self._lock_file(self._cache_file)
900             self._cache_file.seek(0)
901
902         with self._state_lock:
903             if self.use_cache:
904                 try:
905                     self._state = json.load(self._cache_file)
906                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
907                         # Cache at least partially incomplete, set up new cache
908                         self._state = copy.deepcopy(self.EMPTY_STATE)
909                 except ValueError:
910                     # Cache file empty, set up new cache
911                     self._state = copy.deepcopy(self.EMPTY_STATE)
912             else:
913                 self.logger.info("No cache usage requested for this run.")
914                 # No cache file, set empty state
915                 self._state = copy.deepcopy(self.EMPTY_STATE)
916             if not self._cached_manifest_valid():
917                 if not self.batch_mode:
918                     raise ResumeCacheInvalidError()
919                 else:
920                     self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
921                     self.use_cache = False # Don't overwrite preexisting cache file.
922                     self._state = copy.deepcopy(self.EMPTY_STATE)
923             # Load the previous manifest so we can check if files were modified remotely.
924             self._local_collection = arvados.collection.Collection(
925                 self._state['manifest'],
926                 replication_desired=self.replication_desired,
927                 storage_classes_desired=self.storage_classes,
928                 put_threads=self.put_threads,
929                 api_client=self._api_client,
930                 num_retries=self.num_retries)
931
932     def _cached_manifest_valid(self):
933         """
934         Validate the oldest non-expired block signature to check if cached manifest
935         is usable: checking if the cached manifest was not created with a different
936         arvados account.
937         """
938         if self._state.get('manifest', None) is None:
939             # No cached manifest yet, all good.
940             return True
941         now = datetime.datetime.utcnow()
942         oldest_exp = None
943         oldest_loc = None
944         block_found = False
945         for m in keep_locator_pattern.finditer(self._state['manifest']):
946             loc = m.group(0)
947             try:
948                 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
949             except IndexError:
950                 # Locator without signature
951                 continue
952             block_found = True
953             if exp > now and (oldest_exp is None or exp < oldest_exp):
954                 oldest_exp = exp
955                 oldest_loc = loc
956         if not block_found:
957             # No block signatures found => no invalid block signatures.
958             return True
959         if oldest_loc is None:
960             # Locator signatures found, but all have expired.
961             # Reset the cache and move on.
962             self.logger.info('Cache expired, starting from scratch.')
963             self._state['manifest'] = ''
964             return True
965         kc = arvados.KeepClient(api_client=self._api_client,
966                                 num_retries=self.num_retries)
967         try:
968             kc.head(oldest_loc)
969         except arvados.errors.KeepRequestError:
970             # Something is wrong, cached manifest is not valid.
971             return False
972         return True
973
974     def collection_file_paths(self, col, path_prefix='.'):
975         """Return a list of file paths by recursively go through the entire collection `col`"""
976         file_paths = []
977         for name, item in col.items():
978             if isinstance(item, arvados.arvfile.ArvadosFile):
979                 file_paths.append(os.path.join(path_prefix, name))
980             elif isinstance(item, arvados.collection.Subcollection):
981                 new_prefix = os.path.join(path_prefix, name)
982                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
983         return file_paths
984
985     def _lock_file(self, fileobj):
986         try:
987             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
988         except IOError:
989             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
990
991     def _save_state(self):
992         """
993         Atomically save current state into cache.
994         """
995         with self._state_lock:
996             # We're not using copy.deepcopy() here because it's a lot slower
997             # than json.dumps(), and we're already needing JSON format to be
998             # saved on disk.
999             state = json.dumps(self._state)
1000         try:
1001             new_cache = tempfile.NamedTemporaryFile(
1002                 mode='w+',
1003                 dir=os.path.dirname(self._cache_filename), delete=False)
1004             self._lock_file(new_cache)
1005             new_cache.write(state)
1006             new_cache.flush()
1007             os.fsync(new_cache)
1008             os.rename(new_cache.name, self._cache_filename)
1009         except (IOError, OSError, ResumeCacheConflict) as error:
1010             self.logger.error("There was a problem while saving the cache file: {}".format(error))
1011             try:
1012                 os.unlink(new_cache_name)
1013             except NameError:  # mkstemp failed.
1014                 pass
1015         else:
1016             self._cache_file.close()
1017             self._cache_file = new_cache
1018
1019     def collection_name(self):
1020         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1021
1022     def collection_trash_at(self):
1023         return self._my_collection().get_trash_at()
1024
1025     def manifest_locator(self):
1026         return self._my_collection().manifest_locator()
1027
1028     def portable_data_hash(self):
1029         pdh = self._my_collection().portable_data_hash()
1030         m = self._my_collection().stripped_manifest().encode()
1031         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1032         if pdh != local_pdh:
1033             self.logger.warning("\n".join([
1034                 "arv-put: API server provided PDH differs from local manifest.",
1035                 "         This should not happen; showing API server version."]))
1036         return pdh
1037
1038     def manifest_text(self, stream_name=".", strip=False, normalize=False):
1039         return self._my_collection().manifest_text(stream_name, strip, normalize)
1040
1041     def _datablocks_on_item(self, item):
1042         """
1043         Return a list of datablock locators, recursively navigating
1044         through subcollections
1045         """
1046         if isinstance(item, arvados.arvfile.ArvadosFile):
1047             if item.size() == 0:
1048                 # Empty file locator
1049                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1050             else:
1051                 locators = []
1052                 for segment in item.segments():
1053                     loc = segment.locator
1054                     locators.append(loc)
1055                 return locators
1056         elif isinstance(item, arvados.collection.Collection):
1057             l = [self._datablocks_on_item(x) for x in item.values()]
1058             # Fast list flattener method taken from:
1059             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1060             return [loc for sublist in l for loc in sublist]
1061         else:
1062             return None
1063
1064     def data_locators(self):
1065         with self._collection_lock:
1066             # Make sure all datablocks are flushed before getting the locators
1067             self._my_collection().manifest_text()
1068             datablocks = self._datablocks_on_item(self._my_collection())
1069         return datablocks
1070
1071 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1072                                                             os.getpid())
1073
1074 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1075 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1076 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1077 # so instead we're using it on every path component.
1078 def pathname_match(pathname, pattern):
1079     name = pathname.split(os.sep)
1080     # Fix patterns like 'some/subdir/' or 'some//subdir'
1081     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1082     if len(name) != len(pat):
1083         return False
1084     for i in range(len(name)):
1085         if not fnmatch.fnmatch(name[i], pat[i]):
1086             return False
1087     return True
1088
1089 def machine_progress(bytes_written, bytes_expected):
1090     return _machine_format.format(
1091         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1092
1093 def human_progress(bytes_written, bytes_expected):
1094     if bytes_expected:
1095         return "\r{}M / {}M {:.1%} ".format(
1096             bytes_written >> 20, bytes_expected >> 20,
1097             float(bytes_written) / bytes_expected)
1098     else:
1099         return "\r{} ".format(bytes_written)
1100
1101 def progress_writer(progress_func, outfile=sys.stderr):
1102     def write_progress(bytes_written, bytes_expected):
1103         outfile.write(progress_func(bytes_written, bytes_expected))
1104     return write_progress
1105
1106 def desired_project_uuid(api_client, project_uuid, num_retries):
1107     if not project_uuid:
1108         query = api_client.users().current()
1109     elif arvados.util.user_uuid_pattern.match(project_uuid):
1110         query = api_client.users().get(uuid=project_uuid)
1111     elif arvados.util.group_uuid_pattern.match(project_uuid):
1112         query = api_client.groups().get(uuid=project_uuid)
1113     else:
1114         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1115     return query.execute(num_retries=num_retries)['uuid']
1116
1117 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1118          install_sig_handlers=True):
1119     global api_client
1120
1121     args = parse_arguments(arguments)
1122     logger = logging.getLogger('arvados.arv_put')
1123     if args.silent:
1124         logger.setLevel(logging.WARNING)
1125     else:
1126         logger.setLevel(logging.INFO)
1127     status = 0
1128
1129     request_id = arvados.util.new_request_id()
1130
1131     formatter = ArvPutLogFormatter(request_id)
1132     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1133
1134     if api_client is None:
1135         api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
1136
1137     if install_sig_handlers:
1138         arv_cmd.install_signal_handlers()
1139
1140     # Trash arguments validation
1141     trash_at = None
1142     if args.trash_at is not None:
1143         # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1144         # make sure the user provides a complete YYYY-MM-DD date.
1145         if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1146             logger.error("--trash-at argument format invalid, use --help to see examples.")
1147             sys.exit(1)
1148         # Check if no time information was provided. In that case, assume end-of-day.
1149         if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1150             args.trash_at += 'T23:59:59'
1151         try:
1152             trash_at = ciso8601.parse_datetime(args.trash_at)
1153         except:
1154             logger.error("--trash-at argument format invalid, use --help to see examples.")
1155             sys.exit(1)
1156         else:
1157             if trash_at.tzinfo is not None:
1158                 # Timezone aware datetime provided.
1159                 utcoffset = -trash_at.utcoffset()
1160             else:
1161                 # Timezone naive datetime provided. Assume is local.
1162                 if time.daylight:
1163                     utcoffset = datetime.timedelta(seconds=time.altzone)
1164                 else:
1165                     utcoffset = datetime.timedelta(seconds=time.timezone)
1166             # Convert to UTC timezone naive datetime.
1167             trash_at = trash_at.replace(tzinfo=None) + utcoffset
1168
1169         if trash_at <= datetime.datetime.utcnow():
1170             logger.error("--trash-at argument must be set in the future")
1171             sys.exit(1)
1172     if args.trash_after is not None:
1173         if args.trash_after < 1:
1174             logger.error("--trash-after argument must be >= 1")
1175             sys.exit(1)
1176         trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1177
1178     # Determine the name to use
1179     if args.name:
1180         if args.stream or args.raw:
1181             logger.error("Cannot use --name with --stream or --raw")
1182             sys.exit(1)
1183         elif args.update_collection:
1184             logger.error("Cannot use --name with --update-collection")
1185             sys.exit(1)
1186         collection_name = args.name
1187     else:
1188         collection_name = "Saved at {} by {}@{}".format(
1189             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1190             pwd.getpwuid(os.getuid()).pw_name,
1191             socket.gethostname())
1192
1193     if args.project_uuid and (args.stream or args.raw):
1194         logger.error("Cannot use --project-uuid with --stream or --raw")
1195         sys.exit(1)
1196
1197     # Determine the parent project
1198     try:
1199         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1200                                             args.retries)
1201     except (apiclient_errors.Error, ValueError) as error:
1202         logger.error(error)
1203         sys.exit(1)
1204
1205     if args.progress:
1206         reporter = progress_writer(human_progress)
1207     elif args.batch_progress:
1208         reporter = progress_writer(machine_progress)
1209     else:
1210         reporter = None
1211
1212     #  Split storage-classes argument
1213     storage_classes = None
1214     if args.storage_classes:
1215         storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
1216
1217     # Setup exclude regex from all the --exclude arguments provided
1218     name_patterns = []
1219     exclude_paths = []
1220     exclude_names = None
1221     if len(args.exclude) > 0:
1222         # We're supporting 2 kinds of exclusion patterns:
1223         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1224         #                            the name, wherever the file is on the tree)
1225         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1226         #                            entire path, and should be relative to
1227         #                            any input dir argument)
1228         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1229         #                            placed directly underneath the input dir)
1230         for p in args.exclude:
1231             # Only relative paths patterns allowed
1232             if p.startswith(os.sep):
1233                 logger.error("Cannot use absolute paths with --exclude")
1234                 sys.exit(1)
1235             if os.path.dirname(p):
1236                 # We don't support of path patterns with '..'
1237                 p_parts = p.split(os.sep)
1238                 if '..' in p_parts:
1239                     logger.error(
1240                         "Cannot use path patterns that include or '..'")
1241                     sys.exit(1)
1242                 # Path search pattern
1243                 exclude_paths.append(p)
1244             else:
1245                 # Name-only search pattern
1246                 name_patterns.append(p)
1247         # For name only matching, we can combine all patterns into a single
1248         # regexp, for better performance.
1249         exclude_names = re.compile('|'.join(
1250             [fnmatch.translate(p) for p in name_patterns]
1251         )) if len(name_patterns) > 0 else None
1252         # Show the user the patterns to be used, just in case they weren't
1253         # specified inside quotes and got changed by the shell expansion.
1254         logger.info("Exclude patterns: {}".format(args.exclude))
1255
1256     # If this is used by a human, and there's at least one directory to be
1257     # uploaded, the expected bytes calculation can take a moment.
1258     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1259         logger.info("Calculating upload size, this could take some time...")
1260     try:
1261         writer = ArvPutUploadJob(paths = args.paths,
1262                                  resume = args.resume,
1263                                  use_cache = args.use_cache,
1264                                  batch_mode= args.batch,
1265                                  filename = args.filename,
1266                                  reporter = reporter,
1267                                  api_client = api_client,
1268                                  num_retries = args.retries,
1269                                  replication_desired = args.replication,
1270                                  put_threads = args.threads,
1271                                  name = collection_name,
1272                                  owner_uuid = project_uuid,
1273                                  ensure_unique_name = True,
1274                                  update_collection = args.update_collection,
1275                                  storage_classes=storage_classes,
1276                                  logger=logger,
1277                                  dry_run=args.dry_run,
1278                                  follow_links=args.follow_links,
1279                                  exclude_paths=exclude_paths,
1280                                  exclude_names=exclude_names,
1281                                  trash_at=trash_at)
1282     except ResumeCacheConflict:
1283         logger.error("\n".join([
1284             "arv-put: Another process is already uploading this data.",
1285             "         Use --no-cache if this is really what you want."]))
1286         sys.exit(1)
1287     except ResumeCacheInvalidError:
1288         logger.error("\n".join([
1289             "arv-put: Resume cache contains invalid signature: it may have expired",
1290             "         or been created with another Arvados user's credentials.",
1291             "         Switch user or use one of the following options to restart upload:",
1292             "         --no-resume to start a new resume cache.",
1293             "         --no-cache to disable resume cache.",
1294             "         --batch to ignore the resume cache if invalid."]))
1295         sys.exit(1)
1296     except (CollectionUpdateError, PathDoesNotExistError) as error:
1297         logger.error("\n".join([
1298             "arv-put: %s" % str(error)]))
1299         sys.exit(1)
1300     except ArvPutUploadIsPending:
1301         # Dry run check successful, return proper exit code.
1302         sys.exit(2)
1303     except ArvPutUploadNotPending:
1304         # No files pending for upload
1305         sys.exit(0)
1306
1307     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1308         logger.warning("\n".join([
1309             "arv-put: Resuming previous upload from last checkpoint.",
1310             "         Use the --no-resume option to start over."]))
1311
1312     if not args.dry_run:
1313         writer.report_progress()
1314     output = None
1315     try:
1316         writer.start(save_collection=not(args.stream or args.raw))
1317     except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
1318         logger.error("\n".join([
1319             "arv-put: %s" % str(error)]))
1320         sys.exit(1)
1321
1322     if args.progress:  # Print newline to split stderr from stdout for humans.
1323         logger.info("\n")
1324
1325     if args.stream:
1326         if args.normalize:
1327             output = writer.manifest_text(normalize=True)
1328         else:
1329             output = writer.manifest_text()
1330     elif args.raw:
1331         output = ','.join(writer.data_locators())
1332     elif writer.manifest_locator() is not None:
1333         try:
1334             expiration_notice = ""
1335             if writer.collection_trash_at() is not None:
1336                 # Get the local timezone-naive version, and log it with timezone information.
1337                 if time.daylight:
1338                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1339                 else:
1340                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1341                 expiration_notice = ". It will expire on {} {}.".format(
1342                     local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1343             if args.update_collection:
1344                 logger.info(u"Collection updated: '{}'{}".format(
1345                     writer.collection_name(), expiration_notice))
1346             else:
1347                 logger.info(u"Collection saved as '{}'{}".format(
1348                     writer.collection_name(), expiration_notice))
1349             if args.portable_data_hash:
1350                 output = writer.portable_data_hash()
1351             else:
1352                 output = writer.manifest_locator()
1353         except apiclient_errors.Error as error:
1354             logger.error(
1355                 "arv-put: Error creating Collection on project: {}.".format(
1356                     error))
1357             status = 1
1358     else:
1359         status = 1
1360
1361     # Print the locator (uuid) of the new collection.
1362     if output is None:
1363         status = status or 1
1364     elif not args.silent:
1365         stdout.write(output)
1366         if not output.endswith('\n'):
1367             stdout.write('\n')
1368
1369     if install_sig_handlers:
1370         arv_cmd.restore_signal_handlers()
1371
1372     if status != 0:
1373         sys.exit(status)
1374
1375     # Success!
1376     return output
1377
1378
1379 if __name__ == '__main__':
1380     main()