Merge branch '1943-client-sdk-config-files'
authorTim Pierce <twp@curoverse.com>
Tue, 21 Jan 2014 22:14:40 +0000 (17:14 -0500)
committerTim Pierce <twp@curoverse.com>
Tue, 21 Jan 2014 22:14:40 +0000 (17:14 -0500)
Conflicts:
sdk/python/arvados/__init__.py

34 files changed:
doc/_config.yml
doc/_includes/navbar_left.html
doc/_includes/navbar_top.html
doc/_includes/run-md5sum.py [new file with mode: 0644]
doc/_layouts/default.html
doc/css/bootstrap.css
doc/images/glyphicons-halflings-white.png [new file with mode: 0644]
doc/images/glyphicons-halflings.png [new file with mode: 0644]
doc/index.html
doc/install/create-standard-objects.textile
doc/mkpydoc.sh [new file with mode: 0755]
doc/sdk/index.textile [new file with mode: 0644]
doc/sdk/python/crunch-utility-libraries.textile [moved from doc/user/reference/crunch-utility-libraries.textile with 91% similarity]
doc/sdk/python/python.textile [new file with mode: 0644]
doc/sdk/python/sdk-python.textile [moved from doc/user/reference/sdk-python.textile with 95% similarity]
doc/user/reference/sdk-cli.textile
doc/user/tutorials/running-external-program.textile [new file with mode: 0644]
doc/user/tutorials/tutorial-firstscript.textile
doc/user/tutorials/tutorial-gatk-variantfiltration.textile
doc/user/tutorials/tutorial-job-debug.textile
doc/user/tutorials/tutorial-job1.textile
doc/user/tutorials/tutorial-keep.textile
doc/user/tutorials/tutorial-new-pipeline.textile
doc/user/tutorials/tutorial-parallel.textile
doc/user/tutorials/tutorial-trait-search.textile
sdk/python/arvados/__init__.py
sdk/python/arvados/collection.py [new file with mode: 0644]
sdk/python/arvados/keep.py [new file with mode: 0644]
sdk/python/arvados/stream.py [new file with mode: 0644]
services/api/app/controllers/arvados/v1/collections_controller.rb
services/api/app/controllers/arvados/v1/nodes_controller.rb
services/api/app/controllers/arvados/v1/schema_controller.rb
services/api/app/models/collection.rb
services/api/db/migrate/20140117231056_normalize_collection_uuid.rb [new file with mode: 0644]

index a4fed53b19ca5f79e499c5643e7a19092042e1b2..274ae9bcd7ad2ef846316a7e16772f6fcadf8b28 100644 (file)
@@ -10,3 +10,5 @@ navbar:
     - Concepts
     - API Methods
     - Schema
+  sdk:
+    - Python
\ No newline at end of file
index d593ff33e00faeb071972c8bf413e6639761c9af..f74606ecec007dcc8b55f2cf50a617e56d880e33 100644 (file)
@@ -1,7 +1,7 @@
         <div class="span3">
           <div class="affix-top">
             <div class="well sidebar-nav">
-             {% if page.navsection == 'userguide' or page.navsection == 'api' %}
+             {% if page.navsection == 'userguide' or page.navsection == 'api' or page.navsection == 'sdk' %}
              <ol class="nav nav-list">
                 {% for menu_item in site.navbar[page.navsection] %}
                <li>{{ menu_item }}
index fc2098d15bddd3b74ca6c744800faaf7aeb539a2..a5f629b97aeaba52f5f05d48eb839d12ea860c8d 100644 (file)
@@ -10,6 +10,7 @@
           <div class="nav-collapse collapse">
             <ul class="nav">
               <li {% if page.navsection == 'userguide' %} class="active" {% endif %}><a href="{{ site.baseurl }}/user/">User&nbsp;Guide</a></li>
+              <li {% if page.navsection == 'sdk' %} class="active" {% endif %}><a href="{{ site.baseurl }}/sdk/">SDK&nbsp;Reference</a></li>
               <li {% if page.navsection == 'api' %} class="active" {% endif %}><a href="{{ site.baseurl }}/api/">API&nbsp;Reference</a></li>
               <li {% if page.navsection == 'adminguide' %} class="active" {% endif %}><a href="{{ site.baseurl }}/admin/">Admin Guide</a></li>
               <li {% if page.navsection == 'installguide' %} class="active" {% endif %}><a href="{{ site.baseurl }}/install/">Install Guide</a></li>
diff --git a/doc/_includes/run-md5sum.py b/doc/_includes/run-md5sum.py
new file mode 100644 (file)
index 0000000..31b2ef0
--- /dev/null
@@ -0,0 +1,35 @@
+#!/usr/bin/env python
+
+import arvados
+
+arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
+this_task = arvados.current_task()
+
+# Get the input collection for this task
+this_task_input = this_task['parameters']['input']
+
+# Create a CollectionReader to access the collection
+input_collection = arvados.CollectionReader(this_task_input)
+
+# Get the name of the first file in the collection
+input_file = list(input_collection.all_files())[0].name()
+
+# Extract the file to a temporary directory
+# Returns the directory that the file was written to
+input_dir = arvados.util.collection_extract(this_task_input,
+        'tmp',
+        files=[input_file],
+        decompress=False)
+
+# Run the 'md5sum' command on the input file, with the current working
+# directory set to the location the input file was extracted to.
+stdoutdata, stderrdata = arvados.util.run_command(
+        ['md5sum', input_file],
+        cwd=input_dir)
+
+# Save the standard output (stdoutdata) "md5sum.txt" in the output collection
+out = arvados.CollectionWriter()
+out.set_current_file_name("md5sum.txt")
+out.write(stdoutdata)
+
+this_task.set_output(out.finish())
index a7b561031cd215956cc432820f552e01112034b2..4e2f08a4518a5ba159103d3fa080e016835d620a 100644 (file)
     <link rel="shortcut icon" href="{{ site.baseurl }}/images/favicon.ico" type="image/x-icon">
     <link href="{{ site.baseurl }}/css/bootstrap.css" rel="stylesheet">
     <style>
+      html {
+      height:100%;
+      }
       body {
       padding-top: 41px;
+      height: 90%; /* If calc() is not supported */
+      height: calc(100% - 46px); /* Sets the body full height minus the padding for the menu bar */
       }
       div.frontpagehero {
       background: #fff;
@@ -73,7 +78,7 @@
   <body class="nopad">
     {% include navbar_top.html %}
 
-    {% if page.navsection == 'top' %}
+    {% if page.navsection == 'top' or page.no_nav_left %}
     {{ content }}
     {% else %}
 
index 6b2ba3098aa9739eee0509256aa49993a4c5f6e0..03f41e1fe75ca739f53056e332e3b6495144f04c 100644 (file)
@@ -2429,7 +2429,7 @@ input[type="submit"].btn.btn-mini {
   *margin-right: .3em;
   line-height: 14px;
   vertical-align: text-top;
-  background-image: url("../img/glyphicons-halflings.png");
+  background-image: url("../images/glyphicons-halflings.png");
   background-position: 14px 14px;
   background-repeat: no-repeat;
   margin-top: 1px;
@@ -2452,7 +2452,7 @@ input[type="submit"].btn.btn-mini {
 .dropdown-submenu:focus > a > [class^="icon-"],
 .dropdown-submenu:hover > a > [class*=" icon-"],
 .dropdown-submenu:focus > a > [class*=" icon-"] {
-  background-image: url("../img/glyphicons-halflings-white.png");
+  background-image: url("../images/glyphicons-halflings-white.png");
 }
 .icon-glass {
   background-position: 0      0;
diff --git a/doc/images/glyphicons-halflings-white.png b/doc/images/glyphicons-halflings-white.png
new file mode 100644 (file)
index 0000000..3bf6484
Binary files /dev/null and b/doc/images/glyphicons-halflings-white.png differ
diff --git a/doc/images/glyphicons-halflings.png b/doc/images/glyphicons-halflings.png
new file mode 100644 (file)
index 0000000..a996999
Binary files /dev/null and b/doc/images/glyphicons-halflings.png differ
index 3ff2956329c0f2cd8169988ba125c116d987639b..af280c0183ce91b1e0802ecd391d7081a17ab11b 100644 (file)
@@ -24,13 +24,16 @@ title: Arvados | Documentation
   <div style="width: 50px; display: table-cell; border-left:1px solid #bbb;">
     &nbsp;
   </div>
-  <div style="margin-left: 0px; width: 450px; display: table-cell;">
+  <div style="margin-left: 0px; width: auto; display: table-cell;">
     <div>
       <p>
         <a href="{{ site.baseurl }}/user/">User Guide</a> &mdash; How to manage data and do analysis with Arvados.
       </p>
       <p>
-        <a href="{{ site.baseurl }}/api/">API Reference</a> &mdash; Details about the Arvados APIs.
+        <a href="{{ site.baseurl }}/sdk/">SDK Reference</a> &mdash; Details about the accessing Arvados from various programming languages.
+      </p>
+      <p>
+        <a href="{{ site.baseurl }}/api/">API Reference</a> &mdash; Details about the the Arvados REST API.
       </p>
       <p>
         <a href="{{ site.baseurl }}/admin/">Admin Guide</a> &mdash; How to administer an Arvados system.
index 5e34e4e7bd520343da7f5256ddc108cedb3dc4a1..9a79aa147581a66c8b219a6906b835dec3962fc1 100644 (file)
@@ -42,3 +42,19 @@ read -rd $'\000' newlink <<EOF; arv link create --link "$newlink"
 }                                         
 EOF
 </pre>
+
+h3. Keep disks
+
+Currently, you need to tell Arvados about Keep disks manually.
+
+<pre>
+secret=`ruby -e 'print rand(2**512).to_s(36)[0..49]'`
+arv keep_disk create --keep-disk <<EOF
+{
+ "service_host":"keep0.xyzzy.arvadosapi.com",
+ "service_port":25107,
+ "service_ssl_flag":false,
+ "ping_secret":"$secret"
+}
+EOF
+</pre>
diff --git a/doc/mkpydoc.sh b/doc/mkpydoc.sh
new file mode 100755 (executable)
index 0000000..27be8c7
--- /dev/null
@@ -0,0 +1,5 @@
+#!/bin/sh
+
+mkdir -p _site/sdk/python/arvados
+cd _site/sdk/python/arvados
+epydoc --html -o . "arvados"
diff --git a/doc/sdk/index.textile b/doc/sdk/index.textile
new file mode 100644 (file)
index 0000000..4224b59
--- /dev/null
@@ -0,0 +1,12 @@
+---
+layout: default
+navsection: sdk
+title: "SDK Reference"
+navorder: 0
+---
+
+h1. Arvados SDK Reference
+
+This section documents how to access the Arvados API and Keep using various programming languages.
+
+* "Python SDK":python/sdk-python.html
similarity index 91%
rename from doc/user/reference/crunch-utility-libraries.textile
rename to doc/sdk/python/crunch-utility-libraries.textile
index 524040f58d3bc4aa7c3004997b07a5e74bb8d2fa..95a3780b7ab7db765933fb6ab968919816d1e62c 100644 (file)
@@ -1,20 +1,21 @@
 ---
 layout: default
-navsection: userguide
-navmenu: Reference
+navsection: sdk
+navmenu: Python
 title: "Crunch utility libraries"
-navorder: 31
+navorder: 20
 ---
 
 h1. Crunch utility libraries
 
 Several utility libraries are included with Arvados. They are intended to make it quicker and easier to write your own crunch scripts.
 
-h4. Python SDK extras
+* "Python SDK extras":#pythonsdk
+* "Toolkit wrappers":#toolkit_wrappers
 
-The Python SDK adds some convenience features that are particularly useful in crunch scripts, in addition to the standard set of API calls.
+h2(#pythonsdk). Python SDK extras
 
-<div class="offset1">
+The Python SDK adds some convenience features that are particularly useful in crunch scripts, in addition to the standard set of API calls.
 
 In a crunch job, the environment variables @ARVADOS_API_HOST@ and @ARVADOS_API_TOKEN@ will be set up so the job has the privileges of the user who submitted the job.
 
@@ -25,7 +26,7 @@ my_user = arvados.api().users().current().execute()
 my_uuid = my_user['uuid']
 </pre>
 
-h4. Get the current job and task parameters
+h3. Get the current job and task parameters
 
 @arvados.current_job()@ and @arvados.current_task()@ are convenient ways to retrieve the current Job and Task, using the @JOB_UUID@ and @TASK_UUID@ environment variables provided to each crunch task process.
 
@@ -36,7 +37,7 @@ this_job_input = this_job['script_parameters']['input']
 this_task_input = this_task['parameters']['input']
 </pre>
 
-h4(#one_task_per_input). Queue a task for each input file
+h3(#one_task_per_input). Queue a task for each input file
 
 A common pattern for a crunch job is to run one task to scan the input, and one task per input file to do the work.
 
@@ -61,7 +62,7 @@ arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
 my_input = this_task['parameters']['input']
 </pre>
 
-h4. Set the current task's output and success flag
+h3. Set the current task's output and success flag
 
 Each task in a crunch job must make an API call to record its output and set its @success@ attribute to True. The object returned by @current_task()@ has a @set_output()@ method to make the process more succinct.
 
@@ -69,13 +70,10 @@ Each task in a crunch job must make an API call to record its output and set its
 arvados.current_task().set_output(my_output_locator)
 </pre>
 
-</div>
-
-h4. arvados_ipc.py
+h3. arvados_ipc.py
 
 Manage child processes and FIFOs (pipes).
 
-<div class="offset1">
 
 This module makes it easier to check the exit status of every child process you start, and close the unused end of each FIFO at the appropriate time.
 
@@ -102,18 +100,14 @@ if not waitpid_and_check_children(children):
 
 The "crunch scripts" included with Arvados include some more examples of using the arvados_ipc module.
 
-</div>
-
-h3. Toolkit wrappers
+h2(#toolkit_wrappers). Toolkit wrappers
 
 The following *arvados-&lowast;.py* modules provide "extract, build, run" helpers to make it easy to incorporate common analysis tools in your crunch scripts.
 
-h4. arvados_bwa.py
+h3. arvados_bwa.py
 
 Build and run the "bwa":http://bio-bwa.sourceforge.net/bwa.shtml program.
 
-<div class="offset1">
-
 The module retrieves the bwa source code from Keep, using the job's @bwa_tbz@ parameter.
 
 <pre>
@@ -135,14 +129,10 @@ On qr1hi.arvadosapi.com, the source distribution @bwa-0.7.5a.tar.bz2@ is availab
 }
 </pre>
 
-</div>
-
-h4. arvados_gatk2.py
+h3. arvados_gatk2.py
 
 Extract and run the "Genome Analysis Toolkit":http://www.broadinstitute.org/gatk/ programs.
 
-<div class="offset1">
-
 The module retrieves the binary distribution tarball from Keep, using the job's @gatk_tbz@ parameter.
 
 <pre>
@@ -171,13 +161,10 @@ The GATK data bundle is available in the collection @d237a90bae3870b3b033aea1e99
 }
 </pre>
 
-</div>
-
-h4. arvados_samtools.py
+h3. arvados_samtools.py
 
 Build and run the "samtools":http://samtools.sourceforge.net/samtools.shtml program.
 
-<div class="offset1">
 
 The module retrieves the samtools source code from Keep, using the job's @samtools_tgz@ parameter.
 
@@ -200,13 +187,11 @@ On qr1hi.arvadosapi.com, the source distribution @samtools-0.1.19.tar.gz@ is ava
 }
 </pre>
 
-</div>
 
-h4. arvados_picard.py
+h3. arvados_picard.py
 
 Build and run the "picard":http://picard.sourceforge.net/command-line-overview.shtml program.
 
-<div class="offset1">
 
 The module retrieves the picard binary distribution from Keep, using the job's @picard_zip@ parameter.
 
@@ -237,5 +222,4 @@ On qr1hi.arvadosapi.com, the binary distribution @picard-tools-1.82.zip@ is avai
 }
 </pre>
 
-</div>
 
diff --git a/doc/sdk/python/python.textile b/doc/sdk/python/python.textile
new file mode 100644 (file)
index 0000000..4d6900b
--- /dev/null
@@ -0,0 +1,10 @@
+---
+layout: default
+navsection: sdk
+navmenu: Python
+title: "PyDoc Reference"
+navorder: 30
+no_nav_left: true
+---
+
+notextile. <iframe src="arvados/" style="width:100%; height:100%; border:none" />
similarity index 95%
rename from doc/user/reference/sdk-python.textile
rename to doc/sdk/python/sdk-python.textile
index 288a9d561271c6dd505cc477b4c88d5ebe970603..81a61f094ff6298972aae703c27e240f40523482 100644 (file)
@@ -1,12 +1,12 @@
 ---
 layout: default
-navsection: userguide
-navmenu: Reference
+navsection: sdk
+navmenu: Python
 title: "Python SDK"
-navorder: 23
+navorder: 10
 ---
 
-h1. Reference: Python SDK
+h1. Python SDK
 
 The Python SDK provides a generic set of wrappers so you can make API calls easily. It performs some validation before connecting to the API server: for example, it refuses to do an API call if a required parameter is missing.
 
@@ -26,7 +26,7 @@ $ <code class="userinput">sudo python setup.py install</code>
 </pre>
 </notextile>
 
-If the SDK is installed and your @ARVADOS_API_HOST@ and @ARVADOS_API_TOKEN@ environment variables are set up correctly (see "api-tokens":api-tokens.html for details), @import arvados@ should produce no errors:
+If the SDK is installed and your @ARVADOS_API_HOST@ and @ARVADOS_API_TOKEN@ environment variables are set up correctly (see "api-tokens":{{site.basedoc}}/user/reference/api-tokens.html for details), @import arvados@ should produce no errors:
 
 <notextile>
 <pre>$ <code class="userinput">python</code>
index 17d2a3c0c9b5ee27ab6ae32e02e3e97bfdc2766c..058f1b399d854ffe98e2f7310c3a9c92372ed1ec 100644 (file)
@@ -2,11 +2,11 @@
 layout: default
 navsection: userguide
 navmenu: Reference
-title: "Command line SDK"
+title: "Command line interface"
 navorder: 22
 ---
 
-h1. Reference: Command line SDK
+h1. Reference: Command Line Interface
 
 If you are logged in to an Arvados VM, the command line SDK should be installed. Try:
 
diff --git a/doc/user/tutorials/running-external-program.textile b/doc/user/tutorials/running-external-program.textile
new file mode 100644 (file)
index 0000000..f23fae8
--- /dev/null
@@ -0,0 +1,65 @@
+---
+layout: default
+navsection: userguide
+navmenu: Tutorials
+title: "Running external programs"
+navorder: 18
+---
+
+h1. Running external programs
+
+This tutorial demonstrates how to use Crunch to run an external program by writting a wrapper using the Python SDK.
+
+*This tutorial assumes that you are "logged into an Arvados VM instance":{{site.basedoc}}/user/getting_started/ssh-access.html#login, and have a "working environment.":{{site.basedoc}}/user/getting_started/check-environment.html*
+
+Start by entering the @crunch_scripts@ directory of your git repository:
+
+<notextile>
+<pre><code>$ <span class="userinput">cd you/crunch_scripts</span>
+</code></pre>
+</notextile>
+
+Next, using your favorite text editor, create a new file called @run-md5sum.py@ in the @crunch_scripts@ directory.  Add the following code to compute the md5 hash of each file in a collection:
+
+<pre><code class="userinput">{% include run-md5sum.py %}</code></pre>
+
+Make the file executable:
+
+notextile. <pre><code>$ <span class="userinput">chmod +x run-md5sum.py</span></code></pre>
+
+Next, add the file to @git@ staging, commit and push:
+
+<notextile>
+<pre><code>$ <span class="userinput">git add run-md5sum.py</span>
+$ <span class="userinput">git commit -m"run external md5sum program"</span>
+$ <span class="userinput">git push origin master</span>
+</code></pre>
+</notextile>
+
+You should now be able to run your new script using Crunch, with "script" referring to our new "run-md5sum.py" script.
+
+<notextile>
+<pre><code>$ <span class="userinput">cat &gt;the_job &lt;&lt;EOF
+{
+ "script": "run-md5sum.py",
+ "script_version": "you:master",
+ "script_parameters":
+ {
+  "input": "c1bad4b39ca5a924e481008009d94e32+210"
+ }
+}
+EOF</span>
+$ <span class="userinput">arv -h job create --job "$(cat the_job)"</span>
+{
+ ...
+ "uuid":"qr1hi-xxxxx-xxxxxxxxxxxxxxx"
+ ...
+}
+$ <span class="userinput">arv -h job get --uuid qr1hi-xxxxx-xxxxxxxxxxxxxxx</span>
+{
+ ...
+ "output":"4d164b1658c261b9afc6b479130016a3+54",
+ ...
+}
+</code></pre>
+</notextile>
index 4917cf9132ec20745cb5ec400e3d094a11b8f697..b582eefaf19b2069e075931c31621bcf2cffbadf 100644 (file)
@@ -6,7 +6,7 @@ title: "Writing a Crunch script"
 navorder: 13
 ---
 
-h1. Tutorial: Writing a Crunch script
+h1. Writing a Crunch script
 
 In this tutorial, we will write the "hash" script demonstrated in the first tutorial.
 
index 8163bd39f1d329d937264931f2988495c10a8e31..779761ab019286611d6c8b6147c82be7c058a5e3 100644 (file)
@@ -6,7 +6,7 @@ title: "Using GATK with Arvados"
 navorder: 16
 ---
 
-h1. Tutorial: Using GATK with Arvados
+h1. Using GATK with Arvados
 
 This tutorials demonstrates how to use the Genome Analysis Toolkit (GATK) with Arvados. In this example we will install GATK and then create a VariantFiltration job to assign pass/fail scores to variants in a VCF file.
 
@@ -204,8 +204,7 @@ Tue Dec 17 19:04:12 2013 qr1hi-8i9sb-n9k7qyp7bs5b9d4 4867  log manifest is 1e77a
 
 Once the job completes, the output can be found in hu34D5B9-exome-filtered.vcf:
 
-<notextile>
-$ <span class="userinput">arv keep ls bedd6ff56b3ae9f90d873b1fcb72f9a3+91</span>
+<notextile><pre><code>$ <span class="userinput">arv keep ls bedd6ff56b3ae9f90d873b1fcb72f9a3+91</span>
 hu34D5B9-exome-filtered.vcf
 </code></pre>
 </notextile>
index 24364f04b11071f31983387670745810a9942233..6be687c56f5d3f732995340a104726bec29c508d 100644 (file)
@@ -6,7 +6,7 @@ title: "Debugging a Crunch script"
 navorder: 14
 ---
 
-h1. Tutorial: Debugging a Crunch script
+h1. Debugging a Crunch script
 
 To test changes to a script by running a job, the change must be pushed into @git@, the job queued asynchronously, and the actual execution may be run on any compute server.  As a result, debugging a script can be difficult and time consuming.  This tutorial demonstrates using @arv-crunch-job@ to run your job in your local VM.  This avoids the job queue and allows you to execute the script from your uncomitted git tree.
 
index 982eb750215d5b1849013839d85b756e43d440ce..53d2342d661c2a52c8f39a7abd03a8ec5b5f5a1d 100644 (file)
@@ -6,7 +6,7 @@ title: "Running a Crunch job"
 navorder: 12
 ---
 
-h1. Tutorial: Running a crunch job
+h1. Running a crunch job
 
 This tutorial introduces the concepts and use of the Crunch job system using the @arv@ command line tool and Arvados Workbench.
 
index 9922b04dec9ace79232200227f7d286b8e08e2c6..6683498e86d78f6450b9b9b305d4180bb25974fb 100644 (file)
@@ -6,7 +6,7 @@ title: "Storing and Retrieving data using Arvados Keep"
 navorder: 11
 ---
 
-h1. Tutorial: Storing and Retrieving data using Arvados Keep
+h1. Storing and Retrieving data using Arvados Keep
 
 This tutorial introduces you to the Arvados file storage system.
 
index 7a9ecfbf363535400afa704c11b26372282a09b9..101d6ac2616be0f6cb65964b4816b50d91d8db1e 100644 (file)
@@ -6,7 +6,7 @@ title: "Constructing a Crunch pipeline"
 navorder: 15
 ---
 
-h1. Tutorial: Constructing a Crunch pipeline
+h1. Constructing a Crunch pipeline
 
 A pipeline in Arvados is a collection of crunch scripts, in which the output from one script may be used as the input to another script.
 
index b76d1a5238beb8b714c5030c6e1638c8b89204f0..8a6118492db8e4d519a6e7f3a0c2f720b230117f 100644 (file)
@@ -6,7 +6,7 @@ title: "Parallel Crunch tasks"
 navorder: 15
 ---
 
-h1. Tutorial: Parallel Crunch tasks
+h1. Parallel Crunch tasks
 
 In the tutorial "writing a crunch script,":tutorial-firstscript.html our script used a "for" loop to compute the md5 hashes for each file in sequence.  This approach, while simple, is not able to take advantage of the compute cluster with multiple nodes and cores to speed up computation by running tasks in parallel.  This tutorial will demonstrate how to create parallel Crunch tasks.
 
@@ -78,4 +78,4 @@ $ <span class="userinput">arv keep get e2ccd204bca37c77c0ba59fc470cd0f7+162/md5s
 
 h2. The one job per file pattern
 
-This example demonstrates how to schedule a new task per file.  Because this is a common pattern, the Crunch Python API contains a convenience function to "queue a task for each input file":{{site.basedoc}}/user/reference/crunch-utility-libraries.html#one_task_per_input which reduces the amount of boilerplate code required to handle parallel jobs.
+This example demonstrates how to schedule a new task per file.  Because this is a common pattern, the Crunch Python API contains a convenience function to "queue a task for each input file":{{site.basedoc}}/sdk/python/crunch-utility-libraries.html#one_task_per_input which reduces the amount of boilerplate code required to handle parallel jobs.
index 9ea4cb6eb4e231de08620daa7488f0bcdf0b1bf2..d7fe50b93b6667f4b9566f8c3cb5d5b244fc0b31 100644 (file)
@@ -6,13 +6,13 @@ title: "Querying the Metadata Database"
 navorder: 16
 ---
 
-h1. Tutorial: Querying the Metadata Database
+h1. Querying the Metadata Database
 
 This tutorial introduces the Arvados Metadata Database.  The Metadata Database stores information about files in Keep.  This example will use the Python SDK to find public WGS (Whole Genome Sequencing) data for people who have reported a certain medical condition.
 
 *This tutorial assumes that you are "logged into an Arvados VM instance":{{site.basedoc}}/user/getting_started/ssh-access.html#login, and have a "working environment.":{{site.basedoc}}/user/getting_started/check-environment.html*
 
-In tutorial example, three angle brackets (&gt;&gt;&gt;) will be used to denote code to enter at the Python prompt.
+In the tutorial examples, three angle brackets (&gt;&gt;&gt;) will be used to denote code to enter at the interactive Python prompt.
 
 Start by running Python.  
 
index 1adadd152fff8ff3870c95f20255d5de737c7a71..2d19259064093d0a898782e06a634c005f12474f 100644 (file)
@@ -21,6 +21,15 @@ import threading
 import apiclient
 import apiclient.discovery
 
+config = None
+EMPTY_BLOCK_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
+services = {}
+
+from stream import *
+from collection import *
+from keep import *
+
+
 # Arvados configuration settings are taken from $HOME/.config/arvados.
 # Environment variables override settings in the config file.
 #
@@ -36,16 +45,6 @@ class ArvadosConfig(dict):
             if var.startswith('ARVADOS_'):
                 self[var] = os.environ[var]
 
-
-config = ArvadosConfig(os.environ['HOME'] + '/.config/arvados/settings.conf')
-
-if 'ARVADOS_DEBUG' in config:
-    logging.basicConfig(level=logging.DEBUG)
-
-EMPTY_BLOCK_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
-
-services = {}
-
 class errors:
     class SyntaxError(Exception):
         pass
@@ -129,8 +128,22 @@ def _cast_objects_too(value, schema_type):
         return _cast_orig(value, schema_type)
 apiclient.discovery._cast = _cast_objects_too
 
+def http_cache(data_type):
+    path = os.environ['HOME'] + '/.cache/arvados/' + data_type
+    try:
+        util.mkdir_dash_p(path)
+    except OSError:
+        path = None
+    return path
+
 def api(version=None):
     global services, config
+
+    if not config:
+        config = ArvadosConfig(os.environ['HOME'] + '/.config/arvados')
+        if 'ARVADOS_DEBUG' in config:
+            logging.basicConfig(level=logging.DEBUG)
+
     if not services.get(version):
         apiVersion = version
         if not version:
@@ -149,7 +162,8 @@ def api(version=None):
         if not os.path.exists(ca_certs):
             ca_certs = None             # use httplib2 default
 
-        http = httplib2.Http(ca_certs=ca_certs)
+        http = httplib2.Http(ca_certs=ca_certs,
+                             cache=http_cache('discovery'))
         http = credentials.authorize(http)
         if re.match(r'(?i)^(true|1|yes)$',
                     config.get('ARVADOS_API_HOST_INSECURE', 'no')):
@@ -518,584 +532,3 @@ class util:
                 allfiles += [ent_base]
         return allfiles
 
-class StreamFileReader(object):
-    def __init__(self, stream, pos, size, name):
-        self._stream = stream
-        self._pos = pos
-        self._size = size
-        self._name = name
-        self._filepos = 0
-    def name(self):
-        return self._name
-    def decompressed_name(self):
-        return re.sub('\.(bz2|gz)$', '', self._name)
-    def size(self):
-        return self._size
-    def stream_name(self):
-        return self._stream.name()
-    def read(self, size, **kwargs):
-        self._stream.seek(self._pos + self._filepos)
-        data = self._stream.read(min(size, self._size - self._filepos))
-        self._filepos += len(data)
-        return data
-    def readall(self, size=2**20, **kwargs):
-        while True:
-            data = self.read(size, **kwargs)
-            if data == '':
-                break
-            yield data
-    def bunzip2(self, size):
-        decompressor = bz2.BZ2Decompressor()
-        for chunk in self.readall(size):
-            data = decompressor.decompress(chunk)
-            if data and data != '':
-                yield data
-    def gunzip(self, size):
-        decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
-        for chunk in self.readall(size):
-            data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
-            if data and data != '':
-                yield data
-    def readall_decompressed(self, size=2**20):
-        self._stream.seek(self._pos + self._filepos)
-        if re.search('\.bz2$', self._name):
-            return self.bunzip2(size)
-        elif re.search('\.gz$', self._name):
-            return self.gunzip(size)
-        else:
-            return self.readall(size)
-    def readlines(self, decompress=True):
-        if decompress:
-            datasource = self.readall_decompressed()
-        else:
-            self._stream.seek(self._pos + self._filepos)
-            datasource = self.readall()
-        data = ''
-        for newdata in datasource:
-            data += newdata
-            sol = 0
-            while True:
-                eol = string.find(data, "\n", sol)
-                if eol < 0:
-                    break
-                yield data[sol:eol+1]
-                sol = eol+1
-            data = data[sol:]
-        if data != '':
-            yield data
-    def as_manifest(self):
-        if self.size() == 0:
-            return ("%s %s 0:0:%s\n"
-                    % (self._stream.name(), EMPTY_BLOCK_LOCATOR, self.name()))
-        return string.join(self._stream.tokens_for_range(self._pos, self._size),
-                           " ") + "\n"
-
-class StreamReader(object):
-    def __init__(self, tokens):
-        self._tokens = tokens
-        self._current_datablock_data = None
-        self._current_datablock_pos = 0
-        self._current_datablock_index = -1
-        self._pos = 0
-
-        self._stream_name = None
-        self.data_locators = []
-        self.files = []
-
-        for tok in self._tokens:
-            if self._stream_name == None:
-                self._stream_name = tok.replace('\\040', ' ')
-            elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
-                self.data_locators += [tok]
-            elif re.search(r'^\d+:\d+:\S+', tok):
-                pos, size, name = tok.split(':',2)
-                self.files += [[int(pos), int(size), name.replace('\\040', ' ')]]
-            else:
-                raise errors.SyntaxError("Invalid manifest format")
-
-    def tokens(self):
-        return self._tokens
-    def tokens_for_range(self, range_start, range_size):
-        resp = [self._stream_name]
-        return_all_tokens = False
-        block_start = 0
-        token_bytes_skipped = 0
-        for locator in self.data_locators:
-            sizehint = re.search(r'\+(\d+)', locator)
-            if not sizehint:
-                return_all_tokens = True
-            if return_all_tokens:
-                resp += [locator]
-                next
-            blocksize = int(sizehint.group(0))
-            if range_start + range_size <= block_start:
-                break
-            if range_start < block_start + blocksize:
-                resp += [locator]
-            else:
-                token_bytes_skipped += blocksize
-            block_start += blocksize
-        for f in self.files:
-            if ((f[0] < range_start + range_size)
-                and
-                (f[0] + f[1] > range_start)
-                and
-                f[1] > 0):
-                resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
-        return resp
-    def name(self):
-        return self._stream_name
-    def all_files(self):
-        for f in self.files:
-            pos, size, name = f
-            yield StreamFileReader(self, pos, size, name)
-    def nextdatablock(self):
-        if self._current_datablock_index < 0:
-            self._current_datablock_pos = 0
-            self._current_datablock_index = 0
-        else:
-            self._current_datablock_pos += self.current_datablock_size()
-            self._current_datablock_index += 1
-        self._current_datablock_data = None
-    def current_datablock_data(self):
-        if self._current_datablock_data == None:
-            self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
-        return self._current_datablock_data
-    def current_datablock_size(self):
-        if self._current_datablock_index < 0:
-            self.nextdatablock()
-        sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
-        if sizehint:
-            return int(sizehint.group(0))
-        return len(self.current_datablock_data())
-    def seek(self, pos):
-        """Set the position of the next read operation."""
-        self._pos = pos
-    def really_seek(self):
-        """Find and load the appropriate data block, so the byte at
-        _pos is in memory.
-        """
-        if self._pos == self._current_datablock_pos:
-            return True
-        if (self._current_datablock_pos != None and
-            self._pos >= self._current_datablock_pos and
-            self._pos <= self._current_datablock_pos + self.current_datablock_size()):
-            return True
-        if self._pos < self._current_datablock_pos:
-            self._current_datablock_index = -1
-            self.nextdatablock()
-        while (self._pos > self._current_datablock_pos and
-               self._pos > self._current_datablock_pos + self.current_datablock_size()):
-            self.nextdatablock()
-    def read(self, size):
-        """Read no more than size bytes -- but at least one byte,
-        unless _pos is already at the end of the stream.
-        """
-        if size == 0:
-            return ''
-        self.really_seek()
-        while self._pos >= self._current_datablock_pos + self.current_datablock_size():
-            self.nextdatablock()
-            if self._current_datablock_index >= len(self.data_locators):
-                return None
-        data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
-        self._pos += len(data)
-        return data
-
-class CollectionReader(object):
-    def __init__(self, manifest_locator_or_text):
-        if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
-            self._manifest_text = manifest_locator_or_text
-            self._manifest_locator = None
-        else:
-            self._manifest_locator = manifest_locator_or_text
-            self._manifest_text = None
-        self._streams = None
-    def __enter__(self):
-        pass
-    def __exit__(self):
-        pass
-    def _populate(self):
-        if self._streams != None:
-            return
-        if not self._manifest_text:
-            self._manifest_text = Keep.get(self._manifest_locator)
-        self._streams = []
-        for stream_line in self._manifest_text.split("\n"):
-            if stream_line != '':
-                stream_tokens = stream_line.split()
-                self._streams += [stream_tokens]
-    def all_streams(self):
-        self._populate()
-        resp = []
-        for s in self._streams:
-            resp += [StreamReader(s)]
-        return resp
-    def all_files(self):
-        for s in self.all_streams():
-            for f in s.all_files():
-                yield f
-    def manifest_text(self):
-        self._populate()
-        return self._manifest_text
-
-class CollectionWriter(object):
-    KEEP_BLOCK_SIZE = 2**26
-    def __init__(self):
-        self._data_buffer = []
-        self._data_buffer_len = 0
-        self._current_stream_files = []
-        self._current_stream_length = 0
-        self._current_stream_locators = []
-        self._current_stream_name = '.'
-        self._current_file_name = None
-        self._current_file_pos = 0
-        self._finished_streams = []
-    def __enter__(self):
-        pass
-    def __exit__(self):
-        self.finish()
-    def write_directory_tree(self,
-                             path, stream_name='.', max_manifest_depth=-1):
-        self.start_new_stream(stream_name)
-        todo = []
-        if max_manifest_depth == 0:
-            dirents = sorted(util.listdir_recursive(path))
-        else:
-            dirents = sorted(os.listdir(path))
-        for dirent in dirents:
-            target = os.path.join(path, dirent)
-            if os.path.isdir(target):
-                todo += [[target,
-                          os.path.join(stream_name, dirent),
-                          max_manifest_depth-1]]
-            else:
-                self.start_new_file(dirent)
-                with open(target, 'rb') as f:
-                    while True:
-                        buf = f.read(2**26)
-                        if len(buf) == 0:
-                            break
-                        self.write(buf)
-        self.finish_current_stream()
-        map(lambda x: self.write_directory_tree(*x), todo)
-
-    def write(self, newdata):
-        if hasattr(newdata, '__iter__'):
-            for s in newdata:
-                self.write(s)
-            return
-        self._data_buffer += [newdata]
-        self._data_buffer_len += len(newdata)
-        self._current_stream_length += len(newdata)
-        while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
-            self.flush_data()
-    def flush_data(self):
-        data_buffer = ''.join(self._data_buffer)
-        if data_buffer != '':
-            self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
-            self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
-            self._data_buffer_len = len(self._data_buffer[0])
-    def start_new_file(self, newfilename=None):
-        self.finish_current_file()
-        self.set_current_file_name(newfilename)
-    def set_current_file_name(self, newfilename):
-        if re.search(r'[\t\n]', newfilename):
-            raise errors.AssertionError(
-                "Manifest filenames cannot contain whitespace: %s" %
-                newfilename)
-        self._current_file_name = newfilename
-    def current_file_name(self):
-        return self._current_file_name
-    def finish_current_file(self):
-        if self._current_file_name == None:
-            if self._current_file_pos == self._current_stream_length:
-                return
-            raise errors.AssertionError(
-                "Cannot finish an unnamed file " +
-                "(%d bytes at offset %d in '%s' stream)" %
-                (self._current_stream_length - self._current_file_pos,
-                 self._current_file_pos,
-                 self._current_stream_name))
-        self._current_stream_files += [[self._current_file_pos,
-                                       self._current_stream_length - self._current_file_pos,
-                                       self._current_file_name]]
-        self._current_file_pos = self._current_stream_length
-    def start_new_stream(self, newstreamname='.'):
-        self.finish_current_stream()
-        self.set_current_stream_name(newstreamname)
-    def set_current_stream_name(self, newstreamname):
-        if re.search(r'[\t\n]', newstreamname):
-            raise errors.AssertionError(
-                "Manifest stream names cannot contain whitespace")
-        self._current_stream_name = '.' if newstreamname=='' else newstreamname
-    def current_stream_name(self):
-        return self._current_stream_name
-    def finish_current_stream(self):
-        self.finish_current_file()
-        self.flush_data()
-        if len(self._current_stream_files) == 0:
-            pass
-        elif self._current_stream_name == None:
-            raise errors.AssertionError(
-                "Cannot finish an unnamed stream (%d bytes in %d files)" %
-                (self._current_stream_length, len(self._current_stream_files)))
-        else:
-            if len(self._current_stream_locators) == 0:
-                self._current_stream_locators += [EMPTY_BLOCK_LOCATOR]
-            self._finished_streams += [[self._current_stream_name,
-                                       self._current_stream_locators,
-                                       self._current_stream_files]]
-        self._current_stream_files = []
-        self._current_stream_length = 0
-        self._current_stream_locators = []
-        self._current_stream_name = None
-        self._current_file_pos = 0
-        self._current_file_name = None
-    def finish(self):
-        return Keep.put(self.manifest_text())
-    def manifest_text(self):
-        self.finish_current_stream()
-        manifest = ''
-        for stream in self._finished_streams:
-            if not re.search(r'^\.(/.*)?$', stream[0]):
-                manifest += './'
-            manifest += stream[0].replace(' ', '\\040')
-            for locator in stream[1]:
-                manifest += " %s" % locator
-            for sfile in stream[2]:
-                manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040'))
-            manifest += "\n"
-        return manifest
-    def data_locators(self):
-        ret = []
-        for name, locators, files in self._finished_streams:
-            ret += locators
-        return ret
-
-global_client_object = None
-
-class Keep:
-    @staticmethod
-    def global_client_object():
-        global global_client_object
-        if global_client_object == None:
-            global_client_object = KeepClient()
-        return global_client_object
-
-    @staticmethod
-    def get(locator, **kwargs):
-        return Keep.global_client_object().get(locator, **kwargs)
-
-    @staticmethod
-    def put(data, **kwargs):
-        return Keep.global_client_object().put(data, **kwargs)
-
-class KeepClient(object):
-
-    class ThreadLimiter(object):
-        """
-        Limit the number of threads running at a given time to
-        {desired successes} minus {successes reported}. When successes
-        reported == desired, wake up the remaining threads and tell
-        them to quit.
-
-        Should be used in a "with" block.
-        """
-        def __init__(self, todo):
-            self._todo = todo
-            self._done = 0
-            self._todo_lock = threading.Semaphore(todo)
-            self._done_lock = threading.Lock()
-        def __enter__(self):
-            self._todo_lock.acquire()
-            return self
-        def __exit__(self, type, value, traceback):
-            self._todo_lock.release()
-        def shall_i_proceed(self):
-            """
-            Return true if the current thread should do stuff. Return
-            false if the current thread should just stop.
-            """
-            with self._done_lock:
-                return (self._done < self._todo)
-        def increment_done(self):
-            """
-            Report that the current thread was successful.
-            """
-            with self._done_lock:
-                self._done += 1
-        def done(self):
-            """
-            Return how many successes were reported.
-            """
-            with self._done_lock:
-                return self._done
-
-    class KeepWriterThread(threading.Thread):
-        """
-        Write a blob of data to the given Keep server. Call
-        increment_done() of the given ThreadLimiter if the write
-        succeeds.
-        """
-        def __init__(self, **kwargs):
-            super(KeepClient.KeepWriterThread, self).__init__()
-            self.args = kwargs
-        def run(self):
-            global config
-            with self.args['thread_limiter'] as limiter:
-                if not limiter.shall_i_proceed():
-                    # My turn arrived, but the job has been done without
-                    # me.
-                    return
-                logging.debug("KeepWriterThread %s proceeding %s %s" %
-                              (str(threading.current_thread()),
-                               self.args['data_hash'],
-                               self.args['service_root']))
-                h = httplib2.Http()
-                url = self.args['service_root'] + self.args['data_hash']
-                api_token = config['ARVADOS_API_TOKEN']
-                headers = {'Authorization': "OAuth2 %s" % api_token}
-                try:
-                    resp, content = h.request(url.encode('utf-8'), 'PUT',
-                                              headers=headers,
-                                              body=self.args['data'])
-                    if (resp['status'] == '401' and
-                        re.match(r'Timestamp verification failed', content)):
-                        body = KeepClient.sign_for_old_server(
-                            self.args['data_hash'],
-                            self.args['data'])
-                        h = httplib2.Http()
-                        resp, content = h.request(url.encode('utf-8'), 'PUT',
-                                                  headers=headers,
-                                                  body=body)
-                    if re.match(r'^2\d\d$', resp['status']):
-                        logging.debug("KeepWriterThread %s succeeded %s %s" %
-                                      (str(threading.current_thread()),
-                                       self.args['data_hash'],
-                                       self.args['service_root']))
-                        return limiter.increment_done()
-                    logging.warning("Request fail: PUT %s => %s %s" %
-                                    (url, resp['status'], content))
-                except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
-                    logging.warning("Request fail: PUT %s => %s: %s" %
-                                    (url, type(e), str(e)))
-
-    def __init__(self):
-        self.lock = threading.Lock()
-        self.service_roots = None
-
-    def shuffled_service_roots(self, hash):
-        if self.service_roots == None:
-            self.lock.acquire()
-            keep_disks = api().keep_disks().list().execute()['items']
-            roots = (("http%s://%s:%d/" %
-                      ('s' if f['service_ssl_flag'] else '',
-                       f['service_host'],
-                       f['service_port']))
-                     for f in keep_disks)
-            self.service_roots = sorted(set(roots))
-            logging.debug(str(self.service_roots))
-            self.lock.release()
-        seed = hash
-        pool = self.service_roots[:]
-        pseq = []
-        while len(pool) > 0:
-            if len(seed) < 8:
-                if len(pseq) < len(hash) / 4: # first time around
-                    seed = hash[-4:] + hash
-                else:
-                    seed += hash
-            probe = int(seed[0:8], 16) % len(pool)
-            pseq += [pool[probe]]
-            pool = pool[:probe] + pool[probe+1:]
-            seed = seed[8:]
-        logging.debug(str(pseq))
-        return pseq
-
-    def get(self, locator):
-        global config
-        if re.search(r',', locator):
-            return ''.join(self.get(x) for x in locator.split(','))
-        if 'KEEP_LOCAL_STORE' in os.environ:
-            return KeepClient.local_store_get(locator)
-        expect_hash = re.sub(r'\+.*', '', locator)
-        for service_root in self.shuffled_service_roots(expect_hash):
-            h = httplib2.Http()
-            url = service_root + expect_hash
-            api_token = config['ARVADOS_API_TOKEN']
-            headers = {'Authorization': "OAuth2 %s" % api_token,
-                       'Accept': 'application/octet-stream'}
-            try:
-                resp, content = h.request(url.encode('utf-8'), 'GET',
-                                          headers=headers)
-                if re.match(r'^2\d\d$', resp['status']):
-                    m = hashlib.new('md5')
-                    m.update(content)
-                    md5 = m.hexdigest()
-                    if md5 == expect_hash:
-                        return content
-                    logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
-            except (httplib2.HttpLib2Error, httplib.ResponseNotReady) as e:
-                logging.info("Request fail: GET %s => %s: %s" %
-                             (url, type(e), str(e)))
-        raise errors.NotFoundError("Block not found: %s" % expect_hash)
-
-    def put(self, data, **kwargs):
-        if 'KEEP_LOCAL_STORE' in os.environ:
-            return KeepClient.local_store_put(data)
-        m = hashlib.new('md5')
-        m.update(data)
-        data_hash = m.hexdigest()
-        have_copies = 0
-        want_copies = kwargs.get('copies', 2)
-        if not (want_copies > 0):
-            return data_hash
-        threads = []
-        thread_limiter = KeepClient.ThreadLimiter(want_copies)
-        for service_root in self.shuffled_service_roots(data_hash):
-            t = KeepClient.KeepWriterThread(data=data,
-                                            data_hash=data_hash,
-                                            service_root=service_root,
-                                            thread_limiter=thread_limiter)
-            t.start()
-            threads += [t]
-        for t in threads:
-            t.join()
-        have_copies = thread_limiter.done()
-        if have_copies == want_copies:
-            return (data_hash + '+' + str(len(data)))
-        raise errors.KeepWriteError(
-            "Write fail for %s: wanted %d but wrote %d" %
-            (data_hash, want_copies, have_copies))
-
-    @staticmethod
-    def sign_for_old_server(data_hash, data):
-        return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
-
-
-    @staticmethod
-    def local_store_put(data):
-        m = hashlib.new('md5')
-        m.update(data)
-        md5 = m.hexdigest()
-        locator = '%s+%d' % (md5, len(data))
-        with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
-            f.write(data)
-        os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
-                  os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
-        return locator
-    @staticmethod
-    def local_store_get(locator):
-        r = re.search('^([0-9a-f]{32,})', locator)
-        if not r:
-            raise errors.NotFoundError(
-                "Invalid data locator: '%s'" % locator)
-        if r.group(0) == EMPTY_BLOCK_LOCATOR.split('+')[0]:
-            return ''
-        with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f:
-            return f.read()
-
-# We really shouldn't do this but some clients still use
-# arvados.service.* directly instead of arvados.api().*
-service = api()
diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py
new file mode 100644 (file)
index 0000000..dc2f0f8
--- /dev/null
@@ -0,0 +1,215 @@
+import gflags
+import httplib
+import httplib2
+import logging
+import os
+import pprint
+import sys
+import types
+import subprocess
+import json
+import UserDict
+import re
+import hashlib
+import string
+import bz2
+import zlib
+import fcntl
+import time
+import threading
+
+from stream import *
+from keep import *
+
+class CollectionReader(object):
+    def __init__(self, manifest_locator_or_text):
+        if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
+            self._manifest_text = manifest_locator_or_text
+            self._manifest_locator = None
+        else:
+            self._manifest_locator = manifest_locator_or_text
+            self._manifest_text = None
+        self._streams = None
+
+    def __enter__(self):
+        pass
+
+    def __exit__(self):
+        pass
+
+    def _populate(self):
+        if self._streams != None:
+            return
+        if not self._manifest_text:
+            self._manifest_text = Keep.get(self._manifest_locator)
+        self._streams = []
+        for stream_line in self._manifest_text.split("\n"):
+            if stream_line != '':
+                stream_tokens = stream_line.split()
+                self._streams += [stream_tokens]
+
+    def all_streams(self):
+        self._populate()
+        resp = []
+        for s in self._streams:
+            resp += [StreamReader(s)]
+        return resp
+
+    def all_files(self):
+        for s in self.all_streams():
+            for f in s.all_files():
+                yield f
+
+    def manifest_text(self):
+        self._populate()
+        return self._manifest_text
+
+class CollectionWriter(object):
+    KEEP_BLOCK_SIZE = 2**26
+
+    def __init__(self):
+        self._data_buffer = []
+        self._data_buffer_len = 0
+        self._current_stream_files = []
+        self._current_stream_length = 0
+        self._current_stream_locators = []
+        self._current_stream_name = '.'
+        self._current_file_name = None
+        self._current_file_pos = 0
+        self._finished_streams = []
+
+    def __enter__(self):
+        pass
+
+    def __exit__(self):
+        self.finish()
+
+    def write_directory_tree(self,
+                             path, stream_name='.', max_manifest_depth=-1):
+        self.start_new_stream(stream_name)
+        todo = []
+        if max_manifest_depth == 0:
+            dirents = sorted(util.listdir_recursive(path))
+        else:
+            dirents = sorted(os.listdir(path))
+        for dirent in dirents:
+            target = os.path.join(path, dirent)
+            if os.path.isdir(target):
+                todo += [[target,
+                          os.path.join(stream_name, dirent),
+                          max_manifest_depth-1]]
+            else:
+                self.start_new_file(dirent)
+                with open(target, 'rb') as f:
+                    while True:
+                        buf = f.read(2**26)
+                        if len(buf) == 0:
+                            break
+                        self.write(buf)
+        self.finish_current_stream()
+        map(lambda x: self.write_directory_tree(*x), todo)
+
+    def write(self, newdata):
+        if hasattr(newdata, '__iter__'):
+            for s in newdata:
+                self.write(s)
+            return
+        self._data_buffer += [newdata]
+        self._data_buffer_len += len(newdata)
+        self._current_stream_length += len(newdata)
+        while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
+            self.flush_data()
+
+    def flush_data(self):
+        data_buffer = ''.join(self._data_buffer)
+        if data_buffer != '':
+            self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
+            self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
+            self._data_buffer_len = len(self._data_buffer[0])
+
+    def start_new_file(self, newfilename=None):
+        self.finish_current_file()
+        self.set_current_file_name(newfilename)
+
+    def set_current_file_name(self, newfilename):
+        if re.search(r'[\t\n]', newfilename):
+            raise errors.AssertionError(
+                "Manifest filenames cannot contain whitespace: %s" %
+                newfilename)
+        self._current_file_name = newfilename
+
+    def current_file_name(self):
+        return self._current_file_name
+
+    def finish_current_file(self):
+        if self._current_file_name == None:
+            if self._current_file_pos == self._current_stream_length:
+                return
+            raise errors.AssertionError(
+                "Cannot finish an unnamed file " +
+                "(%d bytes at offset %d in '%s' stream)" %
+                (self._current_stream_length - self._current_file_pos,
+                 self._current_file_pos,
+                 self._current_stream_name))
+        self._current_stream_files += [[self._current_file_pos,
+                                       self._current_stream_length - self._current_file_pos,
+                                       self._current_file_name]]
+        self._current_file_pos = self._current_stream_length
+
+    def start_new_stream(self, newstreamname='.'):
+        self.finish_current_stream()
+        self.set_current_stream_name(newstreamname)
+
+    def set_current_stream_name(self, newstreamname):
+        if re.search(r'[\t\n]', newstreamname):
+            raise errors.AssertionError(
+                "Manifest stream names cannot contain whitespace")
+        self._current_stream_name = '.' if newstreamname=='' else newstreamname
+
+    def current_stream_name(self):
+        return self._current_stream_name
+
+    def finish_current_stream(self):
+        self.finish_current_file()
+        self.flush_data()
+        if len(self._current_stream_files) == 0:
+            pass
+        elif self._current_stream_name == None:
+            raise errors.AssertionError(
+                "Cannot finish an unnamed stream (%d bytes in %d files)" %
+                (self._current_stream_length, len(self._current_stream_files)))
+        else:
+            if len(self._current_stream_locators) == 0:
+                self._current_stream_locators += [EMPTY_BLOCK_LOCATOR]
+            self._finished_streams += [[self._current_stream_name,
+                                       self._current_stream_locators,
+                                       self._current_stream_files]]
+        self._current_stream_files = []
+        self._current_stream_length = 0
+        self._current_stream_locators = []
+        self._current_stream_name = None
+        self._current_file_pos = 0
+        self._current_file_name = None
+
+    def finish(self):
+        return Keep.put(self.manifest_text())
+
+    def manifest_text(self):
+        self.finish_current_stream()
+        manifest = ''
+        for stream in self._finished_streams:
+            if not re.search(r'^\.(/.*)?$', stream[0]):
+                manifest += './'
+            manifest += stream[0].replace(' ', '\\040')
+            for locator in stream[1]:
+                manifest += " %s" % locator
+            for sfile in stream[2]:
+                manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040'))
+            manifest += "\n"
+        return manifest
+
+    def data_locators(self):
+        ret = []
+        for name, locators, files in self._finished_streams:
+            ret += locators
+        return ret
diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py
new file mode 100644 (file)
index 0000000..62e9d08
--- /dev/null
@@ -0,0 +1,251 @@
+import gflags
+import httplib
+import httplib2
+import logging
+import os
+import pprint
+import sys
+import types
+import subprocess
+import json
+import UserDict
+import re
+import hashlib
+import string
+import bz2
+import zlib
+import fcntl
+import time
+import threading
+
+global_client_object = None
+
+from arvados import *
+
+class Keep:
+    @staticmethod
+    def global_client_object():
+        global global_client_object
+        if global_client_object == None:
+            global_client_object = KeepClient()
+        return global_client_object
+
+    @staticmethod
+    def get(locator, **kwargs):
+        return Keep.global_client_object().get(locator, **kwargs)
+
+    @staticmethod
+    def put(data, **kwargs):
+        return Keep.global_client_object().put(data, **kwargs)
+
+class KeepClient(object):
+
+    class ThreadLimiter(object):
+        """
+        Limit the number of threads running at a given time to
+        {desired successes} minus {successes reported}. When successes
+        reported == desired, wake up the remaining threads and tell
+        them to quit.
+
+        Should be used in a "with" block.
+        """
+        def __init__(self, todo):
+            self._todo = todo
+            self._done = 0
+            self._todo_lock = threading.Semaphore(todo)
+            self._done_lock = threading.Lock()
+
+        def __enter__(self):
+            self._todo_lock.acquire()
+            return self
+
+        def __exit__(self, type, value, traceback):
+            self._todo_lock.release()
+
+        def shall_i_proceed(self):
+            """
+            Return true if the current thread should do stuff. Return
+            false if the current thread should just stop.
+            """
+            with self._done_lock:
+                return (self._done < self._todo)
+
+        def increment_done(self):
+            """
+            Report that the current thread was successful.
+            """
+            with self._done_lock:
+                self._done += 1
+
+        def done(self):
+            """
+            Return how many successes were reported.
+            """
+            with self._done_lock:
+                return self._done
+
+    class KeepWriterThread(threading.Thread):
+        """
+        Write a blob of data to the given Keep server. Call
+        increment_done() of the given ThreadLimiter if the write
+        succeeds.
+        """
+        def __init__(self, **kwargs):
+            super(KeepClient.KeepWriterThread, self).__init__()
+            self.args = kwargs
+
+        def run(self):
+            global config
+            with self.args['thread_limiter'] as limiter:
+                if not limiter.shall_i_proceed():
+                    # My turn arrived, but the job has been done without
+                    # me.
+                    return
+                logging.debug("KeepWriterThread %s proceeding %s %s" %
+                              (str(threading.current_thread()),
+                               self.args['data_hash'],
+                               self.args['service_root']))
+                h = httplib2.Http()
+                url = self.args['service_root'] + self.args['data_hash']
+                api_token = config['ARVADOS_API_TOKEN']
+                headers = {'Authorization': "OAuth2 %s" % api_token}
+                try:
+                    resp, content = h.request(url.encode('utf-8'), 'PUT',
+                                              headers=headers,
+                                              body=self.args['data'])
+                    if (resp['status'] == '401' and
+                        re.match(r'Timestamp verification failed', content)):
+                        body = KeepClient.sign_for_old_server(
+                            self.args['data_hash'],
+                            self.args['data'])
+                        h = httplib2.Http()
+                        resp, content = h.request(url.encode('utf-8'), 'PUT',
+                                                  headers=headers,
+                                                  body=body)
+                    if re.match(r'^2\d\d$', resp['status']):
+                        logging.debug("KeepWriterThread %s succeeded %s %s" %
+                                      (str(threading.current_thread()),
+                                       self.args['data_hash'],
+                                       self.args['service_root']))
+                        return limiter.increment_done()
+                    logging.warning("Request fail: PUT %s => %s %s" %
+                                    (url, resp['status'], content))
+                except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
+                    logging.warning("Request fail: PUT %s => %s: %s" %
+                                    (url, type(e), str(e)))
+
+    def __init__(self):
+        self.lock = threading.Lock()
+        self.service_roots = None
+
+    def shuffled_service_roots(self, hash):
+        if self.service_roots == None:
+            self.lock.acquire()
+            keep_disks = api().keep_disks().list().execute()['items']
+            roots = (("http%s://%s:%d/" %
+                      ('s' if f['service_ssl_flag'] else '',
+                       f['service_host'],
+                       f['service_port']))
+                     for f in keep_disks)
+            self.service_roots = sorted(set(roots))
+            logging.debug(str(self.service_roots))
+            self.lock.release()
+        seed = hash
+        pool = self.service_roots[:]
+        pseq = []
+        while len(pool) > 0:
+            if len(seed) < 8:
+                if len(pseq) < len(hash) / 4: # first time around
+                    seed = hash[-4:] + hash
+                else:
+                    seed += hash
+            probe = int(seed[0:8], 16) % len(pool)
+            pseq += [pool[probe]]
+            pool = pool[:probe] + pool[probe+1:]
+            seed = seed[8:]
+        logging.debug(str(pseq))
+        return pseq
+
+    def get(self, locator):
+        global config
+        if re.search(r',', locator):
+            return ''.join(self.get(x) for x in locator.split(','))
+        if 'KEEP_LOCAL_STORE' in os.environ:
+            return KeepClient.local_store_get(locator)
+        expect_hash = re.sub(r'\+.*', '', locator)
+        for service_root in self.shuffled_service_roots(expect_hash):
+            h = httplib2.Http()
+            url = service_root + expect_hash
+            api_token = config['ARVADOS_API_TOKEN']
+            headers = {'Authorization': "OAuth2 %s" % api_token,
+                       'Accept': 'application/octet-stream'}
+            try:
+                resp, content = h.request(url.encode('utf-8'), 'GET',
+                                          headers=headers)
+                if re.match(r'^2\d\d$', resp['status']):
+                    m = hashlib.new('md5')
+                    m.update(content)
+                    md5 = m.hexdigest()
+                    if md5 == expect_hash:
+                        return content
+                    logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
+            except (httplib2.HttpLib2Error, httplib.ResponseNotReady) as e:
+                logging.info("Request fail: GET %s => %s: %s" %
+                             (url, type(e), str(e)))
+        raise errors.NotFoundError("Block not found: %s" % expect_hash)
+
+    def put(self, data, **kwargs):
+        if 'KEEP_LOCAL_STORE' in os.environ:
+            return KeepClient.local_store_put(data)
+        m = hashlib.new('md5')
+        m.update(data)
+        data_hash = m.hexdigest()
+        have_copies = 0
+        want_copies = kwargs.get('copies', 2)
+        if not (want_copies > 0):
+            return data_hash
+        threads = []
+        thread_limiter = KeepClient.ThreadLimiter(want_copies)
+        for service_root in self.shuffled_service_roots(data_hash):
+            t = KeepClient.KeepWriterThread(data=data,
+                                            data_hash=data_hash,
+                                            service_root=service_root,
+                                            thread_limiter=thread_limiter)
+            t.start()
+            threads += [t]
+        for t in threads:
+            t.join()
+        have_copies = thread_limiter.done()
+        if have_copies == want_copies:
+            return (data_hash + '+' + str(len(data)))
+        raise errors.KeepWriteError(
+            "Write fail for %s: wanted %d but wrote %d" %
+            (data_hash, want_copies, have_copies))
+
+    @staticmethod
+    def sign_for_old_server(data_hash, data):
+        return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
+
+
+    @staticmethod
+    def local_store_put(data):
+        m = hashlib.new('md5')
+        m.update(data)
+        md5 = m.hexdigest()
+        locator = '%s+%d' % (md5, len(data))
+        with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
+            f.write(data)
+        os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
+                  os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
+        return locator
+
+    @staticmethod
+    def local_store_get(locator):
+        r = re.search('^([0-9a-f]{32,})', locator)
+        if not r:
+            raise errors.NotFoundError(
+                "Invalid data locator: '%s'" % locator)
+        if r.group(0) == EMPTY_BLOCK_LOCATOR.split('+')[0]:
+            return ''
+        with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f:
+            return f.read()
diff --git a/sdk/python/arvados/stream.py b/sdk/python/arvados/stream.py
new file mode 100644 (file)
index 0000000..8570b97
--- /dev/null
@@ -0,0 +1,225 @@
+import gflags
+import httplib
+import httplib2
+import logging
+import os
+import pprint
+import sys
+import types
+import subprocess
+import json
+import UserDict
+import re
+import hashlib
+import string
+import bz2
+import zlib
+import fcntl
+import time
+import threading
+
+from keep import *
+
+class StreamFileReader(object):
+    def __init__(self, stream, pos, size, name):
+        self._stream = stream
+        self._pos = pos
+        self._size = size
+        self._name = name
+        self._filepos = 0
+
+    def name(self):
+        return self._name
+
+    def decompressed_name(self):
+        return re.sub('\.(bz2|gz)$', '', self._name)
+
+    def size(self):
+        return self._size
+
+    def stream_name(self):
+        return self._stream.name()
+
+    def read(self, size, **kwargs):
+        self._stream.seek(self._pos + self._filepos)
+        data = self._stream.read(min(size, self._size - self._filepos))
+        self._filepos += len(data)
+        return data
+
+    def readall(self, size=2**20, **kwargs):
+        while True:
+            data = self.read(size, **kwargs)
+            if data == '':
+                break
+            yield data
+
+    def bunzip2(self, size):
+        decompressor = bz2.BZ2Decompressor()
+        for chunk in self.readall(size):
+            data = decompressor.decompress(chunk)
+            if data and data != '':
+                yield data
+
+    def gunzip(self, size):
+        decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
+        for chunk in self.readall(size):
+            data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
+            if data and data != '':
+                yield data
+
+    def readall_decompressed(self, size=2**20):
+        self._stream.seek(self._pos + self._filepos)
+        if re.search('\.bz2$', self._name):
+            return self.bunzip2(size)
+        elif re.search('\.gz$', self._name):
+            return self.gunzip(size)
+        else:
+            return self.readall(size)
+
+    def readlines(self, decompress=True):
+        if decompress:
+            datasource = self.readall_decompressed()
+        else:
+            self._stream.seek(self._pos + self._filepos)
+            datasource = self.readall()
+        data = ''
+        for newdata in datasource:
+            data += newdata
+            sol = 0
+            while True:
+                eol = string.find(data, "\n", sol)
+                if eol < 0:
+                    break
+                yield data[sol:eol+1]
+                sol = eol+1
+            data = data[sol:]
+        if data != '':
+            yield data
+
+    def as_manifest(self):
+        if self.size() == 0:
+            return ("%s %s 0:0:%s\n"
+                    % (self._stream.name(), EMPTY_BLOCK_LOCATOR, self.name()))
+        return string.join(self._stream.tokens_for_range(self._pos, self._size),
+                           " ") + "\n"
+
+class StreamReader(object):
+    def __init__(self, tokens):
+        self._tokens = tokens
+        self._current_datablock_data = None
+        self._current_datablock_pos = 0
+        self._current_datablock_index = -1
+        self._pos = 0
+
+        self._stream_name = None
+        self.data_locators = []
+        self.files = []
+
+        for tok in self._tokens:
+            if self._stream_name == None:
+                self._stream_name = tok.replace('\\040', ' ')
+            elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
+                self.data_locators += [tok]
+            elif re.search(r'^\d+:\d+:\S+', tok):
+                pos, size, name = tok.split(':',2)
+                self.files += [[int(pos), int(size), name.replace('\\040', ' ')]]
+            else:
+                raise errors.SyntaxError("Invalid manifest format")
+
+    def tokens(self):
+        return self._tokens
+
+    def tokens_for_range(self, range_start, range_size):
+        resp = [self._stream_name]
+        return_all_tokens = False
+        block_start = 0
+        token_bytes_skipped = 0
+        for locator in self.data_locators:
+            sizehint = re.search(r'\+(\d+)', locator)
+            if not sizehint:
+                return_all_tokens = True
+            if return_all_tokens:
+                resp += [locator]
+                next
+            blocksize = int(sizehint.group(0))
+            if range_start + range_size <= block_start:
+                break
+            if range_start < block_start + blocksize:
+                resp += [locator]
+            else:
+                token_bytes_skipped += blocksize
+            block_start += blocksize
+        for f in self.files:
+            if ((f[0] < range_start + range_size)
+                and
+                (f[0] + f[1] > range_start)
+                and
+                f[1] > 0):
+                resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
+        return resp
+
+    def name(self):
+        return self._stream_name
+
+    def all_files(self):
+        for f in self.files:
+            pos, size, name = f
+            yield StreamFileReader(self, pos, size, name)
+
+    def nextdatablock(self):
+        if self._current_datablock_index < 0:
+            self._current_datablock_pos = 0
+            self._current_datablock_index = 0
+        else:
+            self._current_datablock_pos += self.current_datablock_size()
+            self._current_datablock_index += 1
+        self._current_datablock_data = None
+
+    def current_datablock_data(self):
+        if self._current_datablock_data == None:
+            self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
+        return self._current_datablock_data
+
+    def current_datablock_size(self):
+        if self._current_datablock_index < 0:
+            self.nextdatablock()
+        sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
+        if sizehint:
+            return int(sizehint.group(0))
+        return len(self.current_datablock_data())
+
+    def seek(self, pos):
+        """Set the position of the next read operation."""
+        self._pos = pos
+
+    def really_seek(self):
+        """Find and load the appropriate data block, so the byte at
+        _pos is in memory.
+        """
+        if self._pos == self._current_datablock_pos:
+            return True
+        if (self._current_datablock_pos != None and
+            self._pos >= self._current_datablock_pos and
+            self._pos <= self._current_datablock_pos + self.current_datablock_size()):
+            return True
+        if self._pos < self._current_datablock_pos:
+            self._current_datablock_index = -1
+            self.nextdatablock()
+        while (self._pos > self._current_datablock_pos and
+               self._pos > self._current_datablock_pos + self.current_datablock_size()):
+            self.nextdatablock()
+
+    def read(self, size):
+        """Read no more than size bytes -- but at least one byte,
+        unless _pos is already at the end of the stream.
+        """
+        if size == 0:
+            return ''
+        self.really_seek()
+        while self._pos >= self._current_datablock_pos + self.current_datablock_size():
+            self.nextdatablock()
+            if self._current_datablock_index >= len(self.data_locators):
+                return None
+        data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
+        self._pos += len(data)
+        return data
index 4b63747d5b1fe12f78609fe7a6912e51a5404b61..05da5fef0374b5332b487127ec05c6eb38360be5 100644 (file)
@@ -49,4 +49,25 @@ class Arvados::V1::CollectionsController < ApplicationController
     end
     show
   end
+
+  protected
+
+  def find_object_by_uuid
+    super
+    if !@object and !params[:uuid].match(/^[0-9a-f]+\+\d+$/)
+      # Normalize the given uuid and search again.
+      hash_part = params[:uuid].match(/^([0-9a-f]*)/)[1]
+      collection = Collection.where('uuid like ?', hash_part + '+%').first
+      if collection
+        # We know the collection exists, and what its real uuid is in
+        # the database. Now, throw out @objects and repeat the usual
+        # lookup procedure. (Returning the collection at this point
+        # would bypass permission checks.)
+        @objects = nil
+        @where = { uuid: collection.uuid }
+        find_objects_for_index
+        @object = @objects.first
+      end
+    end
+  end
 end
index d04448229cd7e6de158970aa7fd0fbf32eb7c358..8f5b097ccea9e3d5b222ae69c213c2941acea2d5 100644 (file)
@@ -1,5 +1,6 @@
 class Arvados::V1::NodesController < ApplicationController
   skip_before_filter :require_auth_scope_all, :only => :ping
+  skip_before_filter :find_object_by_uuid, :only => :ping
 
   def create
     @object = Node.new
@@ -12,6 +13,10 @@ class Arvados::V1::NodesController < ApplicationController
     { ping_secret: true }
   end
   def ping
+    @object = Node.where(uuid: (params[:id] || params[:uuid])).first
+    if !@object
+      return render_not_found
+    end
     @object.ping({ ip: params[:local_ipv4] || request.env['REMOTE_ADDR'],
                    ping_secret: params[:ping_secret],
                    ec2_instance_id: params[:instance_id] })
index 8a16356c0ddcd6958c9a606c6e43dc00f7d328c8..e57ca52081969ed31aaf07109eafd83e9bfe455d 100644 (file)
@@ -23,6 +23,7 @@ class Arvados::V1::SchemaController < ApplicationController
   end
 
   def discovery_rest_description
+    expires_in 24.hours, public: true
     discovery = Rails.cache.fetch 'arvados_v1_rest_discovery' do
       Rails.application.eager_load!
       discovery = {
index 863e2cb6b27e903cabdf927c893b453575c64f39..03e5e4ef44c32948e3f923ad3b849f0d71e28c8e 100644 (file)
@@ -28,7 +28,9 @@ class Collection < ArvadosModel
     if self.manifest_text.nil? and self.uuid.nil?
       super
     elsif self.manifest_text and self.uuid
-      if self.uuid.gsub(/\+[^,]+/,'') == Digest::MD5.hexdigest(self.manifest_text)
+      self.uuid.gsub! /\+.*/, ''
+      if self.uuid == Digest::MD5.hexdigest(self.manifest_text)
+        self.uuid.gsub! /$/, '+' + self.manifest_text.length.to_s
         true
       else
         errors.add :uuid, 'uuid does not match checksum of manifest_text'
diff --git a/services/api/db/migrate/20140117231056_normalize_collection_uuid.rb b/services/api/db/migrate/20140117231056_normalize_collection_uuid.rb
new file mode 100644 (file)
index 0000000..bec7ec7
--- /dev/null
@@ -0,0 +1,91 @@
+class NormalizeCollectionUuid < ActiveRecord::Migration
+  def count_orphans
+    %w(head tail).each do |ht|
+      results = ActiveRecord::Base.connection.execute(<<-EOS)
+SELECT COUNT(links.*)
+ FROM links
+ LEFT JOIN collections c
+   ON links.#{ht}_uuid = c.uuid
+ WHERE (#{ht}_kind='arvados#collection' or #{ht}_uuid ~ '^[0-9a-f]{32,}')
+   AND #{ht}_uuid IS NOT NULL
+   AND #{ht}_uuid NOT IN (SELECT uuid FROM collections)
+EOS
+      puts "#{results.first['count'].to_i} links with #{ht}_uuid pointing nowhere."
+    end
+  end
+
+  def up
+    # Normalize uuids in the collections table to
+    # {hash}+{size}. Existing uuids might be {hash},
+    # {hash}+{size}+K@{instance-name}, {hash}+K@{instance-name}, etc.
+
+    count_orphans
+    puts "Normalizing collection UUIDs."
+
+    update_sql <<-EOS
+UPDATE collections
+ SET uuid = regexp_replace(uuid,'\\+.*','') || '+' || length(manifest_text)
+ WHERE uuid !~ '^[0-9a-f]{32,}\\+[0-9]+'
+   AND (regexp_replace(uuid,'\\+.*','') || '+' || length(manifest_text))
+     NOT IN (SELECT uuid FROM collections)
+EOS
+
+    count_orphans
+    puts "Updating links by stripping +K@.* from *_uuid attributes."
+
+    update_sql <<-EOS
+UPDATE links
+ SET head_uuid = regexp_replace(head_uuid,'\\+K@.*','')
+ WHERE head_uuid like '%+K@%'
+EOS
+    update_sql <<-EOS
+UPDATE links
+ SET tail_uuid = regexp_replace(tail_uuid,'\\+K@.*','')
+ WHERE tail_uuid like '%+K@%'
+EOS
+
+    count_orphans
+    puts "Updating links by searching bare collection hashes using regexp."
+
+    # Next, update {hash} (and any other non-normalized forms) to
+    # {hash}+{size}. This can only work where the corresponding
+    # collection is found in the collections table (otherwise we can't
+    # know the size).
+    %w(head tail).each do |ht|
+      update_sql <<-EOS
+UPDATE links
+ SET #{ht}_uuid = c.uuid
+ FROM collections c
+ WHERE #{ht}_uuid IS NOT NULL
+   AND (#{ht}_kind='arvados#collection' or #{ht}_uuid ~ '^[0-9a-f]{32,}')
+   AND #{ht}_uuid NOT IN (SELECT uuid FROM collections)
+   AND regexp_replace(#{ht}_uuid,'\\+.*','') = regexp_replace(c.uuid,'\\+.*','')
+   AND c.uuid ~ '^[0-9a-f]{32,}\\+[0-9]+'
+EOS
+    end
+
+    count_orphans
+    puts "Stripping \"+K@.*\" from jobs.output, jobs.log, job_tasks.output."
+
+    update_sql <<-EOS
+UPDATE jobs
+ SET output = regexp_replace(output,'\\+K@.*','')
+ WHERE output ~ '^[0-9a-f]{32,}\\+[0-9]+\\+K@\\w+$'
+EOS
+    update_sql <<-EOS
+UPDATE jobs
+ SET log = regexp_replace(log,'\\+K@.*','')
+ WHERE log ~ '^[0-9a-f]{32,}\\+[0-9]+\\+K@\\w+$'
+EOS
+    update_sql <<-EOS
+UPDATE job_tasks
+ SET output = regexp_replace(output,'\\+K@.*','')
+ WHERE output ~ '^[0-9a-f]{32,}\\+[0-9]+\\+K@\\w+$'
+EOS
+
+    puts "Done."
+  end
+
+  def down
+  end
+end