18870: Need to declare NODES as array
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: Apache-2.0
6
7 from __future__ import absolute_import
8 from __future__ import division
9 from future import standard_library
10 standard_library.install_aliases()
11 from builtins import str
12 from builtins import range
13 from functools import partial
14 import apiclient
15 import ciso8601
16 import datetime
17 import json
18 import logging
19 import mock
20 import multiprocessing
21 import os
22 import pwd
23 import random
24 import re
25 import select
26 import shutil
27 import signal
28 import subprocess
29 import sys
30 import tempfile
31 import time
32 import unittest
33 import uuid
34
35 import arvados
36 import arvados.commands.put as arv_put
37 from . import arvados_testutil as tutil
38
39 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
40 from . import run_test_server
41
42 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
43     CACHE_ARGSET = [
44         [],
45         ['/dev/null'],
46         ['/dev/null', '--filename', 'empty'],
47         ['/tmp']
48         ]
49
50     def tearDown(self):
51         super(ArvadosPutResumeCacheTest, self).tearDown()
52         try:
53             self.last_cache.destroy()
54         except AttributeError:
55             pass
56
57     def cache_path_from_arglist(self, arglist):
58         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
59
60     def test_cache_names_stable(self):
61         for argset in self.CACHE_ARGSET:
62             self.assertEqual(self.cache_path_from_arglist(argset),
63                               self.cache_path_from_arglist(argset),
64                               "cache name changed for {}".format(argset))
65
66     def test_cache_names_unique(self):
67         results = []
68         for argset in self.CACHE_ARGSET:
69             path = self.cache_path_from_arglist(argset)
70             self.assertNotIn(path, results)
71             results.append(path)
72
73     def test_cache_names_simple(self):
74         # The goal here is to make sure the filename doesn't use characters
75         # reserved by the filesystem.  Feel free to adjust this regexp as
76         # long as it still does that.
77         bad_chars = re.compile(r'[^-\.\w]')
78         for argset in self.CACHE_ARGSET:
79             path = self.cache_path_from_arglist(argset)
80             self.assertFalse(bad_chars.search(os.path.basename(path)),
81                              "path too exotic: {}".format(path))
82
83     def test_cache_names_ignore_argument_order(self):
84         self.assertEqual(
85             self.cache_path_from_arglist(['a', 'b', 'c']),
86             self.cache_path_from_arglist(['c', 'a', 'b']))
87         self.assertEqual(
88             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
89             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
90
91     def test_cache_names_differ_for_similar_paths(self):
92         # This test needs names at / that don't exist on the real filesystem.
93         self.assertNotEqual(
94             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
95             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
96
97     def test_cache_names_ignore_irrelevant_arguments(self):
98         # Workaround: parse_arguments bails on --filename with a directory.
99         path1 = self.cache_path_from_arglist(['/tmp'])
100         args = arv_put.parse_arguments(['/tmp'])
101         args.filename = 'tmp'
102         path2 = arv_put.ResumeCache.make_path(args)
103         self.assertEqual(path1, path2,
104                          "cache path considered --filename for directory")
105         self.assertEqual(
106             self.cache_path_from_arglist(['-']),
107             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
108             "cache path considered --max-manifest-depth for file")
109
110     def test_cache_names_treat_negative_manifest_depths_identically(self):
111         base_args = ['/tmp', '--max-manifest-depth']
112         self.assertEqual(
113             self.cache_path_from_arglist(base_args + ['-1']),
114             self.cache_path_from_arglist(base_args + ['-2']))
115
116     def test_cache_names_treat_stdin_consistently(self):
117         self.assertEqual(
118             self.cache_path_from_arglist(['-', '--filename', 'test']),
119             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
120
121     def test_cache_names_identical_for_synonymous_names(self):
122         self.assertEqual(
123             self.cache_path_from_arglist(['.']),
124             self.cache_path_from_arglist([os.path.realpath('.')]))
125         testdir = self.make_tmpdir()
126         looplink = os.path.join(testdir, 'loop')
127         os.symlink(testdir, looplink)
128         self.assertEqual(
129             self.cache_path_from_arglist([testdir]),
130             self.cache_path_from_arglist([looplink]))
131
132     def test_cache_names_different_by_api_host(self):
133         config = arvados.config.settings()
134         orig_host = config.get('ARVADOS_API_HOST')
135         try:
136             name1 = self.cache_path_from_arglist(['.'])
137             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
138             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
139         finally:
140             if orig_host is None:
141                 del config['ARVADOS_API_HOST']
142             else:
143                 config['ARVADOS_API_HOST'] = orig_host
144
145     @mock.patch('arvados.keep.KeepClient.head')
146     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
147         keep_client_head.side_effect = [True]
148         thing = {}
149         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
150         with tempfile.NamedTemporaryFile() as cachefile:
151             self.last_cache = arv_put.ResumeCache(cachefile.name)
152         self.last_cache.save(thing)
153         self.last_cache.close()
154         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
155         self.assertNotEqual(None, resume_cache)
156
157     @mock.patch('arvados.keep.KeepClient.head')
158     def test_resume_cache_with_finished_streams(self, keep_client_head):
159         keep_client_head.side_effect = [True]
160         thing = {}
161         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
162         with tempfile.NamedTemporaryFile() as cachefile:
163             self.last_cache = arv_put.ResumeCache(cachefile.name)
164         self.last_cache.save(thing)
165         self.last_cache.close()
166         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
167         self.assertNotEqual(None, resume_cache)
168
169     @mock.patch('arvados.keep.KeepClient.head')
170     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
171         keep_client_head.side_effect = Exception('Locator not found')
172         thing = {}
173         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
174         with tempfile.NamedTemporaryFile() as cachefile:
175             self.last_cache = arv_put.ResumeCache(cachefile.name)
176         self.last_cache.save(thing)
177         self.last_cache.close()
178         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
179         self.assertNotEqual(None, resume_cache)
180         resume_cache.check_cache()
181
182     def test_basic_cache_storage(self):
183         thing = ['test', 'list']
184         with tempfile.NamedTemporaryFile() as cachefile:
185             self.last_cache = arv_put.ResumeCache(cachefile.name)
186         self.last_cache.save(thing)
187         self.assertEqual(thing, self.last_cache.load())
188
189     def test_empty_cache(self):
190         with tempfile.NamedTemporaryFile() as cachefile:
191             cache = arv_put.ResumeCache(cachefile.name)
192         self.assertRaises(ValueError, cache.load)
193
194     def test_cache_persistent(self):
195         thing = ['test', 'list']
196         path = os.path.join(self.make_tmpdir(), 'cache')
197         cache = arv_put.ResumeCache(path)
198         cache.save(thing)
199         cache.close()
200         self.last_cache = arv_put.ResumeCache(path)
201         self.assertEqual(thing, self.last_cache.load())
202
203     def test_multiple_cache_writes(self):
204         thing = ['short', 'list']
205         with tempfile.NamedTemporaryFile() as cachefile:
206             self.last_cache = arv_put.ResumeCache(cachefile.name)
207         # Start writing an object longer than the one we test, to make
208         # sure the cache file gets truncated.
209         self.last_cache.save(['long', 'long', 'list'])
210         self.last_cache.save(thing)
211         self.assertEqual(thing, self.last_cache.load())
212
213     def test_cache_is_locked(self):
214         with tempfile.NamedTemporaryFile() as cachefile:
215             _ = arv_put.ResumeCache(cachefile.name)
216             self.assertRaises(arv_put.ResumeCacheConflict,
217                               arv_put.ResumeCache, cachefile.name)
218
219     def test_cache_stays_locked(self):
220         with tempfile.NamedTemporaryFile() as cachefile:
221             self.last_cache = arv_put.ResumeCache(cachefile.name)
222             path = cachefile.name
223         self.last_cache.save('test')
224         self.assertRaises(arv_put.ResumeCacheConflict,
225                           arv_put.ResumeCache, path)
226
227     def test_destroy_cache(self):
228         cachefile = tempfile.NamedTemporaryFile(delete=False)
229         try:
230             cache = arv_put.ResumeCache(cachefile.name)
231             cache.save('test')
232             cache.destroy()
233             try:
234                 arv_put.ResumeCache(cachefile.name)
235             except arv_put.ResumeCacheConflict:
236                 self.fail("could not load cache after destroying it")
237             self.assertRaises(ValueError, cache.load)
238         finally:
239             if os.path.exists(cachefile.name):
240                 os.unlink(cachefile.name)
241
242     def test_restart_cache(self):
243         path = os.path.join(self.make_tmpdir(), 'cache')
244         cache = arv_put.ResumeCache(path)
245         cache.save('test')
246         cache.restart()
247         self.assertRaises(ValueError, cache.load)
248         self.assertRaises(arv_put.ResumeCacheConflict,
249                           arv_put.ResumeCache, path)
250
251
252 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
253                           ArvadosBaseTestCase):
254
255     def setUp(self):
256         super(ArvPutUploadJobTest, self).setUp()
257         run_test_server.authorize_with('active')
258         # Temp files creation
259         self.tempdir = tempfile.mkdtemp()
260         subdir = os.path.join(self.tempdir, 'subdir')
261         os.mkdir(subdir)
262         data = "x" * 1024 # 1 KB
263         for i in range(1, 5):
264             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
265                 f.write(data * i)
266         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
267             f.write(data * 5)
268         # Large temp file for resume test
269         _, self.large_file_name = tempfile.mkstemp()
270         fileobj = open(self.large_file_name, 'w')
271         # Make sure to write just a little more than one block
272         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
273             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
274             fileobj.write(data)
275         fileobj.close()
276         # Temp dir containing small files to be repacked
277         self.small_files_dir = tempfile.mkdtemp()
278         data = 'y' * 1024 * 1024 # 1 MB
279         for i in range(1, 70):
280             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
281                 f.write(data + str(i))
282         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
283         # Temp dir to hold a symlink to other temp dir
284         self.tempdir_with_symlink = tempfile.mkdtemp()
285         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
286         os.symlink(os.path.join(self.tempdir, '1'),
287                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
288
289     def tearDown(self):
290         super(ArvPutUploadJobTest, self).tearDown()
291         shutil.rmtree(self.tempdir)
292         os.unlink(self.large_file_name)
293         shutil.rmtree(self.small_files_dir)
294         shutil.rmtree(self.tempdir_with_symlink)
295
296     def test_non_regular_files_are_ignored_except_symlinks_to_dirs(self):
297         def pfunc(x):
298             with open(x, 'w') as f:
299                 f.write('test')
300         fifo_filename = 'fifo-file'
301         fifo_path = os.path.join(self.tempdir_with_symlink, fifo_filename)
302         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
303         os.mkfifo(fifo_path)
304         producer = multiprocessing.Process(target=pfunc, args=(fifo_path,))
305         producer.start()
306         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
307         cwriter.start(save_collection=False)
308         if producer.exitcode is None:
309             # If the producer is still running, kill it. This should always be
310             # before any assertion that may fail.
311             producer.terminate()
312             producer.join(1)
313         self.assertIn('linkeddir', cwriter.manifest_text())
314         self.assertNotIn(fifo_filename, cwriter.manifest_text())
315
316     def test_symlinks_are_followed_by_default(self):
317         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
318         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
319         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
320         cwriter.start(save_collection=False)
321         self.assertIn('linkeddir', cwriter.manifest_text())
322         self.assertIn('linkedfile', cwriter.manifest_text())
323         cwriter.destroy_cache()
324
325     def test_symlinks_are_not_followed_when_requested(self):
326         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
327         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkedfile')))
328         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
329                                           follow_links=False)
330         cwriter.start(save_collection=False)
331         self.assertNotIn('linkeddir', cwriter.manifest_text())
332         self.assertNotIn('linkedfile', cwriter.manifest_text())
333         cwriter.destroy_cache()
334         # Check for bug #17800: passed symlinks should also be ignored.
335         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
336         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
337         cwriter.start(save_collection=False)
338         self.assertNotIn('linkeddir', cwriter.manifest_text())
339         cwriter.destroy_cache()
340
341     def test_no_empty_collection_saved(self):
342         self.assertTrue(os.path.islink(os.path.join(self.tempdir_with_symlink, 'linkeddir')))
343         linked_dir = os.path.join(self.tempdir_with_symlink, 'linkeddir')
344         cwriter = arv_put.ArvPutUploadJob([linked_dir], follow_links=False)
345         cwriter.start(save_collection=True)
346         self.assertIsNone(cwriter.manifest_locator())
347         self.assertEqual('', cwriter.manifest_text())
348         cwriter.destroy_cache()
349
350     def test_passing_nonexistant_path_raise_exception(self):
351         uuid_str = str(uuid.uuid4())
352         with self.assertRaises(arv_put.PathDoesNotExistError):
353             arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
354
355     def test_writer_works_without_cache(self):
356         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
357         cwriter.start(save_collection=False)
358         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
359
360     def test_writer_works_with_cache(self):
361         with tempfile.NamedTemporaryFile() as f:
362             f.write(b'foo')
363             f.flush()
364             cwriter = arv_put.ArvPutUploadJob([f.name])
365             cwriter.start(save_collection=False)
366             self.assertEqual(0, cwriter.bytes_skipped)
367             self.assertEqual(3, cwriter.bytes_written)
368             # Don't destroy the cache, and start another upload
369             cwriter_new = arv_put.ArvPutUploadJob([f.name])
370             cwriter_new.start(save_collection=False)
371             cwriter_new.destroy_cache()
372             self.assertEqual(3, cwriter_new.bytes_skipped)
373             self.assertEqual(3, cwriter_new.bytes_written)
374
375     def make_progress_tester(self):
376         progression = []
377         def record_func(written, expected):
378             progression.append((written, expected))
379         return progression, record_func
380
381     def test_progress_reporting(self):
382         with tempfile.NamedTemporaryFile() as f:
383             f.write(b'foo')
384             f.flush()
385             for expect_count in (None, 8):
386                 progression, reporter = self.make_progress_tester()
387                 cwriter = arv_put.ArvPutUploadJob([f.name],
388                                                   reporter=reporter)
389                 cwriter.bytes_expected = expect_count
390                 cwriter.start(save_collection=False)
391                 cwriter.destroy_cache()
392                 self.assertIn((3, expect_count), progression)
393
394     def test_writer_upload_directory(self):
395         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
396         cwriter.start(save_collection=False)
397         cwriter.destroy_cache()
398         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
399
400     def test_resume_large_file_upload(self):
401         def wrapped_write(*args, **kwargs):
402             data = args[1]
403             # Exit only on last block
404             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
405                 # Simulate a checkpoint before quitting. Ensure block commit.
406                 self.writer._update(final=True)
407                 raise SystemExit("Simulated error")
408             return self.arvfile_write(*args, **kwargs)
409
410         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
411                         autospec=True) as mocked_write:
412             mocked_write.side_effect = wrapped_write
413             writer = arv_put.ArvPutUploadJob([self.large_file_name],
414                                              replication_desired=1)
415             # We'll be accessing from inside the wrapper
416             self.writer = writer
417             with self.assertRaises(SystemExit):
418                 writer.start(save_collection=False)
419             # Confirm that the file was partially uploaded
420             self.assertGreater(writer.bytes_written, 0)
421             self.assertLess(writer.bytes_written,
422                             os.path.getsize(self.large_file_name))
423         # Retry the upload
424         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
425                                           replication_desired=1)
426         writer2.start(save_collection=False)
427         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
428                          os.path.getsize(self.large_file_name))
429         writer2.destroy_cache()
430         del(self.writer)
431
432     # Test for bug #11002
433     def test_graceful_exit_while_repacking_small_blocks(self):
434         def wrapped_commit(*args, **kwargs):
435             raise SystemExit("Simulated error")
436
437         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
438                         autospec=True) as mocked_commit:
439             mocked_commit.side_effect = wrapped_commit
440             # Upload a little more than 1 block, wrapped_commit will make the first block
441             # commit to fail.
442             # arv-put should not exit with an exception by trying to commit the collection
443             # as it's in an inconsistent state.
444             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
445                                              replication_desired=1)
446             try:
447                 with self.assertRaises(SystemExit):
448                     writer.start(save_collection=False)
449             except arvados.arvfile.UnownedBlockError:
450                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
451         writer.destroy_cache()
452
453     def test_no_resume_when_asked(self):
454         def wrapped_write(*args, **kwargs):
455             data = args[1]
456             # Exit only on last block
457             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
458                 # Simulate a checkpoint before quitting.
459                 self.writer._update()
460                 raise SystemExit("Simulated error")
461             return self.arvfile_write(*args, **kwargs)
462
463         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
464                         autospec=True) as mocked_write:
465             mocked_write.side_effect = wrapped_write
466             writer = arv_put.ArvPutUploadJob([self.large_file_name],
467                                              replication_desired=1)
468             # We'll be accessing from inside the wrapper
469             self.writer = writer
470             with self.assertRaises(SystemExit):
471                 writer.start(save_collection=False)
472             # Confirm that the file was partially uploaded
473             self.assertGreater(writer.bytes_written, 0)
474             self.assertLess(writer.bytes_written,
475                             os.path.getsize(self.large_file_name))
476         # Retry the upload, this time without resume
477         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
478                                           replication_desired=1,
479                                           resume=False)
480         writer2.start(save_collection=False)
481         self.assertEqual(writer2.bytes_skipped, 0)
482         self.assertEqual(writer2.bytes_written,
483                          os.path.getsize(self.large_file_name))
484         writer2.destroy_cache()
485         del(self.writer)
486
487     def test_no_resume_when_no_cache(self):
488         def wrapped_write(*args, **kwargs):
489             data = args[1]
490             # Exit only on last block
491             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
492                 # Simulate a checkpoint before quitting.
493                 self.writer._update()
494                 raise SystemExit("Simulated error")
495             return self.arvfile_write(*args, **kwargs)
496
497         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
498                         autospec=True) as mocked_write:
499             mocked_write.side_effect = wrapped_write
500             writer = arv_put.ArvPutUploadJob([self.large_file_name],
501                                              replication_desired=1)
502             # We'll be accessing from inside the wrapper
503             self.writer = writer
504             with self.assertRaises(SystemExit):
505                 writer.start(save_collection=False)
506             # Confirm that the file was partially uploaded
507             self.assertGreater(writer.bytes_written, 0)
508             self.assertLess(writer.bytes_written,
509                             os.path.getsize(self.large_file_name))
510         # Retry the upload, this time without cache usage
511         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
512                                           replication_desired=1,
513                                           resume=False,
514                                           use_cache=False)
515         writer2.start(save_collection=False)
516         self.assertEqual(writer2.bytes_skipped, 0)
517         self.assertEqual(writer2.bytes_written,
518                          os.path.getsize(self.large_file_name))
519         writer2.destroy_cache()
520         del(self.writer)
521
522     def test_dry_run_feature(self):
523         def wrapped_write(*args, **kwargs):
524             data = args[1]
525             # Exit only on last block
526             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
527                 # Simulate a checkpoint before quitting.
528                 self.writer._update()
529                 raise SystemExit("Simulated error")
530             return self.arvfile_write(*args, **kwargs)
531
532         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
533                         autospec=True) as mocked_write:
534             mocked_write.side_effect = wrapped_write
535             writer = arv_put.ArvPutUploadJob([self.large_file_name],
536                                              replication_desired=1)
537             # We'll be accessing from inside the wrapper
538             self.writer = writer
539             with self.assertRaises(SystemExit):
540                 writer.start(save_collection=False)
541             # Confirm that the file was partially uploaded
542             self.assertGreater(writer.bytes_written, 0)
543             self.assertLess(writer.bytes_written,
544                             os.path.getsize(self.large_file_name))
545         with self.assertRaises(arv_put.ArvPutUploadIsPending):
546             # Retry the upload using dry_run to check if there is a pending upload
547             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
548                                               replication_desired=1,
549                                               dry_run=True)
550         # Complete the pending upload
551         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
552                                           replication_desired=1)
553         writer3.start(save_collection=False)
554         with self.assertRaises(arv_put.ArvPutUploadNotPending):
555             # Confirm there's no pending upload with dry_run=True
556             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
557                                               replication_desired=1,
558                                               dry_run=True)
559         # Test obvious cases
560         with self.assertRaises(arv_put.ArvPutUploadIsPending):
561             arv_put.ArvPutUploadJob([self.large_file_name],
562                                     replication_desired=1,
563                                     dry_run=True,
564                                     resume=False,
565                                     use_cache=False)
566         with self.assertRaises(arv_put.ArvPutUploadIsPending):
567             arv_put.ArvPutUploadJob([self.large_file_name],
568                                     replication_desired=1,
569                                     dry_run=True,
570                                     resume=False)
571         del(self.writer)
572
573 class CachedManifestValidationTest(ArvadosBaseTestCase):
574     class MockedPut(arv_put.ArvPutUploadJob):
575         def __init__(self, cached_manifest=None):
576             self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
577             self._state['manifest'] = cached_manifest
578             self._api_client = mock.MagicMock()
579             self.logger = mock.MagicMock()
580             self.num_retries = 1
581
582     def datetime_to_hex(self, dt):
583         return hex(int(time.mktime(dt.timetuple())))[2:]
584
585     def setUp(self):
586         super(CachedManifestValidationTest, self).setUp()
587         self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
588         self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
589         self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
590
591     def test_empty_cached_manifest_is_valid(self):
592         put_mock = self.MockedPut()
593         self.assertEqual(None, put_mock._state.get('manifest'))
594         self.assertTrue(put_mock._cached_manifest_valid())
595         put_mock._state['manifest'] = ''
596         self.assertTrue(put_mock._cached_manifest_valid())
597
598     def test_signature_cases(self):
599         now = datetime.datetime.utcnow()
600         yesterday = now - datetime.timedelta(days=1)
601         lastweek = now - datetime.timedelta(days=7)
602         tomorrow = now + datetime.timedelta(days=1)
603         nextweek = now + datetime.timedelta(days=7)
604
605         def mocked_head(blocks={}, loc=None):
606             blk = loc.split('+', 1)[0]
607             if blocks.get(blk):
608                 return True
609             raise arvados.errors.KeepRequestError("mocked error - block invalid")
610
611         # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
612         cases = [
613             # All expired, reset cache - OK
614             (yesterday, lastweek, False, False, True),
615             (lastweek, yesterday, False, False, True),
616             # All non-expired valid blocks - OK
617             (tomorrow, nextweek, True, True, True),
618             (nextweek, tomorrow, True, True, True),
619             # All non-expired invalid blocks - Not OK
620             (tomorrow, nextweek, False, False, False),
621             (nextweek, tomorrow, False, False, False),
622             # One non-expired valid block - OK
623             (tomorrow, yesterday, True, False, True),
624             (yesterday, tomorrow, False, True, True),
625             # One non-expired invalid block - Not OK
626             (tomorrow, yesterday, False, False, False),
627             (yesterday, tomorrow, False, False, False),
628         ]
629         for case in cases:
630             b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
631             head_responses = {
632                 self.block1: b1_valid,
633                 self.block2: b2_valid,
634             }
635             cached_manifest = self.template % (
636                 self.datetime_to_hex(b1_expiration),
637                 self.datetime_to_hex(b2_expiration),
638             )
639             arvput = self.MockedPut(cached_manifest)
640             with mock.patch('arvados.collection.KeepClient.head') as head_mock:
641                 head_mock.side_effect = partial(mocked_head, head_responses)
642                 self.assertEqual(outcome, arvput._cached_manifest_valid(),
643                     "Case '%s' should have produced outcome '%s'" % (case, outcome)
644                 )
645                 if b1_expiration > now or b2_expiration > now:
646                     # A HEAD request should have been done
647                     head_mock.assert_called_once()
648                 else:
649                     head_mock.assert_not_called()
650
651
652 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
653     TEST_SIZE = os.path.getsize(__file__)
654
655     def test_expected_bytes_for_file(self):
656         writer = arv_put.ArvPutUploadJob([__file__])
657         self.assertEqual(self.TEST_SIZE,
658                          writer.bytes_expected)
659
660     def test_expected_bytes_for_tree(self):
661         tree = self.make_tmpdir()
662         shutil.copyfile(__file__, os.path.join(tree, 'one'))
663         shutil.copyfile(__file__, os.path.join(tree, 'two'))
664
665         writer = arv_put.ArvPutUploadJob([tree])
666         self.assertEqual(self.TEST_SIZE * 2,
667                          writer.bytes_expected)
668         writer = arv_put.ArvPutUploadJob([tree, __file__])
669         self.assertEqual(self.TEST_SIZE * 3,
670                          writer.bytes_expected)
671
672     def test_expected_bytes_for_device(self):
673         writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
674         self.assertIsNone(writer.bytes_expected)
675         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
676         self.assertIsNone(writer.bytes_expected)
677
678
679 class ArvadosPutReportTest(ArvadosBaseTestCase):
680     def test_machine_progress(self):
681         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
682             expect = ": {} written {} total\n".format(
683                 count, -1 if (total is None) else total)
684             self.assertTrue(
685                 arv_put.machine_progress(count, total).endswith(expect))
686
687     def test_known_human_progress(self):
688         for count, total in [(0, 1), (2, 4), (45, 60)]:
689             expect = '{:.1%}'.format(1.0*count/total)
690             actual = arv_put.human_progress(count, total)
691             self.assertTrue(actual.startswith('\r'))
692             self.assertIn(expect, actual)
693
694     def test_unknown_human_progress(self):
695         for count in [1, 20, 300, 4000, 50000]:
696             self.assertTrue(re.search(r'\b{}\b'.format(count),
697                                       arv_put.human_progress(count, None)))
698
699
700 class ArvPutLogFormatterTest(ArvadosBaseTestCase):
701     matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
702
703     def setUp(self):
704         super(ArvPutLogFormatterTest, self).setUp()
705         self.stderr = tutil.StringIO()
706         self.loggingHandler = logging.StreamHandler(self.stderr)
707         self.loggingHandler.setFormatter(
708             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
709         self.logger = logging.getLogger()
710         self.logger.addHandler(self.loggingHandler)
711         self.logger.setLevel(logging.DEBUG)
712
713     def tearDown(self):
714         self.logger.removeHandler(self.loggingHandler)
715         self.stderr.close()
716         self.stderr = None
717         super(ArvPutLogFormatterTest, self).tearDown()
718
719     def test_request_id_logged_only_once_on_error(self):
720         self.logger.error('Ooops, something bad happened.')
721         self.logger.error('Another bad thing just happened.')
722         log_lines = self.stderr.getvalue().split('\n')[:-1]
723         self.assertEqual(2, len(log_lines))
724         self.assertRegex(log_lines[0], self.matcher)
725         self.assertNotRegex(log_lines[1], self.matcher)
726
727     def test_request_id_logged_only_once_on_debug(self):
728         self.logger.debug('This is just a debug message.')
729         self.logger.debug('Another message, move along.')
730         log_lines = self.stderr.getvalue().split('\n')[:-1]
731         self.assertEqual(2, len(log_lines))
732         self.assertRegex(log_lines[0], self.matcher)
733         self.assertNotRegex(log_lines[1], self.matcher)
734
735     def test_request_id_not_logged_on_info(self):
736         self.logger.info('This should be a useful message')
737         log_lines = self.stderr.getvalue().split('\n')[:-1]
738         self.assertEqual(1, len(log_lines))
739         self.assertNotRegex(log_lines[0], self.matcher)
740
741 class ArvadosPutTest(run_test_server.TestCaseWithServers,
742                      ArvadosBaseTestCase,
743                      tutil.VersionChecker):
744     MAIN_SERVER = {}
745     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
746
747     def call_main_with_args(self, args):
748         self.main_stdout.seek(0, 0)
749         self.main_stdout.truncate(0)
750         self.main_stderr.seek(0, 0)
751         self.main_stderr.truncate(0)
752         return arv_put.main(args, self.main_stdout, self.main_stderr)
753
754     def call_main_on_test_file(self, args=[]):
755         with self.make_test_file() as testfile:
756             path = testfile.name
757             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
758         self.assertTrue(
759             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
760                                         '098f6bcd4621d373cade4e832627b4f6')),
761             "did not find file stream in Keep store")
762
763     def setUp(self):
764         super(ArvadosPutTest, self).setUp()
765         run_test_server.authorize_with('active')
766         arv_put.api_client = None
767         self.main_stdout = tutil.StringIO()
768         self.main_stderr = tutil.StringIO()
769         self.loggingHandler = logging.StreamHandler(self.main_stderr)
770         self.loggingHandler.setFormatter(
771             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
772         logging.getLogger().addHandler(self.loggingHandler)
773
774     def tearDown(self):
775         logging.getLogger().removeHandler(self.loggingHandler)
776         for outbuf in ['main_stdout', 'main_stderr']:
777             if hasattr(self, outbuf):
778                 getattr(self, outbuf).close()
779                 delattr(self, outbuf)
780         super(ArvadosPutTest, self).tearDown()
781
782     def test_version_argument(self):
783         with tutil.redirected_streams(
784                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
785             with self.assertRaises(SystemExit):
786                 self.call_main_with_args(['--version'])
787         self.assertVersionOutput(out, err)
788
789     def test_simple_file_put(self):
790         self.call_main_on_test_file()
791
792     def test_put_with_unwriteable_cache_dir(self):
793         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
794         cachedir = self.make_tmpdir()
795         os.chmod(cachedir, 0o0)
796         arv_put.ResumeCache.CACHE_DIR = cachedir
797         try:
798             self.call_main_on_test_file()
799         finally:
800             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
801             os.chmod(cachedir, 0o700)
802
803     def test_put_with_unwritable_cache_subdir(self):
804         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
805         cachedir = self.make_tmpdir()
806         os.chmod(cachedir, 0o0)
807         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
808         try:
809             self.call_main_on_test_file()
810         finally:
811             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
812             os.chmod(cachedir, 0o700)
813
814     def test_put_block_replication(self):
815         self.call_main_on_test_file()
816         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
817             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
818             self.call_main_on_test_file(['--replication', '1'])
819             self.call_main_on_test_file(['--replication', '4'])
820             self.call_main_on_test_file(['--replication', '5'])
821             self.assertEqual(
822                 [x[-1].get('copies') for x in put_mock.call_args_list],
823                 [1, 4, 5])
824
825     def test_normalize(self):
826         testfile1 = self.make_test_file()
827         testfile2 = self.make_test_file()
828         test_paths = [testfile1.name, testfile2.name]
829         # Reverse-sort the paths, so normalization must change their order.
830         test_paths.sort(reverse=True)
831         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
832                                  test_paths)
833         manifest = self.main_stdout.getvalue()
834         # Assert the second file we specified appears first in the manifest.
835         file_indices = [manifest.find(':' + os.path.basename(path))
836                         for path in test_paths]
837         self.assertGreater(*file_indices)
838
839     def test_error_name_without_collection(self):
840         self.assertRaises(SystemExit, self.call_main_with_args,
841                           ['--name', 'test without Collection',
842                            '--stream', '/dev/null'])
843
844     def test_error_when_project_not_found(self):
845         self.assertRaises(SystemExit,
846                           self.call_main_with_args,
847                           ['--project-uuid', self.Z_UUID])
848
849     def test_error_bad_project_uuid(self):
850         self.assertRaises(SystemExit,
851                           self.call_main_with_args,
852                           ['--project-uuid', self.Z_UUID, '--stream'])
853
854     def test_error_when_excluding_absolute_path(self):
855         tmpdir = self.make_tmpdir()
856         self.assertRaises(SystemExit,
857                           self.call_main_with_args,
858                           ['--exclude', '/some/absolute/path/*',
859                            tmpdir])
860
861     def test_api_error_handling(self):
862         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
863         coll_save_mock.side_effect = arvados.errors.ApiError(
864             fake_httplib2_response(403), b'{}')
865         with mock.patch('arvados.collection.Collection.save_new',
866                         new=coll_save_mock):
867             with self.assertRaises(SystemExit) as exc_test:
868                 self.call_main_with_args(['/dev/null'])
869             self.assertLess(0, exc_test.exception.args[0])
870             self.assertLess(0, coll_save_mock.call_count)
871             self.assertEqual("", self.main_stdout.getvalue())
872
873     def test_request_id_logging_on_error(self):
874         matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
875         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
876         coll_save_mock.side_effect = arvados.errors.ApiError(
877             fake_httplib2_response(403), b'{}')
878         with mock.patch('arvados.collection.Collection.save_new',
879                         new=coll_save_mock):
880             with self.assertRaises(SystemExit):
881                 self.call_main_with_args(['/dev/null'])
882             self.assertRegex(
883                 self.main_stderr.getvalue(), matcher)
884
885
886 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
887                             ArvadosBaseTestCase):
888     MAIN_SERVER = {}
889     KEEP_SERVER = {'blob_signing': True}
890     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
891
892     @classmethod
893     def setUpClass(cls):
894         super(ArvPutIntegrationTest, cls).setUpClass()
895         cls.ENVIRON = os.environ.copy()
896         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
897
898     def datetime_to_hex(self, dt):
899         return hex(int(time.mktime(dt.timetuple())))[2:]
900
901     def setUp(self):
902         super(ArvPutIntegrationTest, self).setUp()
903         arv_put.api_client = None
904
905     def authorize_with(self, token_name):
906         run_test_server.authorize_with(token_name)
907         for v in ["ARVADOS_API_HOST",
908                   "ARVADOS_API_HOST_INSECURE",
909                   "ARVADOS_API_TOKEN"]:
910             self.ENVIRON[v] = arvados.config.settings()[v]
911         arv_put.api_client = arvados.api('v1')
912
913     def current_user(self):
914         return arv_put.api_client.users().current().execute()
915
916     def test_check_real_project_found(self):
917         self.authorize_with('active')
918         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
919                         "did not correctly find test fixture project")
920
921     def test_check_error_finding_nonexistent_uuid(self):
922         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
923         self.authorize_with('active')
924         try:
925             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
926                                                   0)
927         except ValueError as error:
928             self.assertIn(BAD_UUID, str(error))
929         else:
930             self.assertFalse(result, "incorrectly found nonexistent project")
931
932     def test_check_error_finding_nonexistent_project(self):
933         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
934         self.authorize_with('active')
935         with self.assertRaises(apiclient.errors.HttpError):
936             arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
937                                                   0)
938
939     def test_short_put_from_stdin(self):
940         # Have to run this as an integration test since arv-put can't
941         # read from the tests' stdin.
942         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
943         # case, because the /proc entry is already gone by the time it tries.
944         pipe = subprocess.Popen(
945             [sys.executable, arv_put.__file__, '--stream'],
946             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
947             stderr=subprocess.STDOUT, env=self.ENVIRON)
948         pipe.stdin.write(b'stdin test\xa6\n')
949         pipe.stdin.close()
950         deadline = time.time() + 5
951         while (pipe.poll() is None) and (time.time() < deadline):
952             time.sleep(.1)
953         returncode = pipe.poll()
954         if returncode is None:
955             pipe.terminate()
956             self.fail("arv-put did not PUT from stdin within 5 seconds")
957         elif returncode != 0:
958             sys.stdout.write(pipe.stdout.read())
959             self.fail("arv-put returned exit code {}".format(returncode))
960         self.assertIn('1cb671b355a0c23d5d1c61d59cdb1b2b+12',
961                       pipe.stdout.read().decode())
962
963     def test_sigint_logs_request_id(self):
964         # Start arv-put, give it a chance to start up, send SIGINT,
965         # and check that its output includes the X-Request-Id.
966         input_stream = subprocess.Popen(
967             ['sleep', '10'],
968             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
969         pipe = subprocess.Popen(
970             [sys.executable, arv_put.__file__, '--stream'],
971             stdin=input_stream.stdout, stdout=subprocess.PIPE,
972             stderr=subprocess.STDOUT, env=self.ENVIRON)
973         # Wait for arv-put child process to print something (i.e., a
974         # log message) so we know its signal handler is installed.
975         select.select([pipe.stdout], [], [], 10)
976         pipe.send_signal(signal.SIGINT)
977         deadline = time.time() + 5
978         while (pipe.poll() is None) and (time.time() < deadline):
979             time.sleep(.1)
980         returncode = pipe.poll()
981         input_stream.terminate()
982         if returncode is None:
983             pipe.terminate()
984             self.fail("arv-put did not exit within 5 seconds")
985         self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
986
987     def test_ArvPutSignedManifest(self):
988         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
989         # the newly created manifest from the API server, testing to confirm
990         # that the block locators in the returned manifest are signed.
991         self.authorize_with('active')
992
993         # Before doing anything, demonstrate that the collection
994         # we're about to create is not present in our test fixture.
995         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
996         with self.assertRaises(apiclient.errors.HttpError):
997             arv_put.api_client.collections().get(
998                 uuid=manifest_uuid).execute()
999
1000         datadir = self.make_tmpdir()
1001         with open(os.path.join(datadir, "foo"), "w") as f:
1002             f.write("The quick brown fox jumped over the lazy dog")
1003         p = subprocess.Popen([sys.executable, arv_put.__file__,
1004                               os.path.join(datadir, 'foo')],
1005                              stdout=subprocess.PIPE,
1006                              stderr=subprocess.PIPE,
1007                              env=self.ENVIRON)
1008         (_, err) = p.communicate()
1009         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
1010         self.assertEqual(p.returncode, 0)
1011
1012         # The manifest text stored in the API server under the same
1013         # manifest UUID must use signed locators.
1014         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
1015         self.assertRegex(
1016             c['manifest_text'],
1017             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
1018
1019         os.remove(os.path.join(datadir, "foo"))
1020         os.rmdir(datadir)
1021
1022     def run_and_find_collection(self, text, extra_args=[]):
1023         self.authorize_with('active')
1024         pipe = subprocess.Popen(
1025             [sys.executable, arv_put.__file__] + extra_args,
1026             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1027             stderr=subprocess.PIPE, env=self.ENVIRON)
1028         stdout, stderr = pipe.communicate(text.encode())
1029         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
1030         search_key = ('portable_data_hash'
1031                       if '--portable-data-hash' in extra_args else 'uuid')
1032         collection_list = arvados.api('v1').collections().list(
1033             filters=[[search_key, '=', stdout.decode().strip()]]
1034         ).execute().get('items', [])
1035         self.assertEqual(1, len(collection_list))
1036         return collection_list[0]
1037
1038     def test_all_expired_signatures_invalidates_cache(self):
1039         self.authorize_with('active')
1040         tmpdir = self.make_tmpdir()
1041         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1042             f.write('foo')
1043         # Upload a directory and get the cache file name
1044         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1045                              stdout=subprocess.PIPE,
1046                              stderr=subprocess.PIPE,
1047                              env=self.ENVIRON)
1048         (_, err) = p.communicate()
1049         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1050         self.assertEqual(p.returncode, 0)
1051         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1052                                    err.decode()).groups()[0]
1053         self.assertTrue(os.path.isfile(cache_filepath))
1054         # Load the cache file contents and modify the manifest to simulate
1055         # an expired access token
1056         with open(cache_filepath, 'r') as c:
1057             cache = json.load(c)
1058         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1059         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1060         cache['manifest'] = re.sub(
1061             r'\@.*? ',
1062             "@{} ".format(self.datetime_to_hex(a_month_ago)),
1063             cache['manifest'])
1064         with open(cache_filepath, 'w') as c:
1065             c.write(json.dumps(cache))
1066         # Re-run the upload and expect to get an invalid cache message
1067         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1068                              stdout=subprocess.PIPE,
1069                              stderr=subprocess.PIPE,
1070                              env=self.ENVIRON)
1071         (_, err) = p.communicate()
1072         self.assertRegex(
1073             err.decode(),
1074             r'INFO: Cache expired, starting from scratch.*')
1075         self.assertEqual(p.returncode, 0)
1076
1077     def test_invalid_signature_in_cache(self):
1078         for batch_mode in [False, True]:
1079             self.authorize_with('active')
1080             tmpdir = self.make_tmpdir()
1081             with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1082                 f.write('foo')
1083             # Upload a directory and get the cache file name
1084             arv_put_args = [tmpdir]
1085             if batch_mode:
1086                 arv_put_args = ['--batch'] + arv_put_args
1087             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1088                                 stdout=subprocess.PIPE,
1089                                 stderr=subprocess.PIPE,
1090                                 env=self.ENVIRON)
1091             (_, err) = p.communicate()
1092             self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1093             self.assertEqual(p.returncode, 0)
1094             cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1095                                     err.decode()).groups()[0]
1096             self.assertTrue(os.path.isfile(cache_filepath))
1097             # Load the cache file contents and modify the manifest to simulate
1098             # an invalid access token
1099             with open(cache_filepath, 'r') as c:
1100                 cache = json.load(c)
1101             self.assertRegex(cache['manifest'], r'\+A\S+\@')
1102             cache['manifest'] = re.sub(
1103                 r'\+A.*\@',
1104                 "+Aabcdef0123456789abcdef0123456789abcdef01@",
1105                 cache['manifest'])
1106             with open(cache_filepath, 'w') as c:
1107                 c.write(json.dumps(cache))
1108             # Re-run the upload and expect to get an invalid cache message
1109             p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
1110                                 stdout=subprocess.PIPE,
1111                                 stderr=subprocess.PIPE,
1112                                 env=self.ENVIRON)
1113             (_, err) = p.communicate()
1114             if not batch_mode:
1115                 self.assertRegex(
1116                     err.decode(),
1117                     r'ERROR: arv-put: Resume cache contains invalid signature.*')
1118                 self.assertEqual(p.returncode, 1)
1119             else:
1120                 self.assertRegex(
1121                     err.decode(),
1122                     r'Invalid signatures on cache file \'.*\' while being run in \'batch mode\' -- continuing anyways.*')
1123                 self.assertEqual(p.returncode, 0)
1124
1125     def test_single_expired_signature_reuploads_file(self):
1126         self.authorize_with('active')
1127         tmpdir = self.make_tmpdir()
1128         with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
1129             f.write('foo')
1130         # Write a second file on its own subdir to force a new stream
1131         os.mkdir(os.path.join(tmpdir, 'bar'))
1132         with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
1133             f.write('bar')
1134         # Upload a directory and get the cache file name
1135         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1136                              stdout=subprocess.PIPE,
1137                              stderr=subprocess.PIPE,
1138                              env=self.ENVIRON)
1139         (_, err) = p.communicate()
1140         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1141         self.assertEqual(p.returncode, 0)
1142         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1143                                    err.decode()).groups()[0]
1144         self.assertTrue(os.path.isfile(cache_filepath))
1145         # Load the cache file contents and modify the manifest to simulate
1146         # an expired access token
1147         with open(cache_filepath, 'r') as c:
1148             cache = json.load(c)
1149         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1150         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1151         # Make one of the signatures appear to have expired
1152         cache['manifest'] = re.sub(
1153             r'\@.*? 3:3:barfile.txt',
1154             "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
1155             cache['manifest'])
1156         with open(cache_filepath, 'w') as c:
1157             c.write(json.dumps(cache))
1158         # Re-run the upload and expect to get an invalid cache message
1159         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1160                              stdout=subprocess.PIPE,
1161                              stderr=subprocess.PIPE,
1162                              env=self.ENVIRON)
1163         (_, err) = p.communicate()
1164         self.assertRegex(
1165             err.decode(),
1166             r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
1167         self.assertEqual(p.returncode, 0)
1168         # Confirm that the resulting cache is different from the last run.
1169         with open(cache_filepath, 'r') as c2:
1170             new_cache = json.load(c2)
1171         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
1172
1173     def test_put_collection_with_later_update(self):
1174         tmpdir = self.make_tmpdir()
1175         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1176             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1177         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1178         self.assertNotEqual(None, col['uuid'])
1179         # Add a new file to the directory
1180         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
1181             f.write('The quick brown fox jumped over the lazy dog')
1182         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
1183         self.assertEqual(col['uuid'], updated_col['uuid'])
1184         # Get the manifest and check that the new file is being included
1185         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
1186         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
1187
1188     def test_put_collection_with_utc_expiring_datetime(self):
1189         tmpdir = self.make_tmpdir()
1190         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
1191         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1192             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1193         col = self.run_and_find_collection(
1194             "",
1195             ['--no-progress', '--trash-at', trash_at, tmpdir])
1196         self.assertNotEqual(None, col['uuid'])
1197         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1198         self.assertEqual(ciso8601.parse_datetime(trash_at),
1199             ciso8601.parse_datetime(c['trash_at']))
1200
1201     def test_put_collection_with_timezone_aware_expiring_datetime(self):
1202         tmpdir = self.make_tmpdir()
1203         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
1204         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1205             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1206         col = self.run_and_find_collection(
1207             "",
1208             ['--no-progress', '--trash-at', trash_at, tmpdir])
1209         self.assertNotEqual(None, col['uuid'])
1210         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1211         self.assertEqual(
1212             ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
1213             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1214
1215     def test_put_collection_with_timezone_naive_expiring_datetime(self):
1216         tmpdir = self.make_tmpdir()
1217         trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
1218         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1219             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1220         col = self.run_and_find_collection(
1221             "",
1222             ['--no-progress', '--trash-at', trash_at, tmpdir])
1223         self.assertNotEqual(None, col['uuid'])
1224         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1225         if time.daylight:
1226             offset = datetime.timedelta(seconds=time.altzone)
1227         else:
1228             offset = datetime.timedelta(seconds=time.timezone)
1229         self.assertEqual(
1230             ciso8601.parse_datetime(trash_at) + offset,
1231             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1232
1233     def test_put_collection_with_expiring_date_only(self):
1234         tmpdir = self.make_tmpdir()
1235         trash_at = '2140-01-01'
1236         end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
1237         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1238             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1239         col = self.run_and_find_collection(
1240             "",
1241             ['--no-progress', '--trash-at', trash_at, tmpdir])
1242         self.assertNotEqual(None, col['uuid'])
1243         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1244         if time.daylight:
1245             offset = datetime.timedelta(seconds=time.altzone)
1246         else:
1247             offset = datetime.timedelta(seconds=time.timezone)
1248         self.assertEqual(
1249             ciso8601.parse_datetime(trash_at) + end_of_day + offset,
1250             ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
1251
1252     def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
1253         cases = ['2100', '210010','2100-10', '2100-Oct']
1254         tmpdir = self.make_tmpdir()
1255         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1256             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1257         for test_datetime in cases:
1258             with self.assertRaises(AssertionError):
1259                 self.run_and_find_collection(
1260                     "",
1261                     ['--no-progress', '--trash-at', test_datetime, tmpdir])
1262
1263     def test_put_collection_with_relative_expiring_datetime(self):
1264         expire_after = 7
1265         dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1266         tmpdir = self.make_tmpdir()
1267         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1268             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1269         col = self.run_and_find_collection(
1270             "",
1271             ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1272         self.assertNotEqual(None, col['uuid'])
1273         dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
1274         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1275         trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
1276         self.assertTrue(dt_before < trash_at)
1277         self.assertTrue(dt_after > trash_at)
1278
1279     def test_put_collection_with_invalid_relative_expiring_datetime(self):
1280         expire_after = 0 # Must be >= 1
1281         tmpdir = self.make_tmpdir()
1282         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1283             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1284         with self.assertRaises(AssertionError):
1285             self.run_and_find_collection(
1286                 "",
1287                 ['--no-progress', '--trash-after', str(expire_after), tmpdir])
1288
1289     def test_upload_directory_reference_without_trailing_slash(self):
1290         tmpdir1 = self.make_tmpdir()
1291         tmpdir2 = self.make_tmpdir()
1292         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1293             f.write('This is foo')
1294         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1295             f.write('This is not foo')
1296         # Upload one directory and one file
1297         col = self.run_and_find_collection("", ['--no-progress',
1298                                                 tmpdir1,
1299                                                 os.path.join(tmpdir2, 'bar')])
1300         self.assertNotEqual(None, col['uuid'])
1301         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1302         # Check that 'foo' was written inside a subcollection
1303         # OTOH, 'bar' should have been directly uploaded on the root collection
1304         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
1305
1306     def test_upload_directory_reference_with_trailing_slash(self):
1307         tmpdir1 = self.make_tmpdir()
1308         tmpdir2 = self.make_tmpdir()
1309         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1310             f.write('This is foo')
1311         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1312             f.write('This is not foo')
1313         # Upload one directory (with trailing slash) and one file
1314         col = self.run_and_find_collection("", ['--no-progress',
1315                                                 tmpdir1 + os.sep,
1316                                                 os.path.join(tmpdir2, 'bar')])
1317         self.assertNotEqual(None, col['uuid'])
1318         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1319         # Check that 'foo' and 'bar' were written at the same level
1320         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
1321
1322     def test_put_collection_with_high_redundancy(self):
1323         # Write empty data: we're not testing CollectionWriter, just
1324         # making sure collections.create tells the API server what our
1325         # desired replication level is.
1326         collection = self.run_and_find_collection("", ['--replication', '4'])
1327         self.assertEqual(4, collection['replication_desired'])
1328
1329     def test_put_collection_with_default_redundancy(self):
1330         collection = self.run_and_find_collection("")
1331         self.assertEqual(None, collection['replication_desired'])
1332
1333     def test_put_collection_with_unnamed_project_link(self):
1334         link = self.run_and_find_collection(
1335             "Test unnamed collection",
1336             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
1337         username = pwd.getpwuid(os.getuid()).pw_name
1338         self.assertRegex(
1339             link['name'],
1340             r'^Saved at .* by {}@'.format(re.escape(username)))
1341
1342     def test_put_collection_with_name_and_no_project(self):
1343         link_name = 'Test Collection Link in home project'
1344         collection = self.run_and_find_collection(
1345             "Test named collection in home project",
1346             ['--portable-data-hash', '--name', link_name])
1347         self.assertEqual(link_name, collection['name'])
1348         my_user_uuid = self.current_user()['uuid']
1349         self.assertEqual(my_user_uuid, collection['owner_uuid'])
1350
1351     def test_put_collection_with_named_project_link(self):
1352         link_name = 'Test auto Collection Link'
1353         collection = self.run_and_find_collection("Test named collection",
1354                                       ['--portable-data-hash',
1355                                        '--name', link_name,
1356                                        '--project-uuid', self.PROJECT_UUID])
1357         self.assertEqual(link_name, collection['name'])
1358
1359     def test_put_collection_with_storage_classes_specified(self):
1360         collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
1361         self.assertEqual(len(collection['storage_classes_desired']), 1)
1362         self.assertEqual(collection['storage_classes_desired'][0], 'hot')
1363
1364     def test_put_collection_with_multiple_storage_classes_specified(self):
1365         collection = self.run_and_find_collection("", ['--storage-classes', ' foo, bar  ,baz'])
1366         self.assertEqual(len(collection['storage_classes_desired']), 3)
1367         self.assertEqual(collection['storage_classes_desired'], ['foo', 'bar', 'baz'])
1368
1369     def test_put_collection_without_storage_classes_specified(self):
1370         collection = self.run_and_find_collection("")
1371         self.assertEqual(len(collection['storage_classes_desired']), 1)
1372         self.assertEqual(collection['storage_classes_desired'][0], 'default')
1373
1374     def test_exclude_filename_pattern(self):
1375         tmpdir = self.make_tmpdir()
1376         tmpsubdir = os.path.join(tmpdir, 'subdir')
1377         os.mkdir(tmpsubdir)
1378         for fname in ['file1', 'file2', 'file3']:
1379             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1380                 f.write("This is %s" % fname)
1381             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1382                 f.write("This is %s" % fname)
1383         col = self.run_and_find_collection("", ['--no-progress',
1384                                                 '--exclude', '*2.txt',
1385                                                 '--exclude', 'file3.*',
1386                                                  tmpdir])
1387         self.assertNotEqual(None, col['uuid'])
1388         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1389         # None of the file2.txt & file3.txt should have been uploaded
1390         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
1391         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
1392         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
1393
1394     def test_exclude_filepath_pattern(self):
1395         tmpdir = self.make_tmpdir()
1396         tmpsubdir = os.path.join(tmpdir, 'subdir')
1397         os.mkdir(tmpsubdir)
1398         for fname in ['file1', 'file2', 'file3']:
1399             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1400                 f.write("This is %s" % fname)
1401             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1402                 f.write("This is %s" % fname)
1403         col = self.run_and_find_collection("", ['--no-progress',
1404                                                 '--exclude', 'subdir/*2.txt',
1405                                                 '--exclude', './file1.*',
1406                                                  tmpdir])
1407         self.assertNotEqual(None, col['uuid'])
1408         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1409         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1410         self.assertNotRegex(c['manifest_text'],
1411                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1412         self.assertNotRegex(c['manifest_text'],
1413                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1414         self.assertRegex(c['manifest_text'],
1415                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1416         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1417
1418     def test_unicode_on_filename(self):
1419         tmpdir = self.make_tmpdir()
1420         fname = u"iā¤arvados.txt"
1421         with open(os.path.join(tmpdir, fname), 'w') as f:
1422             f.write("This is a unicode named file")
1423         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1424         self.assertNotEqual(None, col['uuid'])
1425         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1426         self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
1427
1428     def test_silent_mode_no_errors(self):
1429         self.authorize_with('active')
1430         tmpdir = self.make_tmpdir()
1431         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1432             f.write('hello world')
1433         pipe = subprocess.Popen(
1434             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1435             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1436             stderr=subprocess.PIPE, env=self.ENVIRON)
1437         stdout, stderr = pipe.communicate()
1438         # No console output should occur on normal operations
1439         self.assertNotRegex(stderr.decode(), r'.+')
1440         self.assertNotRegex(stdout.decode(), r'.+')
1441
1442     def test_silent_mode_does_not_avoid_error_messages(self):
1443         self.authorize_with('active')
1444         pipe = subprocess.Popen(
1445             [sys.executable, arv_put.__file__] + ['--silent',
1446                                                   '/path/not/existant'],
1447             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1448             stderr=subprocess.PIPE, env=self.ENVIRON)
1449         stdout, stderr = pipe.communicate()
1450         # Error message should be displayed when errors happen
1451         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1452         self.assertNotRegex(stdout.decode(), r'.+')
1453
1454
1455 if __name__ == '__main__':
1456     unittest.main()