- Concepts
- API Methods
- Schema
+ sdk:
+ - Python
\ No newline at end of file
<div class="span3">
<div class="affix-top">
<div class="well sidebar-nav">
- {% if page.navsection == 'userguide' or page.navsection == 'api' %}
+ {% if page.navsection == 'userguide' or page.navsection == 'api' or page.navsection == 'sdk' %}
<ol class="nav nav-list">
{% for menu_item in site.navbar[page.navsection] %}
<li>{{ menu_item }}
<div class="nav-collapse collapse">
<ul class="nav">
<li {% if page.navsection == 'userguide' %} class="active" {% endif %}><a href="{{ site.baseurl }}/user/">User Guide</a></li>
+ <li {% if page.navsection == 'sdk' %} class="active" {% endif %}><a href="{{ site.baseurl }}/sdk/">SDK Reference</a></li>
<li {% if page.navsection == 'api' %} class="active" {% endif %}><a href="{{ site.baseurl }}/api/">API Reference</a></li>
<li {% if page.navsection == 'adminguide' %} class="active" {% endif %}><a href="{{ site.baseurl }}/admin/">Admin Guide</a></li>
<li {% if page.navsection == 'installguide' %} class="active" {% endif %}><a href="{{ site.baseurl }}/install/">Install Guide</a></li>
--- /dev/null
+#!/usr/bin/env python
+
+import arvados
+
+arvados.job_setup.one_task_per_input_file(if_sequence=0, and_end_task=True)
+this_task = arvados.current_task()
+
+# Get the input collection for this task
+this_task_input = this_task['parameters']['input']
+
+# Create a CollectionReader to access the collection
+input_collection = arvados.CollectionReader(this_task_input)
+
+# Get the name of the first file in the collection
+input_file = list(input_collection.all_files())[0].name()
+
+# Extract the file to a temporary directory
+# Returns the directory that the file was written to
+input_dir = arvados.util.collection_extract(this_task_input,
+ 'tmp',
+ files=[input_file],
+ decompress=False)
+
+# Run the 'md5sum' command on the input file, with the current working
+# directory set to the location the input file was extracted to.
+stdoutdata, stderrdata = arvados.util.run_command(
+ ['md5sum', input_file],
+ cwd=input_dir)
+
+# Save the standard output (stdoutdata) "md5sum.txt" in the output collection
+out = arvados.CollectionWriter()
+out.set_current_file_name("md5sum.txt")
+out.write(stdoutdata)
+
+this_task.set_output(out.finish())
<link rel="shortcut icon" href="{{ site.baseurl }}/images/favicon.ico" type="image/x-icon">
<link href="{{ site.baseurl }}/css/bootstrap.css" rel="stylesheet">
<style>
+ html {
+ height:100%;
+ }
body {
padding-top: 41px;
+ height: 90%; /* If calc() is not supported */
+ height: calc(100% - 46px); /* Sets the body full height minus the padding for the menu bar */
}
div.frontpagehero {
background: #fff;
<body class="nopad">
{% include navbar_top.html %}
- {% if page.navsection == 'top' %}
+ {% if page.navsection == 'top' or page.no_nav_left %}
{{ content }}
{% else %}
*margin-right: .3em;
line-height: 14px;
vertical-align: text-top;
- background-image: url("../img/glyphicons-halflings.png");
+ background-image: url("../images/glyphicons-halflings.png");
background-position: 14px 14px;
background-repeat: no-repeat;
margin-top: 1px;
.dropdown-submenu:focus > a > [class^="icon-"],
.dropdown-submenu:hover > a > [class*=" icon-"],
.dropdown-submenu:focus > a > [class*=" icon-"] {
- background-image: url("../img/glyphicons-halflings-white.png");
+ background-image: url("../images/glyphicons-halflings-white.png");
}
.icon-glass {
background-position: 0 0;
<div style="width: 50px; display: table-cell; border-left:1px solid #bbb;">
</div>
- <div style="margin-left: 0px; width: 450px; display: table-cell;">
+ <div style="margin-left: 0px; width: auto; display: table-cell;">
<div>
<p>
<a href="{{ site.baseurl }}/user/">User Guide</a> — How to manage data and do analysis with Arvados.
</p>
<p>
- <a href="{{ site.baseurl }}/api/">API Reference</a> — Details about the Arvados APIs.
+ <a href="{{ site.baseurl }}/sdk/">SDK Reference</a> — Details about the accessing Arvados from various programming languages.
+ </p>
+ <p>
+ <a href="{{ site.baseurl }}/api/">API Reference</a> — Details about the the Arvados REST API.
</p>
<p>
<a href="{{ site.baseurl }}/admin/">Admin Guide</a> — How to administer an Arvados system.
}
EOF
</pre>
+
+h3. Keep disks
+
+Currently, you need to tell Arvados about Keep disks manually.
+
+<pre>
+secret=`ruby -e 'print rand(2**512).to_s(36)[0..49]'`
+arv keep_disk create --keep-disk <<EOF
+{
+ "service_host":"keep0.xyzzy.arvadosapi.com",
+ "service_port":25107,
+ "service_ssl_flag":false,
+ "ping_secret":"$secret"
+}
+EOF
+</pre>
--- /dev/null
+#!/bin/sh
+
+mkdir -p _site/sdk/python/arvados
+cd _site/sdk/python/arvados
+epydoc --html -o . "arvados"
--- /dev/null
+---
+layout: default
+navsection: sdk
+title: "SDK Reference"
+navorder: 0
+---
+
+h1. Arvados SDK Reference
+
+This section documents how to access the Arvados API and Keep using various programming languages.
+
+* "Python SDK":python/sdk-python.html
---
layout: default
-navsection: userguide
-navmenu: Reference
+navsection: sdk
+navmenu: Python
title: "Crunch utility libraries"
-navorder: 31
+navorder: 20
---
h1. Crunch utility libraries
Several utility libraries are included with Arvados. They are intended to make it quicker and easier to write your own crunch scripts.
-h4. Python SDK extras
+* "Python SDK extras":#pythonsdk
+* "Toolkit wrappers":#toolkit_wrappers
-The Python SDK adds some convenience features that are particularly useful in crunch scripts, in addition to the standard set of API calls.
+h2(#pythonsdk). Python SDK extras
-<div class="offset1">
+The Python SDK adds some convenience features that are particularly useful in crunch scripts, in addition to the standard set of API calls.
In a crunch job, the environment variables @ARVADOS_API_HOST@ and @ARVADOS_API_TOKEN@ will be set up so the job has the privileges of the user who submitted the job.
my_uuid = my_user['uuid']
</pre>
-h4. Get the current job and task parameters
+h3. Get the current job and task parameters
@arvados.current_job()@ and @arvados.current_task()@ are convenient ways to retrieve the current Job and Task, using the @JOB_UUID@ and @TASK_UUID@ environment variables provided to each crunch task process.
this_task_input = this_task['parameters']['input']
</pre>
-h4(#one_task_per_input). Queue a task for each input file
+h3(#one_task_per_input). Queue a task for each input file
A common pattern for a crunch job is to run one task to scan the input, and one task per input file to do the work.
my_input = this_task['parameters']['input']
</pre>
-h4. Set the current task's output and success flag
+h3. Set the current task's output and success flag
Each task in a crunch job must make an API call to record its output and set its @success@ attribute to True. The object returned by @current_task()@ has a @set_output()@ method to make the process more succinct.
arvados.current_task().set_output(my_output_locator)
</pre>
-</div>
-
-h4. arvados_ipc.py
+h3. arvados_ipc.py
Manage child processes and FIFOs (pipes).
-<div class="offset1">
This module makes it easier to check the exit status of every child process you start, and close the unused end of each FIFO at the appropriate time.
The "crunch scripts" included with Arvados include some more examples of using the arvados_ipc module.
-</div>
-
-h3. Toolkit wrappers
+h2(#toolkit_wrappers). Toolkit wrappers
The following *arvados-∗.py* modules provide "extract, build, run" helpers to make it easy to incorporate common analysis tools in your crunch scripts.
-h4. arvados_bwa.py
+h3. arvados_bwa.py
Build and run the "bwa":http://bio-bwa.sourceforge.net/bwa.shtml program.
-<div class="offset1">
-
The module retrieves the bwa source code from Keep, using the job's @bwa_tbz@ parameter.
<pre>
}
</pre>
-</div>
-
-h4. arvados_gatk2.py
+h3. arvados_gatk2.py
Extract and run the "Genome Analysis Toolkit":http://www.broadinstitute.org/gatk/ programs.
-<div class="offset1">
-
The module retrieves the binary distribution tarball from Keep, using the job's @gatk_tbz@ parameter.
<pre>
}
</pre>
-</div>
-
-h4. arvados_samtools.py
+h3. arvados_samtools.py
Build and run the "samtools":http://samtools.sourceforge.net/samtools.shtml program.
-<div class="offset1">
The module retrieves the samtools source code from Keep, using the job's @samtools_tgz@ parameter.
}
</pre>
-</div>
-h4. arvados_picard.py
+h3. arvados_picard.py
Build and run the "picard":http://picard.sourceforge.net/command-line-overview.shtml program.
-<div class="offset1">
The module retrieves the picard binary distribution from Keep, using the job's @picard_zip@ parameter.
}
</pre>
-</div>
--- /dev/null
+---
+layout: default
+navsection: sdk
+navmenu: Python
+title: "PyDoc Reference"
+navorder: 30
+no_nav_left: true
+---
+
+notextile. <iframe src="arvados/" style="width:100%; height:100%; border:none" />
---
layout: default
-navsection: userguide
-navmenu: Reference
+navsection: sdk
+navmenu: Python
title: "Python SDK"
-navorder: 23
+navorder: 10
---
-h1. Reference: Python SDK
+h1. Python SDK
The Python SDK provides a generic set of wrappers so you can make API calls easily. It performs some validation before connecting to the API server: for example, it refuses to do an API call if a required parameter is missing.
</pre>
</notextile>
-If the SDK is installed and your @ARVADOS_API_HOST@ and @ARVADOS_API_TOKEN@ environment variables are set up correctly (see "api-tokens":api-tokens.html for details), @import arvados@ should produce no errors:
+If the SDK is installed and your @ARVADOS_API_HOST@ and @ARVADOS_API_TOKEN@ environment variables are set up correctly (see "api-tokens":{{site.basedoc}}/user/reference/api-tokens.html for details), @import arvados@ should produce no errors:
<notextile>
<pre>$ <code class="userinput">python</code>
layout: default
navsection: userguide
navmenu: Reference
-title: "Command line SDK"
+title: "Command line interface"
navorder: 22
---
-h1. Reference: Command line SDK
+h1. Reference: Command Line Interface
If you are logged in to an Arvados VM, the command line SDK should be installed. Try:
--- /dev/null
+---
+layout: default
+navsection: userguide
+navmenu: Tutorials
+title: "Running external programs"
+navorder: 18
+---
+
+h1. Running external programs
+
+This tutorial demonstrates how to use Crunch to run an external program by writting a wrapper using the Python SDK.
+
+*This tutorial assumes that you are "logged into an Arvados VM instance":{{site.basedoc}}/user/getting_started/ssh-access.html#login, and have a "working environment.":{{site.basedoc}}/user/getting_started/check-environment.html*
+
+Start by entering the @crunch_scripts@ directory of your git repository:
+
+<notextile>
+<pre><code>$ <span class="userinput">cd you/crunch_scripts</span>
+</code></pre>
+</notextile>
+
+Next, using your favorite text editor, create a new file called @run-md5sum.py@ in the @crunch_scripts@ directory. Add the following code to compute the md5 hash of each file in a collection:
+
+<pre><code class="userinput">{% include run-md5sum.py %}</code></pre>
+
+Make the file executable:
+
+notextile. <pre><code>$ <span class="userinput">chmod +x run-md5sum.py</span></code></pre>
+
+Next, add the file to @git@ staging, commit and push:
+
+<notextile>
+<pre><code>$ <span class="userinput">git add run-md5sum.py</span>
+$ <span class="userinput">git commit -m"run external md5sum program"</span>
+$ <span class="userinput">git push origin master</span>
+</code></pre>
+</notextile>
+
+You should now be able to run your new script using Crunch, with "script" referring to our new "run-md5sum.py" script.
+
+<notextile>
+<pre><code>$ <span class="userinput">cat >the_job <<EOF
+{
+ "script": "run-md5sum.py",
+ "script_version": "you:master",
+ "script_parameters":
+ {
+ "input": "c1bad4b39ca5a924e481008009d94e32+210"
+ }
+}
+EOF</span>
+$ <span class="userinput">arv -h job create --job "$(cat the_job)"</span>
+{
+ ...
+ "uuid":"qr1hi-xxxxx-xxxxxxxxxxxxxxx"
+ ...
+}
+$ <span class="userinput">arv -h job get --uuid qr1hi-xxxxx-xxxxxxxxxxxxxxx</span>
+{
+ ...
+ "output":"4d164b1658c261b9afc6b479130016a3+54",
+ ...
+}
+</code></pre>
+</notextile>
navorder: 13
---
-h1. Tutorial: Writing a Crunch script
+h1. Writing a Crunch script
In this tutorial, we will write the "hash" script demonstrated in the first tutorial.
navorder: 16
---
-h1. Tutorial: Using GATK with Arvados
+h1. Using GATK with Arvados
This tutorials demonstrates how to use the Genome Analysis Toolkit (GATK) with Arvados. In this example we will install GATK and then create a VariantFiltration job to assign pass/fail scores to variants in a VCF file.
Once the job completes, the output can be found in hu34D5B9-exome-filtered.vcf:
-<notextile>
-$ <span class="userinput">arv keep ls bedd6ff56b3ae9f90d873b1fcb72f9a3+91</span>
+<notextile><pre><code>$ <span class="userinput">arv keep ls bedd6ff56b3ae9f90d873b1fcb72f9a3+91</span>
hu34D5B9-exome-filtered.vcf
</code></pre>
</notextile>
navorder: 14
---
-h1. Tutorial: Debugging a Crunch script
+h1. Debugging a Crunch script
To test changes to a script by running a job, the change must be pushed into @git@, the job queued asynchronously, and the actual execution may be run on any compute server. As a result, debugging a script can be difficult and time consuming. This tutorial demonstrates using @arv-crunch-job@ to run your job in your local VM. This avoids the job queue and allows you to execute the script from your uncomitted git tree.
navorder: 12
---
-h1. Tutorial: Running a crunch job
+h1. Running a crunch job
This tutorial introduces the concepts and use of the Crunch job system using the @arv@ command line tool and Arvados Workbench.
navorder: 11
---
-h1. Tutorial: Storing and Retrieving data using Arvados Keep
+h1. Storing and Retrieving data using Arvados Keep
This tutorial introduces you to the Arvados file storage system.
navorder: 15
---
-h1. Tutorial: Constructing a Crunch pipeline
+h1. Constructing a Crunch pipeline
A pipeline in Arvados is a collection of crunch scripts, in which the output from one script may be used as the input to another script.
navorder: 15
---
-h1. Tutorial: Parallel Crunch tasks
+h1. Parallel Crunch tasks
In the tutorial "writing a crunch script,":tutorial-firstscript.html our script used a "for" loop to compute the md5 hashes for each file in sequence. This approach, while simple, is not able to take advantage of the compute cluster with multiple nodes and cores to speed up computation by running tasks in parallel. This tutorial will demonstrate how to create parallel Crunch tasks.
h2. The one job per file pattern
-This example demonstrates how to schedule a new task per file. Because this is a common pattern, the Crunch Python API contains a convenience function to "queue a task for each input file":{{site.basedoc}}/user/reference/crunch-utility-libraries.html#one_task_per_input which reduces the amount of boilerplate code required to handle parallel jobs.
+This example demonstrates how to schedule a new task per file. Because this is a common pattern, the Crunch Python API contains a convenience function to "queue a task for each input file":{{site.basedoc}}/sdk/python/crunch-utility-libraries.html#one_task_per_input which reduces the amount of boilerplate code required to handle parallel jobs.
navorder: 16
---
-h1. Tutorial: Querying the Metadata Database
+h1. Querying the Metadata Database
This tutorial introduces the Arvados Metadata Database. The Metadata Database stores information about files in Keep. This example will use the Python SDK to find public WGS (Whole Genome Sequencing) data for people who have reported a certain medical condition.
*This tutorial assumes that you are "logged into an Arvados VM instance":{{site.basedoc}}/user/getting_started/ssh-access.html#login, and have a "working environment.":{{site.basedoc}}/user/getting_started/check-environment.html*
-In tutorial example, three angle brackets (>>>) will be used to denote code to enter at the Python prompt.
+In the tutorial examples, three angle brackets (>>>) will be used to denote code to enter at the interactive Python prompt.
Start by running Python.
import apiclient
import apiclient.discovery
+config = None
+EMPTY_BLOCK_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
+services = {}
+
+from stream import *
+from collection import *
+from keep import *
+
+
# Arvados configuration settings are taken from $HOME/.config/arvados.
# Environment variables override settings in the config file.
#
if var.startswith('ARVADOS_'):
self[var] = os.environ[var]
-
-config = ArvadosConfig(os.environ['HOME'] + '/.config/arvados/settings.conf')
-
-if 'ARVADOS_DEBUG' in config:
- logging.basicConfig(level=logging.DEBUG)
-
-EMPTY_BLOCK_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
-
-services = {}
-
class errors:
class SyntaxError(Exception):
pass
return _cast_orig(value, schema_type)
apiclient.discovery._cast = _cast_objects_too
+def http_cache(data_type):
+ path = os.environ['HOME'] + '/.cache/arvados/' + data_type
+ try:
+ util.mkdir_dash_p(path)
+ except OSError:
+ path = None
+ return path
+
def api(version=None):
global services, config
+
+ if not config:
+ config = ArvadosConfig(os.environ['HOME'] + '/.config/arvados')
+ if 'ARVADOS_DEBUG' in config:
+ logging.basicConfig(level=logging.DEBUG)
+
if not services.get(version):
apiVersion = version
if not version:
if not os.path.exists(ca_certs):
ca_certs = None # use httplib2 default
- http = httplib2.Http(ca_certs=ca_certs)
+ http = httplib2.Http(ca_certs=ca_certs,
+ cache=http_cache('discovery'))
http = credentials.authorize(http)
if re.match(r'(?i)^(true|1|yes)$',
config.get('ARVADOS_API_HOST_INSECURE', 'no')):
allfiles += [ent_base]
return allfiles
-class StreamFileReader(object):
- def __init__(self, stream, pos, size, name):
- self._stream = stream
- self._pos = pos
- self._size = size
- self._name = name
- self._filepos = 0
- def name(self):
- return self._name
- def decompressed_name(self):
- return re.sub('\.(bz2|gz)$', '', self._name)
- def size(self):
- return self._size
- def stream_name(self):
- return self._stream.name()
- def read(self, size, **kwargs):
- self._stream.seek(self._pos + self._filepos)
- data = self._stream.read(min(size, self._size - self._filepos))
- self._filepos += len(data)
- return data
- def readall(self, size=2**20, **kwargs):
- while True:
- data = self.read(size, **kwargs)
- if data == '':
- break
- yield data
- def bunzip2(self, size):
- decompressor = bz2.BZ2Decompressor()
- for chunk in self.readall(size):
- data = decompressor.decompress(chunk)
- if data and data != '':
- yield data
- def gunzip(self, size):
- decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
- for chunk in self.readall(size):
- data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
- if data and data != '':
- yield data
- def readall_decompressed(self, size=2**20):
- self._stream.seek(self._pos + self._filepos)
- if re.search('\.bz2$', self._name):
- return self.bunzip2(size)
- elif re.search('\.gz$', self._name):
- return self.gunzip(size)
- else:
- return self.readall(size)
- def readlines(self, decompress=True):
- if decompress:
- datasource = self.readall_decompressed()
- else:
- self._stream.seek(self._pos + self._filepos)
- datasource = self.readall()
- data = ''
- for newdata in datasource:
- data += newdata
- sol = 0
- while True:
- eol = string.find(data, "\n", sol)
- if eol < 0:
- break
- yield data[sol:eol+1]
- sol = eol+1
- data = data[sol:]
- if data != '':
- yield data
- def as_manifest(self):
- if self.size() == 0:
- return ("%s %s 0:0:%s\n"
- % (self._stream.name(), EMPTY_BLOCK_LOCATOR, self.name()))
- return string.join(self._stream.tokens_for_range(self._pos, self._size),
- " ") + "\n"
-
-class StreamReader(object):
- def __init__(self, tokens):
- self._tokens = tokens
- self._current_datablock_data = None
- self._current_datablock_pos = 0
- self._current_datablock_index = -1
- self._pos = 0
-
- self._stream_name = None
- self.data_locators = []
- self.files = []
-
- for tok in self._tokens:
- if self._stream_name == None:
- self._stream_name = tok.replace('\\040', ' ')
- elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
- self.data_locators += [tok]
- elif re.search(r'^\d+:\d+:\S+', tok):
- pos, size, name = tok.split(':',2)
- self.files += [[int(pos), int(size), name.replace('\\040', ' ')]]
- else:
- raise errors.SyntaxError("Invalid manifest format")
-
- def tokens(self):
- return self._tokens
- def tokens_for_range(self, range_start, range_size):
- resp = [self._stream_name]
- return_all_tokens = False
- block_start = 0
- token_bytes_skipped = 0
- for locator in self.data_locators:
- sizehint = re.search(r'\+(\d+)', locator)
- if not sizehint:
- return_all_tokens = True
- if return_all_tokens:
- resp += [locator]
- next
- blocksize = int(sizehint.group(0))
- if range_start + range_size <= block_start:
- break
- if range_start < block_start + blocksize:
- resp += [locator]
- else:
- token_bytes_skipped += blocksize
- block_start += blocksize
- for f in self.files:
- if ((f[0] < range_start + range_size)
- and
- (f[0] + f[1] > range_start)
- and
- f[1] > 0):
- resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
- return resp
- def name(self):
- return self._stream_name
- def all_files(self):
- for f in self.files:
- pos, size, name = f
- yield StreamFileReader(self, pos, size, name)
- def nextdatablock(self):
- if self._current_datablock_index < 0:
- self._current_datablock_pos = 0
- self._current_datablock_index = 0
- else:
- self._current_datablock_pos += self.current_datablock_size()
- self._current_datablock_index += 1
- self._current_datablock_data = None
- def current_datablock_data(self):
- if self._current_datablock_data == None:
- self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
- return self._current_datablock_data
- def current_datablock_size(self):
- if self._current_datablock_index < 0:
- self.nextdatablock()
- sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
- if sizehint:
- return int(sizehint.group(0))
- return len(self.current_datablock_data())
- def seek(self, pos):
- """Set the position of the next read operation."""
- self._pos = pos
- def really_seek(self):
- """Find and load the appropriate data block, so the byte at
- _pos is in memory.
- """
- if self._pos == self._current_datablock_pos:
- return True
- if (self._current_datablock_pos != None and
- self._pos >= self._current_datablock_pos and
- self._pos <= self._current_datablock_pos + self.current_datablock_size()):
- return True
- if self._pos < self._current_datablock_pos:
- self._current_datablock_index = -1
- self.nextdatablock()
- while (self._pos > self._current_datablock_pos and
- self._pos > self._current_datablock_pos + self.current_datablock_size()):
- self.nextdatablock()
- def read(self, size):
- """Read no more than size bytes -- but at least one byte,
- unless _pos is already at the end of the stream.
- """
- if size == 0:
- return ''
- self.really_seek()
- while self._pos >= self._current_datablock_pos + self.current_datablock_size():
- self.nextdatablock()
- if self._current_datablock_index >= len(self.data_locators):
- return None
- data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
- self._pos += len(data)
- return data
-
-class CollectionReader(object):
- def __init__(self, manifest_locator_or_text):
- if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
- self._manifest_text = manifest_locator_or_text
- self._manifest_locator = None
- else:
- self._manifest_locator = manifest_locator_or_text
- self._manifest_text = None
- self._streams = None
- def __enter__(self):
- pass
- def __exit__(self):
- pass
- def _populate(self):
- if self._streams != None:
- return
- if not self._manifest_text:
- self._manifest_text = Keep.get(self._manifest_locator)
- self._streams = []
- for stream_line in self._manifest_text.split("\n"):
- if stream_line != '':
- stream_tokens = stream_line.split()
- self._streams += [stream_tokens]
- def all_streams(self):
- self._populate()
- resp = []
- for s in self._streams:
- resp += [StreamReader(s)]
- return resp
- def all_files(self):
- for s in self.all_streams():
- for f in s.all_files():
- yield f
- def manifest_text(self):
- self._populate()
- return self._manifest_text
-
-class CollectionWriter(object):
- KEEP_BLOCK_SIZE = 2**26
- def __init__(self):
- self._data_buffer = []
- self._data_buffer_len = 0
- self._current_stream_files = []
- self._current_stream_length = 0
- self._current_stream_locators = []
- self._current_stream_name = '.'
- self._current_file_name = None
- self._current_file_pos = 0
- self._finished_streams = []
- def __enter__(self):
- pass
- def __exit__(self):
- self.finish()
- def write_directory_tree(self,
- path, stream_name='.', max_manifest_depth=-1):
- self.start_new_stream(stream_name)
- todo = []
- if max_manifest_depth == 0:
- dirents = sorted(util.listdir_recursive(path))
- else:
- dirents = sorted(os.listdir(path))
- for dirent in dirents:
- target = os.path.join(path, dirent)
- if os.path.isdir(target):
- todo += [[target,
- os.path.join(stream_name, dirent),
- max_manifest_depth-1]]
- else:
- self.start_new_file(dirent)
- with open(target, 'rb') as f:
- while True:
- buf = f.read(2**26)
- if len(buf) == 0:
- break
- self.write(buf)
- self.finish_current_stream()
- map(lambda x: self.write_directory_tree(*x), todo)
-
- def write(self, newdata):
- if hasattr(newdata, '__iter__'):
- for s in newdata:
- self.write(s)
- return
- self._data_buffer += [newdata]
- self._data_buffer_len += len(newdata)
- self._current_stream_length += len(newdata)
- while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
- self.flush_data()
- def flush_data(self):
- data_buffer = ''.join(self._data_buffer)
- if data_buffer != '':
- self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
- self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
- self._data_buffer_len = len(self._data_buffer[0])
- def start_new_file(self, newfilename=None):
- self.finish_current_file()
- self.set_current_file_name(newfilename)
- def set_current_file_name(self, newfilename):
- if re.search(r'[\t\n]', newfilename):
- raise errors.AssertionError(
- "Manifest filenames cannot contain whitespace: %s" %
- newfilename)
- self._current_file_name = newfilename
- def current_file_name(self):
- return self._current_file_name
- def finish_current_file(self):
- if self._current_file_name == None:
- if self._current_file_pos == self._current_stream_length:
- return
- raise errors.AssertionError(
- "Cannot finish an unnamed file " +
- "(%d bytes at offset %d in '%s' stream)" %
- (self._current_stream_length - self._current_file_pos,
- self._current_file_pos,
- self._current_stream_name))
- self._current_stream_files += [[self._current_file_pos,
- self._current_stream_length - self._current_file_pos,
- self._current_file_name]]
- self._current_file_pos = self._current_stream_length
- def start_new_stream(self, newstreamname='.'):
- self.finish_current_stream()
- self.set_current_stream_name(newstreamname)
- def set_current_stream_name(self, newstreamname):
- if re.search(r'[\t\n]', newstreamname):
- raise errors.AssertionError(
- "Manifest stream names cannot contain whitespace")
- self._current_stream_name = '.' if newstreamname=='' else newstreamname
- def current_stream_name(self):
- return self._current_stream_name
- def finish_current_stream(self):
- self.finish_current_file()
- self.flush_data()
- if len(self._current_stream_files) == 0:
- pass
- elif self._current_stream_name == None:
- raise errors.AssertionError(
- "Cannot finish an unnamed stream (%d bytes in %d files)" %
- (self._current_stream_length, len(self._current_stream_files)))
- else:
- if len(self._current_stream_locators) == 0:
- self._current_stream_locators += [EMPTY_BLOCK_LOCATOR]
- self._finished_streams += [[self._current_stream_name,
- self._current_stream_locators,
- self._current_stream_files]]
- self._current_stream_files = []
- self._current_stream_length = 0
- self._current_stream_locators = []
- self._current_stream_name = None
- self._current_file_pos = 0
- self._current_file_name = None
- def finish(self):
- return Keep.put(self.manifest_text())
- def manifest_text(self):
- self.finish_current_stream()
- manifest = ''
- for stream in self._finished_streams:
- if not re.search(r'^\.(/.*)?$', stream[0]):
- manifest += './'
- manifest += stream[0].replace(' ', '\\040')
- for locator in stream[1]:
- manifest += " %s" % locator
- for sfile in stream[2]:
- manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040'))
- manifest += "\n"
- return manifest
- def data_locators(self):
- ret = []
- for name, locators, files in self._finished_streams:
- ret += locators
- return ret
-
-global_client_object = None
-
-class Keep:
- @staticmethod
- def global_client_object():
- global global_client_object
- if global_client_object == None:
- global_client_object = KeepClient()
- return global_client_object
-
- @staticmethod
- def get(locator, **kwargs):
- return Keep.global_client_object().get(locator, **kwargs)
-
- @staticmethod
- def put(data, **kwargs):
- return Keep.global_client_object().put(data, **kwargs)
-
-class KeepClient(object):
-
- class ThreadLimiter(object):
- """
- Limit the number of threads running at a given time to
- {desired successes} minus {successes reported}. When successes
- reported == desired, wake up the remaining threads and tell
- them to quit.
-
- Should be used in a "with" block.
- """
- def __init__(self, todo):
- self._todo = todo
- self._done = 0
- self._todo_lock = threading.Semaphore(todo)
- self._done_lock = threading.Lock()
- def __enter__(self):
- self._todo_lock.acquire()
- return self
- def __exit__(self, type, value, traceback):
- self._todo_lock.release()
- def shall_i_proceed(self):
- """
- Return true if the current thread should do stuff. Return
- false if the current thread should just stop.
- """
- with self._done_lock:
- return (self._done < self._todo)
- def increment_done(self):
- """
- Report that the current thread was successful.
- """
- with self._done_lock:
- self._done += 1
- def done(self):
- """
- Return how many successes were reported.
- """
- with self._done_lock:
- return self._done
-
- class KeepWriterThread(threading.Thread):
- """
- Write a blob of data to the given Keep server. Call
- increment_done() of the given ThreadLimiter if the write
- succeeds.
- """
- def __init__(self, **kwargs):
- super(KeepClient.KeepWriterThread, self).__init__()
- self.args = kwargs
- def run(self):
- global config
- with self.args['thread_limiter'] as limiter:
- if not limiter.shall_i_proceed():
- # My turn arrived, but the job has been done without
- # me.
- return
- logging.debug("KeepWriterThread %s proceeding %s %s" %
- (str(threading.current_thread()),
- self.args['data_hash'],
- self.args['service_root']))
- h = httplib2.Http()
- url = self.args['service_root'] + self.args['data_hash']
- api_token = config['ARVADOS_API_TOKEN']
- headers = {'Authorization': "OAuth2 %s" % api_token}
- try:
- resp, content = h.request(url.encode('utf-8'), 'PUT',
- headers=headers,
- body=self.args['data'])
- if (resp['status'] == '401' and
- re.match(r'Timestamp verification failed', content)):
- body = KeepClient.sign_for_old_server(
- self.args['data_hash'],
- self.args['data'])
- h = httplib2.Http()
- resp, content = h.request(url.encode('utf-8'), 'PUT',
- headers=headers,
- body=body)
- if re.match(r'^2\d\d$', resp['status']):
- logging.debug("KeepWriterThread %s succeeded %s %s" %
- (str(threading.current_thread()),
- self.args['data_hash'],
- self.args['service_root']))
- return limiter.increment_done()
- logging.warning("Request fail: PUT %s => %s %s" %
- (url, resp['status'], content))
- except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
- logging.warning("Request fail: PUT %s => %s: %s" %
- (url, type(e), str(e)))
-
- def __init__(self):
- self.lock = threading.Lock()
- self.service_roots = None
-
- def shuffled_service_roots(self, hash):
- if self.service_roots == None:
- self.lock.acquire()
- keep_disks = api().keep_disks().list().execute()['items']
- roots = (("http%s://%s:%d/" %
- ('s' if f['service_ssl_flag'] else '',
- f['service_host'],
- f['service_port']))
- for f in keep_disks)
- self.service_roots = sorted(set(roots))
- logging.debug(str(self.service_roots))
- self.lock.release()
- seed = hash
- pool = self.service_roots[:]
- pseq = []
- while len(pool) > 0:
- if len(seed) < 8:
- if len(pseq) < len(hash) / 4: # first time around
- seed = hash[-4:] + hash
- else:
- seed += hash
- probe = int(seed[0:8], 16) % len(pool)
- pseq += [pool[probe]]
- pool = pool[:probe] + pool[probe+1:]
- seed = seed[8:]
- logging.debug(str(pseq))
- return pseq
-
- def get(self, locator):
- global config
- if re.search(r',', locator):
- return ''.join(self.get(x) for x in locator.split(','))
- if 'KEEP_LOCAL_STORE' in os.environ:
- return KeepClient.local_store_get(locator)
- expect_hash = re.sub(r'\+.*', '', locator)
- for service_root in self.shuffled_service_roots(expect_hash):
- h = httplib2.Http()
- url = service_root + expect_hash
- api_token = config['ARVADOS_API_TOKEN']
- headers = {'Authorization': "OAuth2 %s" % api_token,
- 'Accept': 'application/octet-stream'}
- try:
- resp, content = h.request(url.encode('utf-8'), 'GET',
- headers=headers)
- if re.match(r'^2\d\d$', resp['status']):
- m = hashlib.new('md5')
- m.update(content)
- md5 = m.hexdigest()
- if md5 == expect_hash:
- return content
- logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
- except (httplib2.HttpLib2Error, httplib.ResponseNotReady) as e:
- logging.info("Request fail: GET %s => %s: %s" %
- (url, type(e), str(e)))
- raise errors.NotFoundError("Block not found: %s" % expect_hash)
-
- def put(self, data, **kwargs):
- if 'KEEP_LOCAL_STORE' in os.environ:
- return KeepClient.local_store_put(data)
- m = hashlib.new('md5')
- m.update(data)
- data_hash = m.hexdigest()
- have_copies = 0
- want_copies = kwargs.get('copies', 2)
- if not (want_copies > 0):
- return data_hash
- threads = []
- thread_limiter = KeepClient.ThreadLimiter(want_copies)
- for service_root in self.shuffled_service_roots(data_hash):
- t = KeepClient.KeepWriterThread(data=data,
- data_hash=data_hash,
- service_root=service_root,
- thread_limiter=thread_limiter)
- t.start()
- threads += [t]
- for t in threads:
- t.join()
- have_copies = thread_limiter.done()
- if have_copies == want_copies:
- return (data_hash + '+' + str(len(data)))
- raise errors.KeepWriteError(
- "Write fail for %s: wanted %d but wrote %d" %
- (data_hash, want_copies, have_copies))
-
- @staticmethod
- def sign_for_old_server(data_hash, data):
- return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
-
-
- @staticmethod
- def local_store_put(data):
- m = hashlib.new('md5')
- m.update(data)
- md5 = m.hexdigest()
- locator = '%s+%d' % (md5, len(data))
- with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
- f.write(data)
- os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
- os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
- return locator
- @staticmethod
- def local_store_get(locator):
- r = re.search('^([0-9a-f]{32,})', locator)
- if not r:
- raise errors.NotFoundError(
- "Invalid data locator: '%s'" % locator)
- if r.group(0) == EMPTY_BLOCK_LOCATOR.split('+')[0]:
- return ''
- with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f:
- return f.read()
-
-# We really shouldn't do this but some clients still use
-# arvados.service.* directly instead of arvados.api().*
-service = api()
--- /dev/null
+import gflags
+import httplib
+import httplib2
+import logging
+import os
+import pprint
+import sys
+import types
+import subprocess
+import json
+import UserDict
+import re
+import hashlib
+import string
+import bz2
+import zlib
+import fcntl
+import time
+import threading
+
+from stream import *
+from keep import *
+
+class CollectionReader(object):
+ def __init__(self, manifest_locator_or_text):
+ if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
+ self._manifest_text = manifest_locator_or_text
+ self._manifest_locator = None
+ else:
+ self._manifest_locator = manifest_locator_or_text
+ self._manifest_text = None
+ self._streams = None
+
+ def __enter__(self):
+ pass
+
+ def __exit__(self):
+ pass
+
+ def _populate(self):
+ if self._streams != None:
+ return
+ if not self._manifest_text:
+ self._manifest_text = Keep.get(self._manifest_locator)
+ self._streams = []
+ for stream_line in self._manifest_text.split("\n"):
+ if stream_line != '':
+ stream_tokens = stream_line.split()
+ self._streams += [stream_tokens]
+
+ def all_streams(self):
+ self._populate()
+ resp = []
+ for s in self._streams:
+ resp += [StreamReader(s)]
+ return resp
+
+ def all_files(self):
+ for s in self.all_streams():
+ for f in s.all_files():
+ yield f
+
+ def manifest_text(self):
+ self._populate()
+ return self._manifest_text
+
+class CollectionWriter(object):
+ KEEP_BLOCK_SIZE = 2**26
+
+ def __init__(self):
+ self._data_buffer = []
+ self._data_buffer_len = 0
+ self._current_stream_files = []
+ self._current_stream_length = 0
+ self._current_stream_locators = []
+ self._current_stream_name = '.'
+ self._current_file_name = None
+ self._current_file_pos = 0
+ self._finished_streams = []
+
+ def __enter__(self):
+ pass
+
+ def __exit__(self):
+ self.finish()
+
+ def write_directory_tree(self,
+ path, stream_name='.', max_manifest_depth=-1):
+ self.start_new_stream(stream_name)
+ todo = []
+ if max_manifest_depth == 0:
+ dirents = sorted(util.listdir_recursive(path))
+ else:
+ dirents = sorted(os.listdir(path))
+ for dirent in dirents:
+ target = os.path.join(path, dirent)
+ if os.path.isdir(target):
+ todo += [[target,
+ os.path.join(stream_name, dirent),
+ max_manifest_depth-1]]
+ else:
+ self.start_new_file(dirent)
+ with open(target, 'rb') as f:
+ while True:
+ buf = f.read(2**26)
+ if len(buf) == 0:
+ break
+ self.write(buf)
+ self.finish_current_stream()
+ map(lambda x: self.write_directory_tree(*x), todo)
+
+ def write(self, newdata):
+ if hasattr(newdata, '__iter__'):
+ for s in newdata:
+ self.write(s)
+ return
+ self._data_buffer += [newdata]
+ self._data_buffer_len += len(newdata)
+ self._current_stream_length += len(newdata)
+ while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
+ self.flush_data()
+
+ def flush_data(self):
+ data_buffer = ''.join(self._data_buffer)
+ if data_buffer != '':
+ self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
+ self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
+ self._data_buffer_len = len(self._data_buffer[0])
+
+ def start_new_file(self, newfilename=None):
+ self.finish_current_file()
+ self.set_current_file_name(newfilename)
+
+ def set_current_file_name(self, newfilename):
+ if re.search(r'[\t\n]', newfilename):
+ raise errors.AssertionError(
+ "Manifest filenames cannot contain whitespace: %s" %
+ newfilename)
+ self._current_file_name = newfilename
+
+ def current_file_name(self):
+ return self._current_file_name
+
+ def finish_current_file(self):
+ if self._current_file_name == None:
+ if self._current_file_pos == self._current_stream_length:
+ return
+ raise errors.AssertionError(
+ "Cannot finish an unnamed file " +
+ "(%d bytes at offset %d in '%s' stream)" %
+ (self._current_stream_length - self._current_file_pos,
+ self._current_file_pos,
+ self._current_stream_name))
+ self._current_stream_files += [[self._current_file_pos,
+ self._current_stream_length - self._current_file_pos,
+ self._current_file_name]]
+ self._current_file_pos = self._current_stream_length
+
+ def start_new_stream(self, newstreamname='.'):
+ self.finish_current_stream()
+ self.set_current_stream_name(newstreamname)
+
+ def set_current_stream_name(self, newstreamname):
+ if re.search(r'[\t\n]', newstreamname):
+ raise errors.AssertionError(
+ "Manifest stream names cannot contain whitespace")
+ self._current_stream_name = '.' if newstreamname=='' else newstreamname
+
+ def current_stream_name(self):
+ return self._current_stream_name
+
+ def finish_current_stream(self):
+ self.finish_current_file()
+ self.flush_data()
+ if len(self._current_stream_files) == 0:
+ pass
+ elif self._current_stream_name == None:
+ raise errors.AssertionError(
+ "Cannot finish an unnamed stream (%d bytes in %d files)" %
+ (self._current_stream_length, len(self._current_stream_files)))
+ else:
+ if len(self._current_stream_locators) == 0:
+ self._current_stream_locators += [EMPTY_BLOCK_LOCATOR]
+ self._finished_streams += [[self._current_stream_name,
+ self._current_stream_locators,
+ self._current_stream_files]]
+ self._current_stream_files = []
+ self._current_stream_length = 0
+ self._current_stream_locators = []
+ self._current_stream_name = None
+ self._current_file_pos = 0
+ self._current_file_name = None
+
+ def finish(self):
+ return Keep.put(self.manifest_text())
+
+ def manifest_text(self):
+ self.finish_current_stream()
+ manifest = ''
+ for stream in self._finished_streams:
+ if not re.search(r'^\.(/.*)?$', stream[0]):
+ manifest += './'
+ manifest += stream[0].replace(' ', '\\040')
+ for locator in stream[1]:
+ manifest += " %s" % locator
+ for sfile in stream[2]:
+ manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040'))
+ manifest += "\n"
+ return manifest
+
+ def data_locators(self):
+ ret = []
+ for name, locators, files in self._finished_streams:
+ ret += locators
+ return ret
--- /dev/null
+import gflags
+import httplib
+import httplib2
+import logging
+import os
+import pprint
+import sys
+import types
+import subprocess
+import json
+import UserDict
+import re
+import hashlib
+import string
+import bz2
+import zlib
+import fcntl
+import time
+import threading
+
+global_client_object = None
+
+from arvados import *
+
+class Keep:
+ @staticmethod
+ def global_client_object():
+ global global_client_object
+ if global_client_object == None:
+ global_client_object = KeepClient()
+ return global_client_object
+
+ @staticmethod
+ def get(locator, **kwargs):
+ return Keep.global_client_object().get(locator, **kwargs)
+
+ @staticmethod
+ def put(data, **kwargs):
+ return Keep.global_client_object().put(data, **kwargs)
+
+class KeepClient(object):
+
+ class ThreadLimiter(object):
+ """
+ Limit the number of threads running at a given time to
+ {desired successes} minus {successes reported}. When successes
+ reported == desired, wake up the remaining threads and tell
+ them to quit.
+
+ Should be used in a "with" block.
+ """
+ def __init__(self, todo):
+ self._todo = todo
+ self._done = 0
+ self._todo_lock = threading.Semaphore(todo)
+ self._done_lock = threading.Lock()
+
+ def __enter__(self):
+ self._todo_lock.acquire()
+ return self
+
+ def __exit__(self, type, value, traceback):
+ self._todo_lock.release()
+
+ def shall_i_proceed(self):
+ """
+ Return true if the current thread should do stuff. Return
+ false if the current thread should just stop.
+ """
+ with self._done_lock:
+ return (self._done < self._todo)
+
+ def increment_done(self):
+ """
+ Report that the current thread was successful.
+ """
+ with self._done_lock:
+ self._done += 1
+
+ def done(self):
+ """
+ Return how many successes were reported.
+ """
+ with self._done_lock:
+ return self._done
+
+ class KeepWriterThread(threading.Thread):
+ """
+ Write a blob of data to the given Keep server. Call
+ increment_done() of the given ThreadLimiter if the write
+ succeeds.
+ """
+ def __init__(self, **kwargs):
+ super(KeepClient.KeepWriterThread, self).__init__()
+ self.args = kwargs
+
+ def run(self):
+ global config
+ with self.args['thread_limiter'] as limiter:
+ if not limiter.shall_i_proceed():
+ # My turn arrived, but the job has been done without
+ # me.
+ return
+ logging.debug("KeepWriterThread %s proceeding %s %s" %
+ (str(threading.current_thread()),
+ self.args['data_hash'],
+ self.args['service_root']))
+ h = httplib2.Http()
+ url = self.args['service_root'] + self.args['data_hash']
+ api_token = config['ARVADOS_API_TOKEN']
+ headers = {'Authorization': "OAuth2 %s" % api_token}
+ try:
+ resp, content = h.request(url.encode('utf-8'), 'PUT',
+ headers=headers,
+ body=self.args['data'])
+ if (resp['status'] == '401' and
+ re.match(r'Timestamp verification failed', content)):
+ body = KeepClient.sign_for_old_server(
+ self.args['data_hash'],
+ self.args['data'])
+ h = httplib2.Http()
+ resp, content = h.request(url.encode('utf-8'), 'PUT',
+ headers=headers,
+ body=body)
+ if re.match(r'^2\d\d$', resp['status']):
+ logging.debug("KeepWriterThread %s succeeded %s %s" %
+ (str(threading.current_thread()),
+ self.args['data_hash'],
+ self.args['service_root']))
+ return limiter.increment_done()
+ logging.warning("Request fail: PUT %s => %s %s" %
+ (url, resp['status'], content))
+ except (httplib2.HttpLib2Error, httplib.HTTPException) as e:
+ logging.warning("Request fail: PUT %s => %s: %s" %
+ (url, type(e), str(e)))
+
+ def __init__(self):
+ self.lock = threading.Lock()
+ self.service_roots = None
+
+ def shuffled_service_roots(self, hash):
+ if self.service_roots == None:
+ self.lock.acquire()
+ keep_disks = api().keep_disks().list().execute()['items']
+ roots = (("http%s://%s:%d/" %
+ ('s' if f['service_ssl_flag'] else '',
+ f['service_host'],
+ f['service_port']))
+ for f in keep_disks)
+ self.service_roots = sorted(set(roots))
+ logging.debug(str(self.service_roots))
+ self.lock.release()
+ seed = hash
+ pool = self.service_roots[:]
+ pseq = []
+ while len(pool) > 0:
+ if len(seed) < 8:
+ if len(pseq) < len(hash) / 4: # first time around
+ seed = hash[-4:] + hash
+ else:
+ seed += hash
+ probe = int(seed[0:8], 16) % len(pool)
+ pseq += [pool[probe]]
+ pool = pool[:probe] + pool[probe+1:]
+ seed = seed[8:]
+ logging.debug(str(pseq))
+ return pseq
+
+ def get(self, locator):
+ global config
+ if re.search(r',', locator):
+ return ''.join(self.get(x) for x in locator.split(','))
+ if 'KEEP_LOCAL_STORE' in os.environ:
+ return KeepClient.local_store_get(locator)
+ expect_hash = re.sub(r'\+.*', '', locator)
+ for service_root in self.shuffled_service_roots(expect_hash):
+ h = httplib2.Http()
+ url = service_root + expect_hash
+ api_token = config['ARVADOS_API_TOKEN']
+ headers = {'Authorization': "OAuth2 %s" % api_token,
+ 'Accept': 'application/octet-stream'}
+ try:
+ resp, content = h.request(url.encode('utf-8'), 'GET',
+ headers=headers)
+ if re.match(r'^2\d\d$', resp['status']):
+ m = hashlib.new('md5')
+ m.update(content)
+ md5 = m.hexdigest()
+ if md5 == expect_hash:
+ return content
+ logging.warning("Checksum fail: md5(%s) = %s" % (url, md5))
+ except (httplib2.HttpLib2Error, httplib.ResponseNotReady) as e:
+ logging.info("Request fail: GET %s => %s: %s" %
+ (url, type(e), str(e)))
+ raise errors.NotFoundError("Block not found: %s" % expect_hash)
+
+ def put(self, data, **kwargs):
+ if 'KEEP_LOCAL_STORE' in os.environ:
+ return KeepClient.local_store_put(data)
+ m = hashlib.new('md5')
+ m.update(data)
+ data_hash = m.hexdigest()
+ have_copies = 0
+ want_copies = kwargs.get('copies', 2)
+ if not (want_copies > 0):
+ return data_hash
+ threads = []
+ thread_limiter = KeepClient.ThreadLimiter(want_copies)
+ for service_root in self.shuffled_service_roots(data_hash):
+ t = KeepClient.KeepWriterThread(data=data,
+ data_hash=data_hash,
+ service_root=service_root,
+ thread_limiter=thread_limiter)
+ t.start()
+ threads += [t]
+ for t in threads:
+ t.join()
+ have_copies = thread_limiter.done()
+ if have_copies == want_copies:
+ return (data_hash + '+' + str(len(data)))
+ raise errors.KeepWriteError(
+ "Write fail for %s: wanted %d but wrote %d" %
+ (data_hash, want_copies, have_copies))
+
+ @staticmethod
+ def sign_for_old_server(data_hash, data):
+ return (("-----BEGIN PGP SIGNED MESSAGE-----\n\n\n%d %s\n-----BEGIN PGP SIGNATURE-----\n\n-----END PGP SIGNATURE-----\n" % (int(time.time()), data_hash)) + data)
+
+
+ @staticmethod
+ def local_store_put(data):
+ m = hashlib.new('md5')
+ m.update(data)
+ md5 = m.hexdigest()
+ locator = '%s+%d' % (md5, len(data))
+ with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'), 'w') as f:
+ f.write(data)
+ os.rename(os.path.join(os.environ['KEEP_LOCAL_STORE'], md5 + '.tmp'),
+ os.path.join(os.environ['KEEP_LOCAL_STORE'], md5))
+ return locator
+
+ @staticmethod
+ def local_store_get(locator):
+ r = re.search('^([0-9a-f]{32,})', locator)
+ if not r:
+ raise errors.NotFoundError(
+ "Invalid data locator: '%s'" % locator)
+ if r.group(0) == EMPTY_BLOCK_LOCATOR.split('+')[0]:
+ return ''
+ with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f:
+ return f.read()
--- /dev/null
+import gflags
+import httplib
+import httplib2
+import logging
+import os
+import pprint
+import sys
+import types
+import subprocess
+import json
+import UserDict
+import re
+import hashlib
+import string
+import bz2
+import zlib
+import fcntl
+import time
+import threading
+
+from keep import *
+
+class StreamFileReader(object):
+ def __init__(self, stream, pos, size, name):
+ self._stream = stream
+ self._pos = pos
+ self._size = size
+ self._name = name
+ self._filepos = 0
+
+ def name(self):
+ return self._name
+
+ def decompressed_name(self):
+ return re.sub('\.(bz2|gz)$', '', self._name)
+
+ def size(self):
+ return self._size
+
+ def stream_name(self):
+ return self._stream.name()
+
+ def read(self, size, **kwargs):
+ self._stream.seek(self._pos + self._filepos)
+ data = self._stream.read(min(size, self._size - self._filepos))
+ self._filepos += len(data)
+ return data
+
+ def readall(self, size=2**20, **kwargs):
+ while True:
+ data = self.read(size, **kwargs)
+ if data == '':
+ break
+ yield data
+
+ def bunzip2(self, size):
+ decompressor = bz2.BZ2Decompressor()
+ for chunk in self.readall(size):
+ data = decompressor.decompress(chunk)
+ if data and data != '':
+ yield data
+
+ def gunzip(self, size):
+ decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
+ for chunk in self.readall(size):
+ data = decompressor.decompress(decompressor.unconsumed_tail + chunk)
+ if data and data != '':
+ yield data
+
+ def readall_decompressed(self, size=2**20):
+ self._stream.seek(self._pos + self._filepos)
+ if re.search('\.bz2$', self._name):
+ return self.bunzip2(size)
+ elif re.search('\.gz$', self._name):
+ return self.gunzip(size)
+ else:
+ return self.readall(size)
+
+ def readlines(self, decompress=True):
+ if decompress:
+ datasource = self.readall_decompressed()
+ else:
+ self._stream.seek(self._pos + self._filepos)
+ datasource = self.readall()
+ data = ''
+ for newdata in datasource:
+ data += newdata
+ sol = 0
+ while True:
+ eol = string.find(data, "\n", sol)
+ if eol < 0:
+ break
+ yield data[sol:eol+1]
+ sol = eol+1
+ data = data[sol:]
+ if data != '':
+ yield data
+
+ def as_manifest(self):
+ if self.size() == 0:
+ return ("%s %s 0:0:%s\n"
+ % (self._stream.name(), EMPTY_BLOCK_LOCATOR, self.name()))
+ return string.join(self._stream.tokens_for_range(self._pos, self._size),
+ " ") + "\n"
+
+class StreamReader(object):
+ def __init__(self, tokens):
+ self._tokens = tokens
+ self._current_datablock_data = None
+ self._current_datablock_pos = 0
+ self._current_datablock_index = -1
+ self._pos = 0
+
+ self._stream_name = None
+ self.data_locators = []
+ self.files = []
+
+ for tok in self._tokens:
+ if self._stream_name == None:
+ self._stream_name = tok.replace('\\040', ' ')
+ elif re.search(r'^[0-9a-f]{32}(\+\S+)*$', tok):
+ self.data_locators += [tok]
+ elif re.search(r'^\d+:\d+:\S+', tok):
+ pos, size, name = tok.split(':',2)
+ self.files += [[int(pos), int(size), name.replace('\\040', ' ')]]
+ else:
+ raise errors.SyntaxError("Invalid manifest format")
+
+ def tokens(self):
+ return self._tokens
+
+ def tokens_for_range(self, range_start, range_size):
+ resp = [self._stream_name]
+ return_all_tokens = False
+ block_start = 0
+ token_bytes_skipped = 0
+ for locator in self.data_locators:
+ sizehint = re.search(r'\+(\d+)', locator)
+ if not sizehint:
+ return_all_tokens = True
+ if return_all_tokens:
+ resp += [locator]
+ next
+ blocksize = int(sizehint.group(0))
+ if range_start + range_size <= block_start:
+ break
+ if range_start < block_start + blocksize:
+ resp += [locator]
+ else:
+ token_bytes_skipped += blocksize
+ block_start += blocksize
+ for f in self.files:
+ if ((f[0] < range_start + range_size)
+ and
+ (f[0] + f[1] > range_start)
+ and
+ f[1] > 0):
+ resp += ["%d:%d:%s" % (f[0] - token_bytes_skipped, f[1], f[2])]
+ return resp
+
+ def name(self):
+ return self._stream_name
+
+ def all_files(self):
+ for f in self.files:
+ pos, size, name = f
+ yield StreamFileReader(self, pos, size, name)
+
+ def nextdatablock(self):
+ if self._current_datablock_index < 0:
+ self._current_datablock_pos = 0
+ self._current_datablock_index = 0
+ else:
+ self._current_datablock_pos += self.current_datablock_size()
+ self._current_datablock_index += 1
+ self._current_datablock_data = None
+
+ def current_datablock_data(self):
+ if self._current_datablock_data == None:
+ self._current_datablock_data = Keep.get(self.data_locators[self._current_datablock_index])
+ return self._current_datablock_data
+
+ def current_datablock_size(self):
+ if self._current_datablock_index < 0:
+ self.nextdatablock()
+ sizehint = re.search('\+(\d+)', self.data_locators[self._current_datablock_index])
+ if sizehint:
+ return int(sizehint.group(0))
+ return len(self.current_datablock_data())
+
+ def seek(self, pos):
+ """Set the position of the next read operation."""
+ self._pos = pos
+
+ def really_seek(self):
+ """Find and load the appropriate data block, so the byte at
+ _pos is in memory.
+ """
+ if self._pos == self._current_datablock_pos:
+ return True
+ if (self._current_datablock_pos != None and
+ self._pos >= self._current_datablock_pos and
+ self._pos <= self._current_datablock_pos + self.current_datablock_size()):
+ return True
+ if self._pos < self._current_datablock_pos:
+ self._current_datablock_index = -1
+ self.nextdatablock()
+ while (self._pos > self._current_datablock_pos and
+ self._pos > self._current_datablock_pos + self.current_datablock_size()):
+ self.nextdatablock()
+
+ def read(self, size):
+ """Read no more than size bytes -- but at least one byte,
+ unless _pos is already at the end of the stream.
+ """
+ if size == 0:
+ return ''
+ self.really_seek()
+ while self._pos >= self._current_datablock_pos + self.current_datablock_size():
+ self.nextdatablock()
+ if self._current_datablock_index >= len(self.data_locators):
+ return None
+ data = self.current_datablock_data()[self._pos - self._current_datablock_pos : self._pos - self._current_datablock_pos + size]
+ self._pos += len(data)
+ return data
end
show
end
+
+ protected
+
+ def find_object_by_uuid
+ super
+ if !@object and !params[:uuid].match(/^[0-9a-f]+\+\d+$/)
+ # Normalize the given uuid and search again.
+ hash_part = params[:uuid].match(/^([0-9a-f]*)/)[1]
+ collection = Collection.where('uuid like ?', hash_part + '+%').first
+ if collection
+ # We know the collection exists, and what its real uuid is in
+ # the database. Now, throw out @objects and repeat the usual
+ # lookup procedure. (Returning the collection at this point
+ # would bypass permission checks.)
+ @objects = nil
+ @where = { uuid: collection.uuid }
+ find_objects_for_index
+ @object = @objects.first
+ end
+ end
+ end
end
class Arvados::V1::NodesController < ApplicationController
skip_before_filter :require_auth_scope_all, :only => :ping
+ skip_before_filter :find_object_by_uuid, :only => :ping
def create
@object = Node.new
{ ping_secret: true }
end
def ping
+ @object = Node.where(uuid: (params[:id] || params[:uuid])).first
+ if !@object
+ return render_not_found
+ end
@object.ping({ ip: params[:local_ipv4] || request.env['REMOTE_ADDR'],
ping_secret: params[:ping_secret],
ec2_instance_id: params[:instance_id] })
end
def discovery_rest_description
+ expires_in 24.hours, public: true
discovery = Rails.cache.fetch 'arvados_v1_rest_discovery' do
Rails.application.eager_load!
discovery = {
if self.manifest_text.nil? and self.uuid.nil?
super
elsif self.manifest_text and self.uuid
- if self.uuid.gsub(/\+[^,]+/,'') == Digest::MD5.hexdigest(self.manifest_text)
+ self.uuid.gsub! /\+.*/, ''
+ if self.uuid == Digest::MD5.hexdigest(self.manifest_text)
+ self.uuid.gsub! /$/, '+' + self.manifest_text.length.to_s
true
else
errors.add :uuid, 'uuid does not match checksum of manifest_text'
--- /dev/null
+class NormalizeCollectionUuid < ActiveRecord::Migration
+ def count_orphans
+ %w(head tail).each do |ht|
+ results = ActiveRecord::Base.connection.execute(<<-EOS)
+SELECT COUNT(links.*)
+ FROM links
+ LEFT JOIN collections c
+ ON links.#{ht}_uuid = c.uuid
+ WHERE (#{ht}_kind='arvados#collection' or #{ht}_uuid ~ '^[0-9a-f]{32,}')
+ AND #{ht}_uuid IS NOT NULL
+ AND #{ht}_uuid NOT IN (SELECT uuid FROM collections)
+EOS
+ puts "#{results.first['count'].to_i} links with #{ht}_uuid pointing nowhere."
+ end
+ end
+
+ def up
+ # Normalize uuids in the collections table to
+ # {hash}+{size}. Existing uuids might be {hash},
+ # {hash}+{size}+K@{instance-name}, {hash}+K@{instance-name}, etc.
+
+ count_orphans
+ puts "Normalizing collection UUIDs."
+
+ update_sql <<-EOS
+UPDATE collections
+ SET uuid = regexp_replace(uuid,'\\+.*','') || '+' || length(manifest_text)
+ WHERE uuid !~ '^[0-9a-f]{32,}\\+[0-9]+'
+ AND (regexp_replace(uuid,'\\+.*','') || '+' || length(manifest_text))
+ NOT IN (SELECT uuid FROM collections)
+EOS
+
+ count_orphans
+ puts "Updating links by stripping +K@.* from *_uuid attributes."
+
+ update_sql <<-EOS
+UPDATE links
+ SET head_uuid = regexp_replace(head_uuid,'\\+K@.*','')
+ WHERE head_uuid like '%+K@%'
+EOS
+ update_sql <<-EOS
+UPDATE links
+ SET tail_uuid = regexp_replace(tail_uuid,'\\+K@.*','')
+ WHERE tail_uuid like '%+K@%'
+EOS
+
+ count_orphans
+ puts "Updating links by searching bare collection hashes using regexp."
+
+ # Next, update {hash} (and any other non-normalized forms) to
+ # {hash}+{size}. This can only work where the corresponding
+ # collection is found in the collections table (otherwise we can't
+ # know the size).
+ %w(head tail).each do |ht|
+ update_sql <<-EOS
+UPDATE links
+ SET #{ht}_uuid = c.uuid
+ FROM collections c
+ WHERE #{ht}_uuid IS NOT NULL
+ AND (#{ht}_kind='arvados#collection' or #{ht}_uuid ~ '^[0-9a-f]{32,}')
+ AND #{ht}_uuid NOT IN (SELECT uuid FROM collections)
+ AND regexp_replace(#{ht}_uuid,'\\+.*','') = regexp_replace(c.uuid,'\\+.*','')
+ AND c.uuid ~ '^[0-9a-f]{32,}\\+[0-9]+'
+EOS
+ end
+
+ count_orphans
+ puts "Stripping \"+K@.*\" from jobs.output, jobs.log, job_tasks.output."
+
+ update_sql <<-EOS
+UPDATE jobs
+ SET output = regexp_replace(output,'\\+K@.*','')
+ WHERE output ~ '^[0-9a-f]{32,}\\+[0-9]+\\+K@\\w+$'
+EOS
+ update_sql <<-EOS
+UPDATE jobs
+ SET log = regexp_replace(log,'\\+K@.*','')
+ WHERE log ~ '^[0-9a-f]{32,}\\+[0-9]+\\+K@\\w+$'
+EOS
+ update_sql <<-EOS
+UPDATE job_tasks
+ SET output = regexp_replace(output,'\\+K@.*','')
+ WHERE output ~ '^[0-9a-f]{32,}\\+[0-9]+\\+K@\\w+$'
+EOS
+
+ puts "Done."
+ end
+
+ def down
+ end
+end