parser.add_argument("--varying-url-params", type=str, default="",
help="A comma separated list of URL query parameters that should be ignored when storing HTTP URLs in Keep.")
+ parser.add_argument("--prefer-cached-downloads", action="store_true", default=False,
+ help="If a HTTP URL is found in Keep, skip upstream URL freshness check (will not notice if the upstream has changed, but also not error if upstream is unavailable).")
+
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.")
exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.")
if runtimeContext.varying_url_params:
command.append("--varying-url-params="+runtimeContext.varying_url_params)
+ if runtimeContext.prefer_cached_downloads:
+ command.append("--prefer-cached-downloads")
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
self.copy_deps = None
self.defer_downloads = False
self.varying_url_params = ""
+ self.prefer_cached_downloads = False
super(ArvRuntimeContext, self).__init__(kwargs)
arvargs.thread_count = 1
arvargs.collection_cache_size = None
arvargs.git_info = True
+ arvargs.submit = False
+ arvargs.defer_downloads = False
self.api = api_client
self.processes = {}
collection_cache=self.collection_cache)
self.defer_downloads = arvargs.submit and arvargs.defer_downloads
- self.varying_url_params = arvargs.varying_url_params
validate_cluster_target(self, self.toplevel_runtimeContext)
page = keys[:pageSize]
try:
proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
- except Exception:
- logger.exception("Error checking states on API server: %s")
+ except Exception as e:
+ logger.exception("Error checking states on API server: %s", e)
remain_wait = self.poll_interval
continue
def changed(url, clean_url, properties, now):
req = requests.head(url, allow_redirects=True)
- remember_headers(url, properties, req.headers, now)
if req.status_code != 200:
# Sometimes endpoints are misconfigured and will deny HEAD but
# allow GET so instead of failing here, we'll try GET If-None-Match
return True
- pr = properties[clean_url]
- if "ETag" in pr and "ETag" in req.headers:
- if pr["ETag"] == req.headers["ETag"]:
- return False
+ etag = properties[url].get("ETag")
+
+ if url in properties:
+ del properties[url]
+ remember_headers(clean_url, properties, req.headers, now)
+
+ if "ETag" in req.headers and etag == req.headers["ETag"]:
+ # Didn't change
+ return False
return True
return '"' + etag + '"'
-def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params=""):
+def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow, varying_url_params="", prefer_cached_downloads=False):
varying_params = [s.strip() for s in varying_url_params.split(",")]
parsed = urllib.parse.urlparse(url)
query = [q for q in urllib.parse.parse_qsl(parsed.query)
if q[0] not in varying_params]
+
clean_url = urllib.parse.urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params,
- urllib.parse.urlencode(query), parsed.fragment))
+ urllib.parse.urlencode(query, safe="/"), parsed.fragment))
- r = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
+ r1 = api.collections().list(filters=[["properties", "exists", url]]).execute()
+
+ if clean_url == url:
+ items = r1["items"]
+ else:
+ r2 = api.collections().list(filters=[["properties", "exists", clean_url]]).execute()
+ items = r1["items"] + r2["items"]
now = utcnow()
etags = {}
- for item in r["items"]:
+ for item in items:
properties = item["properties"]
- if fresh_cache(clean_url, properties, now):
- # Do nothing
+
+ if clean_url in properties:
+ cache_url = clean_url
+ elif url in properties:
+ cache_url = url
+ else:
+ return False
+
+ if prefer_cached_downloads or fresh_cache(cache_url, properties, now):
+ # HTTP caching rules say we should use the cache
cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
- if not changed(url, clean_url, properties, now):
+ if not changed(cache_url, clean_url, properties, now):
# ETag didn't change, same content, just update headers
api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
return "keep:%s/%s" % (item["portable_data_hash"], list(cr.keys())[0])
- if "ETag" in properties and len(properties["ETag"]) > 2:
- etags[properties["ETag"]] = item
+ if "ETag" in properties[cache_url] and len(properties[cache_url]["ETag"]) > 2:
+ etags[properties[cache_url]["ETag"]] = item
logger.debug("Found ETags %s", etags)
# passthrough, we'll download it later.
self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
else:
- keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src, varying_url_params=self.arvrunner.varying_url_params)
+ keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src,
+ varying_url_params=self.arvrunner.toplevel_runtimeContext.varying_url_params,
+ prefer_cached_downloads=self.arvrunner.toplevel_runtimeContext.prefer_cached_downloads)
logger.info("%s is %s", src, keepref)
self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
except Exception as e:
r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
- getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
+ getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={})
cm.open.assert_called_with("file1.txt", "wb")
cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Ffile1.txt",
r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
self.assertEqual(r, "keep:99999999999999999999999999999997+99/file1.txt")
- getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
+ getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={})
cm.open.assert_called_with("file1.txt", "wb")
cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Ffile1.txt",
'http://example.com/file1.txt': {
'Date': 'Tue, 15 May 2018 00:00:00 GMT',
'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
- 'ETag': '123456'
+ 'ETag': '"123456"'
}
}
}]
req.headers = {
'Date': 'Tue, 17 May 2018 00:00:00 GMT',
'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
- 'ETag': '123456'
+ 'ETag': '"123456"'
}
headmock.return_value = req
body={"collection":{"properties": {'http://example.com/file1.txt': {
'Date': 'Tue, 17 May 2018 00:00:00 GMT',
'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
- 'ETag': '123456'
+ 'ETag': '"123456"'
}}}})
])
r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/download?fn=/file1.txt", utcnow=utcnow)
self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
- getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True)
+ getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True, headers={})
cm.open.assert_called_with("file1.txt", "wb")
cm.save_new.assert_called_with(name="Downloaded from http%3A%2F%2Fexample.com%2Fdownload%3Ffn%3D%2Ffile1.txt",
mock.call(uuid=cm.manifest_locator(),
body={"collection":{"properties": {"http://example.com/download?fn=/file1.txt": {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}})
])
+
+ @mock.patch("requests.get")
+ @mock.patch("requests.head")
+ @mock.patch("arvados.collection.CollectionReader")
+ def test_http_etag_if_none_match(self, collectionmock, headmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "properties": {
+ 'http://example.com/file1.txt': {
+ 'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
+ 'ETag': '"123456"'
+ }
+ }
+ }]
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ cm.keys.return_value = ["file1.txt"]
+ collectionmock.return_value = cm
+
+ # Head request fails, will try a conditional GET instead
+ req = mock.MagicMock()
+ req.status_code = 403
+ req.headers = {
+ }
+ headmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+ req = mock.MagicMock()
+ req.status_code = 304
+ req.headers = {
+ 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+ 'ETag': '"123456"'
+ }
+ getmock.return_value = req
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True, headers={"If-None-Match": '"123456"'})
+ cm.open.assert_not_called()
+
+ api.collections().update.assert_has_calls([
+ mock.call(uuid=cm.manifest_locator(),
+ body={"collection":{"properties": {'http://example.com/file1.txt': {
+ 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+ 'ETag': '"123456"'
+ }}}})
+ ])
+
+
+ @mock.patch("requests.get")
+ @mock.patch("requests.head")
+ @mock.patch("arvados.collection.CollectionReader")
+ def test_http_prefer_cached_downloads(self, collectionmock, headmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "properties": {
+ 'http://example.com/file1.txt': {
+ 'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
+ 'ETag': '"123456"'
+ }
+ }
+ }]
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ cm.keys.return_value = ["file1.txt"]
+ collectionmock.return_value = cm
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow, prefer_cached_downloads=True)
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ headmock.assert_not_called()
+ getmock.assert_not_called()
+ cm.open.assert_not_called()
+ api.collections().update.assert_not_called()
+
+ @mock.patch("requests.get")
+ @mock.patch("requests.head")
+ @mock.patch("arvados.collection.CollectionReader")
+ def test_http_varying_url_params(self, collectionmock, headmock, getmock):
+ for prurl in ("http://example.com/file1.txt", "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789"):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "properties": {
+ prurl: {
+ 'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
+ 'ETag': '"123456"'
+ }
+ }
+ }]
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ cm.keys.return_value = ["file1.txt"]
+ collectionmock.return_value = cm
+
+ req = mock.MagicMock()
+ req.status_code = 200
+ req.headers = {
+ 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+ 'ETag': '"123456"'
+ }
+ headmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt?KeyId=123&Signature=456&Expires=789",
+ utcnow=utcnow, varying_url_params="KeyId,Signature,Expires")
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ getmock.assert_not_called()
+ cm.open.assert_not_called()
+
+ api.collections().update.assert_has_calls([
+ mock.call(uuid=cm.manifest_locator(),
+ body={"collection":{"properties": {'http://example.com/file1.txt': {
+ 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+ 'ETag': '"123456"'
+ }}}})
+ ])
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
+ @stubs()
+ def test_submit_container_prefer_cached_downloads(self, stubs):
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--prefer-cached-downloads",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate', '--disable-color',
+ '--eval-timeout=20', '--thread-count=0',
+ '--enable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
+ '--debug', "--on-error=continue", '--prefer-cached-downloads',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(stubs.capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+ self.assertEqual(exited, 0)
+
+ @stubs()
+ def test_submit_container_varying_url_params(self, stubs):
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--varying-url-params", "KeyId,Signature",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate', '--disable-color',
+ '--eval-timeout=20', '--thread-count=0',
+ '--enable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow submit_wf.cwl (%s)' % stubs.git_props["arv:gitDescribe"],
+ '--debug', "--on-error=continue", "--varying-url-params=KeyId,Signature",
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(stubs.capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+ self.assertEqual(exited, 0)
+
class TestCreateWorkflow(unittest.TestCase):
existing_workflow_uuid = "zzzzz-7fd4e-validworkfloyml"