+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+from __future__ import absolute_import
+from future import standard_library
+standard_library.install_aliases()
import arvados
import arvados_fuse.command
+import json
import mock
import os
-import run_test_server
+import pycurl
+import queue
+from . import run_test_server
import tempfile
import unittest
+from .integration_test import IntegrationTest
+
+
class KeepClientRetry(unittest.TestCase):
origKeepClient = arvados.keep.KeepClient
def test_no_retry(self):
self._test_retry(0, ['--retries=0'])
+
+class RetryPUT(IntegrationTest):
+ @mock.patch('time.sleep')
+ @IntegrationTest.mount(argv=['--read-write', '--mount-tmp=zzz'])
+ def test_retry_write(self, sleep):
+ mockedCurl = mock.Mock(spec=pycurl.Curl(), wraps=pycurl.Curl())
+ mockedCurl.perform.side_effect = Exception('mock error (ok)')
+ q = queue.Queue()
+ q.put(mockedCurl)
+ q.put(pycurl.Curl())
+ q.put(pycurl.Curl())
+ with mock.patch('arvados.keep.KeepClient.KeepService._get_user_agent', side_effect=q.get_nowait):
+ self.pool_test(os.path.join(self.mnt, 'zzz'))
+ self.assertTrue(mockedCurl.perform.called)
+ @staticmethod
+ def _test_retry_write(self, tmp):
+ with open(os.path.join(tmp, 'foo'), 'w') as f:
+ f.write('foo')
+ json.load(open(os.path.join(tmp, '.arvados#collection')))