Merge branch '15397-remove-obsolete-apis'
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: Apache-2.0
6
7 import apiclient
8 import ciso8601
9 import datetime
10 import json
11 import logging
12 import multiprocessing
13 import os
14 import pwd
15 import random
16 import re
17 import select
18 import shutil
19 import signal
20 import subprocess
21 import sys
22 import tempfile
23 import time
24 import unittest
25 import uuid
26
27 from functools import partial
28 from unittest import mock
29
30 import arvados
31 import arvados.commands.put as arv_put
32 from . import arvados_testutil as tutil
33
34 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
35 from . import run_test_server
36
37 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
38     CACHE_ARGSET = [
39         [],
40         ['/dev/null'],
41         ['/dev/null', '--filename', 'empty'],
42         ['/tmp']
43         ]
44
45     def tearDown(self):
46         super(ArvadosPutResumeCacheTest, self).tearDown()
47         try:
48             self.last_cache.destroy()
49         except AttributeError:
50             pass
51
52     def cache_path_from_arglist(self, arglist):
53         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
54
55     def test_cache_names_stable(self):
56         for argset in self.CACHE_ARGSET:
57             self.assertEqual(self.cache_path_from_arglist(argset),
58                               self.cache_path_from_arglist(argset),
59                               "cache name changed for {}".format(argset))
60
61     def test_cache_names_unique(self):
62         results = []
63         for argset in self.CACHE_ARGSET:
64             path = self.cache_path_from_arglist(argset)
65             self.assertNotIn(path, results)
66             results.append(path)
67
68     def test_cache_names_simple(self):
69         # The goal here is to make sure the filename doesn't use characters
70         # reserved by the filesystem.  Feel free to adjust this regexp as
71         # long as it still does that.
72         bad_chars = re.compile(r'[^-\.\w]')
73         for argset in self.CACHE_ARGSET:
74             path = self.cache_path_from_arglist(argset)
75             self.assertFalse(bad_chars.search(os.path.basename(path)),
76                              "path too exotic: {}".format(path))
77
78     def test_cache_names_ignore_argument_order(self):
79         self.assertEqual(
80             self.cache_path_from_arglist(['a', 'b', 'c']),
81             self.cache_path_from_arglist(['c', 'a', 'b']))
82         self.assertEqual(
83             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
84             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
85
86     def test_cache_names_differ_for_similar_paths(self):
87         # This test needs names at / that don't exist on the real filesystem.
88         self.assertNotEqual(
89             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
90             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
91
92     def test_cache_names_ignore_irrelevant_arguments(self):
93         # Workaround: parse_arguments bails on --filename with a directory.
94         path1 = self.cache_path_from_arglist(['/tmp'])
95         args = arv_put.parse_arguments(['/tmp'])
96         args.filename = 'tmp'
97         path2 = arv_put.ResumeCache.make_path(args)
98         self.assertEqual(path1, path2,
99                          "cache path considered --filename for directory")
100         self.assertEqual(
101             self.cache_path_from_arglist(['-']),
102             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
103             "cache path considered --max-manifest-depth for file")
104
105     def test_cache_names_treat_negative_manifest_depths_identically(self):
106         base_args = ['/tmp', '--max-manifest-depth']
107         self.assertEqual(
108             self.cache_path_from_arglist(base_args + ['-1']),
109             self.cache_path_from_arglist(base_args + ['-2']))
110
111     def test_cache_names_treat_stdin_consistently(self):
112         self.assertEqual(
113             self.cache_path_from_arglist(['-', '--filename', 'test']),
114             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
115
116     def test_cache_names_identical_for_synonymous_names(self):
117         self.assertEqual(
118             self.cache_path_from_arglist(['.']),
119             self.cache_path_from_arglist([os.path.realpath('.')]))
120         testdir = self.make_tmpdir()
121         looplink = os.path.join(testdir, 'loop')
122         os.symlink(testdir, looplink)
123         self.assertEqual(
124             self.cache_path_from_arglist([testdir]),
125             self.cache_path_from_arglist([looplink]))
126
127     def test_cache_names_different_by_api_host(self):
128         config = arvados.config.settings()
129         orig_host = config.get('ARVADOS_API_HOST')
130         try:
131             name1 = self.cache_path_from_arglist(['.'])
132             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
133             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
134         finally:
135             if orig_host is None:
136                 del config['ARVADOS_API_HOST']
137             else:
138                 config['ARVADOS_API_HOST'] = orig_host
139
140     @mock.patch('arvados.keep.KeepClient.head')
141     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
142         keep_client_head.side_effect = [True]
143         thing = {}
144         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
145         with tempfile.NamedTemporaryFile() as cachefile:
146             self.last_cache = arv_put.ResumeCache(cachefile.name)
147         self.last_cache.save(thing)
148         self.last_cache.close()
149         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
150         self.assertNotEqual(None, resume_cache)
151
152     @mock.patch('arvados.keep.KeepClient.head')
153     def test_resume_cache_with_finished_streams(self, keep_client_head):
154         keep_client_head.side_effect = [True]
155         thing = {}
156         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
157         with tempfile.NamedTemporaryFile() as cachefile:
158             self.last_cache = arv_put.ResumeCache(cachefile.name)
159         self.last_cache.save(thing)
160         self.last_cache.close()
161         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
162         self.assertNotEqual(None, resume_cache)
163
164     @mock.patch('arvados.keep.KeepClient.head')
165     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
166         keep_client_head.side_effect = Exception('Locator not found')
167         thing = {}
168         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
169         with tempfile.NamedTemporaryFile() as cachefile:
170             self.last_cache = arv_put.ResumeCache(cachefile.name)
171         self.last_cache.save(thing)
172         self.last_cache.close()
173         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
174         self.assertNotEqual(None, resume_cache)
175         resume_cache.check_cache()
176
177     def test_basic_cache_storage(self):
178         thing = ['test', 'list']
179         with tempfile.NamedTemporaryFile() as cachefile:
180             self.last_cache = arv_put.ResumeCache(cachefile.name)
181         self.last_cache.save(thing)
182         self.assertEqual(thing, self.last_cache.load())
183
184     def test_empty_cache(self):
185         with tempfile.NamedTemporaryFile() as cachefile:
186             cache = arv_put.ResumeCache(cachefile.name)
187         self.assertRaises(ValueError, cache.load)
188
189     def test_cache_persistent(self):
190         thing = ['test', 'list']
191         path = os.path.join(self.make_tmpdir(), 'cache')
192         cache = arv_put.ResumeCache(path)
193         cache.save(thing)
194         cache.close()
195         self.last_cache = arv_put.ResumeCache(path)
196         self.assertEqual(thing, self.last_cache.load())
197
198     def test_multiple_cache_writes(self):
199         thing = ['short', 'list']
200         with tempfile.NamedTemporaryFile() as cachefile:
201             self.last_cache = arv_put.ResumeCache(cachefile.name)
202         # Start writing an object longer than the one we test, to make
203         # sure the cache file gets truncated.
204         self.last_cache.save(['long', 'long', 'list'])
205         self.last_cache.save(thing)
206         self.assertEqual(thing, self.last_cache.load())
207
208     def test_cache_is_locked(self):
209         with tempfile.NamedTemporaryFile() as cachefile:
210             _ = arv_put.ResumeCache(cachefile.name)
211             self.assertRaises(arv_put.ResumeCacheConflict,
212                               arv_put.ResumeCache, cachefile.name)
213
214     def test_cache_stays_locked(self):
215         with tempfile.NamedTemporaryFile() as cachefile:
216             self.last_cache = arv_put.ResumeCache(cachefile.name)
217             path = cachefile.name
218         self.last_cache.save('test')
219         self.assertRaises(arv_put.ResumeCacheConflict,
220                           arv_put.ResumeCache, path)
221
222     def test_destroy_cache(self):
223         cachefile = tempfile.NamedTemporaryFile(delete=False)
224         try:
225             cache = arv_put.ResumeCache(cachefile.name)
226             cache.save('test')
227             cache.destroy()
228             try:
229                 arv_put.ResumeCache(cachefile.name)
230             except arv_put.ResumeCacheConflict:
231                 self.fail("could not load cache after destroying it")
232             self.assertRaises(ValueError, cache.load)
233         finally:
234             if os.path.exists(cachefile.name):
235                 os.unlink(cachefile.name)
236
237     def test_restart_cache(self):
238         path = os.path.join(self.make_tmpdir(), 'cache')
239         cache = arv_put.ResumeCache(path)
240         cache.save('test')
241         cache.restart()
242         self.assertRaises(ValueError, cache.load)
243         self.assertRaises(arv_put.ResumeCacheConflict,
244                           arv_put.ResumeCache, path)
245
246
247 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
248                           ArvadosBaseTestCase):
249
250     def setUp(self):
251         super(ArvPutUploadJobTest, self).setUp()
252         run_test_server.authorize_with('active')
253         # Temp files creation
254         self.tempdir = tempfile.mkdtemp()
255         subdir = os.path.join(self.tempdir, 'subdir')
256         os.mkdir(subdir)
257         data = "x" * 1024 # 1 KB
258         for i in range(1, 5):
259             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
260                 f.write(data * i)
261         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
262             f.write(data * 5)
263         # Large temp file for resume test
264         _, self.large_file_name = tempfile.mkstemp()
265         fileobj = open(self.large_file_name, 'w')
266         # Make sure to write just a little more than one block
267         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
268             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
269             fileobj.write(data)
270         fileobj.close()
271         # Temp dir containing small files to be repacked
272         self.small_files_dir = tempfile.mkdtemp()
273         data = 'y' * 1024 * 1024 # 1 MB
274         for i in range(1, 70):
275             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
276                 f.write(data + str(i))
277         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
278         # Temp dir to hold a symlink to other temp dir
279         self.tempdir_with_symlink = tempfile.mkdtemp()
280         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
281         os.symlink(os.path.join(self.tempdir, '1'),
282                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
283
284     def tearDown(self):
285         super(ArvPutUploadJobTest, self).tearDown()
286         shutil.rmtree(self.tempdir)
287         os.unlink(self.large_file_name)
288         shutil.rmtree(self.small_files_dir)
289         shutil.rmtree(self.tempdir_with_symlink)
290
291     def test_non_regular_files_are_ignored_except_symlinks_to_dirs(self):
292         def pfunc(x):
293             with open(x, 'w') as f:
294                 f.write('test')
295         fifo_filename = 'fifo-file'
296         fifo_path = os.path.join(self.tempdir_with_symlink, fifo_filename)
297         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
298         os.mkfifo(fifo_path)
299         producer = multiprocessing.Process(target=pfunc, args=(fifo_path,))
300         producer.start()
301         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
302         cwriter.start(save_collection=False)
303         if producer.exitcode is None:
304             # If the producer is still running, kill it. This should always be
305             # before any assertion that may fail.
306             producer.terminate()
307             producer.join(1)
308         self.assertIn('linkeddir', cwriter.manifest_text())
309         self.assertNotIn(fifo_filename, cwriter.manifest_text())
310
311     def test_symlinks_are_followed_by_default(self):
312         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
313         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
314         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
315         cwriter.start(save_collection=False)
316         self.assertIn('linkeddir', cwriter.manifest_text())
317         self.assertIn('linkedfile', cwriter.manifest_text())
318         cwriter.destroy_cache()
319
320     def test_symlinks_are_not_followed_when_requested(self):
321         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
322         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
323         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
324                                           follow_links=False)
325         cwriter.start(save_collection=False)
326         self.assertNotIn('linkeddir', cwriter.manifest_text())
327         self.assertNotIn('linkedfile', cwriter.manifest_text())
328         cwriter.destroy_cache()
329         # Check for bug #17800: passed symlinks should also be ignored.
330         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
331         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
332         cwriter.start(save_collection=False)
333         self.assertNotIn('linkeddir', cwriter.manifest_text())
334         cwriter.destroy_cache()
335
336     def test_no_empty_collection_saved(self):
337         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
338         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
339         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
340         cwriter.start(save_collection=True)
341         self.assertIsNone(cwriter.manifest_locator())
342         self.assertEqual('', cwriter.manifest_text())
343         cwriter.destroy_cache()
344
345     def test_passing_nonexistant_path_raise_exception(self):
346         uuid_str = str(uuid.uuid4())
347         with self.assertRaises(arv_put.PathDoesNotExistError):
348             arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
349
350     def test_writer_works_without_cache(self):
351         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
352         cwriter.start(save_collection=False)
353         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
354
355     def test_writer_works_with_cache(self):
356         with tempfile.NamedTemporaryFile() as f:
357             f.write(b'foo')
358             f.flush()
359             cwriter = arv_put.ArvPutUploadJob([f.name])
360             cwriter.start(save_collection=False)
361             self.assertEqual(0, cwriter.bytes_skipped)
362             self.assertEqual(3, cwriter.bytes_written)
363             # Don't destroy the cache, and start another upload
364             cwriter_new = arv_put.ArvPutUploadJob([f.name])
365             cwriter_new.start(save_collection=False)
366             cwriter_new.destroy_cache()
367             self.assertEqual(3, cwriter_new.bytes_skipped)
368             self.assertEqual(3, cwriter_new.bytes_written)
369
370     def make_progress_tester(self):
371         progression = []
372         def record_func(written, expected):
373             progression.append((written, expected))
374         return progression, record_func
375
376     def test_progress_reporting(self):
377         with tempfile.NamedTemporaryFile() as f:
378             f.write(b'foo')
379             f.flush()
380             for expect_count in (None, 8):
381                 progression, reporter = self.make_progress_tester()
382                 cwriter = arv_put.ArvPutUploadJob([f.name],
383                                                   reporter=reporter)
384                 cwriter.bytes_expected = expect_count
385                 cwriter.start(save_collection=False)
386                 cwriter.destroy_cache()
387                 self.assertIn((3, expect_count), progression)
388
389     def test_writer_upload_directory(self):
390         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
391         cwriter.start(save_collection=False)
392         cwriter.destroy_cache()
393         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
394
395     def test_resume_large_file_upload(self):
396         def wrapped_write(*args, **kwargs):
397             data = args[1]
398             # Exit only on last block
399             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
400                 # Simulate a checkpoint before quitting. Ensure block commit.
401                 self.writer._update(final=True)
402                 raise SystemExit("Simulated error")
403             return self.arvfile_write(*args, **kwargs)
404
405         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
406                         autospec=True) as mocked_write:
407             mocked_write.side_effect = wrapped_write
408             writer = arv_put.ArvPutUploadJob([self.large_file_name],
409                                              replication_desired=1)
410             # We'll be accessing from inside the wrapper
411             self.writer = writer
412             with self.assertRaises(SystemExit):
413                 writer.start(save_collection=False)
414             # Confirm that the file was partially uploaded
415             self.assertGreater(writer.bytes_written, 0)
416             self.assertLess(writer.bytes_written,
417                             os.path.getsize(self.large_file_name))
418         # Retry the upload
419         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
420                                           replication_desired=1)
421         writer2.start(save_collection=False)
422         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
423                          os.path.getsize(self.large_file_name))
424         writer2.destroy_cache()
425         del(self.writer)
426
427     # Test for bug #11002
428     def test_graceful_exit_while_repacking_small_blocks(self):
429         def wrapped_commit(*args, **kwargs):
430             raise SystemExit("Simulated error")
431
432         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
433                         autospec=True) as mocked_commit:
434             mocked_commit.side_effect = wrapped_commit
435             # Upload a little more than 1 block, wrapped_commit will make the first block
436             # commit to fail.
437             # arv-put should not exit with an exception by trying to commit the collection
438             # as it's in an inconsistent state.
439             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
440                                              replication_desired=1)
441             try:
442                 with self.assertRaises(SystemExit):
443                     writer.start(save_collection=False)
444             except arvados.arvfile.UnownedBlockError:
445                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
446         writer.destroy_cache()
447
448     def test_no_resume_when_asked(self):
449         def wrapped_write(*args, **kwargs):
450             data = args[1]
451             # Exit only on last block
452             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
453                 # Simulate a checkpoint before quitting.
454                 self.writer._update()
455                 raise SystemExit("Simulated error")
456             return self.arvfile_write(*args, **kwargs)
457
458         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
459                         autospec=True) as mocked_write:
460             mocked_write.side_effect = wrapped_write
461             writer = arv_put.ArvPutUploadJob([self.large_file_name],
462                                              replication_desired=1)
463             # We'll be accessing from inside the wrapper
464             self.writer = writer
465             with self.assertRaises(SystemExit):
466                 writer.start(save_collection=False)
467             # Confirm that the file was partially uploaded
468             self.assertGreater(writer.bytes_written, 0)
469             self.assertLess(writer.bytes_written,
470                             os.path.getsize(self.large_file_name))
471         # Retry the upload, this time without resume
472         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
473                                           replication_desired=1,
474                                           resume=False)
475         writer2.start(save_collection=False)
476         self.assertEqual(writer2.bytes_skipped, 0)
477         self.assertEqual(writer2.bytes_written,
478                          os.path.getsize(self.large_file_name))
479         writer2.destroy_cache()
480         del(self.writer)
481
482     def test_no_resume_when_no_cache(self):
483         def wrapped_write(*args, **kwargs):
484             data = args[1]
485             # Exit only on last block
486             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
487                 # Simulate a checkpoint before quitting.
488                 self.writer._update()
489                 raise SystemExit("Simulated error")
490             return self.arvfile_write(*args, **kwargs)
491
492         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
493                         autospec=True) as mocked_write:
494             mocked_write.side_effect = wrapped_write
495             writer = arv_put.ArvPutUploadJob([self.large_file_name],
496                                              replication_desired=1)
497             # We'll be accessing from inside the wrapper
498             self.writer = writer
499             with self.assertRaises(SystemExit):
500                 writer.start(save_collection=False)
501             # Confirm that the file was partially uploaded
502             self.assertGreater(writer.bytes_written, 0)
503             self.assertLess(writer.bytes_written,
504                             os.path.getsize(self.large_file_name))
505         # Retry the upload, this time without cache usage
506         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
507                                           replication_desired=1,
508                                           resume=False,
509                                           use_cache=False)
510         writer2.start(save_collection=False)
511         self.assertEqual(writer2.bytes_skipped, 0)
512         self.assertEqual(writer2.bytes_written,
513                          os.path.getsize(self.large_file_name))
514         writer2.destroy_cache()
515         del(self.writer)
516
517     def test_dry_run_feature(self):
518         def wrapped_write(*args, **kwargs):
519             data = args[1]
520             # Exit only on last block
521             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
522                 # Simulate a checkpoint before quitting.
523                 self.writer._update()
524                 raise SystemExit("Simulated error")
525             return self.arvfile_write(*args, **kwargs)
526
527         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
528                         autospec=True) as mocked_write:
529             mocked_write.side_effect = wrapped_write
530             writer = arv_put.ArvPutUploadJob([self.large_file_name],
531                                              replication_desired=1)
532             # We'll be accessing from inside the wrapper
533             self.writer = writer
534             with self.assertRaises(SystemExit):
535                 writer.start(save_collection=False)
536             # Confirm that the file was partially uploaded
537             self.assertGreater(writer.bytes_written, 0)
538             self.assertLess(writer.bytes_written,
539                             os.path.getsize(self.large_file_name))
540         with self.assertRaises(arv_put.ArvPutUploadIsPending):
541             # Retry the upload using dry_run to check if there is a pending upload
542             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
543                                               replication_desired=1,
544                                               dry_run=True)
545         # Complete the pending upload
546         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
547                                           replication_desired=1)
548         writer3.start(save_collection=False)
549         with self.assertRaises(arv_put.ArvPutUploadNotPending):
550             # Confirm there's no pending upload with dry_run=True
551             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
552                                               replication_desired=1,
553                                               dry_run=True)
554         # Test obvious cases
555         with self.assertRaises(arv_put.ArvPutUploadIsPending):
556             arv_put.ArvPutUploadJob([self.large_file_name],
557                                     replication_desired=1,
558                                     dry_run=True,
559                                     resume=False,
560                                     use_cache=False)
561         with self.assertRaises(arv_put.ArvPutUploadIsPending):
562             arv_put.ArvPutUploadJob([self.large_file_name],
563                                     replication_desired=1,
564                                     dry_run=True,
565                                     resume=False)
566         del(self.writer)
567
568 class CachedManifestValidationTest(ArvadosBaseTestCase):
569     class MockedPut(arv_put.ArvPutUploadJob):
570         def __init__(self, cached_manifest=None):
571             self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
572             self._state['manifest'] = cached_manifest
573             self._api_client = mock.MagicMock()
574             self.logger = mock.MagicMock()
575             self.num_retries = 1
576
577     def datetime_to_hex(self, dt):
578         return hex(int(time.mktime(dt.timetuple())))[2:]
579
580     def setUp(self):
581         super(CachedManifestValidationTest, self).setUp()
582         self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
583         self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
584         self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
585
586     def test_empty_cached_manifest_is_valid(self):
587         put_mock = self.MockedPut()
588         self.assertEqual(None, put_mock._state.get('manifest'))
589         self.assertTrue(put_mock._cached_manifest_valid())
590         put_mock._state['manifest'] = ''
591         self.assertTrue(put_mock._cached_manifest_valid())
592
593     def test_signature_cases(self):
594         now = datetime.datetime.utcnow()
595         yesterday = now - datetime.timedelta(days=1)
596         lastweek = now - datetime.timedelta(days=7)
597         tomorrow = now + datetime.timedelta(days=1)
598         nextweek = now + datetime.timedelta(days=7)
599
600         def mocked_head(blocks={}, loc=None):
601             blk = loc.split('+', 1)[0]
602             if blocks.get(blk):
603                 return True
604             raise arvados.errors.KeepRequestError("mocked error - block invalid")
605
606         # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
607         cases = [
608             # All expired, reset cache - OK
609             (yesterday, lastweek, False, False, True),
610             (lastweek, yesterday, False, False, True),
611             # All non-expired valid blocks - OK
612             (tomorrow, nextweek, True, True, True),
613             (nextweek, tomorrow, True, True, True),
614             # All non-expired invalid blocks - Not OK
615             (tomorrow, nextweek, False, False, False),
616             (nextweek, tomorrow, False, False, False),
617             # One non-expired valid block - OK
618             (tomorrow, yesterday, True, False, True),
619             (yesterday, tomorrow, False, True, True),
620             # One non-expired invalid block - Not OK
621             (tomorrow, yesterday, False, False, False),
622             (yesterday, tomorrow, False, False, False),
623         ]
624         for case in cases:
625             b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
626             head_responses = {
627                 self.block1: b1_valid,
628                 self.block2: b2_valid,
629             }
630             cached_manifest = self.template % (
631                 self.datetime_to_hex(b1_expiration),
632                 self.datetime_to_hex(b2_expiration),
633             )
634             arvput = self.MockedPut(cached_manifest)
635             with mock.patch('arvados.collection.KeepClient.head') as head_mock:
636                 head_mock.side_effect = partial(mocked_head, head_responses)
637                 self.assertEqual(outcome, arvput._cached_manifest_valid(),
638                     "Case '%s' should have produced outcome '%s'" % (case, outcome)
639                 )
640                 if b1_expiration > now or b2_expiration > now:
641                     # A HEAD request should have been done
642                     head_mock.assert_called_once()
643                 else:
644                     head_mock.assert_not_called()
645
646
647 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
648     TEST_SIZE = os.path.getsize(__file__)
649
650     def test_expected_bytes_for_file(self):
651         writer = arv_put.ArvPutUploadJob([__file__])
652         self.assertEqual(self.TEST_SIZE,
653                          writer.bytes_expected)
654
655     def test_expected_bytes_for_tree(self):
656         tree = self.make_tmpdir()
657         shutil.copyfile(__file__, os.path.join(tree, 'one'))
658         shutil.copyfile(__file__, os.path.join(tree, 'two'))
659
660         writer = arv_put.ArvPutUploadJob([tree])
661         self.assertEqual(self.TEST_SIZE * 2,
662                          writer.bytes_expected)
663         writer = arv_put.ArvPutUploadJob([tree, __file__])
664         self.assertEqual(self.TEST_SIZE * 3,
665                          writer.bytes_expected)
666
667     def test_expected_bytes_for_device(self):
668         writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
669         self.assertIsNone(writer.bytes_expected)
670         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
671         self.assertIsNone(writer.bytes_expected)
672
673
674 class ArvadosPutReportTest(ArvadosBaseTestCase):
675     def test_machine_progress(self):
676         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
677             expect = ": {} written {} total\n".format(
678                 count, -1 if (total is None) else total)
679             self.assertTrue(
680                 arv_put.machine_progress(count, total).endswith(expect))
681
682     def test_known_human_progress(self):
683         for count, total in [(0, 1), (2, 4), (45, 60)]:
684             expect = '{:.1%}'.format(1.0*count/total)
685             actual = arv_put.human_progress(count, total)
686             self.assertTrue(actual.startswith('\r'))
687             self.assertIn(expect, actual)
688
689     def test_unknown_human_progress(self):
690         for count in [1, 20, 300, 4000, 50000]:
691             self.assertTrue(re.search(r'\b{}\b'.format(count),
692                                       arv_put.human_progress(count, None)))
693
694
695 class ArvPutLogFormatterTest(ArvadosBaseTestCase):
696     matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
697
698     def setUp(self):
699         super(ArvPutLogFormatterTest, self).setUp()
700         self.stderr = tutil.StringIO()
701         self.loggingHandler = logging.StreamHandler(self.stderr)
702         self.loggingHandler.setFormatter(
703             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
704         self.logger = logging.getLogger()
705         self.logger.addHandler(self.loggingHandler)
706         self.logger.setLevel(logging.DEBUG)
707
708     def tearDown(self):
709         self.logger.removeHandler(self.loggingHandler)
710         self.stderr.close()
711         self.stderr = None
712         super(ArvPutLogFormatterTest, self).tearDown()
713
714     def test_request_id_logged_only_once_on_error(self):
715         self.logger.error('Ooops, something bad happened.')
716         self.logger.error('Another bad thing just happened.')
717         log_lines = self.stderr.getvalue().split('\n')[:-1]
718         self.assertEqual(2, len(log_lines))
719         self.assertRegex(log_lines[0], self.matcher)
720         self.assertNotRegex(log_lines[1], self.matcher)
721
722     def test_request_id_logged_only_once_on_debug(self):
723         self.logger.debug('This is just a debug message.')
724         self.logger.debug('Another message, move along.')
725         log_lines = self.stderr.getvalue().split('\n')[:-1]
726         self.assertEqual(2, len(log_lines))
727         self.assertRegex(log_lines[0], self.matcher)
728         self.assertNotRegex(log_lines[1], self.matcher)
729
730     def test_request_id_not_logged_on_info(self):
731         self.logger.info('This should be a useful message')
732         log_lines = self.stderr.getvalue().split('\n')[:-1]
733         self.assertEqual(1, len(log_lines))
734         self.assertNotRegex(log_lines[0], self.matcher)
735
736 class ArvadosPutTest(run_test_server.TestCaseWithServers,
737                      ArvadosBaseTestCase,
738                      tutil.VersionChecker):
739     MAIN_SERVER = {}
740     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
741
742     def call_main_with_args(self, args):
743         self.main_stdout.seek(0, 0)
744         self.main_stdout.truncate(0)
745         self.main_stderr.seek(0, 0)
746         self.main_stderr.truncate(0)
747         return arv_put.main(args, self.main_stdout, self.main_stderr)
748
749     def call_main_on_test_file(self, args=[]):
750         with self.make_test_file() as testfile:
751             path = testfile.name
752             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
753         self.assertTrue(
754             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
755                                         '098f6bcd4621d373cade4e832627b4f6')),
756             "did not find file stream in Keep store")
757
758     def setUp(self):
759         super(ArvadosPutTest, self).setUp()
760         run_test_server.authorize_with('active')
761         arv_put.api_client = None
762         self.main_stdout = tutil.StringIO()
763         self.main_stderr = tutil.StringIO()
764         self.loggingHandler = logging.StreamHandler(self.main_stderr)
765         self.loggingHandler.setFormatter(
766             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
767         logging.getLogger().addHandler(self.loggingHandler)
768
769     def tearDown(self):
770         logging.getLogger().removeHandler(self.loggingHandler)
771         for outbuf in ['main_stdout', 'main_stderr']:
772             if hasattr(self, outbuf):
773                 getattr(self, outbuf).close()
774                 delattr(self, outbuf)
775         super(ArvadosPutTest, self).tearDown()
776
777     def test_version_argument(self):
778         with tutil.redirected_streams(
779                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
780             with self.assertRaises(SystemExit):
781                 self.call_main_with_args(['--version'])
782         self.assertVersionOutput(out, err)
783
784     def test_simple_file_put(self):
785         self.call_main_on_test_file()
786
787     def test_put_with_unwriteable_cache_dir(self):
788         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
789         cachedir = self.make_tmpdir()
790         os.chmod(cachedir, 0o0)
791         arv_put.ResumeCache.CACHE_DIR = cachedir
792         try:
793             self.call_main_on_test_file()
794         finally:
795             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
796             os.chmod(cachedir, 0o700)
797
798     def test_put_with_unwritable_cache_subdir(self):
799         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
800         cachedir = self.make_tmpdir()
801         os.chmod(cachedir, 0o0)
802         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
803         try:
804             self.call_main_on_test_file()
805         finally:
806             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
807             os.chmod(cachedir, 0o700)
808
809     def test_put_block_replication(self):
810         self.call_main_on_test_file()
811         arv_put.api_client = None
812         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
813             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
814             self.call_main_on_test_file(['--replication', '1'])
815             self.call_main_on_test_file(['--replication', '4'])
816             self.call_main_on_test_file(['--replication', '5'])
817             self.assertEqual(
818                 [x[-1].get('copies') for x in put_mock.call_args_list],
819                 [1, 4, 5])
820
821     def test_normalize(self):
822         testfile1 = self.make_test_file()
823         testfile2 = self.make_test_file()
824         test_paths = [testfile1.name, testfile2.name]
825         # Reverse-sort the paths, so normalization must change their order.
826         test_paths.sort(reverse=True)
827         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
828                                  test_paths)
829         manifest = self.main_stdout.getvalue()
830         # Assert the second file we specified appears first in the manifest.
831         file_indices = [manifest.find(':' + os.path.basename(path))
832                         for path in test_paths]
833         self.assertGreater(*file_indices)
834
835     def test_error_name_without_collection(self):
836         self.assertRaises(SystemExit, self.call_main_with_args,
837                           ['--name', 'test without Collection',
838                            '--stream', '/dev/null'])
839
840     def test_error_when_project_not_found(self):
841         self.assertRaises(SystemExit,
842                           self.call_main_with_args,
843                           ['--project-uuid', self.Z_UUID])
844
845     def test_error_bad_project_uuid(self):
846         self.assertRaises(SystemExit,
847                           self.call_main_with_args,
848                           ['--project-uuid', self.Z_UUID, '--stream'])
849
850     def test_error_when_excluding_absolute_path(self):
851         tmpdir = self.make_tmpdir()
852         self.assertRaises(SystemExit,
853                           self.call_main_with_args,
854                           ['--exclude', '/some/absolute/path/*',
855                            tmpdir])
856
857     def test_api_error_handling(self):
858         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
859         coll_save_mock.side_effect = arvados.errors.ApiError(
860             fake_httplib2_response(403), b'{}')
861         with mock.patch('arvados.collection.Collection.save_new',
862                         new=coll_save_mock):
863             with self.assertRaises(SystemExit) as exc_test:
864                 self.call_main_with_args(['/dev/null'])
865             self.assertLess(0, exc_test.exception.args[0])
866             self.assertLess(0, coll_save_mock.call_count)
867             self.assertEqual("", self.main_stdout.getvalue())
868
869     def test_request_id_logging_on_error(self):
870         matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
871         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
872         coll_save_mock.side_effect = arvados.errors.ApiError(
873             fake_httplib2_response(403), b'{}')
874         with mock.patch('arvados.collection.Collection.save_new',
875                         new=coll_save_mock):
876             with self.assertRaises(SystemExit):
877                 self.call_main_with_args(['/dev/null'])
878             self.assertRegex(
879                 self.main_stderr.getvalue(), matcher)
880
881
882 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
883                             ArvadosBaseTestCase):
884     MAIN_SERVER = {}
885     KEEP_SERVER = {'blob_signing': True}
886     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
887
888     @classmethod
889     def setUpClass(cls):
890         super(ArvPutIntegrationTest, cls).setUpClass()
891         cls.ENVIRON = os.environ.copy()
892         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
893
894     def datetime_to_hex(self, dt):
895         return hex(int(time.mktime(dt.timetuple())))[2:]
896
897     def setUp(self):
898         super(ArvPutIntegrationTest, self).setUp()
899         arv_put.api_client = None
900
901     def authorize_with(self, token_name):
902         run_test_server.authorize_with(token_name)
903         for v in ["ARVADOS_API_HOST",
904                   "ARVADOS_API_HOST_INSECURE",
905                   "ARVADOS_API_TOKEN"]:
906             self.ENVIRON[v] = arvados.config.settings()[v]
907         arv_put.api_client = arvados.api('v1')
908
909     def current_user(self):
910         return arv_put.api_client.users().current().execute()
911
912     def test_check_real_project_found(self):
913         self.authorize_with('active')
914         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
915                         "did not correctly find test fixture project")
916
917     def test_check_error_finding_nonexistent_uuid(self):
918         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
919         self.authorize_with('active')
920         try:
921             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
922                                                   0)
923         except ValueError as error:
924             self.assertIn(BAD_UUID, str(error))
925         else:
926             self.assertFalse(result, "incorrectly found nonexistent project")
927
928     def test_check_error_finding_nonexistent_project(self):
929         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
930         self.authorize_with('active')
931         with self.assertRaises(apiclient.errors.HttpError):
932             arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
933                                                   0)
934
935     def test_short_put_from_stdin(self):
936         # Have to run this as an integration test since arv-put can't
937         # read from the tests' stdin.
938         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
939         # case, because the /proc entry is already gone by the time it tries.
940         pipe = subprocess.Popen(
941             [sys.executable, arv_put.__file__, '--stream'],
942             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
943             stderr=subprocess.STDOUT, env=self.ENVIRON)
944         pipe.stdin.write(b'stdin test\xa6\n')
945         pipe.stdin.close()
946         deadline = time.time() + 5
947         while (pipe.poll() is None) and (time.time() < deadline):
948             time.sleep(.1)
949         returncode = pipe.poll()
950         if returncode is None:
951             pipe.terminate()
952             self.fail("arv-put did not PUT from stdin within 5 seconds")
953         elif returncode != 0:
954             sys.stdout.write(pipe.stdout.read())
955             self.fail("arv-put returned exit code {}".format(returncode))
956         self.assertIn('1cb671b355a0c23d5d1c61d59cdb1b2b+12',
957                       pipe.stdout.read().decode())
958
959     def test_sigint_logs_request_id(self):
960         # Start arv-put, give it a chance to start up, send SIGINT,
961         # and check that its output includes the X-Request-Id.
962         input_stream = subprocess.Popen(
963             ['sleep', '10'],
964             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
965         pipe = subprocess.Popen(
966             [sys.executable, arv_put.__file__, '--stream'],
967             stdin=input_stream.stdout, stdout=subprocess.PIPE,
968             stderr=subprocess.STDOUT, env=self.ENVIRON)
969         # Wait for arv-put child process to print something (i.e., a
970         # log message) so we know its signal handler is installed.
971         select.select([pipe.stdout], [], [], 10)
972         pipe.send_signal(signal.SIGINT)
973         deadline = time.time() + 5
974         while (pipe.poll() is None) and (time.time() < deadline):
975             time.sleep(.1)
976         returncode = pipe.poll()
977         input_stream.terminate()
978         if returncode is None:
979             pipe.terminate()
980             self.fail("arv-put did not exit within 5 seconds")
981         self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
982
983     def test_ArvPutSignedManifest(self):
984         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
985         # the newly created manifest from the API server, testing to confirm
986         # that the block locators in the returned manifest are signed.
987         self.authorize_with('active')
988
989         # Before doing anything, demonstrate that the collection
990         # we're about to create is not present in our test fixture.
991         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
992         with self.assertRaises(apiclient.errors.HttpError):
993             arv_put.api_client.collections().get(
994                 uuid=manifest_uuid).execute()
995
996         datadir = self.make_tmpdir()
997         with open(os.path.join(datadir, "foo"), "w") as f:
998             f.write("The quick brown fox jumped over the lazy dog")
999         p = subprocess.Popen([sys.executable, arv_put.__file__,
1000                               os.path.join(datadir, 'foo')],
1001                              stdout=subprocess.PIPE,
1002                              stderr=subprocess.PIPE,
1003                              env=self.ENVIRON)
1004         (_, err) = p.communicate()
1005         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
1006         self.assertEqual(p.returncode, 0)
1007
1008         # The manifest text stored in the API server under the same
1009         # manifest UUID must use signed locators.
1010         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
1011         self.assertRegex(
1012             c['manifest_text'],
1013             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
1014
1015         os.remove(os.path.join(datadir, "foo"))
1016         os.rmdir(datadir)
1017
1018     def run_and_find_collection(self, text, extra_args=[]):
1019         self.authorize_with('active')
1020         pipe = subprocess.Popen(
1021             [sys.executable, arv_put.__file__] + extra_args,
1022             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1023             stderr=subprocess.PIPE, env=self.ENVIRON)
1024         stdout, stderr = pipe.communicate(text.encode())
1025         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
1026         search_key = ('portable_data_hash'
1027                       if '--portable-data-hash' in extra_args else 'uuid')
1028         collection_list = arvados.api('v1').collections().list(
1029             filters=[[search_key, '=', stdout.decode().strip()]]
1030         ).execute().get('items', [])
1031         self.assertEqual(1, len(collection_list))
1032         return collection_list[0]
1033
1034     def test_all_expired_signatures_invalidates_cache(self):
1035         self.authorize_with('active')
1036         tmpdir = self.make_tmpdir()
1037         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1038             f.write('foo')
1039         # Upload a directory and get the cache file name
1040         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1041                              stdout=subprocess.PIPE,
1042                              stderr=subprocess.PIPE,
1043                              env=self.ENVIRON)
1044         (_, err) = p.communicate()
1045         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1046         self.assertEqual(p.returncode, 0)
1047         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1048                                    err.decode()).groups()[0]
1049         self.assertTrue(os.path.isfile(cache_filepath))
1050         # Load the cache file contents and modify the manifest to simulate
1051         # an expired access token
1052         with open(cache_filepath, 'r') as c:
1053             cache = json.load(c)
1054         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1055         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1056         cache['manifest'] = re.sub(
1057             r'\@.*? ',
1058             "@{} ".format(self.datetime_to_hex(a_month_ago)),
1059             cache['manifest'])
1060         with open(cache_filepath, 'w') as c:
1061             c.write(json.dumps(cache))
1062         # Re-run the upload and expect to get an invalid cache message
1063         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1064                              stdout=subprocess.PIPE,
1065                              stderr=subprocess.PIPE,
1066                              env=self.ENVIRON)
1067         (_, err) = p.communicate()
1068         self.assertRegex(
1069             err.decode(),
1070             r'INFO: Cache expired, starting from scratch.*')
1071         self.assertEqual(p.returncode, 0)
1072
1073     def test_invalid_signature_in_cache(self):
1074         for batch_mode in [False, True]:
1075             self.authorize_with('active')
1076             tmpdir = self.make_tmpdir()
1077             with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1078                 f.write('foo')
1079             # Upload a directory and get the cache file name
1080             arv_put_args = [tmpdir]
1081             if batch_mode:
1082                 arv_put_args = ['--batch'] + arv_put_args
1083             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1084                                 stdout=subprocess.PIPE,
1085                                 stderr=subprocess.PIPE,
1086                                 env=self.ENVIRON)
1087             (_, err) = p.communicate()
1088             self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1089             self.assertEqual(p.returncode, 0)
1090             cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1091                                     err.decode()).groups()[0]
1092             self.assertTrue(os.path.isfile(cache_filepath))
1093             # Load the cache file contents and modify the manifest to simulate
1094             # an invalid access token
1095             with open(cache_filepath, 'r') as c:
1096                 cache = json.load(c)
1097             self.assertRegex(cache['manifest'], r'\+A\S+\@')
1098             cache['manifest'] = re.sub(
1099                 r'\+A.*\@',
1100                 "+Aabcdef0123456789abcdef0123456789abcdef01@",
1101                 cache['manifest'])
1102             with open(cache_filepath, 'w') as c:
1103                 c.write(json.dumps(cache))
1104             # Re-run the upload and expect to get an invalid cache message
1105             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1106                                 stdout=subprocess.PIPE,
1107                                 stderr=subprocess.PIPE,
1108                                 env=self.ENVIRON)
1109             (_, err) = p.communicate()
1110             if not batch_mode:
1111                 self.assertRegex(
1112                     err.decode(),
1113                     r'ERROR: arv-put: Resume cache contains invalid signature.*')
1114                 self.assertEqual(p.returncode, 1)
1115             else:
1116                 self.assertRegex(
1117                     err.decode(),
1118                     r'Invalid signatures on cache file \'.*\' while being run in \'batch mode\' -- continuing anyways.*')
1119                 self.assertEqual(p.returncode, 0)
1120
1121     def test_single_expired_signature_reuploads_file(self):
1122         self.authorize_with('active')
1123         tmpdir = self.make_tmpdir()
1124         with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
1125             f.write('foo')
1126         # Write a second file on its own subdir to force a new stream
1127         os.mkdir(os.path.join(tmpdir, 'bar'))
1128         with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
1129             f.write('bar')
1130         # Upload a directory and get the cache file name
1131         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1132                              stdout=subprocess.PIPE,
1133                              stderr=subprocess.PIPE,
1134                              env=self.ENVIRON)
1135         (_, err) = p.communicate()
1136         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1137         self.assertEqual(p.returncode, 0)
1138         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1139                                    err.decode()).groups()[0]
1140         self.assertTrue(os.path.isfile(cache_filepath))
1141         # Load the cache file contents and modify the manifest to simulate
1142         # an expired access token
1143         with open(cache_filepath, 'r') as c:
1144             cache = json.load(c)
1145         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1146         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1147         # Make one of the signatures appear to have expired
1148         cache['manifest'] = re.sub(
1149             r'\@.*? 3:3:barfile.txt',
1150             "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
1151             cache['manifest'])
1152         with open(cache_filepath, 'w') as c:
1153             c.write(json.dumps(cache))
1154         # Re-run the upload and expect to get an invalid cache message
1155         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1156                              stdout=subprocess.PIPE,
1157                              stderr=subprocess.PIPE,
1158                              env=self.ENVIRON)
1159         (_, err) = p.communicate()
1160         self.assertRegex(
1161             err.decode(),
1162             r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
1163         self.assertEqual(p.returncode, 0)
1164         # Confirm that the resulting cache is different from the last run.
1165         with open(cache_filepath, 'r') as c2:
1166             new_cache = json.load(c2)
1167         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
1168
1169     def test_put_collection_with_later_update(self):
1170         tmpdir = self.make_tmpdir()
1171         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1172             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1173         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1174         self.assertNotEqual(None, col['uuid'])
1175         # Add a new file to the directory
1176         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
1177             f.write('The quick brown fox jumped over the lazy dog')
1178         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
1179         self.assertEqual(col['uuid'], updated_col['uuid'])
1180         # Get the manifest and check that the new file is being included
1181         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
1182         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
1183
1184     def test_put_collection_with_utc_expiring_datetime(self):
1185         tmpdir = self.make_tmpdir()
1186         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
1187         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1188             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1189         col = self.run_and_find_collection(
1190             "",
1191             ['--no-progress', '--trash-at', trash_at, tmpdir])
1192         self.assertNotEqual(None, col['uuid'])
1193         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1194         self.assertEqual(ciso8601.parse_datetime(trash_at),
1195             ciso8601.parse_datetime(c['trash_at']))
1196
1197     def test_put_collection_with_timezone_aware_expiring_datetime(self):
1198         tmpdir = self.make_tmpdir()
1199         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
1200         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1201             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1202         col = self.run_and_find_collection(
1203             "",
1204             ['--no-progress', '--trash-at', trash_at, tmpdir])
1205         self.assertNotEqual(None, col['uuid'])
1206         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1207         self.assertEqual(
1208             ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
1209             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1210
1211     def test_put_collection_with_timezone_naive_expiring_datetime(self):
1212         tmpdir = self.make_tmpdir()
1213         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
1214         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1215             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1216         col = self.run_and_find_collection(
1217             "",
1218             ['--no-progress', '--trash-at', trash_at, tmpdir])
1219         self.assertNotEqual(None, col['uuid'])
1220         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1221         if time.daylight:
1222             offset = datetime.timedelta(seconds=time.altzone)
1223         else:
1224             offset = datetime.timedelta(seconds=time.timezone)
1225         self.assertEqual(
1226             ciso8601.parse_datetime(trash_at) + offset,
1227             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1228
1229     def test_put_collection_with_expiring_date_only(self):
1230         tmpdir = self.make_tmpdir()
1231         trash_at = '2140-01-01'
1232         end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
1233         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1234             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1235         col = self.run_and_find_collection(
1236             "",
1237             ['--no-progress', '--trash-at', trash_at, tmpdir])
1238         self.assertNotEqual(None, col['uuid'])
1239         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1240         if time.daylight:
1241             offset = datetime.timedelta(seconds=time.altzone)
1242         else:
1243             offset = datetime.timedelta(seconds=time.timezone)
1244         self.assertEqual(
1245             ciso8601.parse_datetime(trash_at) + end_of_day + offset,
1246             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1247
1248     def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
1249         cases = ['2100', '210010','2100-10', '2100-Oct']
1250         tmpdir = self.make_tmpdir()
1251         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1252             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1253         for test_datetime in cases:
1254             with self.assertRaises(AssertionError):
1255                 self.run_and_find_collection(
1256                     "",
1257                     ['--no-progress', '--trash-at', test_datetime, tmpdir])
1258
1259     def test_put_collection_with_relative_expiring_datetime(self):
1260         expire_after = 7
1261         dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1262         tmpdir = self.make_tmpdir()
1263         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1264             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1265         col = self.run_and_find_collection(
1266             "",
1267             ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1268         self.assertNotEqual(None, col['uuid'])
1269         dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1270         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1271         trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
1272         self.assertTrue(dt_before < trash_at)
1273         self.assertTrue(dt_after > trash_at)
1274
1275     def test_put_collection_with_invalid_relative_expiring_datetime(self):
1276         expire_after = 0 # Must be >= 1
1277         tmpdir = self.make_tmpdir()
1278         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1279             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1280         with self.assertRaises(AssertionError):
1281             self.run_and_find_collection(
1282                 "",
1283                 ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1284
1285     def test_upload_directory_reference_without_trailing_slash(self):
1286         tmpdir1 = self.make_tmpdir()
1287         tmpdir2 = self.make_tmpdir()
1288         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1289             f.write('This is foo')
1290         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1291             f.write('This is not foo')
1292         # Upload one directory and one file
1293         col = self.run_and_find_collection("", ['--no-progress',
1294                                                 tmpdir1,
1295                                                 os.path.join(tmpdir2, 'bar')])
1296         self.assertNotEqual(None, col['uuid'])
1297         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1298         # Check that 'foo' was written inside a subcollection
1299         # OTOH, 'bar' should have been directly uploaded on the root collection
1300         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
1301
1302     def test_upload_directory_reference_with_trailing_slash(self):
1303         tmpdir1 = self.make_tmpdir()
1304         tmpdir2 = self.make_tmpdir()
1305         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1306             f.write('This is foo')
1307         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1308             f.write('This is not foo')
1309         # Upload one directory (with trailing slash) and one file
1310         col = self.run_and_find_collection("", ['--no-progress',
1311                                                 tmpdir1 + os.sep,
1312                                                 os.path.join(tmpdir2, 'bar')])
1313         self.assertNotEqual(None, col['uuid'])
1314         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1315         # Check that 'foo' and 'bar' were written at the same level
1316         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
1317
1318     def test_put_collection_with_high_redundancy(self):
1319         # Write empty data: we're not testing CollectionWriter, just
1320         # making sure collections.create tells the API server what our
1321         # desired replication level is.
1322         collection = self.run_and_find_collection("", ['--replication', '4'])
1323         self.assertEqual(4, collection['replication_desired'])
1324
1325     def test_put_collection_with_default_redundancy(self):
1326         collection = self.run_and_find_collection("")
1327         self.assertEqual(None, collection['replication_desired'])
1328
1329     def test_put_collection_with_unnamed_project_link(self):
1330         link = self.run_and_find_collection(
1331             "Test unnamed collection",
1332             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
1333         username = pwd.getpwuid(os.getuid()).pw_name
1334         self.assertRegex(
1335             link['name'],
1336             r'^Saved at .* by {}@'.format(re.escape(username)))
1337
1338     def test_put_collection_with_name_and_no_project(self):
1339         link_name = 'Test Collection Link in home project'
1340         collection = self.run_and_find_collection(
1341             "Test named collection in home project",
1342             ['--portable-data-hash', '--name', link_name])
1343         self.assertEqual(link_name, collection['name'])
1344         my_user_uuid = self.current_user()['uuid']
1345         self.assertEqual(my_user_uuid, collection['owner_uuid'])
1346
1347     def test_put_collection_with_named_project_link(self):
1348         link_name = 'Test auto Collection Link'
1349         collection = self.run_and_find_collection("Test named collection",
1350                                       ['--portable-data-hash',
1351                                        '--name', link_name,
1352                                        '--project-uuid', self.PROJECT_UUID])
1353         self.assertEqual(link_name, collection['name'])
1354
1355     def test_put_collection_with_storage_classes_specified(self):
1356         collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
1357         self.assertEqual(len(collection['storage_classes_desired']), 1)
1358         self.assertEqual(collection['storage_classes_desired'][0], 'hot')
1359
1360     def test_put_collection_with_multiple_storage_classes_specified(self):
1361         collection = self.run_and_find_collection("", ['--storage-classes', ' foo, bar  ,baz'])
1362         self.assertEqual(len(collection['storage_classes_desired']), 3)
1363         self.assertEqual(collection['storage_classes_desired'], ['foo', 'bar', 'baz'])
1364
1365     def test_put_collection_without_storage_classes_specified(self):
1366         collection = self.run_and_find_collection("")
1367         self.assertEqual(len(collection['storage_classes_desired']), 1)
1368         self.assertEqual(collection['storage_classes_desired'][0], 'default')
1369
1370     def test_exclude_filename_pattern(self):
1371         tmpdir = self.make_tmpdir()
1372         tmpsubdir = os.path.join(tmpdir, 'subdir')
1373         os.mkdir(tmpsubdir)
1374         for fname in ['file1', 'file2', 'file3']:
1375             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1376                 f.write("This is %s" % fname)
1377             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1378                 f.write("This is %s" % fname)
1379         col = self.run_and_find_collection("", ['--no-progress',
1380                                                 '--exclude', '*2.txt',
1381                                                 '--exclude', 'file3.*',
1382                                                  tmpdir])
1383         self.assertNotEqual(None, col['uuid'])
1384         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1385         # None of the file2.txt & file3.txt should have been uploaded
1386         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
1387         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
1388         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
1389
1390     def test_exclude_filepath_pattern(self):
1391         tmpdir = self.make_tmpdir()
1392         tmpsubdir = os.path.join(tmpdir, 'subdir')
1393         os.mkdir(tmpsubdir)
1394         for fname in ['file1', 'file2', 'file3']:
1395             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1396                 f.write("This is %s" % fname)
1397             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1398                 f.write("This is %s" % fname)
1399         col = self.run_and_find_collection("", ['--no-progress',
1400                                                 '--exclude', 'subdir/*2.txt',
1401                                                 '--exclude', './file1.*',
1402                                                  tmpdir])
1403         self.assertNotEqual(None, col['uuid'])
1404         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1405         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1406         self.assertNotRegex(c['manifest_text'],
1407                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1408         self.assertNotRegex(c['manifest_text'],
1409                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1410         self.assertRegex(c['manifest_text'],
1411                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1412         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1413
1414     def test_unicode_on_filename(self):
1415         tmpdir = self.make_tmpdir()
1416         fname = u"iā¤arvados.txt"
1417         with open(os.path.join(tmpdir, fname), 'w') as f:
1418             f.write("This is a unicode named file")
1419         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1420         self.assertNotEqual(None, col['uuid'])
1421         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1422         self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
1423
1424     def test_silent_mode_no_errors(self):
1425         self.authorize_with('active')
1426         tmpdir = self.make_tmpdir()
1427         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1428             f.write('hello world')
1429         pipe = subprocess.Popen(
1430             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1431             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1432             stderr=subprocess.PIPE, env=self.ENVIRON)
1433         stdout, stderr = pipe.communicate()
1434         # No console output should occur on normal operations
1435         self.assertNotRegex(stderr.decode(), r'.+')
1436         self.assertNotRegex(stdout.decode(), r'.+')
1437
1438     def test_silent_mode_does_not_avoid_error_messages(self):
1439         self.authorize_with('active')
1440         pipe = subprocess.Popen(
1441             [sys.executable, arv_put.__file__] + ['--silent',
1442                                                   '/path/not/existant'],
1443             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1444             stderr=subprocess.PIPE, env=self.ENVIRON)
1445         stdout, stderr = pipe.communicate()
1446         # Error message should be displayed when errors happen
1447         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1448         self.assertNotRegex(stdout.decode(), r'.+')
1449
1450
1451 if __name__ == '__main__':
1452     unittest.main()