10467: Merge branch 'master' into 10467-client-disconnect
authorTom Clegg <tom@curoverse.com>
Tue, 8 Nov 2016 20:37:26 +0000 (15:37 -0500)
committerTom Clegg <tom@curoverse.com>
Tue, 8 Nov 2016 20:37:26 +0000 (15:37 -0500)
build/run-build-packages.sh
build/run-tests.sh
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/setup.py
sdk/cwl/tests/test_make_output.py
services/keepstore/s3_volume.go
tools/keep-exercise/keep-exercise.go

index 12c92607de51e3fd4aa7b6ec129e438b9427b1ec..320f9d445c3a052a62bf5b8560b2080c98b06904 100755 (executable)
@@ -431,6 +431,8 @@ package_go_binary tools/keep-block-check keep-block-check \
     "Verify that all data from one set of Keep servers to another was copied"
 package_go_binary tools/keep-rsync keep-rsync \
     "Copy all data from one set of Keep servers to another"
+package_go_binary tools/keep-exercise keep-exercise \
+    "Performance testing tool for Arvados Keep"
 
 # The Python SDK
 # Please resist the temptation to add --no-python-fix-name to the fpm call here
@@ -476,7 +478,7 @@ fpm_build ruamel.yaml "" "" python 0.12.4 --python-setup-py-arguments "--single-
 fpm_build cwltest "" "" python 1.0.20160907111242
 
 # And for cwltool we have the same problem as for schema_salad. Ward, 2016-03-17
-fpm_build cwltool "" "" python 1.0.20161007181528
+fpm_build cwltool "" "" python 1.0.20161107145355
 
 # FPM eats the trailing .0 in the python-rdflib-jsonld package when built with 'rdflib-jsonld>=0.3.0'. Force the version. Ward, 2016-03-25
 fpm_build rdflib-jsonld "" "" python 0.3.0
index e326c2287f05f3b10a178384b7fdf2b49f688647..8959cfbe09c3ea7ac6ded2142b626259787d2121 100755 (executable)
@@ -93,6 +93,7 @@ sdk/go/streamer
 sdk/go/crunchrunner
 sdk/cwl
 tools/crunchstat-summary
+tools/keep-exercise
 tools/keep-rsync
 tools/keep-block-check
 
@@ -764,8 +765,9 @@ gostuff=(
     services/crunch-dispatch-local
     services/crunch-dispatch-slurm
     services/crunch-run
-    tools/keep-rsync
     tools/keep-block-check
+    tools/keep-exercise
+    tools/keep-rsync
     )
 for g in "${gostuff[@]}"
 do
index 3144592fc98f47083581e06fabcf900517d7ab01..5ae2de31521816a888618c3e3aaf7adb0487f356 100644 (file)
@@ -201,14 +201,28 @@ class ArvCwlRunner(object):
 
         srccollections = {}
         for k,v in generatemapper.items():
+            if k.startswith("_:"):
+                if v.type == "Directory":
+                    continue
+                if v.type == "CreateFile":
+                    with final.open(v.target, "wb") as f:
+                        f.write(v.resolved.encode("utf-8"))
+                    continue
+
+            if not k.startswith("keep:"):
+                raise Exception("Output source is not in keep or a literal")
             sp = k.split("/")
             srccollection = sp[0][5:]
             if srccollection not in srccollections:
-                srccollections[srccollection] = arvados.collection.CollectionReader(
-                    srccollection,
-                    api_client=self.api,
-                    keep_client=self.keep_client,
-                    num_retries=self.num_retries)
+                try:
+                    srccollections[srccollection] = arvados.collection.CollectionReader(
+                        srccollection,
+                        api_client=self.api,
+                        keep_client=self.keep_client,
+                        num_retries=self.num_retries)
+                except arvados.errors.ArgumentError as e:
+                    logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+                    raise
             reader = srccollections[srccollection]
             try:
                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
@@ -218,7 +232,7 @@ class ArvCwlRunner(object):
 
         def rewrite(fileobj):
             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
-            for k in ("basename", "size", "listing"):
+            for k in ("basename", "size", "listing", "contents"):
                 if k in fileobj:
                     del fileobj[k]
 
@@ -234,7 +248,13 @@ class ArvCwlRunner(object):
                     final.api_response()["name"],
                     final.manifest_locator())
 
-        self.final_output_collection = final
+        def finalcollection(fileobj):
+            fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
+
+        adjustDirObjs(outputObj, finalcollection)
+        adjustFileObjs(outputObj, finalcollection)
+
+        return (outputObj, final)
 
     def set_crunch_output(self):
         if self.work_api == "containers":
@@ -390,7 +410,7 @@ class ArvCwlRunner(object):
         else:
             if self.output_name is None:
                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
-            self.make_output_collection(self.output_name, self.final_output)
+            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.final_output)
             self.set_crunch_output()
 
         if self.final_status != "success":
index 73c81ceb0fcdb033203c1b7e5425b3875ea121d6..c15b289037210d8b3c98be08664d4c5713315b64 100644 (file)
@@ -187,14 +187,19 @@ class FinalOutputPathMapper(PathMapper):
     def visit(self, obj, stagedir, basedir, copy=False):
         # type: (Dict[unicode, Any], unicode, unicode, bool) -> None
         loc = obj["location"]
+        tgt = os.path.join(stagedir, obj["basename"])
         if obj["class"] == "Directory":
-            self._pathmap[loc] = MapperEnt(loc, stagedir, "Directory")
+            self._pathmap[loc] = MapperEnt(tgt, tgt, "Directory")
+            if loc.startswith("_:"):
+                self.visitlisting(obj.get("listing", []), tgt, basedir)
         elif obj["class"] == "File":
             if loc in self._pathmap:
                 return
-            tgt = os.path.join(stagedir, obj["basename"])
-            self._pathmap[loc] = MapperEnt(loc, tgt, "File")
-            self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
+            if "contents" in obj and loc.startswith("_:"):
+                self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile")
+            else:
+                self._pathmap[loc] = MapperEnt(loc, tgt, "File")
+                self.visitlisting(obj.get("secondaryFiles", []), stagedir, basedir)
 
     def setup(self, referenced_files, basedir):
         # type: (List[Any], unicode) -> None
index d1c8f9b567839bb6aaf1e78db2d6855b9a6038c2..9d9a1e1a7acf99f46d61d96de384681da114925a 100644 (file)
@@ -48,7 +48,7 @@ setup(name='arvados-cwl-runner',
       # Make sure to update arvados/build/run-build-packages.sh as well
       # when updating the cwltool version pin.
       install_requires=[
-          'cwltool==1.0.20161007181528',
+          'cwltool==1.0.20161107145355',
           'arvados-python-client>=0.1.20160826210445'
       ],
       data_files=[
index 0b08b2e6ec10b36cfe7dabf9be0263050893d878..776f07c4644f046c9cd9432e7365754112180b6a 100644 (file)
@@ -32,7 +32,7 @@ class TestMakeOutput(unittest.TestCase):
         final.open.return_value = openmock
         openmock.__enter__.return_value = cwlout
 
-        runner.make_output_collection("Test output", {
+        _, runner.final_output_collection = runner.make_output_collection("Test output", {
             "foo": {
                 "class": "File",
                 "location": "keep:99999999999999999999999999999991+99/foo.txt",
index 33919a37e1371fa322503b3aed3add9f15bf2cf5..17923f807dc8a8f11bc77ce8dc0732001a4a8ba8 100644 (file)
@@ -22,6 +22,11 @@ import (
        "github.com/AdRoll/goamz/s3"
 )
 
+const (
+       s3DefaultReadTimeout    = arvados.Duration(10 * time.Minute)
+       s3DefaultConnectTimeout = arvados.Duration(time.Minute)
+)
+
 var (
        // ErrS3TrashDisabled is returned by Trash if that operation
        // is impossible with the current config.
@@ -216,10 +221,10 @@ func (v *S3Volume) Start() error {
        // Zero timeouts mean "wait forever", which is a bad
        // default. Default to long timeouts instead.
        if v.ConnectTimeout == 0 {
-               v.ConnectTimeout = arvados.Duration(time.Minute)
+               v.ConnectTimeout = s3DefaultConnectTimeout
        }
        if v.ReadTimeout == 0 {
-               v.ReadTimeout = arvados.Duration(10 * time.Minute)
+               v.ReadTimeout = s3DefaultReadTimeout
        }
 
        client := s3.New(auth, region)
index 4131d752b85144d6ed95c1f5989ade92086f1c92..6d791bf9876a5b84a2b1b642025b730771f76da2 100644 (file)
@@ -56,11 +56,11 @@ func main() {
 
        overrideServices(kc)
 
-       nextBuf := make(chan []byte, *WriteThreads)
        nextLocator := make(chan string, *ReadThreads+*WriteThreads)
 
        go countBeans(nextLocator)
        for i := 0; i < *WriteThreads; i++ {
+               nextBuf := make(chan []byte, 1)
                go makeBufs(nextBuf, i)
                go doWrites(kc, nextBuf, nextLocator)
        }
@@ -106,23 +106,28 @@ func countBeans(nextLocator chan string) {
        }
 }
 
-func makeBufs(nextBuf chan []byte, threadID int) {
+func makeBufs(nextBuf chan<- []byte, threadID int) {
        buf := make([]byte, *BlockSize)
        if *VaryThread {
                binary.PutVarint(buf, int64(threadID))
        }
+       randSize := 524288
+       if randSize > *BlockSize {
+               randSize = *BlockSize
+       }
        for {
                if *VaryRequest {
-                       buf = make([]byte, *BlockSize)
-                       if _, err := io.ReadFull(rand.Reader, buf); err != nil {
+                       rnd := make([]byte, randSize)
+                       if _, err := io.ReadFull(rand.Reader, rnd); err != nil {
                                log.Fatal(err)
                        }
+                       buf = append(rnd, buf[randSize:]...)
                }
                nextBuf <- buf
        }
 }
 
-func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan string) {
+func doWrites(kc *keepclient.KeepClient, nextBuf <-chan []byte, nextLocator chan<- string) {
        for buf := range nextBuf {
                locator, _, err := kc.PutB(buf)
                if err != nil {
@@ -139,7 +144,7 @@ func doWrites(kc *keepclient.KeepClient, nextBuf chan []byte, nextLocator chan s
        }
 }
 
-func doReads(kc *keepclient.KeepClient, nextLocator chan string) {
+func doReads(kc *keepclient.KeepClient, nextLocator <-chan string) {
        for locator := range nextLocator {
                rdr, size, url, err := kc.Get(locator)
                if err != nil {