From: Tom Clegg Date: Mon, 7 Nov 2016 22:43:17 +0000 (-0500) Subject: Merge branch '10468-blob-storage-timeouts' closes #10468 X-Git-Tag: 1.1.0~611 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/c3cc1d58b64940a2bd79f27a9d0fdc50318dbb99?hp=cd391613ae717c90ede24f220695c277ecd095ce Merge branch '10468-blob-storage-timeouts' closes #10468 --- diff --git a/build/run-build-packages.sh b/build/run-build-packages.sh index 12c92607de..320f9d445c 100755 --- a/build/run-build-packages.sh +++ b/build/run-build-packages.sh @@ -431,6 +431,8 @@ package_go_binary tools/keep-block-check keep-block-check \ "Verify that all data from one set of Keep servers to another was copied" package_go_binary tools/keep-rsync keep-rsync \ "Copy all data from one set of Keep servers to another" +package_go_binary tools/keep-exercise keep-exercise \ + "Performance testing tool for Arvados Keep" # The Python SDK # Please resist the temptation to add --no-python-fix-name to the fpm call here @@ -476,7 +478,7 @@ fpm_build ruamel.yaml "" "" python 0.12.4 --python-setup-py-arguments "--single- fpm_build cwltest "" "" python 1.0.20160907111242 # And for cwltool we have the same problem as for schema_salad. Ward, 2016-03-17 -fpm_build cwltool "" "" python 1.0.20161007181528 +fpm_build cwltool "" "" python 1.0.20161107145355 # FPM eats the trailing .0 in the python-rdflib-jsonld package when built with 'rdflib-jsonld>=0.3.0'. Force the version. Ward, 2016-03-25 fpm_build rdflib-jsonld "" "" python 0.3.0 diff --git a/build/run-tests.sh b/build/run-tests.sh index 2797ec3109..fa608738c7 100755 --- a/build/run-tests.sh +++ b/build/run-tests.sh @@ -93,6 +93,7 @@ sdk/go/streamer sdk/go/crunchrunner sdk/cwl tools/crunchstat-summary +tools/keep-exercise tools/keep-rsync tools/keep-block-check @@ -764,8 +765,9 @@ gostuff=( services/crunch-dispatch-local services/crunch-dispatch-slurm services/crunch-run - tools/keep-rsync tools/keep-block-check + tools/keep-exercise + tools/keep-rsync ) for g in "${gostuff[@]}" do diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 3144592fc9..5ae2de3152 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -201,14 +201,28 @@ class ArvCwlRunner(object): srccollections = {} for k,v in generatemapper.items(): + if k.startswith("_:"): + if v.type == "Directory": + continue + if v.type == "CreateFile": + with final.open(v.target, "wb") as f: + f.write(v.resolved.encode("utf-8")) + continue + + if not k.startswith("keep:"): + raise Exception("Output source is not in keep or a literal") sp = k.split("/") srccollection = sp[0][5:] if srccollection not in srccollections: - srccollections[srccollection] = arvados.collection.CollectionReader( - srccollection, - api_client=self.api, - keep_client=self.keep_client, - num_retries=self.num_retries) + try: + srccollections[srccollection] = arvados.collection.CollectionReader( + srccollection, + api_client=self.api, + keep_client=self.keep_client, + num_retries=self.num_retries) + except arvados.errors.ArgumentError as e: + logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e) + raise reader = srccollections[srccollection] try: srcpath = "/".join(sp[1:]) if len(sp) > 1 else "." @@ -218,7 +232,7 @@ class ArvCwlRunner(object): def rewrite(fileobj): fileobj["location"] = generatemapper.mapper(fileobj["location"]).target - for k in ("basename", "size", "listing"): + for k in ("basename", "size", "listing", "contents"): if k in fileobj: del fileobj[k] @@ -234,7 +248,13 @@ class ArvCwlRunner(object): final.api_response()["name"], final.manifest_locator()) - self.final_output_collection = final + def finalcollection(fileobj): + fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"]) + + adjustDirObjs(outputObj, finalcollection) + adjustFileObjs(outputObj, finalcollection) + + return (outputObj, final) def set_crunch_output(self): if self.work_api == "containers": @@ -390,7 +410,7 @@ class ArvCwlRunner(object): else: if self.output_name is None: self.output_name = "Output of %s" % (shortname(tool.tool["id"])) - self.make_output_collection(self.output_name, self.final_output) + self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.final_output) self.set_crunch_output() if self.final_status != "success": diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py index 73c81ceb0f..c15b289037 100644 --- a/sdk/cwl/arvados_cwl/pathmapper.py +++ b/sdk/cwl/arvados_cwl/pathmapper.py @@ -187,14 +187,19 @@ class FinalOutputPathMapper(PathMapper): def visit(self, obj, stagedir, basedir, copy=False): # type: (Dict[unicode, Any], unicode, unicode, bool) -> None loc = obj["location"] + tgt = os.path.join(stagedir, obj["basename"]) if obj["class"] == "Directory": - self._pathmap[loc] = MapperEnt(loc, stagedir, "Directory") + self._pathmap[loc] = MapperEnt(tgt, tgt, "Directory") + if loc.startswith("_:"): + self.visitlisting(obj.get("listing", []), tgt, basedir) elif obj["class"] == "File": if loc in self._pathmap: return - tgt = os.path.join(stagedir, obj["basename"]) - self._pathmap[loc] = MapperEnt(loc, tgt, "File") - self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir) + if "contents" in obj and loc.startswith("_:"): + self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile") + else: + self._pathmap[loc] = MapperEnt(loc, tgt, "File") + self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir) def setup(self, referenced_files, basedir): # type: (List[Any], unicode) -> None diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py index d1c8f9b567..9d9a1e1a7a 100644 --- a/sdk/cwl/setup.py +++ b/sdk/cwl/setup.py @@ -48,7 +48,7 @@ setup(name='arvados-cwl-runner', # Make sure to update arvados/build/run-build-packages.sh as well # when updating the cwltool version pin. install_requires=[ - 'cwltool==1.0.20161007181528', + 'cwltool==1.0.20161107145355', 'arvados-python-client>=0.1.20160826210445' ], data_files=[ diff --git a/sdk/cwl/tests/test_make_output.py b/sdk/cwl/tests/test_make_output.py index 0b08b2e6ec..776f07c464 100644 --- a/sdk/cwl/tests/test_make_output.py +++ b/sdk/cwl/tests/test_make_output.py @@ -32,7 +32,7 @@ class TestMakeOutput(unittest.TestCase): final.open.return_value = openmock openmock.__enter__.return_value = cwlout - runner.make_output_collection("Test output", { + _, runner.final_output_collection = runner.make_output_collection("Test output", { "foo": { "class": "File", "location": "keep:99999999999999999999999999999991+99/foo.txt", diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go index 9dc8f9425a..6d791bf987 100644 --- a/tools/keep-exercise/keep-exercise.go +++ b/tools/keep-exercise/keep-exercise.go @@ -47,7 +47,7 @@ func main() { if err != nil { log.Fatal(err) } - kc, err := keepclient.MakeKeepClient(&arv) + kc, err := keepclient.MakeKeepClient(arv) if err != nil { log.Fatal(err) } @@ -56,11 +56,11 @@ func main() { overrideServices(kc) - nextBuf := make(chan []byte, *WriteThreads) nextLocator := make(chan string, *ReadThreads+*WriteThreads) go countBeans(nextLocator) for i := 0; i < *WriteThreads; i++ { + nextBuf := make(chan []byte, 1) go makeBufs(nextBuf, i) go doWrites(kc, nextBuf, nextLocator) } @@ -106,23 +106,28 @@ func countBeans(nextLocator chan string) { } } -func makeBufs(nextBuf chan []byte, threadID int) { +func makeBufs(nextBuf chan<- []byte, threadID int) { buf := make([]byte, *BlockSize) if *VaryThread { binary.PutVarint(buf, int64(threadID)) } + randSize := 524288 + if randSize > *BlockSize { + randSize = *BlockSize + } for { if *VaryRequest { - buf = make([]byte, *BlockSize) - if _, err := io.ReadFull(rand.Reader, buf); err != nil { + rnd := make([]byte, randSize) + if _, err := io.ReadFull(rand.Reader, rnd); err != nil { log.Fatal(err) } + buf = append(rnd, buf[randSize:]...) } nextBuf <- buf } } -func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan string) { +func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string) { for buf := range nextBuf { locator, _, err := kc.PutB(buf) if err != nil { @@ -139,7 +144,7 @@ func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan s } } -func doReads(kc *keepclient.KeepClient, nextLocator chan string) { +func doReads(kc *keepclient.KeepClient, nextLocator <-chan string) { for locator := range nextLocator { rdr, size, url, err := kc.Get(locator) if err != nil {