21700: Install Bundler system-wide in Rails postinst
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: Apache-2.0
6
7 from __future__ import absolute_import
8 from __future__ import division
9 from future import standard_library
10 standard_library.install_aliases()
11 from builtins import str
12 from builtins import range
13 from functools import partial
14 import apiclient
15 import ciso8601
16 import datetime
17 import json
18 import logging
19 import multiprocessing
20 import os
21 import pwd
22 import random
23 import re
24 import select
25 import shutil
26 import signal
27 import subprocess
28 import sys
29 import tempfile
30 import time
31 import unittest
32 import uuid
33
34 from unittest import mock
35
36 import arvados
37 import arvados.commands.put as arv_put
38 from . import arvados_testutil as tutil
39
40 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
41 from . import run_test_server
42
43 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
44     CACHE_ARGSET = [
45         [],
46         ['/dev/null'],
47         ['/dev/null', '--filename', 'empty'],
48         ['/tmp']
49         ]
50
51     def tearDown(self):
52         super(ArvadosPutResumeCacheTest, self).tearDown()
53         try:
54             self.last_cache.destroy()
55         except AttributeError:
56             pass
57
58     def cache_path_from_arglist(self, arglist):
59         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
60
61     def test_cache_names_stable(self):
62         for argset in self.CACHE_ARGSET:
63             self.assertEqual(self.cache_path_from_arglist(argset),
64                               self.cache_path_from_arglist(argset),
65                               "cache name changed for {}".format(argset))
66
67     def test_cache_names_unique(self):
68         results = []
69         for argset in self.CACHE_ARGSET:
70             path = self.cache_path_from_arglist(argset)
71             self.assertNotIn(path, results)
72             results.append(path)
73
74     def test_cache_names_simple(self):
75         # The goal here is to make sure the filename doesn't use characters
76         # reserved by the filesystem.  Feel free to adjust this regexp as
77         # long as it still does that.
78         bad_chars = re.compile(r'[^-\.\w]')
79         for argset in self.CACHE_ARGSET:
80             path = self.cache_path_from_arglist(argset)
81             self.assertFalse(bad_chars.search(os.path.basename(path)),
82                              "path too exotic: {}".format(path))
83
84     def test_cache_names_ignore_argument_order(self):
85         self.assertEqual(
86             self.cache_path_from_arglist(['a', 'b', 'c']),
87             self.cache_path_from_arglist(['c', 'a', 'b']))
88         self.assertEqual(
89             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
90             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
91
92     def test_cache_names_differ_for_similar_paths(self):
93         # This test needs names at / that don't exist on the real filesystem.
94         self.assertNotEqual(
95             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
96             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
97
98     def test_cache_names_ignore_irrelevant_arguments(self):
99         # Workaround: parse_arguments bails on --filename with a directory.
100         path1 = self.cache_path_from_arglist(['/tmp'])
101         args = arv_put.parse_arguments(['/tmp'])
102         args.filename = 'tmp'
103         path2 = arv_put.ResumeCache.make_path(args)
104         self.assertEqual(path1, path2,
105                          "cache path considered --filename for directory")
106         self.assertEqual(
107             self.cache_path_from_arglist(['-']),
108             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
109             "cache path considered --max-manifest-depth for file")
110
111     def test_cache_names_treat_negative_manifest_depths_identically(self):
112         base_args = ['/tmp', '--max-manifest-depth']
113         self.assertEqual(
114             self.cache_path_from_arglist(base_args + ['-1']),
115             self.cache_path_from_arglist(base_args + ['-2']))
116
117     def test_cache_names_treat_stdin_consistently(self):
118         self.assertEqual(
119             self.cache_path_from_arglist(['-', '--filename', 'test']),
120             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
121
122     def test_cache_names_identical_for_synonymous_names(self):
123         self.assertEqual(
124             self.cache_path_from_arglist(['.']),
125             self.cache_path_from_arglist([os.path.realpath('.')]))
126         testdir = self.make_tmpdir()
127         looplink = os.path.join(testdir, 'loop')
128         os.symlink(testdir, looplink)
129         self.assertEqual(
130             self.cache_path_from_arglist([testdir]),
131             self.cache_path_from_arglist([looplink]))
132
133     def test_cache_names_different_by_api_host(self):
134         config = arvados.config.settings()
135         orig_host = config.get('ARVADOS_API_HOST')
136         try:
137             name1 = self.cache_path_from_arglist(['.'])
138             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
139             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
140         finally:
141             if orig_host is None:
142                 del config['ARVADOS_API_HOST']
143             else:
144                 config['ARVADOS_API_HOST'] = orig_host
145
146     @mock.patch('arvados.keep.KeepClient.head')
147     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
148         keep_client_head.side_effect = [True]
149         thing = {}
150         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
151         with tempfile.NamedTemporaryFile() as cachefile:
152             self.last_cache = arv_put.ResumeCache(cachefile.name)
153         self.last_cache.save(thing)
154         self.last_cache.close()
155         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
156         self.assertNotEqual(None, resume_cache)
157
158     @mock.patch('arvados.keep.KeepClient.head')
159     def test_resume_cache_with_finished_streams(self, keep_client_head):
160         keep_client_head.side_effect = [True]
161         thing = {}
162         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
163         with tempfile.NamedTemporaryFile() as cachefile:
164             self.last_cache = arv_put.ResumeCache(cachefile.name)
165         self.last_cache.save(thing)
166         self.last_cache.close()
167         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
168         self.assertNotEqual(None, resume_cache)
169
170     @mock.patch('arvados.keep.KeepClient.head')
171     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
172         keep_client_head.side_effect = Exception('Locator not found')
173         thing = {}
174         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
175         with tempfile.NamedTemporaryFile() as cachefile:
176             self.last_cache = arv_put.ResumeCache(cachefile.name)
177         self.last_cache.save(thing)
178         self.last_cache.close()
179         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
180         self.assertNotEqual(None, resume_cache)
181         resume_cache.check_cache()
182
183     def test_basic_cache_storage(self):
184         thing = ['test', 'list']
185         with tempfile.NamedTemporaryFile() as cachefile:
186             self.last_cache = arv_put.ResumeCache(cachefile.name)
187         self.last_cache.save(thing)
188         self.assertEqual(thing, self.last_cache.load())
189
190     def test_empty_cache(self):
191         with tempfile.NamedTemporaryFile() as cachefile:
192             cache = arv_put.ResumeCache(cachefile.name)
193         self.assertRaises(ValueError, cache.load)
194
195     def test_cache_persistent(self):
196         thing = ['test', 'list']
197         path = os.path.join(self.make_tmpdir(), 'cache')
198         cache = arv_put.ResumeCache(path)
199         cache.save(thing)
200         cache.close()
201         self.last_cache = arv_put.ResumeCache(path)
202         self.assertEqual(thing, self.last_cache.load())
203
204     def test_multiple_cache_writes(self):
205         thing = ['short', 'list']
206         with tempfile.NamedTemporaryFile() as cachefile:
207             self.last_cache = arv_put.ResumeCache(cachefile.name)
208         # Start writing an object longer than the one we test, to make
209         # sure the cache file gets truncated.
210         self.last_cache.save(['long', 'long', 'list'])
211         self.last_cache.save(thing)
212         self.assertEqual(thing, self.last_cache.load())
213
214     def test_cache_is_locked(self):
215         with tempfile.NamedTemporaryFile() as cachefile:
216             _ = arv_put.ResumeCache(cachefile.name)
217             self.assertRaises(arv_put.ResumeCacheConflict,
218                               arv_put.ResumeCache, cachefile.name)
219
220     def test_cache_stays_locked(self):
221         with tempfile.NamedTemporaryFile() as cachefile:
222             self.last_cache = arv_put.ResumeCache(cachefile.name)
223             path = cachefile.name
224         self.last_cache.save('test')
225         self.assertRaises(arv_put.ResumeCacheConflict,
226                           arv_put.ResumeCache, path)
227
228     def test_destroy_cache(self):
229         cachefile = tempfile.NamedTemporaryFile(delete=False)
230         try:
231             cache = arv_put.ResumeCache(cachefile.name)
232             cache.save('test')
233             cache.destroy()
234             try:
235                 arv_put.ResumeCache(cachefile.name)
236             except arv_put.ResumeCacheConflict:
237                 self.fail("could not load cache after destroying it")
238             self.assertRaises(ValueError, cache.load)
239         finally:
240             if os.path.exists(cachefile.name):
241                 os.unlink(cachefile.name)
242
243     def test_restart_cache(self):
244         path = os.path.join(self.make_tmpdir(), 'cache')
245         cache = arv_put.ResumeCache(path)
246         cache.save('test')
247         cache.restart()
248         self.assertRaises(ValueError, cache.load)
249         self.assertRaises(arv_put.ResumeCacheConflict,
250                           arv_put.ResumeCache, path)
251
252
253 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
254                           ArvadosBaseTestCase):
255
256     def setUp(self):
257         super(ArvPutUploadJobTest, self).setUp()
258         run_test_server.authorize_with('active')
259         # Temp files creation
260         self.tempdir = tempfile.mkdtemp()
261         subdir = os.path.join(self.tempdir, 'subdir')
262         os.mkdir(subdir)
263         data = "x" * 1024 # 1 KB
264         for i in range(1, 5):
265             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
266                 f.write(data * i)
267         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
268             f.write(data * 5)
269         # Large temp file for resume test
270         _, self.large_file_name = tempfile.mkstemp()
271         fileobj = open(self.large_file_name, 'w')
272         # Make sure to write just a little more than one block
273         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
274             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
275             fileobj.write(data)
276         fileobj.close()
277         # Temp dir containing small files to be repacked
278         self.small_files_dir = tempfile.mkdtemp()
279         data = 'y' * 1024 * 1024 # 1 MB
280         for i in range(1, 70):
281             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
282                 f.write(data + str(i))
283         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
284         # Temp dir to hold a symlink to other temp dir
285         self.tempdir_with_symlink = tempfile.mkdtemp()
286         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
287         os.symlink(os.path.join(self.tempdir, '1'),
288                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
289
290     def tearDown(self):
291         super(ArvPutUploadJobTest, self).tearDown()
292         shutil.rmtree(self.tempdir)
293         os.unlink(self.large_file_name)
294         shutil.rmtree(self.small_files_dir)
295         shutil.rmtree(self.tempdir_with_symlink)
296
297     def test_non_regular_files_are_ignored_except_symlinks_to_dirs(self):
298         def pfunc(x):
299             with open(x, 'w') as f:
300                 f.write('test')
301         fifo_filename = 'fifo-file'
302         fifo_path = os.path.join(self.tempdir_with_symlink, fifo_filename)
303         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
304         os.mkfifo(fifo_path)
305         producer = multiprocessing.Process(target=pfunc, args=(fifo_path,))
306         producer.start()
307         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
308         cwriter.start(save_collection=False)
309         if producer.exitcode is None:
310             # If the producer is still running, kill it. This should always be
311             # before any assertion that may fail.
312             producer.terminate()
313             producer.join(1)
314         self.assertIn('linkeddir', cwriter.manifest_text())
315         self.assertNotIn(fifo_filename, cwriter.manifest_text())
316
317     def test_symlinks_are_followed_by_default(self):
318         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
319         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
320         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
321         cwriter.start(save_collection=False)
322         self.assertIn('linkeddir', cwriter.manifest_text())
323         self.assertIn('linkedfile', cwriter.manifest_text())
324         cwriter.destroy_cache()
325
326     def test_symlinks_are_not_followed_when_requested(self):
327         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
328         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
329         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
330                                           follow_links=False)
331         cwriter.start(save_collection=False)
332         self.assertNotIn('linkeddir', cwriter.manifest_text())
333         self.assertNotIn('linkedfile', cwriter.manifest_text())
334         cwriter.destroy_cache()
335         # Check for bug #17800: passed symlinks should also be ignored.
336         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
337         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
338         cwriter.start(save_collection=False)
339         self.assertNotIn('linkeddir', cwriter.manifest_text())
340         cwriter.destroy_cache()
341
342     def test_no_empty_collection_saved(self):
343         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
344         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
345         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
346         cwriter.start(save_collection=True)
347         self.assertIsNone(cwriter.manifest_locator())
348         self.assertEqual('', cwriter.manifest_text())
349         cwriter.destroy_cache()
350
351     def test_passing_nonexistant_path_raise_exception(self):
352         uuid_str = str(uuid.uuid4())
353         with self.assertRaises(arv_put.PathDoesNotExistError):
354             arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
355
356     def test_writer_works_without_cache(self):
357         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
358         cwriter.start(save_collection=False)
359         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
360
361     def test_writer_works_with_cache(self):
362         with tempfile.NamedTemporaryFile() as f:
363             f.write(b'foo')
364             f.flush()
365             cwriter = arv_put.ArvPutUploadJob([f.name])
366             cwriter.start(save_collection=False)
367             self.assertEqual(0, cwriter.bytes_skipped)
368             self.assertEqual(3, cwriter.bytes_written)
369             # Don't destroy the cache, and start another upload
370             cwriter_new = arv_put.ArvPutUploadJob([f.name])
371             cwriter_new.start(save_collection=False)
372             cwriter_new.destroy_cache()
373             self.assertEqual(3, cwriter_new.bytes_skipped)
374             self.assertEqual(3, cwriter_new.bytes_written)
375
376     def make_progress_tester(self):
377         progression = []
378         def record_func(written, expected):
379             progression.append((written, expected))
380         return progression, record_func
381
382     def test_progress_reporting(self):
383         with tempfile.NamedTemporaryFile() as f:
384             f.write(b'foo')
385             f.flush()
386             for expect_count in (None, 8):
387                 progression, reporter = self.make_progress_tester()
388                 cwriter = arv_put.ArvPutUploadJob([f.name],
389                                                   reporter=reporter)
390                 cwriter.bytes_expected = expect_count
391                 cwriter.start(save_collection=False)
392                 cwriter.destroy_cache()
393                 self.assertIn((3, expect_count), progression)
394
395     def test_writer_upload_directory(self):
396         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
397         cwriter.start(save_collection=False)
398         cwriter.destroy_cache()
399         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
400
401     def test_resume_large_file_upload(self):
402         def wrapped_write(*args, **kwargs):
403             data = args[1]
404             # Exit only on last block
405             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
406                 # Simulate a checkpoint before quitting. Ensure block commit.
407                 self.writer._update(final=True)
408                 raise SystemExit("Simulated error")
409             return self.arvfile_write(*args, **kwargs)
410
411         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
412                         autospec=True) as mocked_write:
413             mocked_write.side_effect = wrapped_write
414             writer = arv_put.ArvPutUploadJob([self.large_file_name],
415                                              replication_desired=1)
416             # We'll be accessing from inside the wrapper
417             self.writer = writer
418             with self.assertRaises(SystemExit):
419                 writer.start(save_collection=False)
420             # Confirm that the file was partially uploaded
421             self.assertGreater(writer.bytes_written, 0)
422             self.assertLess(writer.bytes_written,
423                             os.path.getsize(self.large_file_name))
424         # Retry the upload
425         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
426                                           replication_desired=1)
427         writer2.start(save_collection=False)
428         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
429                          os.path.getsize(self.large_file_name))
430         writer2.destroy_cache()
431         del(self.writer)
432
433     # Test for bug #11002
434     def test_graceful_exit_while_repacking_small_blocks(self):
435         def wrapped_commit(*args, **kwargs):
436             raise SystemExit("Simulated error")
437
438         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
439                         autospec=True) as mocked_commit:
440             mocked_commit.side_effect = wrapped_commit
441             # Upload a little more than 1 block, wrapped_commit will make the first block
442             # commit to fail.
443             # arv-put should not exit with an exception by trying to commit the collection
444             # as it's in an inconsistent state.
445             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
446                                              replication_desired=1)
447             try:
448                 with self.assertRaises(SystemExit):
449                     writer.start(save_collection=False)
450             except arvados.arvfile.UnownedBlockError:
451                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
452         writer.destroy_cache()
453
454     def test_no_resume_when_asked(self):
455         def wrapped_write(*args, **kwargs):
456             data = args[1]
457             # Exit only on last block
458             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
459                 # Simulate a checkpoint before quitting.
460                 self.writer._update()
461                 raise SystemExit("Simulated error")
462             return self.arvfile_write(*args, **kwargs)
463
464         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
465                         autospec=True) as mocked_write:
466             mocked_write.side_effect = wrapped_write
467             writer = arv_put.ArvPutUploadJob([self.large_file_name],
468                                              replication_desired=1)
469             # We'll be accessing from inside the wrapper
470             self.writer = writer
471             with self.assertRaises(SystemExit):
472                 writer.start(save_collection=False)
473             # Confirm that the file was partially uploaded
474             self.assertGreater(writer.bytes_written, 0)
475             self.assertLess(writer.bytes_written,
476                             os.path.getsize(self.large_file_name))
477         # Retry the upload, this time without resume
478         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
479                                           replication_desired=1,
480                                           resume=False)
481         writer2.start(save_collection=False)
482         self.assertEqual(writer2.bytes_skipped, 0)
483         self.assertEqual(writer2.bytes_written,
484                          os.path.getsize(self.large_file_name))
485         writer2.destroy_cache()
486         del(self.writer)
487
488     def test_no_resume_when_no_cache(self):
489         def wrapped_write(*args, **kwargs):
490             data = args[1]
491             # Exit only on last block
492             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
493                 # Simulate a checkpoint before quitting.
494                 self.writer._update()
495                 raise SystemExit("Simulated error")
496             return self.arvfile_write(*args, **kwargs)
497
498         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
499                         autospec=True) as mocked_write:
500             mocked_write.side_effect = wrapped_write
501             writer = arv_put.ArvPutUploadJob([self.large_file_name],
502                                              replication_desired=1)
503             # We'll be accessing from inside the wrapper
504             self.writer = writer
505             with self.assertRaises(SystemExit):
506                 writer.start(save_collection=False)
507             # Confirm that the file was partially uploaded
508             self.assertGreater(writer.bytes_written, 0)
509             self.assertLess(writer.bytes_written,
510                             os.path.getsize(self.large_file_name))
511         # Retry the upload, this time without cache usage
512         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
513                                           replication_desired=1,
514                                           resume=False,
515                                           use_cache=False)
516         writer2.start(save_collection=False)
517         self.assertEqual(writer2.bytes_skipped, 0)
518         self.assertEqual(writer2.bytes_written,
519                          os.path.getsize(self.large_file_name))
520         writer2.destroy_cache()
521         del(self.writer)
522
523     def test_dry_run_feature(self):
524         def wrapped_write(*args, **kwargs):
525             data = args[1]
526             # Exit only on last block
527             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
528                 # Simulate a checkpoint before quitting.
529                 self.writer._update()
530                 raise SystemExit("Simulated error")
531             return self.arvfile_write(*args, **kwargs)
532
533         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
534                         autospec=True) as mocked_write:
535             mocked_write.side_effect = wrapped_write
536             writer = arv_put.ArvPutUploadJob([self.large_file_name],
537                                              replication_desired=1)
538             # We'll be accessing from inside the wrapper
539             self.writer = writer
540             with self.assertRaises(SystemExit):
541                 writer.start(save_collection=False)
542             # Confirm that the file was partially uploaded
543             self.assertGreater(writer.bytes_written, 0)
544             self.assertLess(writer.bytes_written,
545                             os.path.getsize(self.large_file_name))
546         with self.assertRaises(arv_put.ArvPutUploadIsPending):
547             # Retry the upload using dry_run to check if there is a pending upload
548             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
549                                               replication_desired=1,
550                                               dry_run=True)
551         # Complete the pending upload
552         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
553                                           replication_desired=1)
554         writer3.start(save_collection=False)
555         with self.assertRaises(arv_put.ArvPutUploadNotPending):
556             # Confirm there's no pending upload with dry_run=True
557             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
558                                               replication_desired=1,
559                                               dry_run=True)
560         # Test obvious cases
561         with self.assertRaises(arv_put.ArvPutUploadIsPending):
562             arv_put.ArvPutUploadJob([self.large_file_name],
563                                     replication_desired=1,
564                                     dry_run=True,
565                                     resume=False,
566                                     use_cache=False)
567         with self.assertRaises(arv_put.ArvPutUploadIsPending):
568             arv_put.ArvPutUploadJob([self.large_file_name],
569                                     replication_desired=1,
570                                     dry_run=True,
571                                     resume=False)
572         del(self.writer)
573
574 class CachedManifestValidationTest(ArvadosBaseTestCase):
575     class MockedPut(arv_put.ArvPutUploadJob):
576         def __init__(self, cached_manifest=None):
577             self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
578             self._state['manifest'] = cached_manifest
579             self._api_client = mock.MagicMock()
580             self.logger = mock.MagicMock()
581             self.num_retries = 1
582
583     def datetime_to_hex(self, dt):
584         return hex(int(time.mktime(dt.timetuple())))[2:]
585
586     def setUp(self):
587         super(CachedManifestValidationTest, self).setUp()
588         self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
589         self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
590         self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
591
592     def test_empty_cached_manifest_is_valid(self):
593         put_mock = self.MockedPut()
594         self.assertEqual(None, put_mock._state.get('manifest'))
595         self.assertTrue(put_mock._cached_manifest_valid())
596         put_mock._state['manifest'] = ''
597         self.assertTrue(put_mock._cached_manifest_valid())
598
599     def test_signature_cases(self):
600         now = datetime.datetime.utcnow()
601         yesterday = now - datetime.timedelta(days=1)
602         lastweek = now - datetime.timedelta(days=7)
603         tomorrow = now + datetime.timedelta(days=1)
604         nextweek = now + datetime.timedelta(days=7)
605
606         def mocked_head(blocks={}, loc=None):
607             blk = loc.split('+', 1)[0]
608             if blocks.get(blk):
609                 return True
610             raise arvados.errors.KeepRequestError("mocked error - block invalid")
611
612         # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
613         cases = [
614             # All expired, reset cache - OK
615             (yesterday, lastweek, False, False, True),
616             (lastweek, yesterday, False, False, True),
617             # All non-expired valid blocks - OK
618             (tomorrow, nextweek, True, True, True),
619             (nextweek, tomorrow, True, True, True),
620             # All non-expired invalid blocks - Not OK
621             (tomorrow, nextweek, False, False, False),
622             (nextweek, tomorrow, False, False, False),
623             # One non-expired valid block - OK
624             (tomorrow, yesterday, True, False, True),
625             (yesterday, tomorrow, False, True, True),
626             # One non-expired invalid block - Not OK
627             (tomorrow, yesterday, False, False, False),
628             (yesterday, tomorrow, False, False, False),
629         ]
630         for case in cases:
631             b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
632             head_responses = {
633                 self.block1: b1_valid,
634                 self.block2: b2_valid,
635             }
636             cached_manifest = self.template % (
637                 self.datetime_to_hex(b1_expiration),
638                 self.datetime_to_hex(b2_expiration),
639             )
640             arvput = self.MockedPut(cached_manifest)
641             with mock.patch('arvados.collection.KeepClient.head') as head_mock:
642                 head_mock.side_effect = partial(mocked_head, head_responses)
643                 self.assertEqual(outcome, arvput._cached_manifest_valid(),
644                     "Case '%s' should have produced outcome '%s'" % (case, outcome)
645                 )
646                 if b1_expiration > now or b2_expiration > now:
647                     # A HEAD request should have been done
648                     head_mock.assert_called_once()
649                 else:
650                     head_mock.assert_not_called()
651
652
653 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
654     TEST_SIZE = os.path.getsize(__file__)
655
656     def test_expected_bytes_for_file(self):
657         writer = arv_put.ArvPutUploadJob([__file__])
658         self.assertEqual(self.TEST_SIZE,
659                          writer.bytes_expected)
660
661     def test_expected_bytes_for_tree(self):
662         tree = self.make_tmpdir()
663         shutil.copyfile(__file__, os.path.join(tree, 'one'))
664         shutil.copyfile(__file__, os.path.join(tree, 'two'))
665
666         writer = arv_put.ArvPutUploadJob([tree])
667         self.assertEqual(self.TEST_SIZE * 2,
668                          writer.bytes_expected)
669         writer = arv_put.ArvPutUploadJob([tree, __file__])
670         self.assertEqual(self.TEST_SIZE * 3,
671                          writer.bytes_expected)
672
673     def test_expected_bytes_for_device(self):
674         writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
675         self.assertIsNone(writer.bytes_expected)
676         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
677         self.assertIsNone(writer.bytes_expected)
678
679
680 class ArvadosPutReportTest(ArvadosBaseTestCase):
681     def test_machine_progress(self):
682         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
683             expect = ": {} written {} total\n".format(
684                 count, -1 if (total is None) else total)
685             self.assertTrue(
686                 arv_put.machine_progress(count, total).endswith(expect))
687
688     def test_known_human_progress(self):
689         for count, total in [(0, 1), (2, 4), (45, 60)]:
690             expect = '{:.1%}'.format(1.0*count/total)
691             actual = arv_put.human_progress(count, total)
692             self.assertTrue(actual.startswith('\r'))
693             self.assertIn(expect, actual)
694
695     def test_unknown_human_progress(self):
696         for count in [1, 20, 300, 4000, 50000]:
697             self.assertTrue(re.search(r'\b{}\b'.format(count),
698                                       arv_put.human_progress(count, None)))
699
700
701 class ArvPutLogFormatterTest(ArvadosBaseTestCase):
702     matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
703
704     def setUp(self):
705         super(ArvPutLogFormatterTest, self).setUp()
706         self.stderr = tutil.StringIO()
707         self.loggingHandler = logging.StreamHandler(self.stderr)
708         self.loggingHandler.setFormatter(
709             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
710         self.logger = logging.getLogger()
711         self.logger.addHandler(self.loggingHandler)
712         self.logger.setLevel(logging.DEBUG)
713
714     def tearDown(self):
715         self.logger.removeHandler(self.loggingHandler)
716         self.stderr.close()
717         self.stderr = None
718         super(ArvPutLogFormatterTest, self).tearDown()
719
720     def test_request_id_logged_only_once_on_error(self):
721         self.logger.error('Ooops, something bad happened.')
722         self.logger.error('Another bad thing just happened.')
723         log_lines = self.stderr.getvalue().split('\n')[:-1]
724         self.assertEqual(2, len(log_lines))
725         self.assertRegex(log_lines[0], self.matcher)
726         self.assertNotRegex(log_lines[1], self.matcher)
727
728     def test_request_id_logged_only_once_on_debug(self):
729         self.logger.debug('This is just a debug message.')
730         self.logger.debug('Another message, move along.')
731         log_lines = self.stderr.getvalue().split('\n')[:-1]
732         self.assertEqual(2, len(log_lines))
733         self.assertRegex(log_lines[0], self.matcher)
734         self.assertNotRegex(log_lines[1], self.matcher)
735
736     def test_request_id_not_logged_on_info(self):
737         self.logger.info('This should be a useful message')
738         log_lines = self.stderr.getvalue().split('\n')[:-1]
739         self.assertEqual(1, len(log_lines))
740         self.assertNotRegex(log_lines[0], self.matcher)
741
742 class ArvadosPutTest(run_test_server.TestCaseWithServers,
743                      ArvadosBaseTestCase,
744                      tutil.VersionChecker):
745     MAIN_SERVER = {}
746     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
747
748     def call_main_with_args(self, args):
749         self.main_stdout.seek(0, 0)
750         self.main_stdout.truncate(0)
751         self.main_stderr.seek(0, 0)
752         self.main_stderr.truncate(0)
753         return arv_put.main(args, self.main_stdout, self.main_stderr)
754
755     def call_main_on_test_file(self, args=[]):
756         with self.make_test_file() as testfile:
757             path = testfile.name
758             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
759         self.assertTrue(
760             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
761                                         '098f6bcd4621d373cade4e832627b4f6')),
762             "did not find file stream in Keep store")
763
764     def setUp(self):
765         super(ArvadosPutTest, self).setUp()
766         run_test_server.authorize_with('active')
767         arv_put.api_client = None
768         self.main_stdout = tutil.StringIO()
769         self.main_stderr = tutil.StringIO()
770         self.loggingHandler = logging.StreamHandler(self.main_stderr)
771         self.loggingHandler.setFormatter(
772             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
773         logging.getLogger().addHandler(self.loggingHandler)
774
775     def tearDown(self):
776         logging.getLogger().removeHandler(self.loggingHandler)
777         for outbuf in ['main_stdout', 'main_stderr']:
778             if hasattr(self, outbuf):
779                 getattr(self, outbuf).close()
780                 delattr(self, outbuf)
781         super(ArvadosPutTest, self).tearDown()
782
783     def test_version_argument(self):
784         with tutil.redirected_streams(
785                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
786             with self.assertRaises(SystemExit):
787                 self.call_main_with_args(['--version'])
788         self.assertVersionOutput(out, err)
789
790     def test_simple_file_put(self):
791         self.call_main_on_test_file()
792
793     def test_put_with_unwriteable_cache_dir(self):
794         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
795         cachedir = self.make_tmpdir()
796         os.chmod(cachedir, 0o0)
797         arv_put.ResumeCache.CACHE_DIR = cachedir
798         try:
799             self.call_main_on_test_file()
800         finally:
801             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
802             os.chmod(cachedir, 0o700)
803
804     def test_put_with_unwritable_cache_subdir(self):
805         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
806         cachedir = self.make_tmpdir()
807         os.chmod(cachedir, 0o0)
808         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
809         try:
810             self.call_main_on_test_file()
811         finally:
812             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
813             os.chmod(cachedir, 0o700)
814
815     def test_put_block_replication(self):
816         self.call_main_on_test_file()
817         arv_put.api_client = None
818         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
819             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
820             self.call_main_on_test_file(['--replication', '1'])
821             self.call_main_on_test_file(['--replication', '4'])
822             self.call_main_on_test_file(['--replication', '5'])
823             self.assertEqual(
824                 [x[-1].get('copies') for x in put_mock.call_args_list],
825                 [1, 4, 5])
826
827     def test_normalize(self):
828         testfile1 = self.make_test_file()
829         testfile2 = self.make_test_file()
830         test_paths = [testfile1.name, testfile2.name]
831         # Reverse-sort the paths, so normalization must change their order.
832         test_paths.sort(reverse=True)
833         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
834                                  test_paths)
835         manifest = self.main_stdout.getvalue()
836         # Assert the second file we specified appears first in the manifest.
837         file_indices = [manifest.find(':' + os.path.basename(path))
838                         for path in test_paths]
839         self.assertGreater(*file_indices)
840
841     def test_error_name_without_collection(self):
842         self.assertRaises(SystemExit, self.call_main_with_args,
843                           ['--name', 'test without Collection',
844                            '--stream', '/dev/null'])
845
846     def test_error_when_project_not_found(self):
847         self.assertRaises(SystemExit,
848                           self.call_main_with_args,
849                           ['--project-uuid', self.Z_UUID])
850
851     def test_error_bad_project_uuid(self):
852         self.assertRaises(SystemExit,
853                           self.call_main_with_args,
854                           ['--project-uuid', self.Z_UUID, '--stream'])
855
856     def test_error_when_excluding_absolute_path(self):
857         tmpdir = self.make_tmpdir()
858         self.assertRaises(SystemExit,
859                           self.call_main_with_args,
860                           ['--exclude', '/some/absolute/path/*',
861                            tmpdir])
862
863     def test_api_error_handling(self):
864         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
865         coll_save_mock.side_effect = arvados.errors.ApiError(
866             fake_httplib2_response(403), b'{}')
867         with mock.patch('arvados.collection.Collection.save_new',
868                         new=coll_save_mock):
869             with self.assertRaises(SystemExit) as exc_test:
870                 self.call_main_with_args(['/dev/null'])
871             self.assertLess(0, exc_test.exception.args[0])
872             self.assertLess(0, coll_save_mock.call_count)
873             self.assertEqual("", self.main_stdout.getvalue())
874
875     def test_request_id_logging_on_error(self):
876         matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
877         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
878         coll_save_mock.side_effect = arvados.errors.ApiError(
879             fake_httplib2_response(403), b'{}')
880         with mock.patch('arvados.collection.Collection.save_new',
881                         new=coll_save_mock):
882             with self.assertRaises(SystemExit):
883                 self.call_main_with_args(['/dev/null'])
884             self.assertRegex(
885                 self.main_stderr.getvalue(), matcher)
886
887
888 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
889                             ArvadosBaseTestCase):
890     MAIN_SERVER = {}
891     KEEP_SERVER = {'blob_signing': True}
892     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
893
894     @classmethod
895     def setUpClass(cls):
896         super(ArvPutIntegrationTest, cls).setUpClass()
897         cls.ENVIRON = os.environ.copy()
898         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
899
900     def datetime_to_hex(self, dt):
901         return hex(int(time.mktime(dt.timetuple())))[2:]
902
903     def setUp(self):
904         super(ArvPutIntegrationTest, self).setUp()
905         arv_put.api_client = None
906
907     def authorize_with(self, token_name):
908         run_test_server.authorize_with(token_name)
909         for v in ["ARVADOS_API_HOST",
910                   "ARVADOS_API_HOST_INSECURE",
911                   "ARVADOS_API_TOKEN"]:
912             self.ENVIRON[v] = arvados.config.settings()[v]
913         arv_put.api_client = arvados.api('v1')
914
915     def current_user(self):
916         return arv_put.api_client.users().current().execute()
917
918     def test_check_real_project_found(self):
919         self.authorize_with('active')
920         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
921                         "did not correctly find test fixture project")
922
923     def test_check_error_finding_nonexistent_uuid(self):
924         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
925         self.authorize_with('active')
926         try:
927             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
928                                                   0)
929         except ValueError as error:
930             self.assertIn(BAD_UUID, str(error))
931         else:
932             self.assertFalse(result, "incorrectly found nonexistent project")
933
934     def test_check_error_finding_nonexistent_project(self):
935         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
936         self.authorize_with('active')
937         with self.assertRaises(apiclient.errors.HttpError):
938             arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
939                                                   0)
940
941     def test_short_put_from_stdin(self):
942         # Have to run this as an integration test since arv-put can't
943         # read from the tests' stdin.
944         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
945         # case, because the /proc entry is already gone by the time it tries.
946         pipe = subprocess.Popen(
947             [sys.executable, arv_put.__file__, '--stream'],
948             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
949             stderr=subprocess.STDOUT, env=self.ENVIRON)
950         pipe.stdin.write(b'stdin test\xa6\n')
951         pipe.stdin.close()
952         deadline = time.time() + 5
953         while (pipe.poll() is None) and (time.time() < deadline):
954             time.sleep(.1)
955         returncode = pipe.poll()
956         if returncode is None:
957             pipe.terminate()
958             self.fail("arv-put did not PUT from stdin within 5 seconds")
959         elif returncode != 0:
960             sys.stdout.write(pipe.stdout.read())
961             self.fail("arv-put returned exit code {}".format(returncode))
962         self.assertIn('1cb671b355a0c23d5d1c61d59cdb1b2b+12',
963                       pipe.stdout.read().decode())
964
965     def test_sigint_logs_request_id(self):
966         # Start arv-put, give it a chance to start up, send SIGINT,
967         # and check that its output includes the X-Request-Id.
968         input_stream = subprocess.Popen(
969             ['sleep', '10'],
970             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
971         pipe = subprocess.Popen(
972             [sys.executable, arv_put.__file__, '--stream'],
973             stdin=input_stream.stdout, stdout=subprocess.PIPE,
974             stderr=subprocess.STDOUT, env=self.ENVIRON)
975         # Wait for arv-put child process to print something (i.e., a
976         # log message) so we know its signal handler is installed.
977         select.select([pipe.stdout], [], [], 10)
978         pipe.send_signal(signal.SIGINT)
979         deadline = time.time() + 5
980         while (pipe.poll() is None) and (time.time() < deadline):
981             time.sleep(.1)
982         returncode = pipe.poll()
983         input_stream.terminate()
984         if returncode is None:
985             pipe.terminate()
986             self.fail("arv-put did not exit within 5 seconds")
987         self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
988
989     def test_ArvPutSignedManifest(self):
990         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
991         # the newly created manifest from the API server, testing to confirm
992         # that the block locators in the returned manifest are signed.
993         self.authorize_with('active')
994
995         # Before doing anything, demonstrate that the collection
996         # we're about to create is not present in our test fixture.
997         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
998         with self.assertRaises(apiclient.errors.HttpError):
999             arv_put.api_client.collections().get(
1000                 uuid=manifest_uuid).execute()
1001
1002         datadir = self.make_tmpdir()
1003         with open(os.path.join(datadir, "foo"), "w") as f:
1004             f.write("The quick brown fox jumped over the lazy dog")
1005         p = subprocess.Popen([sys.executable, arv_put.__file__,
1006                               os.path.join(datadir, 'foo')],
1007                              stdout=subprocess.PIPE,
1008                              stderr=subprocess.PIPE,
1009                              env=self.ENVIRON)
1010         (_, err) = p.communicate()
1011         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
1012         self.assertEqual(p.returncode, 0)
1013
1014         # The manifest text stored in the API server under the same
1015         # manifest UUID must use signed locators.
1016         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
1017         self.assertRegex(
1018             c['manifest_text'],
1019             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
1020
1021         os.remove(os.path.join(datadir, "foo"))
1022         os.rmdir(datadir)
1023
1024     def run_and_find_collection(self, text, extra_args=[]):
1025         self.authorize_with('active')
1026         pipe = subprocess.Popen(
1027             [sys.executable, arv_put.__file__] + extra_args,
1028             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1029             stderr=subprocess.PIPE, env=self.ENVIRON)
1030         stdout, stderr = pipe.communicate(text.encode())
1031         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
1032         search_key = ('portable_data_hash'
1033                       if '--portable-data-hash' in extra_args else 'uuid')
1034         collection_list = arvados.api('v1').collections().list(
1035             filters=[[search_key, '=', stdout.decode().strip()]]
1036         ).execute().get('items', [])
1037         self.assertEqual(1, len(collection_list))
1038         return collection_list[0]
1039
1040     def test_all_expired_signatures_invalidates_cache(self):
1041         self.authorize_with('active')
1042         tmpdir = self.make_tmpdir()
1043         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1044             f.write('foo')
1045         # Upload a directory and get the cache file name
1046         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1047                              stdout=subprocess.PIPE,
1048                              stderr=subprocess.PIPE,
1049                              env=self.ENVIRON)
1050         (_, err) = p.communicate()
1051         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1052         self.assertEqual(p.returncode, 0)
1053         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1054                                    err.decode()).groups()[0]
1055         self.assertTrue(os.path.isfile(cache_filepath))
1056         # Load the cache file contents and modify the manifest to simulate
1057         # an expired access token
1058         with open(cache_filepath, 'r') as c:
1059             cache = json.load(c)
1060         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1061         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1062         cache['manifest'] = re.sub(
1063             r'\@.*? ',
1064             "@{} ".format(self.datetime_to_hex(a_month_ago)),
1065             cache['manifest'])
1066         with open(cache_filepath, 'w') as c:
1067             c.write(json.dumps(cache))
1068         # Re-run the upload and expect to get an invalid cache message
1069         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1070                              stdout=subprocess.PIPE,
1071                              stderr=subprocess.PIPE,
1072                              env=self.ENVIRON)
1073         (_, err) = p.communicate()
1074         self.assertRegex(
1075             err.decode(),
1076             r'INFO: Cache expired, starting from scratch.*')
1077         self.assertEqual(p.returncode, 0)
1078
1079     def test_invalid_signature_in_cache(self):
1080         for batch_mode in [False, True]:
1081             self.authorize_with('active')
1082             tmpdir = self.make_tmpdir()
1083             with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1084                 f.write('foo')
1085             # Upload a directory and get the cache file name
1086             arv_put_args = [tmpdir]
1087             if batch_mode:
1088                 arv_put_args = ['--batch'] + arv_put_args
1089             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1090                                 stdout=subprocess.PIPE,
1091                                 stderr=subprocess.PIPE,
1092                                 env=self.ENVIRON)
1093             (_, err) = p.communicate()
1094             self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1095             self.assertEqual(p.returncode, 0)
1096             cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1097                                     err.decode()).groups()[0]
1098             self.assertTrue(os.path.isfile(cache_filepath))
1099             # Load the cache file contents and modify the manifest to simulate
1100             # an invalid access token
1101             with open(cache_filepath, 'r') as c:
1102                 cache = json.load(c)
1103             self.assertRegex(cache['manifest'], r'\+A\S+\@')
1104             cache['manifest'] = re.sub(
1105                 r'\+A.*\@',
1106                 "+Aabcdef0123456789abcdef0123456789abcdef01@",
1107                 cache['manifest'])
1108             with open(cache_filepath, 'w') as c:
1109                 c.write(json.dumps(cache))
1110             # Re-run the upload and expect to get an invalid cache message
1111             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1112                                 stdout=subprocess.PIPE,
1113                                 stderr=subprocess.PIPE,
1114                                 env=self.ENVIRON)
1115             (_, err) = p.communicate()
1116             if not batch_mode:
1117                 self.assertRegex(
1118                     err.decode(),
1119                     r'ERROR: arv-put: Resume cache contains invalid signature.*')
1120                 self.assertEqual(p.returncode, 1)
1121             else:
1122                 self.assertRegex(
1123                     err.decode(),
1124                     r'Invalid signatures on cache file \'.*\' while being run in \'batch mode\' -- continuing anyways.*')
1125                 self.assertEqual(p.returncode, 0)
1126
1127     def test_single_expired_signature_reuploads_file(self):
1128         self.authorize_with('active')
1129         tmpdir = self.make_tmpdir()
1130         with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
1131             f.write('foo')
1132         # Write a second file on its own subdir to force a new stream
1133         os.mkdir(os.path.join(tmpdir, 'bar'))
1134         with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
1135             f.write('bar')
1136         # Upload a directory and get the cache file name
1137         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1138                              stdout=subprocess.PIPE,
1139                              stderr=subprocess.PIPE,
1140                              env=self.ENVIRON)
1141         (_, err) = p.communicate()
1142         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1143         self.assertEqual(p.returncode, 0)
1144         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1145                                    err.decode()).groups()[0]
1146         self.assertTrue(os.path.isfile(cache_filepath))
1147         # Load the cache file contents and modify the manifest to simulate
1148         # an expired access token
1149         with open(cache_filepath, 'r') as c:
1150             cache = json.load(c)
1151         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1152         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1153         # Make one of the signatures appear to have expired
1154         cache['manifest'] = re.sub(
1155             r'\@.*? 3:3:barfile.txt',
1156             "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
1157             cache['manifest'])
1158         with open(cache_filepath, 'w') as c:
1159             c.write(json.dumps(cache))
1160         # Re-run the upload and expect to get an invalid cache message
1161         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1162                              stdout=subprocess.PIPE,
1163                              stderr=subprocess.PIPE,
1164                              env=self.ENVIRON)
1165         (_, err) = p.communicate()
1166         self.assertRegex(
1167             err.decode(),
1168             r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
1169         self.assertEqual(p.returncode, 0)
1170         # Confirm that the resulting cache is different from the last run.
1171         with open(cache_filepath, 'r') as c2:
1172             new_cache = json.load(c2)
1173         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
1174
1175     def test_put_collection_with_later_update(self):
1176         tmpdir = self.make_tmpdir()
1177         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1178             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1179         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1180         self.assertNotEqual(None, col['uuid'])
1181         # Add a new file to the directory
1182         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
1183             f.write('The quick brown fox jumped over the lazy dog')
1184         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
1185         self.assertEqual(col['uuid'], updated_col['uuid'])
1186         # Get the manifest and check that the new file is being included
1187         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
1188         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
1189
1190     def test_put_collection_with_utc_expiring_datetime(self):
1191         tmpdir = self.make_tmpdir()
1192         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
1193         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1194             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1195         col = self.run_and_find_collection(
1196             "",
1197             ['--no-progress', '--trash-at', trash_at, tmpdir])
1198         self.assertNotEqual(None, col['uuid'])
1199         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1200         self.assertEqual(ciso8601.parse_datetime(trash_at),
1201             ciso8601.parse_datetime(c['trash_at']))
1202
1203     def test_put_collection_with_timezone_aware_expiring_datetime(self):
1204         tmpdir = self.make_tmpdir()
1205         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
1206         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1207             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1208         col = self.run_and_find_collection(
1209             "",
1210             ['--no-progress', '--trash-at', trash_at, tmpdir])
1211         self.assertNotEqual(None, col['uuid'])
1212         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1213         self.assertEqual(
1214             ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
1215             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1216
1217     def test_put_collection_with_timezone_naive_expiring_datetime(self):
1218         tmpdir = self.make_tmpdir()
1219         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
1220         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1221             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1222         col = self.run_and_find_collection(
1223             "",
1224             ['--no-progress', '--trash-at', trash_at, tmpdir])
1225         self.assertNotEqual(None, col['uuid'])
1226         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1227         if time.daylight:
1228             offset = datetime.timedelta(seconds=time.altzone)
1229         else:
1230             offset = datetime.timedelta(seconds=time.timezone)
1231         self.assertEqual(
1232             ciso8601.parse_datetime(trash_at) + offset,
1233             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1234
1235     def test_put_collection_with_expiring_date_only(self):
1236         tmpdir = self.make_tmpdir()
1237         trash_at = '2140-01-01'
1238         end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
1239         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1240             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1241         col = self.run_and_find_collection(
1242             "",
1243             ['--no-progress', '--trash-at', trash_at, tmpdir])
1244         self.assertNotEqual(None, col['uuid'])
1245         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1246         if time.daylight:
1247             offset = datetime.timedelta(seconds=time.altzone)
1248         else:
1249             offset = datetime.timedelta(seconds=time.timezone)
1250         self.assertEqual(
1251             ciso8601.parse_datetime(trash_at) + end_of_day + offset,
1252             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1253
1254     def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
1255         cases = ['2100', '210010','2100-10', '2100-Oct']
1256         tmpdir = self.make_tmpdir()
1257         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1258             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1259         for test_datetime in cases:
1260             with self.assertRaises(AssertionError):
1261                 self.run_and_find_collection(
1262                     "",
1263                     ['--no-progress', '--trash-at', test_datetime, tmpdir])
1264
1265     def test_put_collection_with_relative_expiring_datetime(self):
1266         expire_after = 7
1267         dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1268         tmpdir = self.make_tmpdir()
1269         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1270             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1271         col = self.run_and_find_collection(
1272             "",
1273             ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1274         self.assertNotEqual(None, col['uuid'])
1275         dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1276         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1277         trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
1278         self.assertTrue(dt_before < trash_at)
1279         self.assertTrue(dt_after > trash_at)
1280
1281     def test_put_collection_with_invalid_relative_expiring_datetime(self):
1282         expire_after = 0 # Must be >= 1
1283         tmpdir = self.make_tmpdir()
1284         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1285             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1286         with self.assertRaises(AssertionError):
1287             self.run_and_find_collection(
1288                 "",
1289                 ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1290
1291     def test_upload_directory_reference_without_trailing_slash(self):
1292         tmpdir1 = self.make_tmpdir()
1293         tmpdir2 = self.make_tmpdir()
1294         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1295             f.write('This is foo')
1296         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1297             f.write('This is not foo')
1298         # Upload one directory and one file
1299         col = self.run_and_find_collection("", ['--no-progress',
1300                                                 tmpdir1,
1301                                                 os.path.join(tmpdir2, 'bar')])
1302         self.assertNotEqual(None, col['uuid'])
1303         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1304         # Check that 'foo' was written inside a subcollection
1305         # OTOH, 'bar' should have been directly uploaded on the root collection
1306         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
1307
1308     def test_upload_directory_reference_with_trailing_slash(self):
1309         tmpdir1 = self.make_tmpdir()
1310         tmpdir2 = self.make_tmpdir()
1311         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1312             f.write('This is foo')
1313         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1314             f.write('This is not foo')
1315         # Upload one directory (with trailing slash) and one file
1316         col = self.run_and_find_collection("", ['--no-progress',
1317                                                 tmpdir1 + os.sep,
1318                                                 os.path.join(tmpdir2, 'bar')])
1319         self.assertNotEqual(None, col['uuid'])
1320         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1321         # Check that 'foo' and 'bar' were written at the same level
1322         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
1323
1324     def test_put_collection_with_high_redundancy(self):
1325         # Write empty data: we're not testing CollectionWriter, just
1326         # making sure collections.create tells the API server what our
1327         # desired replication level is.
1328         collection = self.run_and_find_collection("", ['--replication', '4'])
1329         self.assertEqual(4, collection['replication_desired'])
1330
1331     def test_put_collection_with_default_redundancy(self):
1332         collection = self.run_and_find_collection("")
1333         self.assertEqual(None, collection['replication_desired'])
1334
1335     def test_put_collection_with_unnamed_project_link(self):
1336         link = self.run_and_find_collection(
1337             "Test unnamed collection",
1338             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
1339         username = pwd.getpwuid(os.getuid()).pw_name
1340         self.assertRegex(
1341             link['name'],
1342             r'^Saved at .* by {}@'.format(re.escape(username)))
1343
1344     def test_put_collection_with_name_and_no_project(self):
1345         link_name = 'Test Collection Link in home project'
1346         collection = self.run_and_find_collection(
1347             "Test named collection in home project",
1348             ['--portable-data-hash', '--name', link_name])
1349         self.assertEqual(link_name, collection['name'])
1350         my_user_uuid = self.current_user()['uuid']
1351         self.assertEqual(my_user_uuid, collection['owner_uuid'])
1352
1353     def test_put_collection_with_named_project_link(self):
1354         link_name = 'Test auto Collection Link'
1355         collection = self.run_and_find_collection("Test named collection",
1356                                       ['--portable-data-hash',
1357                                        '--name', link_name,
1358                                        '--project-uuid', self.PROJECT_UUID])
1359         self.assertEqual(link_name, collection['name'])
1360
1361     def test_put_collection_with_storage_classes_specified(self):
1362         collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
1363         self.assertEqual(len(collection['storage_classes_desired']), 1)
1364         self.assertEqual(collection['storage_classes_desired'][0], 'hot')
1365
1366     def test_put_collection_with_multiple_storage_classes_specified(self):
1367         collection = self.run_and_find_collection("", ['--storage-classes', ' foo, bar  ,baz'])
1368         self.assertEqual(len(collection['storage_classes_desired']), 3)
1369         self.assertEqual(collection['storage_classes_desired'], ['foo', 'bar', 'baz'])
1370
1371     def test_put_collection_without_storage_classes_specified(self):
1372         collection = self.run_and_find_collection("")
1373         self.assertEqual(len(collection['storage_classes_desired']), 1)
1374         self.assertEqual(collection['storage_classes_desired'][0], 'default')
1375
1376     def test_exclude_filename_pattern(self):
1377         tmpdir = self.make_tmpdir()
1378         tmpsubdir = os.path.join(tmpdir, 'subdir')
1379         os.mkdir(tmpsubdir)
1380         for fname in ['file1', 'file2', 'file3']:
1381             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1382                 f.write("This is %s" % fname)
1383             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1384                 f.write("This is %s" % fname)
1385         col = self.run_and_find_collection("", ['--no-progress',
1386                                                 '--exclude', '*2.txt',
1387                                                 '--exclude', 'file3.*',
1388                                                  tmpdir])
1389         self.assertNotEqual(None, col['uuid'])
1390         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1391         # None of the file2.txt & file3.txt should have been uploaded
1392         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
1393         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
1394         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
1395
1396     def test_exclude_filepath_pattern(self):
1397         tmpdir = self.make_tmpdir()
1398         tmpsubdir = os.path.join(tmpdir, 'subdir')
1399         os.mkdir(tmpsubdir)
1400         for fname in ['file1', 'file2', 'file3']:
1401             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1402                 f.write("This is %s" % fname)
1403             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1404                 f.write("This is %s" % fname)
1405         col = self.run_and_find_collection("", ['--no-progress',
1406                                                 '--exclude', 'subdir/*2.txt',
1407                                                 '--exclude', './file1.*',
1408                                                  tmpdir])
1409         self.assertNotEqual(None, col['uuid'])
1410         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1411         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1412         self.assertNotRegex(c['manifest_text'],
1413                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1414         self.assertNotRegex(c['manifest_text'],
1415                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1416         self.assertRegex(c['manifest_text'],
1417                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1418         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1419
1420     def test_unicode_on_filename(self):
1421         tmpdir = self.make_tmpdir()
1422         fname = u"iā¤arvados.txt"
1423         with open(os.path.join(tmpdir, fname), 'w') as f:
1424             f.write("This is a unicode named file")
1425         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1426         self.assertNotEqual(None, col['uuid'])
1427         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1428         self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
1429
1430     def test_silent_mode_no_errors(self):
1431         self.authorize_with('active')
1432         tmpdir = self.make_tmpdir()
1433         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1434             f.write('hello world')
1435         pipe = subprocess.Popen(
1436             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1437             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1438             stderr=subprocess.PIPE, env=self.ENVIRON)
1439         stdout, stderr = pipe.communicate()
1440         # No console output should occur on normal operations
1441         self.assertNotRegex(stderr.decode(), r'.+')
1442         self.assertNotRegex(stdout.decode(), r'.+')
1443
1444     def test_silent_mode_does_not_avoid_error_messages(self):
1445         self.authorize_with('active')
1446         pipe = subprocess.Popen(
1447             [sys.executable, arv_put.__file__] + ['--silent',
1448                                                   '/path/not/existant'],
1449             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1450             stderr=subprocess.PIPE, env=self.ENVIRON)
1451         stdout, stderr = pipe.communicate()
1452         # Error message should be displayed when errors happen
1453         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1454         self.assertNotRegex(stdout.decode(), r'.+')
1455
1456
1457 if __name__ == '__main__':
1458     unittest.main()