17800: Avoids saving a new empty collection.
[arvados.git] / sdk / python / arvados / commands / put.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from future.utils import listitems, listvalues
7 from builtins import str
8 from builtins import object
9 import argparse
10 import arvados
11 import arvados.collection
12 import base64
13 import ciso8601
14 import copy
15 import datetime
16 import errno
17 import fcntl
18 import fnmatch
19 import hashlib
20 import json
21 import logging
22 import os
23 import pwd
24 import re
25 import signal
26 import socket
27 import sys
28 import tempfile
29 import threading
30 import time
31 import traceback
32
33 from apiclient import errors as apiclient_errors
34 from arvados._version import __version__
35 from arvados.util import keep_locator_pattern
36
37 import arvados.commands._util as arv_cmd
38
39 api_client = None
40
41 upload_opts = argparse.ArgumentParser(add_help=False)
42
43 upload_opts.add_argument('--version', action='version',
44                          version="%s %s" % (sys.argv[0], __version__),
45                          help='Print version and exit.')
46 upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
47                          help="""
48 Local file or directory. If path is a directory reference with a trailing
49 slash, then just upload the directory's contents; otherwise upload the
50 directory itself. Default: read from standard input.
51 """)
52
53 _group = upload_opts.add_mutually_exclusive_group()
54
55 _group.add_argument('--max-manifest-depth', type=int, metavar='N',
56                     default=-1, help=argparse.SUPPRESS)
57
58 _group.add_argument('--normalize', action='store_true',
59                     help="""
60 Normalize the manifest by re-ordering files and streams after writing
61 data.
62 """)
63
64 _group.add_argument('--dry-run', action='store_true', default=False,
65                     help="""
66 Don't actually upload files, but only check if any file should be
67 uploaded. Exit with code=2 when files are pending for upload.
68 """)
69
70 _group = upload_opts.add_mutually_exclusive_group()
71
72 _group.add_argument('--as-stream', action='store_true', dest='stream',
73                     help="""
74 Synonym for --stream.
75 """)
76
77 _group.add_argument('--stream', action='store_true',
78                     help="""
79 Store the file content and display the resulting manifest on
80 stdout. Do not save a Collection object in Arvados.
81 """)
82
83 _group.add_argument('--as-manifest', action='store_true', dest='manifest',
84                     help="""
85 Synonym for --manifest.
86 """)
87
88 _group.add_argument('--in-manifest', action='store_true', dest='manifest',
89                     help="""
90 Synonym for --manifest.
91 """)
92
93 _group.add_argument('--manifest', action='store_true',
94                     help="""
95 Store the file data and resulting manifest in Keep, save a Collection
96 object in Arvados, and display the manifest locator (Collection uuid)
97 on stdout. This is the default behavior.
98 """)
99
100 _group.add_argument('--as-raw', action='store_true', dest='raw',
101                     help="""
102 Synonym for --raw.
103 """)
104
105 _group.add_argument('--raw', action='store_true',
106                     help="""
107 Store the file content and display the data block locators on stdout,
108 separated by commas, with a trailing newline. Do not store a
109 manifest.
110 """)
111
112 upload_opts.add_argument('--update-collection', type=str, default=None,
113                          dest='update_collection', metavar="UUID", help="""
114 Update an existing collection identified by the given Arvados collection
115 UUID. All new local files will be uploaded.
116 """)
117
118 upload_opts.add_argument('--use-filename', type=str, default=None,
119                          dest='filename', help="""
120 Synonym for --filename.
121 """)
122
123 upload_opts.add_argument('--filename', type=str, default=None,
124                          help="""
125 Use the given filename in the manifest, instead of the name of the
126 local file. This is useful when "-" or "/dev/stdin" is given as an
127 input file. It can be used only if there is exactly one path given and
128 it is not a directory. Implies --manifest.
129 """)
130
131 upload_opts.add_argument('--portable-data-hash', action='store_true',
132                          help="""
133 Print the portable data hash instead of the Arvados UUID for the collection
134 created by the upload.
135 """)
136
137 upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
138                          help="""
139 Set the replication level for the new collection: how many different
140 physical storage devices (e.g., disks) should have a copy of each data
141 block. Default is to use the server-provided default (if any) or 2.
142 """)
143
144 upload_opts.add_argument('--storage-classes', help="""
145 Specify comma separated list of storage classes to be used when saving data to Keep.
146 """)
147
148 upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
149                          help="""
150 Set the number of upload threads to be used. Take into account that
151 using lots of threads will increase the RAM requirements. Default is
152 to use 2 threads.
153 On high latency installations, using a greater number will improve
154 overall throughput.
155 """)
156
157 upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
158                       action='append', help="""
159 Exclude files and directories whose names match the given glob pattern. When
160 using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
161 directory, relative to the provided input dirs will be excluded.
162 When using a filename pattern like '*.txt', any text file will be excluded
163 no matter where it is placed.
164 For the special case of needing to exclude only files or dirs directly below
165 the given input directory, you can use a pattern like './exclude_this.gif'.
166 You can specify multiple patterns by using this argument more than once.
167 """)
168
169 _group = upload_opts.add_mutually_exclusive_group()
170 _group.add_argument('--follow-links', action='store_true', default=True,
171                     dest='follow_links', help="""
172 Follow file and directory symlinks (default).
173 """)
174 _group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
175                     help="""
176 Ignore file and directory symlinks. Even paths given explicitly on the
177 command line will be skipped if they are symlinks.
178 """)
179
180
181 run_opts = argparse.ArgumentParser(add_help=False)
182
183 run_opts.add_argument('--project-uuid', metavar='UUID', help="""
184 Store the collection in the specified project, instead of your Home
185 project.
186 """)
187
188 run_opts.add_argument('--name', help="""
189 Save the collection with the specified name.
190 """)
191
192 _group = run_opts.add_mutually_exclusive_group()
193 _group.add_argument('--progress', action='store_true',
194                     help="""
195 Display human-readable progress on stderr (bytes and, if possible,
196 percentage of total data size). This is the default behavior when
197 stderr is a tty.
198 """)
199
200 _group.add_argument('--no-progress', action='store_true',
201                     help="""
202 Do not display human-readable progress on stderr, even if stderr is a
203 tty.
204 """)
205
206 _group.add_argument('--batch-progress', action='store_true',
207                     help="""
208 Display machine-readable progress on stderr (bytes and, if known,
209 total data size).
210 """)
211
212 run_opts.add_argument('--silent', action='store_true',
213                       help="""
214 Do not print any debug messages to console. (Any error messages will
215 still be displayed.)
216 """)
217
218 _group = run_opts.add_mutually_exclusive_group()
219 _group.add_argument('--resume', action='store_true', default=True,
220                     help="""
221 Continue interrupted uploads from cached state (default).
222 """)
223 _group.add_argument('--no-resume', action='store_false', dest='resume',
224                     help="""
225 Do not continue interrupted uploads from cached state.
226 """)
227
228 _group = run_opts.add_mutually_exclusive_group()
229 _group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
230                     help="""
231 Save upload state in a cache file for resuming (default).
232 """)
233 _group.add_argument('--no-cache', action='store_false', dest='use_cache',
234                     help="""
235 Do not save upload state in a cache file for resuming.
236 """)
237
238 _group = upload_opts.add_mutually_exclusive_group()
239 _group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
240                     help="""
241 Set the trash date of the resulting collection to an absolute date in the future.
242 The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
243 Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
244 """)
245 _group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
246                     help="""
247 Set the trash date of the resulting collection to an amount of days from the
248 date/time that the upload process finishes.
249 """)
250
251 arg_parser = argparse.ArgumentParser(
252     description='Copy data from the local filesystem to Keep.',
253     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
254
255 def parse_arguments(arguments):
256     args = arg_parser.parse_args(arguments)
257
258     if len(args.paths) == 0:
259         args.paths = ['-']
260
261     args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
262
263     if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
264         arg_parser.error("""
265     --filename argument cannot be used when storing a directory or
266     multiple files.
267     """)
268
269     # Turn on --progress by default if stderr is a tty.
270     if (not (args.batch_progress or args.no_progress or args.silent)
271         and os.isatty(sys.stderr.fileno())):
272         args.progress = True
273
274     # Turn off --resume (default) if --no-cache is used.
275     if not args.use_cache:
276         args.resume = False
277
278     if args.paths == ['-']:
279         if args.update_collection:
280             arg_parser.error("""
281     --update-collection cannot be used when reading from stdin.
282     """)
283         args.resume = False
284         args.use_cache = False
285         if not args.filename:
286             args.filename = 'stdin'
287
288     # Remove possible duplicated patterns
289     if len(args.exclude) > 0:
290         args.exclude = list(set(args.exclude))
291
292     return args
293
294
295 class PathDoesNotExistError(Exception):
296     pass
297
298
299 class CollectionUpdateError(Exception):
300     pass
301
302
303 class ResumeCacheConflict(Exception):
304     pass
305
306
307 class ResumeCacheInvalidError(Exception):
308     pass
309
310 class ArvPutArgumentConflict(Exception):
311     pass
312
313
314 class ArvPutUploadIsPending(Exception):
315     pass
316
317
318 class ArvPutUploadNotPending(Exception):
319     pass
320
321
322 class FileUploadList(list):
323     def __init__(self, dry_run=False):
324         list.__init__(self)
325         self.dry_run = dry_run
326
327     def append(self, other):
328         if self.dry_run:
329             raise ArvPutUploadIsPending()
330         super(FileUploadList, self).append(other)
331
332
333 # Appends the X-Request-Id to the log message when log level is ERROR or DEBUG
334 class ArvPutLogFormatter(logging.Formatter):
335     std_fmtr = logging.Formatter(arvados.log_format, arvados.log_date_format)
336     err_fmtr = None
337     request_id_informed = False
338
339     def __init__(self, request_id):
340         self.err_fmtr = logging.Formatter(
341             arvados.log_format+' (X-Request-Id: {})'.format(request_id),
342             arvados.log_date_format)
343
344     def format(self, record):
345         if (not self.request_id_informed) and (record.levelno in (logging.DEBUG, logging.ERROR)):
346             self.request_id_informed = True
347             return self.err_fmtr.format(record)
348         return self.std_fmtr.format(record)
349
350
351 class ResumeCache(object):
352     CACHE_DIR = '.cache/arvados/arv-put'
353
354     def __init__(self, file_spec):
355         self.cache_file = open(file_spec, 'a+')
356         self._lock_file(self.cache_file)
357         self.filename = self.cache_file.name
358
359     @classmethod
360     def make_path(cls, args):
361         md5 = hashlib.md5()
362         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
363         realpaths = sorted(os.path.realpath(path) for path in args.paths)
364         md5.update(b'\0'.join([p.encode() for p in realpaths]))
365         if any(os.path.isdir(path) for path in realpaths):
366             md5.update(b'-1')
367         elif args.filename:
368             md5.update(args.filename.encode())
369         return os.path.join(
370             arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
371             md5.hexdigest())
372
373     def _lock_file(self, fileobj):
374         try:
375             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
376         except IOError:
377             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
378
379     def load(self):
380         self.cache_file.seek(0)
381         return json.load(self.cache_file)
382
383     def check_cache(self, api_client=None, num_retries=0):
384         try:
385             state = self.load()
386             locator = None
387             try:
388                 if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
389                     locator = state["_finished_streams"][0][1][0]
390                 elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
391                     locator = state["_current_stream_locators"][0]
392                 if locator is not None:
393                     kc = arvados.keep.KeepClient(api_client=api_client)
394                     kc.head(locator, num_retries=num_retries)
395             except Exception as e:
396                 self.restart()
397         except (ValueError):
398             pass
399
400     def save(self, data):
401         try:
402             new_cache_fd, new_cache_name = tempfile.mkstemp(
403                 dir=os.path.dirname(self.filename))
404             self._lock_file(new_cache_fd)
405             new_cache = os.fdopen(new_cache_fd, 'r+')
406             json.dump(data, new_cache)
407             os.rename(new_cache_name, self.filename)
408         except (IOError, OSError, ResumeCacheConflict):
409             try:
410                 os.unlink(new_cache_name)
411             except NameError:  # mkstemp failed.
412                 pass
413         else:
414             self.cache_file.close()
415             self.cache_file = new_cache
416
417     def close(self):
418         self.cache_file.close()
419
420     def destroy(self):
421         try:
422             os.unlink(self.filename)
423         except OSError as error:
424             if error.errno != errno.ENOENT:  # That's what we wanted anyway.
425                 raise
426         self.close()
427
428     def restart(self):
429         self.destroy()
430         self.__init__(self.filename)
431
432
433 class ArvPutUploadJob(object):
434     CACHE_DIR = '.cache/arvados/arv-put'
435     EMPTY_STATE = {
436         'manifest' : None, # Last saved manifest checkpoint
437         'files' : {} # Previous run file list: {path : {size, mtime}}
438     }
439
440     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
441                  name=None, owner_uuid=None, api_client=None,
442                  ensure_unique_name=False, num_retries=None,
443                  put_threads=None, replication_desired=None, filename=None,
444                  update_time=60.0, update_collection=None, storage_classes=None,
445                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
446                  follow_links=True, exclude_paths=[], exclude_names=None,
447                  trash_at=None):
448         self.paths = paths
449         self.resume = resume
450         self.use_cache = use_cache
451         self.update = False
452         self.reporter = reporter
453         # This will set to 0 before start counting, if no special files are going
454         # to be read.
455         self.bytes_expected = None
456         self.bytes_written = 0
457         self.bytes_skipped = 0
458         self.name = name
459         self.owner_uuid = owner_uuid
460         self.ensure_unique_name = ensure_unique_name
461         self.num_retries = num_retries
462         self.replication_desired = replication_desired
463         self.put_threads = put_threads
464         self.filename = filename
465         self.storage_classes = storage_classes
466         self._api_client = api_client
467         self._state_lock = threading.Lock()
468         self._state = None # Previous run state (file list & manifest)
469         self._current_files = [] # Current run file list
470         self._cache_file = None
471         self._collection_lock = threading.Lock()
472         self._remote_collection = None # Collection being updated (if asked)
473         self._local_collection = None # Collection from previous run manifest
474         self._file_paths = set() # Files to be updated in remote collection
475         self._stop_checkpointer = threading.Event()
476         self._checkpointer = threading.Thread(target=self._update_task)
477         self._checkpointer.daemon = True
478         self._update_task_time = update_time  # How many seconds wait between update runs
479         self._files_to_upload = FileUploadList(dry_run=dry_run)
480         self._upload_started = False
481         self.logger = logger
482         self.dry_run = dry_run
483         self._checkpoint_before_quit = True
484         self.follow_links = follow_links
485         self.exclude_paths = exclude_paths
486         self.exclude_names = exclude_names
487         self._trash_at = trash_at
488
489         if self._trash_at is not None:
490             if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
491                 raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
492             if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
493                 raise TypeError('provided trash_at datetime should be timezone-naive')
494
495         if not self.use_cache and self.resume:
496             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
497
498         # Check for obvious dry-run responses
499         if self.dry_run and (not self.use_cache or not self.resume):
500             raise ArvPutUploadIsPending()
501
502         # Load cached data if any and if needed
503         self._setup_state(update_collection)
504
505         # Build the upload file list, excluding requested files and counting the
506         # bytes expected to be uploaded.
507         self._build_upload_list()
508
509     def _build_upload_list(self):
510         """
511         Scan the requested paths to count file sizes, excluding requested files
512         and dirs and building the upload file list.
513         """
514         # If there aren't special files to be read, reset total bytes count to zero
515         # to start counting.
516         if not any([p for p in self.paths
517                     if not (os.path.isfile(p) or os.path.isdir(p))]):
518             self.bytes_expected = 0
519
520         for path in self.paths:
521             # Test for stdin first, in case some file named '-' exist
522             if path == '-':
523                 if self.dry_run:
524                     raise ArvPutUploadIsPending()
525                 self._write_stdin(self.filename or 'stdin')
526             elif not os.path.exists(path):
527                  raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
528             elif (not self.follow_links) and os.path.islink(path):
529                 self.logger.warning("Skipping symlink '{}'".format(path))
530                 continue
531             elif os.path.isdir(path):
532                 # Use absolute paths on cache index so CWD doesn't interfere
533                 # with the caching logic.
534                 orig_path = path
535                 path = os.path.abspath(path)
536                 if orig_path[-1:] == os.sep:
537                     # When passing a directory reference with a trailing slash,
538                     # its contents should be uploaded directly to the
539                     # collection's root.
540                     prefixdir = path
541                 else:
542                     # When passing a directory reference with no trailing slash,
543                     # upload the directory to the collection's root.
544                     prefixdir = os.path.dirname(path)
545                 prefixdir += os.sep
546                 for root, dirs, files in os.walk(path,
547                                                  followlinks=self.follow_links):
548                     root_relpath = os.path.relpath(root, path)
549                     if root_relpath == '.':
550                         root_relpath = ''
551                     # Exclude files/dirs by full path matching pattern
552                     if self.exclude_paths:
553                         dirs[:] = [d for d in dirs
554                                    if not any(pathname_match(
555                                            os.path.join(root_relpath, d), pat)
556                                               for pat in self.exclude_paths)]
557                         files = [f for f in files
558                                  if not any(pathname_match(
559                                          os.path.join(root_relpath, f), pat)
560                                             for pat in self.exclude_paths)]
561                     # Exclude files/dirs by name matching pattern
562                     if self.exclude_names is not None:
563                         dirs[:] = [d for d in dirs
564                                    if not self.exclude_names.match(d)]
565                         files = [f for f in files
566                                  if not self.exclude_names.match(f)]
567                     # Make os.walk()'s dir traversing order deterministic
568                     dirs.sort()
569                     files.sort()
570                     for f in files:
571                         filepath = os.path.join(root, f)
572                         # Add its size to the total bytes count (if applicable)
573                         if self.follow_links or (not os.path.islink(filepath)):
574                             if self.bytes_expected is not None:
575                                 self.bytes_expected += os.path.getsize(filepath)
576                         self._check_file(filepath,
577                                          os.path.join(root[len(prefixdir):], f))
578             else:
579                 filepath = os.path.abspath(path)
580                 # Add its size to the total bytes count (if applicable)
581                 if self.follow_links or (not os.path.islink(filepath)):
582                     if self.bytes_expected is not None:
583                         self.bytes_expected += os.path.getsize(filepath)
584                 self._check_file(filepath,
585                                  self.filename or os.path.basename(path))
586         # If dry-mode is on, and got up to this point, then we should notify that
587         # there aren't any file to upload.
588         if self.dry_run:
589             raise ArvPutUploadNotPending()
590         # Remove local_collection's files that don't exist locally anymore, so the
591         # bytes_written count is correct.
592         for f in self.collection_file_paths(self._local_collection,
593                                             path_prefix=""):
594             if f != 'stdin' and f != self.filename and not f in self._file_paths:
595                 self._local_collection.remove(f)
596
597     def start(self, save_collection):
598         """
599         Start supporting thread & file uploading
600         """
601         self._checkpointer.start()
602         try:
603             # Update bytes_written from current local collection and
604             # report initial progress.
605             self._update()
606             # Actual file upload
607             self._upload_started = True # Used by the update thread to start checkpointing
608             self._upload_files()
609         except (SystemExit, Exception) as e:
610             self._checkpoint_before_quit = False
611             # Log stack trace only when Ctrl-C isn't pressed (SIGINT)
612             # Note: We're expecting SystemExit instead of
613             # KeyboardInterrupt because we have a custom signal
614             # handler in place that raises SystemExit with the catched
615             # signal's code.
616             if isinstance(e, PathDoesNotExistError):
617                 # We aren't interested in the traceback for this case
618                 pass
619             elif not isinstance(e, SystemExit) or e.code != -2:
620                 self.logger.warning("Abnormal termination:\n{}".format(
621                     traceback.format_exc()))
622             raise
623         finally:
624             if not self.dry_run:
625                 # Stop the thread before doing anything else
626                 self._stop_checkpointer.set()
627                 self._checkpointer.join()
628                 if self._checkpoint_before_quit:
629                     # Commit all pending blocks & one last _update()
630                     self._local_collection.manifest_text()
631                     self._update(final=True)
632                     if save_collection:
633                         self.save_collection()
634             if self.use_cache:
635                 self._cache_file.close()
636
637     def _collection_trash_at(self):
638         """
639         Returns the trash date that the collection should use at save time.
640         Takes into account absolute/relative trash_at values requested
641         by the user.
642         """
643         if type(self._trash_at) == datetime.timedelta:
644             # Get an absolute datetime for trash_at
645             return datetime.datetime.utcnow() + self._trash_at
646         return self._trash_at
647
648     def save_collection(self):
649         if self.update:
650             # Check if files should be updated on the remote collection.
651             for fp in self._file_paths:
652                 remote_file = self._remote_collection.find(fp)
653                 if not remote_file:
654                     # File don't exist on remote collection, copy it.
655                     self._remote_collection.copy(fp, fp, self._local_collection)
656                 elif remote_file != self._local_collection.find(fp):
657                     # A different file exist on remote collection, overwrite it.
658                     self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
659                 else:
660                     # The file already exist on remote collection, skip it.
661                     pass
662             self._remote_collection.save(storage_classes=self.storage_classes,
663                                          num_retries=self.num_retries,
664                                          trash_at=self._collection_trash_at())
665         else:
666             if len(self._local_collection) == 0:
667                 self.logger.warning("No files were uploaded, skipping collection creation.")
668                 return
669             self._local_collection.save_new(
670                 name=self.name, owner_uuid=self.owner_uuid,
671                 storage_classes=self.storage_classes,
672                 ensure_unique_name=self.ensure_unique_name,
673                 num_retries=self.num_retries,
674                 trash_at=self._collection_trash_at())
675
676     def destroy_cache(self):
677         if self.use_cache:
678             try:
679                 os.unlink(self._cache_filename)
680             except OSError as error:
681                 # That's what we wanted anyway.
682                 if error.errno != errno.ENOENT:
683                     raise
684             self._cache_file.close()
685
686     def _collection_size(self, collection):
687         """
688         Recursively get the total size of the collection
689         """
690         size = 0
691         for item in listvalues(collection):
692             if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
693                 size += self._collection_size(item)
694             else:
695                 size += item.size()
696         return size
697
698     def _update_task(self):
699         """
700         Periodically called support task. File uploading is
701         asynchronous so we poll status from the collection.
702         """
703         while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
704             self._update()
705
706     def _update(self, final=False):
707         """
708         Update cached manifest text and report progress.
709         """
710         if self._upload_started:
711             with self._collection_lock:
712                 self.bytes_written = self._collection_size(self._local_collection)
713                 if self.use_cache:
714                     if final:
715                         manifest = self._local_collection.manifest_text()
716                     else:
717                         # Get the manifest text without comitting pending blocks
718                         manifest = self._local_collection.manifest_text(strip=False,
719                                                                         normalize=False,
720                                                                         only_committed=True)
721                     # Update cache
722                     with self._state_lock:
723                         self._state['manifest'] = manifest
724             if self.use_cache:
725                 try:
726                     self._save_state()
727                 except Exception as e:
728                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
729             # Keep remote collection's trash_at attribute synced when using relative expire dates
730             if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
731                 try:
732                     self._api_client.collections().update(
733                         uuid=self._remote_collection.manifest_locator(),
734                         body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
735                     ).execute(num_retries=self.num_retries)
736                 except Exception as e:
737                     self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
738         else:
739             self.bytes_written = self.bytes_skipped
740         # Call the reporter, if any
741         self.report_progress()
742
743     def report_progress(self):
744         if self.reporter is not None:
745             self.reporter(self.bytes_written, self.bytes_expected)
746
747     def _write_stdin(self, filename):
748         output = self._local_collection.open(filename, 'wb')
749         self._write(sys.stdin.buffer, output)
750         output.close()
751
752     def _check_file(self, source, filename):
753         """
754         Check if this file needs to be uploaded
755         """
756         # Ignore symlinks when requested
757         if (not self.follow_links) and os.path.islink(source):
758             return
759         resume_offset = 0
760         should_upload = False
761         new_file_in_cache = False
762         # Record file path for updating the remote collection before exiting
763         self._file_paths.add(filename)
764
765         with self._state_lock:
766             # If no previous cached data on this file, store it for an eventual
767             # repeated run.
768             if source not in self._state['files']:
769                 self._state['files'][source] = {
770                     'mtime': os.path.getmtime(source),
771                     'size' : os.path.getsize(source)
772                 }
773                 new_file_in_cache = True
774             cached_file_data = self._state['files'][source]
775
776         # Check if file was already uploaded (at least partially)
777         file_in_local_collection = self._local_collection.find(filename)
778
779         # If not resuming, upload the full file.
780         if not self.resume:
781             should_upload = True
782         # New file detected from last run, upload it.
783         elif new_file_in_cache:
784             should_upload = True
785         # Local file didn't change from last run.
786         elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
787             if not file_in_local_collection:
788                 # File not uploaded yet, upload it completely
789                 should_upload = True
790             elif file_in_local_collection.permission_expired():
791                 # Permission token expired, re-upload file. This will change whenever
792                 # we have a API for refreshing tokens.
793                 self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
794                 should_upload = True
795                 self._local_collection.remove(filename)
796             elif cached_file_data['size'] == file_in_local_collection.size():
797                 # File already there, skip it.
798                 self.bytes_skipped += cached_file_data['size']
799             elif cached_file_data['size'] > file_in_local_collection.size():
800                 # File partially uploaded, resume!
801                 resume_offset = file_in_local_collection.size()
802                 self.bytes_skipped += resume_offset
803                 should_upload = True
804             else:
805                 # Inconsistent cache, re-upload the file
806                 should_upload = True
807                 self._local_collection.remove(filename)
808                 self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
809         # Local file differs from cached data, re-upload it.
810         else:
811             if file_in_local_collection:
812                 self._local_collection.remove(filename)
813             should_upload = True
814
815         if should_upload:
816             try:
817                 self._files_to_upload.append((source, resume_offset, filename))
818             except ArvPutUploadIsPending:
819                 # This could happen when running on dry-mode, close cache file to
820                 # avoid locking issues.
821                 self._cache_file.close()
822                 raise
823
824     def _upload_files(self):
825         for source, resume_offset, filename in self._files_to_upload:
826             with open(source, 'rb') as source_fd:
827                 with self._state_lock:
828                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
829                     self._state['files'][source]['size'] = os.path.getsize(source)
830                 if resume_offset > 0:
831                     # Start upload where we left off
832                     output = self._local_collection.open(filename, 'ab')
833                     source_fd.seek(resume_offset)
834                 else:
835                     # Start from scratch
836                     output = self._local_collection.open(filename, 'wb')
837                 self._write(source_fd, output)
838                 output.close(flush=False)
839
840     def _write(self, source_fd, output):
841         while True:
842             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
843             if not data:
844                 break
845             output.write(data)
846
847     def _my_collection(self):
848         return self._remote_collection if self.update else self._local_collection
849
850     def _get_cache_filepath(self):
851         # Set up cache file name from input paths.
852         md5 = hashlib.md5()
853         md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
854         realpaths = sorted(os.path.realpath(path) for path in self.paths)
855         md5.update(b'\0'.join([p.encode() for p in realpaths]))
856         if self.filename:
857             md5.update(self.filename.encode())
858         cache_filename = md5.hexdigest()
859         cache_filepath = os.path.join(
860             arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
861             cache_filename)
862         return cache_filepath
863
864     def _setup_state(self, update_collection):
865         """
866         Create a new cache file or load a previously existing one.
867         """
868         # Load an already existing collection for update
869         if update_collection and re.match(arvados.util.collection_uuid_pattern,
870                                           update_collection):
871             try:
872                 self._remote_collection = arvados.collection.Collection(
873                     update_collection,
874                     api_client=self._api_client,
875                     num_retries=self.num_retries)
876             except arvados.errors.ApiError as error:
877                 raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
878             else:
879                 self.update = True
880         elif update_collection:
881             # Collection locator provided, but unknown format
882             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
883
884         if self.use_cache:
885             cache_filepath = self._get_cache_filepath()
886             if self.resume and os.path.exists(cache_filepath):
887                 self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
888                 self._cache_file = open(cache_filepath, 'a+')
889             else:
890                 # --no-resume means start with a empty cache file.
891                 self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
892                 self._cache_file = open(cache_filepath, 'w+')
893             self._cache_filename = self._cache_file.name
894             self._lock_file(self._cache_file)
895             self._cache_file.seek(0)
896
897         with self._state_lock:
898             if self.use_cache:
899                 try:
900                     self._state = json.load(self._cache_file)
901                     if not set(['manifest', 'files']).issubset(set(self._state.keys())):
902                         # Cache at least partially incomplete, set up new cache
903                         self._state = copy.deepcopy(self.EMPTY_STATE)
904                 except ValueError:
905                     # Cache file empty, set up new cache
906                     self._state = copy.deepcopy(self.EMPTY_STATE)
907             else:
908                 self.logger.info("No cache usage requested for this run.")
909                 # No cache file, set empty state
910                 self._state = copy.deepcopy(self.EMPTY_STATE)
911             if not self._cached_manifest_valid():
912                 raise ResumeCacheInvalidError()
913             # Load the previous manifest so we can check if files were modified remotely.
914             self._local_collection = arvados.collection.Collection(
915                 self._state['manifest'],
916                 replication_desired=self.replication_desired,
917                 put_threads=self.put_threads,
918                 api_client=self._api_client,
919                 num_retries=self.num_retries)
920
921     def _cached_manifest_valid(self):
922         """
923         Validate the oldest non-expired block signature to check if cached manifest
924         is usable: checking if the cached manifest was not created with a different
925         arvados account.
926         """
927         if self._state.get('manifest', None) is None:
928             # No cached manifest yet, all good.
929             return True
930         now = datetime.datetime.utcnow()
931         oldest_exp = None
932         oldest_loc = None
933         block_found = False
934         for m in keep_locator_pattern.finditer(self._state['manifest']):
935             loc = m.group(0)
936             try:
937                 exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
938             except IndexError:
939                 # Locator without signature
940                 continue
941             block_found = True
942             if exp > now and (oldest_exp is None or exp < oldest_exp):
943                 oldest_exp = exp
944                 oldest_loc = loc
945         if not block_found:
946             # No block signatures found => no invalid block signatures.
947             return True
948         if oldest_loc is None:
949             # Locator signatures found, but all have expired.
950             # Reset the cache and move on.
951             self.logger.info('Cache expired, starting from scratch.')
952             self._state['manifest'] = ''
953             return True
954         kc = arvados.KeepClient(api_client=self._api_client,
955                                 num_retries=self.num_retries)
956         try:
957             kc.head(oldest_loc)
958         except arvados.errors.KeepRequestError:
959             # Something is wrong, cached manifest is not valid.
960             return False
961         return True
962
963     def collection_file_paths(self, col, path_prefix='.'):
964         """Return a list of file paths by recursively go through the entire collection `col`"""
965         file_paths = []
966         for name, item in listitems(col):
967             if isinstance(item, arvados.arvfile.ArvadosFile):
968                 file_paths.append(os.path.join(path_prefix, name))
969             elif isinstance(item, arvados.collection.Subcollection):
970                 new_prefix = os.path.join(path_prefix, name)
971                 file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
972         return file_paths
973
974     def _lock_file(self, fileobj):
975         try:
976             fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
977         except IOError:
978             raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
979
980     def _save_state(self):
981         """
982         Atomically save current state into cache.
983         """
984         with self._state_lock:
985             # We're not using copy.deepcopy() here because it's a lot slower
986             # than json.dumps(), and we're already needing JSON format to be
987             # saved on disk.
988             state = json.dumps(self._state)
989         try:
990             new_cache = tempfile.NamedTemporaryFile(
991                 mode='w+',
992                 dir=os.path.dirname(self._cache_filename), delete=False)
993             self._lock_file(new_cache)
994             new_cache.write(state)
995             new_cache.flush()
996             os.fsync(new_cache)
997             os.rename(new_cache.name, self._cache_filename)
998         except (IOError, OSError, ResumeCacheConflict) as error:
999             self.logger.error("There was a problem while saving the cache file: {}".format(error))
1000             try:
1001                 os.unlink(new_cache_name)
1002             except NameError:  # mkstemp failed.
1003                 pass
1004         else:
1005             self._cache_file.close()
1006             self._cache_file = new_cache
1007
1008     def collection_name(self):
1009         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
1010
1011     def collection_trash_at(self):
1012         return self._my_collection().get_trash_at()
1013
1014     def manifest_locator(self):
1015         return self._my_collection().manifest_locator()
1016
1017     def portable_data_hash(self):
1018         pdh = self._my_collection().portable_data_hash()
1019         m = self._my_collection().stripped_manifest().encode()
1020         local_pdh = '{}+{}'.format(hashlib.md5(m).hexdigest(), len(m))
1021         if pdh != local_pdh:
1022             self.logger.warning("\n".join([
1023                 "arv-put: API server provided PDH differs from local manifest.",
1024                 "         This should not happen; showing API server version."]))
1025         return pdh
1026
1027     def manifest_text(self, stream_name=".", strip=False, normalize=False):
1028         return self._my_collection().manifest_text(stream_name, strip, normalize)
1029
1030     def _datablocks_on_item(self, item):
1031         """
1032         Return a list of datablock locators, recursively navigating
1033         through subcollections
1034         """
1035         if isinstance(item, arvados.arvfile.ArvadosFile):
1036             if item.size() == 0:
1037                 # Empty file locator
1038                 return ["d41d8cd98f00b204e9800998ecf8427e+0"]
1039             else:
1040                 locators = []
1041                 for segment in item.segments():
1042                     loc = segment.locator
1043                     locators.append(loc)
1044                 return locators
1045         elif isinstance(item, arvados.collection.Collection):
1046             l = [self._datablocks_on_item(x) for x in listvalues(item)]
1047             # Fast list flattener method taken from:
1048             # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
1049             return [loc for sublist in l for loc in sublist]
1050         else:
1051             return None
1052
1053     def data_locators(self):
1054         with self._collection_lock:
1055             # Make sure all datablocks are flushed before getting the locators
1056             self._my_collection().manifest_text()
1057             datablocks = self._datablocks_on_item(self._my_collection())
1058         return datablocks
1059
1060 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
1061                                                             os.getpid())
1062
1063 # Simulate glob.glob() matching behavior without the need to scan the filesystem
1064 # Note: fnmatch() doesn't work correctly when used with pathnames. For example the
1065 # pattern 'tests/*.py' will match 'tests/run_test.py' and also 'tests/subdir/run_test.py',
1066 # so instead we're using it on every path component.
1067 def pathname_match(pathname, pattern):
1068     name = pathname.split(os.sep)
1069     # Fix patterns like 'some/subdir/' or 'some//subdir'
1070     pat = [x for x in pattern.split(os.sep) if x != '' and x != '.']
1071     if len(name) != len(pat):
1072         return False
1073     for i in range(len(name)):
1074         if not fnmatch.fnmatch(name[i], pat[i]):
1075             return False
1076     return True
1077
1078 def machine_progress(bytes_written, bytes_expected):
1079     return _machine_format.format(
1080         bytes_written, -1 if (bytes_expected is None) else bytes_expected)
1081
1082 def human_progress(bytes_written, bytes_expected):
1083     if bytes_expected:
1084         return "\r{}M / {}M {:.1%} ".format(
1085             bytes_written >> 20, bytes_expected >> 20,
1086             float(bytes_written) / bytes_expected)
1087     else:
1088         return "\r{} ".format(bytes_written)
1089
1090 def progress_writer(progress_func, outfile=sys.stderr):
1091     def write_progress(bytes_written, bytes_expected):
1092         outfile.write(progress_func(bytes_written, bytes_expected))
1093     return write_progress
1094
1095 def desired_project_uuid(api_client, project_uuid, num_retries):
1096     if not project_uuid:
1097         query = api_client.users().current()
1098     elif arvados.util.user_uuid_pattern.match(project_uuid):
1099         query = api_client.users().get(uuid=project_uuid)
1100     elif arvados.util.group_uuid_pattern.match(project_uuid):
1101         query = api_client.groups().get(uuid=project_uuid)
1102     else:
1103         raise ValueError("Not a valid project UUID: {}".format(project_uuid))
1104     return query.execute(num_retries=num_retries)['uuid']
1105
1106 def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
1107          install_sig_handlers=True):
1108     global api_client
1109
1110     args = parse_arguments(arguments)
1111     logger = logging.getLogger('arvados.arv_put')
1112     if args.silent:
1113         logger.setLevel(logging.WARNING)
1114     else:
1115         logger.setLevel(logging.INFO)
1116     status = 0
1117
1118     request_id = arvados.util.new_request_id()
1119
1120     formatter = ArvPutLogFormatter(request_id)
1121     logging.getLogger('arvados').handlers[0].setFormatter(formatter)
1122
1123     if api_client is None:
1124         api_client = arvados.api('v1', request_id=request_id)
1125
1126     if install_sig_handlers:
1127         arv_cmd.install_signal_handlers()
1128
1129     # Trash arguments validation
1130     trash_at = None
1131     if args.trash_at is not None:
1132         # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
1133         # make sure the user provides a complete YYYY-MM-DD date.
1134         if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
1135             logger.error("--trash-at argument format invalid, use --help to see examples.")
1136             sys.exit(1)
1137         # Check if no time information was provided. In that case, assume end-of-day.
1138         if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
1139             args.trash_at += 'T23:59:59'
1140         try:
1141             trash_at = ciso8601.parse_datetime(args.trash_at)
1142         except:
1143             logger.error("--trash-at argument format invalid, use --help to see examples.")
1144             sys.exit(1)
1145         else:
1146             if trash_at.tzinfo is not None:
1147                 # Timezone aware datetime provided.
1148                 utcoffset = -trash_at.utcoffset()
1149             else:
1150                 # Timezone naive datetime provided. Assume is local.
1151                 if time.daylight:
1152                     utcoffset = datetime.timedelta(seconds=time.altzone)
1153                 else:
1154                     utcoffset = datetime.timedelta(seconds=time.timezone)
1155             # Convert to UTC timezone naive datetime.
1156             trash_at = trash_at.replace(tzinfo=None) + utcoffset
1157
1158         if trash_at <= datetime.datetime.utcnow():
1159             logger.error("--trash-at argument must be set in the future")
1160             sys.exit(1)
1161     if args.trash_after is not None:
1162         if args.trash_after < 1:
1163             logger.error("--trash-after argument must be >= 1")
1164             sys.exit(1)
1165         trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
1166
1167     # Determine the name to use
1168     if args.name:
1169         if args.stream or args.raw:
1170             logger.error("Cannot use --name with --stream or --raw")
1171             sys.exit(1)
1172         elif args.update_collection:
1173             logger.error("Cannot use --name with --update-collection")
1174             sys.exit(1)
1175         collection_name = args.name
1176     else:
1177         collection_name = "Saved at {} by {}@{}".format(
1178             datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
1179             pwd.getpwuid(os.getuid()).pw_name,
1180             socket.gethostname())
1181
1182     if args.project_uuid and (args.stream or args.raw):
1183         logger.error("Cannot use --project-uuid with --stream or --raw")
1184         sys.exit(1)
1185
1186     # Determine the parent project
1187     try:
1188         project_uuid = desired_project_uuid(api_client, args.project_uuid,
1189                                             args.retries)
1190     except (apiclient_errors.Error, ValueError) as error:
1191         logger.error(error)
1192         sys.exit(1)
1193
1194     if args.progress:
1195         reporter = progress_writer(human_progress)
1196     elif args.batch_progress:
1197         reporter = progress_writer(machine_progress)
1198     else:
1199         reporter = None
1200
1201     #  Split storage-classes argument
1202     storage_classes = None
1203     if args.storage_classes:
1204         storage_classes = args.storage_classes.strip().split(',')
1205         if len(storage_classes) > 1:
1206             logger.error("Multiple storage classes are not supported currently.")
1207             sys.exit(1)
1208
1209
1210     # Setup exclude regex from all the --exclude arguments provided
1211     name_patterns = []
1212     exclude_paths = []
1213     exclude_names = None
1214     if len(args.exclude) > 0:
1215         # We're supporting 2 kinds of exclusion patterns:
1216         # 1)   --exclude '*.jpg'    (file/dir name patterns, will only match
1217         #                            the name, wherever the file is on the tree)
1218         # 2.1) --exclude 'foo/bar'  (file/dir path patterns, will match the
1219         #                            entire path, and should be relative to
1220         #                            any input dir argument)
1221         # 2.2) --exclude './*.jpg'  (Special case for excluding files/dirs
1222         #                            placed directly underneath the input dir)
1223         for p in args.exclude:
1224             # Only relative paths patterns allowed
1225             if p.startswith(os.sep):
1226                 logger.error("Cannot use absolute paths with --exclude")
1227                 sys.exit(1)
1228             if os.path.dirname(p):
1229                 # We don't support of path patterns with '..'
1230                 p_parts = p.split(os.sep)
1231                 if '..' in p_parts:
1232                     logger.error(
1233                         "Cannot use path patterns that include or '..'")
1234                     sys.exit(1)
1235                 # Path search pattern
1236                 exclude_paths.append(p)
1237             else:
1238                 # Name-only search pattern
1239                 name_patterns.append(p)
1240         # For name only matching, we can combine all patterns into a single
1241         # regexp, for better performance.
1242         exclude_names = re.compile('|'.join(
1243             [fnmatch.translate(p) for p in name_patterns]
1244         )) if len(name_patterns) > 0 else None
1245         # Show the user the patterns to be used, just in case they weren't
1246         # specified inside quotes and got changed by the shell expansion.
1247         logger.info("Exclude patterns: {}".format(args.exclude))
1248
1249     # If this is used by a human, and there's at least one directory to be
1250     # uploaded, the expected bytes calculation can take a moment.
1251     if args.progress and any([os.path.isdir(f) for f in args.paths]):
1252         logger.info("Calculating upload size, this could take some time...")
1253     try:
1254         writer = ArvPutUploadJob(paths = args.paths,
1255                                  resume = args.resume,
1256                                  use_cache = args.use_cache,
1257                                  filename = args.filename,
1258                                  reporter = reporter,
1259                                  api_client = api_client,
1260                                  num_retries = args.retries,
1261                                  replication_desired = args.replication,
1262                                  put_threads = args.threads,
1263                                  name = collection_name,
1264                                  owner_uuid = project_uuid,
1265                                  ensure_unique_name = True,
1266                                  update_collection = args.update_collection,
1267                                  storage_classes=storage_classes,
1268                                  logger=logger,
1269                                  dry_run=args.dry_run,
1270                                  follow_links=args.follow_links,
1271                                  exclude_paths=exclude_paths,
1272                                  exclude_names=exclude_names,
1273                                  trash_at=trash_at)
1274     except ResumeCacheConflict:
1275         logger.error("\n".join([
1276             "arv-put: Another process is already uploading this data.",
1277             "         Use --no-cache if this is really what you want."]))
1278         sys.exit(1)
1279     except ResumeCacheInvalidError:
1280         logger.error("\n".join([
1281             "arv-put: Resume cache contains invalid signature: it may have expired",
1282             "         or been created with another Arvados user's credentials.",
1283             "         Switch user or use one of the following options to restart upload:",
1284             "         --no-resume to start a new resume cache.",
1285             "         --no-cache to disable resume cache."]))
1286         sys.exit(1)
1287     except (CollectionUpdateError, PathDoesNotExistError) as error:
1288         logger.error("\n".join([
1289             "arv-put: %s" % str(error)]))
1290         sys.exit(1)
1291     except ArvPutUploadIsPending:
1292         # Dry run check successful, return proper exit code.
1293         sys.exit(2)
1294     except ArvPutUploadNotPending:
1295         # No files pending for upload
1296         sys.exit(0)
1297
1298     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
1299         logger.warning("\n".join([
1300             "arv-put: Resuming previous upload from last checkpoint.",
1301             "         Use the --no-resume option to start over."]))
1302
1303     if not args.dry_run:
1304         writer.report_progress()
1305     output = None
1306     try:
1307         writer.start(save_collection=not(args.stream or args.raw))
1308     except arvados.errors.ApiError as error:
1309         logger.error("\n".join([
1310             "arv-put: %s" % str(error)]))
1311         sys.exit(1)
1312
1313     if args.progress:  # Print newline to split stderr from stdout for humans.
1314         logger.info("\n")
1315
1316     if args.stream:
1317         if args.normalize:
1318             output = writer.manifest_text(normalize=True)
1319         else:
1320             output = writer.manifest_text()
1321     elif args.raw:
1322         output = ','.join(writer.data_locators())
1323     elif writer.manifest_locator() is not None:
1324         try:
1325             expiration_notice = ""
1326             if writer.collection_trash_at() is not None:
1327                 # Get the local timezone-naive version, and log it with timezone information.
1328                 if time.daylight:
1329                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
1330                 else:
1331                     local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
1332                 expiration_notice = ". It will expire on {} {}.".format(
1333                     local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
1334             if args.update_collection:
1335                 logger.info(u"Collection updated: '{}'{}".format(
1336                     writer.collection_name(), expiration_notice))
1337             else:
1338                 logger.info(u"Collection saved as '{}'{}".format(
1339                     writer.collection_name(), expiration_notice))
1340             if args.portable_data_hash:
1341                 output = writer.portable_data_hash()
1342             else:
1343                 output = writer.manifest_locator()
1344         except apiclient_errors.Error as error:
1345             logger.error(
1346                 "arv-put: Error creating Collection on project: {}.".format(
1347                     error))
1348             status = 1
1349     else:
1350         status = 1
1351
1352     # Print the locator (uuid) of the new collection.
1353     if output is None:
1354         status = status or 1
1355     elif not args.silent:
1356         stdout.write(output)
1357         if not output.endswith('\n'):
1358             stdout.write('\n')
1359
1360     if install_sig_handlers:
1361         arv_cmd.restore_signal_handlers()
1362
1363     if status != 0:
1364         sys.exit(status)
1365
1366     # Success!
1367     return output
1368
1369
1370 if __name__ == '__main__':
1371     main()