14819: Merge branch 'master' into 14819-arvados-jobs-on-stretch
[arvados.git] / sdk / python / tests / test_arv_put.py
1 # -*- coding: utf-8 -*-
2
3 # Copyright (C) The Arvados Authors. All rights reserved.
4 #
5 # SPDX-License-Identifier: Apache-2.0
6
7 from __future__ import absolute_import
8 from __future__ import division
9 from future import standard_library
10 standard_library.install_aliases()
11 from builtins import str
12 from builtins import range
13 from functools import partial
14 import apiclient
15 import datetime
16 import hashlib
17 import json
18 import logging
19 import mock
20 import os
21 import pwd
22 import random
23 import re
24 import select
25 import shutil
26 import signal
27 import subprocess
28 import sys
29 import tempfile
30 import time
31 import unittest
32 import uuid
33 import yaml
34
35 import arvados
36 import arvados.commands.put as arv_put
37 from . import arvados_testutil as tutil
38
39 from .arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
40 from . import run_test_server
41
42 class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
43     CACHE_ARGSET = [
44         [],
45         ['/dev/null'],
46         ['/dev/null', '--filename', 'empty'],
47         ['/tmp']
48         ]
49
50     def tearDown(self):
51         super(ArvadosPutResumeCacheTest, self).tearDown()
52         try:
53             self.last_cache.destroy()
54         except AttributeError:
55             pass
56
57     def cache_path_from_arglist(self, arglist):
58         return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
59
60     def test_cache_names_stable(self):
61         for argset in self.CACHE_ARGSET:
62             self.assertEqual(self.cache_path_from_arglist(argset),
63                               self.cache_path_from_arglist(argset),
64                               "cache name changed for {}".format(argset))
65
66     def test_cache_names_unique(self):
67         results = []
68         for argset in self.CACHE_ARGSET:
69             path = self.cache_path_from_arglist(argset)
70             self.assertNotIn(path, results)
71             results.append(path)
72
73     def test_cache_names_simple(self):
74         # The goal here is to make sure the filename doesn't use characters
75         # reserved by the filesystem.  Feel free to adjust this regexp as
76         # long as it still does that.
77         bad_chars = re.compile(r'[^-\.\w]')
78         for argset in self.CACHE_ARGSET:
79             path = self.cache_path_from_arglist(argset)
80             self.assertFalse(bad_chars.search(os.path.basename(path)),
81                              "path too exotic: {}".format(path))
82
83     def test_cache_names_ignore_argument_order(self):
84         self.assertEqual(
85             self.cache_path_from_arglist(['a', 'b', 'c']),
86             self.cache_path_from_arglist(['c', 'a', 'b']))
87         self.assertEqual(
88             self.cache_path_from_arglist(['-', '--filename', 'stdin']),
89             self.cache_path_from_arglist(['--filename', 'stdin', '-']))
90
91     def test_cache_names_differ_for_similar_paths(self):
92         # This test needs names at / that don't exist on the real filesystem.
93         self.assertNotEqual(
94             self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
95             self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
96
97     def test_cache_names_ignore_irrelevant_arguments(self):
98         # Workaround: parse_arguments bails on --filename with a directory.
99         path1 = self.cache_path_from_arglist(['/tmp'])
100         args = arv_put.parse_arguments(['/tmp'])
101         args.filename = 'tmp'
102         path2 = arv_put.ResumeCache.make_path(args)
103         self.assertEqual(path1, path2,
104                          "cache path considered --filename for directory")
105         self.assertEqual(
106             self.cache_path_from_arglist(['-']),
107             self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
108             "cache path considered --max-manifest-depth for file")
109
110     def test_cache_names_treat_negative_manifest_depths_identically(self):
111         base_args = ['/tmp', '--max-manifest-depth']
112         self.assertEqual(
113             self.cache_path_from_arglist(base_args + ['-1']),
114             self.cache_path_from_arglist(base_args + ['-2']))
115
116     def test_cache_names_treat_stdin_consistently(self):
117         self.assertEqual(
118             self.cache_path_from_arglist(['-', '--filename', 'test']),
119             self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
120
121     def test_cache_names_identical_for_synonymous_names(self):
122         self.assertEqual(
123             self.cache_path_from_arglist(['.']),
124             self.cache_path_from_arglist([os.path.realpath('.')]))
125         testdir = self.make_tmpdir()
126         looplink = os.path.join(testdir, 'loop')
127         os.symlink(testdir, looplink)
128         self.assertEqual(
129             self.cache_path_from_arglist([testdir]),
130             self.cache_path_from_arglist([looplink]))
131
132     def test_cache_names_different_by_api_host(self):
133         config = arvados.config.settings()
134         orig_host = config.get('ARVADOS_API_HOST')
135         try:
136             name1 = self.cache_path_from_arglist(['.'])
137             config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
138             self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
139         finally:
140             if orig_host is None:
141                 del config['ARVADOS_API_HOST']
142             else:
143                 config['ARVADOS_API_HOST'] = orig_host
144
145     @mock.patch('arvados.keep.KeepClient.head')
146     def test_resume_cache_with_current_stream_locators(self, keep_client_head):
147         keep_client_head.side_effect = [True]
148         thing = {}
149         thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
150         with tempfile.NamedTemporaryFile() as cachefile:
151             self.last_cache = arv_put.ResumeCache(cachefile.name)
152         self.last_cache.save(thing)
153         self.last_cache.close()
154         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
155         self.assertNotEqual(None, resume_cache)
156
157     @mock.patch('arvados.keep.KeepClient.head')
158     def test_resume_cache_with_finished_streams(self, keep_client_head):
159         keep_client_head.side_effect = [True]
160         thing = {}
161         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
162         with tempfile.NamedTemporaryFile() as cachefile:
163             self.last_cache = arv_put.ResumeCache(cachefile.name)
164         self.last_cache.save(thing)
165         self.last_cache.close()
166         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
167         self.assertNotEqual(None, resume_cache)
168
169     @mock.patch('arvados.keep.KeepClient.head')
170     def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
171         keep_client_head.side_effect = Exception('Locator not found')
172         thing = {}
173         thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
174         with tempfile.NamedTemporaryFile() as cachefile:
175             self.last_cache = arv_put.ResumeCache(cachefile.name)
176         self.last_cache.save(thing)
177         self.last_cache.close()
178         resume_cache = arv_put.ResumeCache(self.last_cache.filename)
179         self.assertNotEqual(None, resume_cache)
180         resume_cache.check_cache()
181
182     def test_basic_cache_storage(self):
183         thing = ['test', 'list']
184         with tempfile.NamedTemporaryFile() as cachefile:
185             self.last_cache = arv_put.ResumeCache(cachefile.name)
186         self.last_cache.save(thing)
187         self.assertEqual(thing, self.last_cache.load())
188
189     def test_empty_cache(self):
190         with tempfile.NamedTemporaryFile() as cachefile:
191             cache = arv_put.ResumeCache(cachefile.name)
192         self.assertRaises(ValueError, cache.load)
193
194     def test_cache_persistent(self):
195         thing = ['test', 'list']
196         path = os.path.join(self.make_tmpdir(), 'cache')
197         cache = arv_put.ResumeCache(path)
198         cache.save(thing)
199         cache.close()
200         self.last_cache = arv_put.ResumeCache(path)
201         self.assertEqual(thing, self.last_cache.load())
202
203     def test_multiple_cache_writes(self):
204         thing = ['short', 'list']
205         with tempfile.NamedTemporaryFile() as cachefile:
206             self.last_cache = arv_put.ResumeCache(cachefile.name)
207         # Start writing an object longer than the one we test, to make
208         # sure the cache file gets truncated.
209         self.last_cache.save(['long', 'long', 'list'])
210         self.last_cache.save(thing)
211         self.assertEqual(thing, self.last_cache.load())
212
213     def test_cache_is_locked(self):
214         with tempfile.NamedTemporaryFile() as cachefile:
215             cache = arv_put.ResumeCache(cachefile.name)
216             self.assertRaises(arv_put.ResumeCacheConflict,
217                               arv_put.ResumeCache, cachefile.name)
218
219     def test_cache_stays_locked(self):
220         with tempfile.NamedTemporaryFile() as cachefile:
221             self.last_cache = arv_put.ResumeCache(cachefile.name)
222             path = cachefile.name
223         self.last_cache.save('test')
224         self.assertRaises(arv_put.ResumeCacheConflict,
225                           arv_put.ResumeCache, path)
226
227     def test_destroy_cache(self):
228         cachefile = tempfile.NamedTemporaryFile(delete=False)
229         try:
230             cache = arv_put.ResumeCache(cachefile.name)
231             cache.save('test')
232             cache.destroy()
233             try:
234                 arv_put.ResumeCache(cachefile.name)
235             except arv_put.ResumeCacheConflict:
236                 self.fail("could not load cache after destroying it")
237             self.assertRaises(ValueError, cache.load)
238         finally:
239             if os.path.exists(cachefile.name):
240                 os.unlink(cachefile.name)
241
242     def test_restart_cache(self):
243         path = os.path.join(self.make_tmpdir(), 'cache')
244         cache = arv_put.ResumeCache(path)
245         cache.save('test')
246         cache.restart()
247         self.assertRaises(ValueError, cache.load)
248         self.assertRaises(arv_put.ResumeCacheConflict,
249                           arv_put.ResumeCache, path)
250
251
252 class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
253                           ArvadosBaseTestCase):
254
255     def setUp(self):
256         super(ArvPutUploadJobTest, self).setUp()
257         run_test_server.authorize_with('active')
258         # Temp files creation
259         self.tempdir = tempfile.mkdtemp()
260         subdir = os.path.join(self.tempdir, 'subdir')
261         os.mkdir(subdir)
262         data = "x" * 1024 # 1 KB
263         for i in range(1, 5):
264             with open(os.path.join(self.tempdir, str(i)), 'w') as f:
265                 f.write(data * i)
266         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
267             f.write(data * 5)
268         # Large temp file for resume test
269         _, self.large_file_name = tempfile.mkstemp()
270         fileobj = open(self.large_file_name, 'w')
271         # Make sure to write just a little more than one block
272         for _ in range((arvados.config.KEEP_BLOCK_SIZE>>20)+1):
273             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MiB
274             fileobj.write(data)
275         fileobj.close()
276         # Temp dir containing small files to be repacked
277         self.small_files_dir = tempfile.mkdtemp()
278         data = 'y' * 1024 * 1024 # 1 MB
279         for i in range(1, 70):
280             with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
281                 f.write(data + str(i))
282         self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
283         # Temp dir to hold a symlink to other temp dir
284         self.tempdir_with_symlink = tempfile.mkdtemp()
285         os.symlink(self.tempdir, os.path.join(self.tempdir_with_symlink, 'linkeddir'))
286         os.symlink(os.path.join(self.tempdir, '1'),
287                    os.path.join(self.tempdir_with_symlink, 'linkedfile'))
288
289     def tearDown(self):
290         super(ArvPutUploadJobTest, self).tearDown()
291         shutil.rmtree(self.tempdir)
292         os.unlink(self.large_file_name)
293         shutil.rmtree(self.small_files_dir)
294         shutil.rmtree(self.tempdir_with_symlink)
295
296     def test_symlinks_are_followed_by_default(self):
297         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink])
298         cwriter.start(save_collection=False)
299         self.assertIn('linkeddir', cwriter.manifest_text())
300         self.assertIn('linkedfile', cwriter.manifest_text())
301         cwriter.destroy_cache()
302
303     def test_symlinks_are_not_followed_when_requested(self):
304         cwriter = arv_put.ArvPutUploadJob([self.tempdir_with_symlink],
305                                           follow_links=False)
306         cwriter.start(save_collection=False)
307         self.assertNotIn('linkeddir', cwriter.manifest_text())
308         self.assertNotIn('linkedfile', cwriter.manifest_text())
309         cwriter.destroy_cache()
310
311     def test_passing_nonexistant_path_raise_exception(self):
312         uuid_str = str(uuid.uuid4())
313         with self.assertRaises(arv_put.PathDoesNotExistError):
314             cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
315
316     def test_writer_works_without_cache(self):
317         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
318         cwriter.start(save_collection=False)
319         self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
320
321     def test_writer_works_with_cache(self):
322         with tempfile.NamedTemporaryFile() as f:
323             f.write(b'foo')
324             f.flush()
325             cwriter = arv_put.ArvPutUploadJob([f.name])
326             cwriter.start(save_collection=False)
327             self.assertEqual(0, cwriter.bytes_skipped)
328             self.assertEqual(3, cwriter.bytes_written)
329             # Don't destroy the cache, and start another upload
330             cwriter_new = arv_put.ArvPutUploadJob([f.name])
331             cwriter_new.start(save_collection=False)
332             cwriter_new.destroy_cache()
333             self.assertEqual(3, cwriter_new.bytes_skipped)
334             self.assertEqual(3, cwriter_new.bytes_written)
335
336     def make_progress_tester(self):
337         progression = []
338         def record_func(written, expected):
339             progression.append((written, expected))
340         return progression, record_func
341
342     def test_progress_reporting(self):
343         with tempfile.NamedTemporaryFile() as f:
344             f.write(b'foo')
345             f.flush()
346             for expect_count in (None, 8):
347                 progression, reporter = self.make_progress_tester()
348                 cwriter = arv_put.ArvPutUploadJob([f.name],
349                                                   reporter=reporter)
350                 cwriter.bytes_expected = expect_count
351                 cwriter.start(save_collection=False)
352                 cwriter.destroy_cache()
353                 self.assertIn((3, expect_count), progression)
354
355     def test_writer_upload_directory(self):
356         cwriter = arv_put.ArvPutUploadJob([self.tempdir])
357         cwriter.start(save_collection=False)
358         cwriter.destroy_cache()
359         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
360
361     def test_resume_large_file_upload(self):
362         def wrapped_write(*args, **kwargs):
363             data = args[1]
364             # Exit only on last block
365             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
366                 # Simulate a checkpoint before quitting. Ensure block commit.
367                 self.writer._update(final=True)
368                 raise SystemExit("Simulated error")
369             return self.arvfile_write(*args, **kwargs)
370
371         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
372                         autospec=True) as mocked_write:
373             mocked_write.side_effect = wrapped_write
374             writer = arv_put.ArvPutUploadJob([self.large_file_name],
375                                              replication_desired=1)
376             # We'll be accessing from inside the wrapper
377             self.writer = writer
378             with self.assertRaises(SystemExit):
379                 writer.start(save_collection=False)
380             # Confirm that the file was partially uploaded
381             self.assertGreater(writer.bytes_written, 0)
382             self.assertLess(writer.bytes_written,
383                             os.path.getsize(self.large_file_name))
384         # Retry the upload
385         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
386                                           replication_desired=1)
387         writer2.start(save_collection=False)
388         self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
389                          os.path.getsize(self.large_file_name))
390         writer2.destroy_cache()
391         del(self.writer)
392
393     # Test for bug #11002
394     def test_graceful_exit_while_repacking_small_blocks(self):
395         def wrapped_commit(*args, **kwargs):
396             raise SystemExit("Simulated error")
397
398         with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
399                         autospec=True) as mocked_commit:
400             mocked_commit.side_effect = wrapped_commit
401             # Upload a little more than 1 block, wrapped_commit will make the first block
402             # commit to fail.
403             # arv-put should not exit with an exception by trying to commit the collection
404             # as it's in an inconsistent state.
405             writer = arv_put.ArvPutUploadJob([self.small_files_dir],
406                                              replication_desired=1)
407             try:
408                 with self.assertRaises(SystemExit):
409                     writer.start(save_collection=False)
410             except arvados.arvfile.UnownedBlockError:
411                 self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
412         writer.destroy_cache()
413
414     def test_no_resume_when_asked(self):
415         def wrapped_write(*args, **kwargs):
416             data = args[1]
417             # Exit only on last block
418             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
419                 # Simulate a checkpoint before quitting.
420                 self.writer._update()
421                 raise SystemExit("Simulated error")
422             return self.arvfile_write(*args, **kwargs)
423
424         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
425                         autospec=True) as mocked_write:
426             mocked_write.side_effect = wrapped_write
427             writer = arv_put.ArvPutUploadJob([self.large_file_name],
428                                              replication_desired=1)
429             # We'll be accessing from inside the wrapper
430             self.writer = writer
431             with self.assertRaises(SystemExit):
432                 writer.start(save_collection=False)
433             # Confirm that the file was partially uploaded
434             self.assertGreater(writer.bytes_written, 0)
435             self.assertLess(writer.bytes_written,
436                             os.path.getsize(self.large_file_name))
437         # Retry the upload, this time without resume
438         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
439                                           replication_desired=1,
440                                           resume=False)
441         writer2.start(save_collection=False)
442         self.assertEqual(writer2.bytes_skipped, 0)
443         self.assertEqual(writer2.bytes_written,
444                          os.path.getsize(self.large_file_name))
445         writer2.destroy_cache()
446         del(self.writer)
447
448     def test_no_resume_when_no_cache(self):
449         def wrapped_write(*args, **kwargs):
450             data = args[1]
451             # Exit only on last block
452             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
453                 # Simulate a checkpoint before quitting.
454                 self.writer._update()
455                 raise SystemExit("Simulated error")
456             return self.arvfile_write(*args, **kwargs)
457
458         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
459                         autospec=True) as mocked_write:
460             mocked_write.side_effect = wrapped_write
461             writer = arv_put.ArvPutUploadJob([self.large_file_name],
462                                              replication_desired=1)
463             # We'll be accessing from inside the wrapper
464             self.writer = writer
465             with self.assertRaises(SystemExit):
466                 writer.start(save_collection=False)
467             # Confirm that the file was partially uploaded
468             self.assertGreater(writer.bytes_written, 0)
469             self.assertLess(writer.bytes_written,
470                             os.path.getsize(self.large_file_name))
471         # Retry the upload, this time without cache usage
472         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
473                                           replication_desired=1,
474                                           resume=False,
475                                           use_cache=False)
476         writer2.start(save_collection=False)
477         self.assertEqual(writer2.bytes_skipped, 0)
478         self.assertEqual(writer2.bytes_written,
479                          os.path.getsize(self.large_file_name))
480         writer2.destroy_cache()
481         del(self.writer)
482
483     def test_dry_run_feature(self):
484         def wrapped_write(*args, **kwargs):
485             data = args[1]
486             # Exit only on last block
487             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
488                 # Simulate a checkpoint before quitting.
489                 self.writer._update()
490                 raise SystemExit("Simulated error")
491             return self.arvfile_write(*args, **kwargs)
492
493         with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
494                         autospec=True) as mocked_write:
495             mocked_write.side_effect = wrapped_write
496             writer = arv_put.ArvPutUploadJob([self.large_file_name],
497                                              replication_desired=1)
498             # We'll be accessing from inside the wrapper
499             self.writer = writer
500             with self.assertRaises(SystemExit):
501                 writer.start(save_collection=False)
502             # Confirm that the file was partially uploaded
503             self.assertGreater(writer.bytes_written, 0)
504             self.assertLess(writer.bytes_written,
505                             os.path.getsize(self.large_file_name))
506         with self.assertRaises(arv_put.ArvPutUploadIsPending):
507             # Retry the upload using dry_run to check if there is a pending upload
508             writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
509                                               replication_desired=1,
510                                               dry_run=True)
511         # Complete the pending upload
512         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
513                                           replication_desired=1)
514         writer3.start(save_collection=False)
515         with self.assertRaises(arv_put.ArvPutUploadNotPending):
516             # Confirm there's no pending upload with dry_run=True
517             writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
518                                               replication_desired=1,
519                                               dry_run=True)
520         # Test obvious cases
521         with self.assertRaises(arv_put.ArvPutUploadIsPending):
522             arv_put.ArvPutUploadJob([self.large_file_name],
523                                     replication_desired=1,
524                                     dry_run=True,
525                                     resume=False,
526                                     use_cache=False)
527         with self.assertRaises(arv_put.ArvPutUploadIsPending):
528             arv_put.ArvPutUploadJob([self.large_file_name],
529                                     replication_desired=1,
530                                     dry_run=True,
531                                     resume=False)
532         del(self.writer)
533
534 class CachedManifestValidationTest(ArvadosBaseTestCase):
535     class MockedPut(arv_put.ArvPutUploadJob):
536         def __init__(self, cached_manifest=None):
537             self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
538             self._state['manifest'] = cached_manifest
539             self._api_client = mock.MagicMock()
540             self.logger = mock.MagicMock()
541             self.num_retries = 1
542
543     def datetime_to_hex(self, dt):
544         return hex(int(time.mktime(dt.timetuple())))[2:]
545
546     def setUp(self):
547         super(CachedManifestValidationTest, self).setUp()
548         self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
549         self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
550         self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
551
552     def test_empty_cached_manifest_is_valid(self):
553         put_mock = self.MockedPut()
554         self.assertEqual(None, put_mock._state.get('manifest'))
555         self.assertTrue(put_mock._cached_manifest_valid())
556         put_mock._state['manifest'] = ''
557         self.assertTrue(put_mock._cached_manifest_valid())
558
559     def test_signature_cases(self):
560         now = datetime.datetime.utcnow()
561         yesterday = now - datetime.timedelta(days=1)
562         lastweek = now - datetime.timedelta(days=7)
563         tomorrow = now + datetime.timedelta(days=1)
564         nextweek = now + datetime.timedelta(days=7)
565
566         def mocked_head(blocks={}, loc=None):
567             blk = loc.split('+', 1)[0]
568             if blocks.get(blk):
569                 return True
570             raise arvados.errors.KeepRequestError("mocked error - block invalid")
571
572         # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
573         cases = [
574             # All expired, reset cache - OK
575             (yesterday, lastweek, False, False, True),
576             (lastweek, yesterday, False, False, True),
577             # All non-expired valid blocks - OK
578             (tomorrow, nextweek, True, True, True),
579             (nextweek, tomorrow, True, True, True),
580             # All non-expired invalid blocks - Not OK
581             (tomorrow, nextweek, False, False, False),
582             (nextweek, tomorrow, False, False, False),
583             # One non-expired valid block - OK
584             (tomorrow, yesterday, True, False, True),
585             (yesterday, tomorrow, False, True, True),
586             # One non-expired invalid block - Not OK
587             (tomorrow, yesterday, False, False, False),
588             (yesterday, tomorrow, False, False, False),
589         ]
590         for case in cases:
591             b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
592             head_responses = {
593                 self.block1: b1_valid,
594                 self.block2: b2_valid,
595             }
596             cached_manifest = self.template % (
597                 self.datetime_to_hex(b1_expiration),
598                 self.datetime_to_hex(b2_expiration),
599             )
600             arvput = self.MockedPut(cached_manifest)
601             with mock.patch('arvados.collection.KeepClient.head') as head_mock:
602                 head_mock.side_effect = partial(mocked_head, head_responses)
603                 self.assertEqual(outcome, arvput._cached_manifest_valid(),
604                     "Case '%s' should have produced outcome '%s'" % (case, outcome)
605                 )
606                 if b1_expiration > now or b2_expiration > now:
607                     # A HEAD request should have been done
608                     head_mock.assert_called_once()
609                 else:
610                     head_mock.assert_not_called()
611
612
613 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
614     TEST_SIZE = os.path.getsize(__file__)
615
616     def test_expected_bytes_for_file(self):
617         writer = arv_put.ArvPutUploadJob([__file__])
618         self.assertEqual(self.TEST_SIZE,
619                          writer.bytes_expected)
620
621     def test_expected_bytes_for_tree(self):
622         tree = self.make_tmpdir()
623         shutil.copyfile(__file__, os.path.join(tree, 'one'))
624         shutil.copyfile(__file__, os.path.join(tree, 'two'))
625
626         writer = arv_put.ArvPutUploadJob([tree])
627         self.assertEqual(self.TEST_SIZE * 2,
628                          writer.bytes_expected)
629         writer = arv_put.ArvPutUploadJob([tree, __file__])
630         self.assertEqual(self.TEST_SIZE * 3,
631                          writer.bytes_expected)
632
633     def test_expected_bytes_for_device(self):
634         writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
635         self.assertIsNone(writer.bytes_expected)
636         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
637         self.assertIsNone(writer.bytes_expected)
638
639
640 class ArvadosPutReportTest(ArvadosBaseTestCase):
641     def test_machine_progress(self):
642         for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
643             expect = ": {} written {} total\n".format(
644                 count, -1 if (total is None) else total)
645             self.assertTrue(
646                 arv_put.machine_progress(count, total).endswith(expect))
647
648     def test_known_human_progress(self):
649         for count, total in [(0, 1), (2, 4), (45, 60)]:
650             expect = '{:.1%}'.format(1.0*count/total)
651             actual = arv_put.human_progress(count, total)
652             self.assertTrue(actual.startswith('\r'))
653             self.assertIn(expect, actual)
654
655     def test_unknown_human_progress(self):
656         for count in [1, 20, 300, 4000, 50000]:
657             self.assertTrue(re.search(r'\b{}\b'.format(count),
658                                       arv_put.human_progress(count, None)))
659
660
661 class ArvPutLogFormatterTest(ArvadosBaseTestCase):
662     matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)'
663
664     def setUp(self):
665         super(ArvPutLogFormatterTest, self).setUp()
666         self.stderr = tutil.StringIO()
667         self.loggingHandler = logging.StreamHandler(self.stderr)
668         self.loggingHandler.setFormatter(
669             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
670         self.logger = logging.getLogger()
671         self.logger.addHandler(self.loggingHandler)
672         self.logger.setLevel(logging.DEBUG)
673
674     def tearDown(self):
675         self.logger.removeHandler(self.loggingHandler)
676         self.stderr.close()
677         self.stderr = None
678         super(ArvPutLogFormatterTest, self).tearDown()
679
680     def test_request_id_logged_only_once_on_error(self):
681         self.logger.error('Ooops, something bad happened.')
682         self.logger.error('Another bad thing just happened.')
683         log_lines = self.stderr.getvalue().split('\n')[:-1]
684         self.assertEqual(2, len(log_lines))
685         self.assertRegex(log_lines[0], self.matcher)
686         self.assertNotRegex(log_lines[1], self.matcher)
687
688     def test_request_id_logged_only_once_on_debug(self):
689         self.logger.debug('This is just a debug message.')
690         self.logger.debug('Another message, move along.')
691         log_lines = self.stderr.getvalue().split('\n')[:-1]
692         self.assertEqual(2, len(log_lines))
693         self.assertRegex(log_lines[0], self.matcher)
694         self.assertNotRegex(log_lines[1], self.matcher)
695
696     def test_request_id_not_logged_on_info(self):
697         self.logger.info('This should be a useful message')
698         log_lines = self.stderr.getvalue().split('\n')[:-1]
699         self.assertEqual(1, len(log_lines))
700         self.assertNotRegex(log_lines[0], self.matcher)
701
702 class ArvadosPutTest(run_test_server.TestCaseWithServers,
703                      ArvadosBaseTestCase,
704                      tutil.VersionChecker):
705     MAIN_SERVER = {}
706     Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
707
708     def call_main_with_args(self, args):
709         self.main_stdout.seek(0, 0)
710         self.main_stdout.truncate(0)
711         self.main_stderr.seek(0, 0)
712         self.main_stderr.truncate(0)
713         return arv_put.main(args, self.main_stdout, self.main_stderr)
714
715     def call_main_on_test_file(self, args=[]):
716         with self.make_test_file() as testfile:
717             path = testfile.name
718             self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
719         self.assertTrue(
720             os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
721                                         '098f6bcd4621d373cade4e832627b4f6')),
722             "did not find file stream in Keep store")
723
724     def setUp(self):
725         super(ArvadosPutTest, self).setUp()
726         run_test_server.authorize_with('active')
727         arv_put.api_client = None
728         self.main_stdout = tutil.StringIO()
729         self.main_stderr = tutil.StringIO()
730         self.loggingHandler = logging.StreamHandler(self.main_stderr)
731         self.loggingHandler.setFormatter(
732             arv_put.ArvPutLogFormatter(arvados.util.new_request_id()))
733         logging.getLogger().addHandler(self.loggingHandler)
734
735     def tearDown(self):
736         logging.getLogger().removeHandler(self.loggingHandler)
737         for outbuf in ['main_stdout', 'main_stderr']:
738             if hasattr(self, outbuf):
739                 getattr(self, outbuf).close()
740                 delattr(self, outbuf)
741         super(ArvadosPutTest, self).tearDown()
742
743     def test_version_argument(self):
744         with tutil.redirected_streams(
745                 stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
746             with self.assertRaises(SystemExit):
747                 self.call_main_with_args(['--version'])
748         self.assertVersionOutput(out, err)
749
750     def test_simple_file_put(self):
751         self.call_main_on_test_file()
752
753     def test_put_with_unwriteable_cache_dir(self):
754         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
755         cachedir = self.make_tmpdir()
756         os.chmod(cachedir, 0o0)
757         arv_put.ResumeCache.CACHE_DIR = cachedir
758         try:
759             self.call_main_on_test_file()
760         finally:
761             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
762             os.chmod(cachedir, 0o700)
763
764     def test_put_with_unwritable_cache_subdir(self):
765         orig_cachedir = arv_put.ResumeCache.CACHE_DIR
766         cachedir = self.make_tmpdir()
767         os.chmod(cachedir, 0o0)
768         arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
769         try:
770             self.call_main_on_test_file()
771         finally:
772             arv_put.ResumeCache.CACHE_DIR = orig_cachedir
773             os.chmod(cachedir, 0o700)
774
775     def test_put_block_replication(self):
776         self.call_main_on_test_file()
777         with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
778             put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
779             self.call_main_on_test_file(['--replication', '1'])
780             self.call_main_on_test_file(['--replication', '4'])
781             self.call_main_on_test_file(['--replication', '5'])
782             self.assertEqual(
783                 [x[-1].get('copies') for x in put_mock.call_args_list],
784                 [1, 4, 5])
785
786     def test_normalize(self):
787         testfile1 = self.make_test_file()
788         testfile2 = self.make_test_file()
789         test_paths = [testfile1.name, testfile2.name]
790         # Reverse-sort the paths, so normalization must change their order.
791         test_paths.sort(reverse=True)
792         self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
793                                  test_paths)
794         manifest = self.main_stdout.getvalue()
795         # Assert the second file we specified appears first in the manifest.
796         file_indices = [manifest.find(':' + os.path.basename(path))
797                         for path in test_paths]
798         self.assertGreater(*file_indices)
799
800     def test_error_name_without_collection(self):
801         self.assertRaises(SystemExit, self.call_main_with_args,
802                           ['--name', 'test without Collection',
803                            '--stream', '/dev/null'])
804
805     def test_error_when_project_not_found(self):
806         self.assertRaises(SystemExit,
807                           self.call_main_with_args,
808                           ['--project-uuid', self.Z_UUID])
809
810     def test_error_bad_project_uuid(self):
811         self.assertRaises(SystemExit,
812                           self.call_main_with_args,
813                           ['--project-uuid', self.Z_UUID, '--stream'])
814
815     def test_error_when_multiple_storage_classes_specified(self):
816         self.assertRaises(SystemExit,
817                           self.call_main_with_args,
818                           ['--storage-classes', 'hot,cold'])
819
820     def test_error_when_excluding_absolute_path(self):
821         tmpdir = self.make_tmpdir()
822         self.assertRaises(SystemExit,
823                           self.call_main_with_args,
824                           ['--exclude', '/some/absolute/path/*',
825                            tmpdir])
826
827     def test_api_error_handling(self):
828         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
829         coll_save_mock.side_effect = arvados.errors.ApiError(
830             fake_httplib2_response(403), b'{}')
831         with mock.patch('arvados.collection.Collection.save_new',
832                         new=coll_save_mock):
833             with self.assertRaises(SystemExit) as exc_test:
834                 self.call_main_with_args(['/dev/null'])
835             self.assertLess(0, exc_test.exception.args[0])
836             self.assertLess(0, coll_save_mock.call_count)
837             self.assertEqual("", self.main_stdout.getvalue())
838
839     def test_request_id_logging_on_error(self):
840         matcher = r'\(X-Request-Id: req-[a-z0-9]{20}\)\n'
841         coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
842         coll_save_mock.side_effect = arvados.errors.ApiError(
843             fake_httplib2_response(403), b'{}')
844         with mock.patch('arvados.collection.Collection.save_new',
845                         new=coll_save_mock):
846             with self.assertRaises(SystemExit) as exc_test:
847                 self.call_main_with_args(['/dev/null'])
848             self.assertRegex(
849                 self.main_stderr.getvalue(), matcher)
850
851
852 class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
853                             ArvadosBaseTestCase):
854     def _getKeepServerConfig():
855         for config_file, mandatory in [
856                 ['application.yml', False], ['application.default.yml', True]]:
857             path = os.path.join(run_test_server.SERVICES_SRC_DIR,
858                                 "api", "config", config_file)
859             if not mandatory and not os.path.exists(path):
860                 continue
861             with open(path) as f:
862                 rails_config = yaml.load(f.read())
863                 for config_section in ['test', 'common']:
864                     try:
865                         key = rails_config[config_section]["blob_signing_key"]
866                     except (KeyError, TypeError):
867                         pass
868                     else:
869                         return {'blob_signing_key': key,
870                                 'enforce_permissions': True}
871         return {'blog_signing_key': None, 'enforce_permissions': False}
872
873     MAIN_SERVER = {}
874     KEEP_SERVER = _getKeepServerConfig()
875     PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
876
877     @classmethod
878     def setUpClass(cls):
879         super(ArvPutIntegrationTest, cls).setUpClass()
880         cls.ENVIRON = os.environ.copy()
881         cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
882
883     def datetime_to_hex(self, dt):
884         return hex(int(time.mktime(dt.timetuple())))[2:]
885
886     def setUp(self):
887         super(ArvPutIntegrationTest, self).setUp()
888         arv_put.api_client = None
889
890     def authorize_with(self, token_name):
891         run_test_server.authorize_with(token_name)
892         for v in ["ARVADOS_API_HOST",
893                   "ARVADOS_API_HOST_INSECURE",
894                   "ARVADOS_API_TOKEN"]:
895             self.ENVIRON[v] = arvados.config.settings()[v]
896         arv_put.api_client = arvados.api('v1')
897
898     def current_user(self):
899         return arv_put.api_client.users().current().execute()
900
901     def test_check_real_project_found(self):
902         self.authorize_with('active')
903         self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
904                         "did not correctly find test fixture project")
905
906     def test_check_error_finding_nonexistent_uuid(self):
907         BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
908         self.authorize_with('active')
909         try:
910             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
911                                                   0)
912         except ValueError as error:
913             self.assertIn(BAD_UUID, str(error))
914         else:
915             self.assertFalse(result, "incorrectly found nonexistent project")
916
917     def test_check_error_finding_nonexistent_project(self):
918         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
919         self.authorize_with('active')
920         with self.assertRaises(apiclient.errors.HttpError):
921             result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
922                                                   0)
923
924     def test_short_put_from_stdin(self):
925         # Have to run this as an integration test since arv-put can't
926         # read from the tests' stdin.
927         # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
928         # case, because the /proc entry is already gone by the time it tries.
929         pipe = subprocess.Popen(
930             [sys.executable, arv_put.__file__, '--stream'],
931             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
932             stderr=subprocess.STDOUT, env=self.ENVIRON)
933         pipe.stdin.write(b'stdin test\n')
934         pipe.stdin.close()
935         deadline = time.time() + 5
936         while (pipe.poll() is None) and (time.time() < deadline):
937             time.sleep(.1)
938         returncode = pipe.poll()
939         if returncode is None:
940             pipe.terminate()
941             self.fail("arv-put did not PUT from stdin within 5 seconds")
942         elif returncode != 0:
943             sys.stdout.write(pipe.stdout.read())
944             self.fail("arv-put returned exit code {}".format(returncode))
945         self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11',
946                       pipe.stdout.read().decode())
947
948     def test_sigint_logs_request_id(self):
949         # Start arv-put, give it a chance to start up, send SIGINT,
950         # and check that its output includes the X-Request-Id.
951         input_stream = subprocess.Popen(
952             ['sleep', '10'],
953             stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
954         pipe = subprocess.Popen(
955             [sys.executable, arv_put.__file__, '--stream'],
956             stdin=input_stream.stdout, stdout=subprocess.PIPE,
957             stderr=subprocess.STDOUT, env=self.ENVIRON)
958         # Wait for arv-put child process to print something (i.e., a
959         # log message) so we know its signal handler is installed.
960         select.select([pipe.stdout], [], [], 10)
961         pipe.send_signal(signal.SIGINT)
962         deadline = time.time() + 5
963         while (pipe.poll() is None) and (time.time() < deadline):
964             time.sleep(.1)
965         returncode = pipe.poll()
966         input_stream.terminate()
967         if returncode is None:
968             pipe.terminate()
969             self.fail("arv-put did not exit within 5 seconds")
970         self.assertRegex(pipe.stdout.read().decode(), r'\(X-Request-Id: req-[a-z0-9]{20}\)')
971
972     def test_ArvPutSignedManifest(self):
973         # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
974         # the newly created manifest from the API server, testing to confirm
975         # that the block locators in the returned manifest are signed.
976         self.authorize_with('active')
977
978         # Before doing anything, demonstrate that the collection
979         # we're about to create is not present in our test fixture.
980         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
981         with self.assertRaises(apiclient.errors.HttpError):
982             notfound = arv_put.api_client.collections().get(
983                 uuid=manifest_uuid).execute()
984
985         datadir = self.make_tmpdir()
986         with open(os.path.join(datadir, "foo"), "w") as f:
987             f.write("The quick brown fox jumped over the lazy dog")
988         p = subprocess.Popen([sys.executable, arv_put.__file__,
989                               os.path.join(datadir, 'foo')],
990                              stdout=subprocess.PIPE,
991                              stderr=subprocess.PIPE,
992                              env=self.ENVIRON)
993         (out, err) = p.communicate()
994         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
995         self.assertEqual(p.returncode, 0)
996
997         # The manifest text stored in the API server under the same
998         # manifest UUID must use signed locators.
999         c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
1000         self.assertRegex(
1001             c['manifest_text'],
1002             r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
1003
1004         os.remove(os.path.join(datadir, "foo"))
1005         os.rmdir(datadir)
1006
1007     def run_and_find_collection(self, text, extra_args=[]):
1008         self.authorize_with('active')
1009         pipe = subprocess.Popen(
1010             [sys.executable, arv_put.__file__] + extra_args,
1011             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1012             stderr=subprocess.PIPE, env=self.ENVIRON)
1013         stdout, stderr = pipe.communicate(text.encode())
1014         self.assertRegex(stderr.decode(), r'INFO: Collection (updated:|saved as)')
1015         search_key = ('portable_data_hash'
1016                       if '--portable-data-hash' in extra_args else 'uuid')
1017         collection_list = arvados.api('v1').collections().list(
1018             filters=[[search_key, '=', stdout.decode().strip()]]
1019         ).execute().get('items', [])
1020         self.assertEqual(1, len(collection_list))
1021         return collection_list[0]
1022
1023     def test_all_expired_signatures_invalidates_cache(self):
1024         self.authorize_with('active')
1025         tmpdir = self.make_tmpdir()
1026         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1027             f.write('foo')
1028         # Upload a directory and get the cache file name
1029         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1030                              stdout=subprocess.PIPE,
1031                              stderr=subprocess.PIPE,
1032                              env=self.ENVIRON)
1033         (out, err) = p.communicate()
1034         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1035         self.assertEqual(p.returncode, 0)
1036         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1037                                    err.decode()).groups()[0]
1038         self.assertTrue(os.path.isfile(cache_filepath))
1039         # Load the cache file contents and modify the manifest to simulate
1040         # an expired access token
1041         with open(cache_filepath, 'r') as c:
1042             cache = json.load(c)
1043         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1044         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1045         cache['manifest'] = re.sub(
1046             r'\@.*? ',
1047             "@{} ".format(self.datetime_to_hex(a_month_ago)),
1048             cache['manifest'])
1049         with open(cache_filepath, 'w') as c:
1050             c.write(json.dumps(cache))
1051         # Re-run the upload and expect to get an invalid cache message
1052         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1053                              stdout=subprocess.PIPE,
1054                              stderr=subprocess.PIPE,
1055                              env=self.ENVIRON)
1056         (out, err) = p.communicate()
1057         self.assertRegex(
1058             err.decode(),
1059             r'INFO: Cache expired, starting from scratch.*')
1060         self.assertEqual(p.returncode, 0)
1061
1062     def test_invalid_signature_invalidates_cache(self):
1063         self.authorize_with('active')
1064         tmpdir = self.make_tmpdir()
1065         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
1066             f.write('foo')
1067         # Upload a directory and get the cache file name
1068         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1069                              stdout=subprocess.PIPE,
1070                              stderr=subprocess.PIPE,
1071                              env=self.ENVIRON)
1072         (out, err) = p.communicate()
1073         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1074         self.assertEqual(p.returncode, 0)
1075         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1076                                    err.decode()).groups()[0]
1077         self.assertTrue(os.path.isfile(cache_filepath))
1078         # Load the cache file contents and modify the manifest to simulate
1079         # an invalid access token
1080         with open(cache_filepath, 'r') as c:
1081             cache = json.load(c)
1082         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1083         cache['manifest'] = re.sub(
1084             r'\+A.*\@',
1085             "+Aabcdef0123456789abcdef0123456789abcdef01@",
1086             cache['manifest'])
1087         with open(cache_filepath, 'w') as c:
1088             c.write(json.dumps(cache))
1089         # Re-run the upload and expect to get an invalid cache message
1090         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1091                              stdout=subprocess.PIPE,
1092                              stderr=subprocess.PIPE,
1093                              env=self.ENVIRON)
1094         (out, err) = p.communicate()
1095         self.assertRegex(
1096             err.decode(),
1097             r'ERROR: arv-put: Resume cache contains invalid signature.*')
1098         self.assertEqual(p.returncode, 1)
1099
1100     def test_single_expired_signature_reuploads_file(self):
1101         self.authorize_with('active')
1102         tmpdir = self.make_tmpdir()
1103         with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
1104             f.write('foo')
1105         # Write a second file on its own subdir to force a new stream
1106         os.mkdir(os.path.join(tmpdir, 'bar'))
1107         with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
1108             f.write('bar')
1109         # Upload a directory and get the cache file name
1110         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1111                              stdout=subprocess.PIPE,
1112                              stderr=subprocess.PIPE,
1113                              env=self.ENVIRON)
1114         (out, err) = p.communicate()
1115         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
1116         self.assertEqual(p.returncode, 0)
1117         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
1118                                    err.decode()).groups()[0]
1119         self.assertTrue(os.path.isfile(cache_filepath))
1120         # Load the cache file contents and modify the manifest to simulate
1121         # an expired access token
1122         with open(cache_filepath, 'r') as c:
1123             cache = json.load(c)
1124         self.assertRegex(cache['manifest'], r'\+A\S+\@')
1125         a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
1126         # Make one of the signatures appear to have expired
1127         cache['manifest'] = re.sub(
1128             r'\@.*? 3:3:barfile.txt',
1129             "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
1130             cache['manifest'])
1131         with open(cache_filepath, 'w') as c:
1132             c.write(json.dumps(cache))
1133         # Re-run the upload and expect to get an invalid cache message
1134         p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
1135                              stdout=subprocess.PIPE,
1136                              stderr=subprocess.PIPE,
1137                              env=self.ENVIRON)
1138         (out, err) = p.communicate()
1139         self.assertRegex(
1140             err.decode(),
1141             r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
1142         self.assertEqual(p.returncode, 0)
1143         # Confirm that the resulting cache is different from the last run.
1144         with open(cache_filepath, 'r') as c2:
1145             new_cache = json.load(c2)
1146         self.assertNotEqual(cache['manifest'], new_cache['manifest'])
1147
1148     def test_put_collection_with_later_update(self):
1149         tmpdir = self.make_tmpdir()
1150         with open(os.path.join(tmpdir, 'file1'), 'w') as f:
1151             f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
1152         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1153         self.assertNotEqual(None, col['uuid'])
1154         # Add a new file to the directory
1155         with open(os.path.join(tmpdir, 'file2'), 'w') as f:
1156             f.write('The quick brown fox jumped over the lazy dog')
1157         updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
1158         self.assertEqual(col['uuid'], updated_col['uuid'])
1159         # Get the manifest and check that the new file is being included
1160         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
1161         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
1162
1163     def test_upload_directory_reference_without_trailing_slash(self):
1164         tmpdir1 = self.make_tmpdir()
1165         tmpdir2 = self.make_tmpdir()
1166         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1167             f.write('This is foo')
1168         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1169             f.write('This is not foo')
1170         # Upload one directory and one file
1171         col = self.run_and_find_collection("", ['--no-progress',
1172                                                 tmpdir1,
1173                                                 os.path.join(tmpdir2, 'bar')])
1174         self.assertNotEqual(None, col['uuid'])
1175         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1176         # Check that 'foo' was written inside a subcollection
1177         # OTOH, 'bar' should have been directly uploaded on the root collection
1178         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar\n\./.+ .*:11:foo\n')
1179
1180     def test_upload_directory_reference_with_trailing_slash(self):
1181         tmpdir1 = self.make_tmpdir()
1182         tmpdir2 = self.make_tmpdir()
1183         with open(os.path.join(tmpdir1, 'foo'), 'w') as f:
1184             f.write('This is foo')
1185         with open(os.path.join(tmpdir2, 'bar'), 'w') as f:
1186             f.write('This is not foo')
1187         # Upload one directory (with trailing slash) and one file
1188         col = self.run_and_find_collection("", ['--no-progress',
1189                                                 tmpdir1 + os.sep,
1190                                                 os.path.join(tmpdir2, 'bar')])
1191         self.assertNotEqual(None, col['uuid'])
1192         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1193         # Check that 'foo' and 'bar' were written at the same level
1194         self.assertRegex(c['manifest_text'], r'^\. .*:15:bar .*:11:foo\n')
1195
1196     def test_put_collection_with_high_redundancy(self):
1197         # Write empty data: we're not testing CollectionWriter, just
1198         # making sure collections.create tells the API server what our
1199         # desired replication level is.
1200         collection = self.run_and_find_collection("", ['--replication', '4'])
1201         self.assertEqual(4, collection['replication_desired'])
1202
1203     def test_put_collection_with_default_redundancy(self):
1204         collection = self.run_and_find_collection("")
1205         self.assertEqual(None, collection['replication_desired'])
1206
1207     def test_put_collection_with_unnamed_project_link(self):
1208         link = self.run_and_find_collection(
1209             "Test unnamed collection",
1210             ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
1211         username = pwd.getpwuid(os.getuid()).pw_name
1212         self.assertRegex(
1213             link['name'],
1214             r'^Saved at .* by {}@'.format(re.escape(username)))
1215
1216     def test_put_collection_with_name_and_no_project(self):
1217         link_name = 'Test Collection Link in home project'
1218         collection = self.run_and_find_collection(
1219             "Test named collection in home project",
1220             ['--portable-data-hash', '--name', link_name])
1221         self.assertEqual(link_name, collection['name'])
1222         my_user_uuid = self.current_user()['uuid']
1223         self.assertEqual(my_user_uuid, collection['owner_uuid'])
1224
1225     def test_put_collection_with_named_project_link(self):
1226         link_name = 'Test auto Collection Link'
1227         collection = self.run_and_find_collection("Test named collection",
1228                                       ['--portable-data-hash',
1229                                        '--name', link_name,
1230                                        '--project-uuid', self.PROJECT_UUID])
1231         self.assertEqual(link_name, collection['name'])
1232
1233     def test_put_collection_with_storage_classes_specified(self):
1234         collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
1235
1236         self.assertEqual(len(collection['storage_classes_desired']), 1)
1237         self.assertEqual(collection['storage_classes_desired'][0], 'hot')
1238
1239     def test_put_collection_without_storage_classes_specified(self):
1240         collection = self.run_and_find_collection("")
1241
1242         self.assertEqual(len(collection['storage_classes_desired']), 1)
1243         self.assertEqual(collection['storage_classes_desired'][0], 'default')
1244
1245     def test_exclude_filename_pattern(self):
1246         tmpdir = self.make_tmpdir()
1247         tmpsubdir = os.path.join(tmpdir, 'subdir')
1248         os.mkdir(tmpsubdir)
1249         for fname in ['file1', 'file2', 'file3']:
1250             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1251                 f.write("This is %s" % fname)
1252             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1253                 f.write("This is %s" % fname)
1254         col = self.run_and_find_collection("", ['--no-progress',
1255                                                 '--exclude', '*2.txt',
1256                                                 '--exclude', 'file3.*',
1257                                                  tmpdir])
1258         self.assertNotEqual(None, col['uuid'])
1259         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1260         # None of the file2.txt & file3.txt should have been uploaded
1261         self.assertRegex(c['manifest_text'], r'^.*:file1.txt')
1262         self.assertNotRegex(c['manifest_text'], r'^.*:file2.txt')
1263         self.assertNotRegex(c['manifest_text'], r'^.*:file3.txt')
1264
1265     def test_exclude_filepath_pattern(self):
1266         tmpdir = self.make_tmpdir()
1267         tmpsubdir = os.path.join(tmpdir, 'subdir')
1268         os.mkdir(tmpsubdir)
1269         for fname in ['file1', 'file2', 'file3']:
1270             with open(os.path.join(tmpdir, "%s.txt" % fname), 'w') as f:
1271                 f.write("This is %s" % fname)
1272             with open(os.path.join(tmpsubdir, "%s.txt" % fname), 'w') as f:
1273                 f.write("This is %s" % fname)
1274         col = self.run_and_find_collection("", ['--no-progress',
1275                                                 '--exclude', 'subdir/*2.txt',
1276                                                 '--exclude', './file1.*',
1277                                                  tmpdir])
1278         self.assertNotEqual(None, col['uuid'])
1279         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1280         # Only tmpdir/file1.txt & tmpdir/subdir/file2.txt should have been excluded
1281         self.assertNotRegex(c['manifest_text'],
1282                             r'^\./%s.*:file1.txt' % os.path.basename(tmpdir))
1283         self.assertNotRegex(c['manifest_text'],
1284                             r'^\./%s/subdir.*:file2.txt' % os.path.basename(tmpdir))
1285         self.assertRegex(c['manifest_text'],
1286                          r'^\./%s.*:file2.txt' % os.path.basename(tmpdir))
1287         self.assertRegex(c['manifest_text'], r'^.*:file3.txt')
1288
1289     def test_unicode_on_filename(self):
1290         tmpdir = self.make_tmpdir()
1291         fname = u"iā¤arvados.txt"
1292         with open(os.path.join(tmpdir, fname), 'w') as f:
1293             f.write("This is a unicode named file")
1294         col = self.run_and_find_collection("", ['--no-progress', tmpdir])
1295         self.assertNotEqual(None, col['uuid'])
1296         c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
1297         self.assertTrue(fname in c['manifest_text'], u"{} does not include {}".format(c['manifest_text'], fname))
1298
1299     def test_silent_mode_no_errors(self):
1300         self.authorize_with('active')
1301         tmpdir = self.make_tmpdir()
1302         with open(os.path.join(tmpdir, 'test.txt'), 'w') as f:
1303             f.write('hello world')
1304         pipe = subprocess.Popen(
1305             [sys.executable, arv_put.__file__] + ['--silent', tmpdir],
1306             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1307             stderr=subprocess.PIPE, env=self.ENVIRON)
1308         stdout, stderr = pipe.communicate()
1309         # No console output should occur on normal operations
1310         self.assertNotRegex(stderr.decode(), r'.+')
1311         self.assertNotRegex(stdout.decode(), r'.+')
1312
1313     def test_silent_mode_does_not_avoid_error_messages(self):
1314         self.authorize_with('active')
1315         pipe = subprocess.Popen(
1316             [sys.executable, arv_put.__file__] + ['--silent',
1317                                                   '/path/not/existant'],
1318             stdin=subprocess.PIPE, stdout=subprocess.PIPE,
1319             stderr=subprocess.PIPE, env=self.ENVIRON)
1320         stdout, stderr = pipe.communicate()
1321         # Error message should be displayed when errors happen
1322         self.assertRegex(stderr.decode(), r'.*ERROR:.*')
1323         self.assertNotRegex(stdout.decode(), r'.+')
1324
1325
1326 if __name__ == '__main__':
1327     unittest.main()