|minimum_script_version |string |Git branch, tag, or commit hash specifying the minimum acceptable script version (earliest ancestor) to consider when deciding whether to re-use a past job.[1]|query|@"c3e86c9"@|
|exclude_script_versions|array of strings|Git commit branches, tags, or hashes to exclude when deciding whether to re-use a past job.|query|@["8f03c71","8f03c71"]@
@["badtag1","badtag2"]@|
+|filters|array|Conditions to find Jobs to reuse.|query||
|find_or_create |boolean |Before creating, look for an existing job that has identical script, script_version, and script_parameters to those in the present job, has nondeterministic=false, and did not fail (it could be queued, running, or completed). If such a job exists, respond with the existing job instead of submitting a new one.|query|@false@|
When a job is submitted to the queue using the **create** method, the @script_version@ attribute is updated to a full 40-character Git commit hash based on the current content of the specified repository. If @script_version@ cannot be resolved, the job submission is rejected.
fn1. See the "note about specifying Git commits on the Job resource page":{{site.baseurl}}/api/schema/Job.html#script_version for more detail.
+h3. Specialized filters
+
+Special filter operations are available for specific Job columns.
+
+* @script_version@ @in git@ @REFSPEC@<br>Resolve @REFSPEC@ to a list of git commits, and match jobs with a @script_version@ in that list. The create method will find commits between @REFSPEC@ and the submitted job's @script_version@; the list method will search between @REFSPEC@ and HEAD. This list may include parallel branches if there is more than one path between @REFSPEC@ and the end commit in the graph. Use @not in@ or @not in git@ filters (below) to blacklist specific commits.
+
+* @script_version@ @not in git@ @REFSPEC@<br>Resolve @REFSPEC@ to a list of git commits, and match jobs with a @script_version@ not in that list.
+
+* @docker_image_locator@ @in docker@ @SEARCH@<br>@SEARCH@ can be a Docker image hash, a repository name, or a repository name and tag separated by a colon (@:@). The server will find collections that contain a Docker image that match that search criteria, then match jobs with a @docker_image_locator@ in that list.
+
+* @docker_image_locator@ @not in docker@ @SEARCH@<br>Negate the @in docker@ filter.
+
h3. Reusing jobs
-Because Arvados records the exact version of the script, input parameters, and runtime environment [1] that was used to run the job, if the script is deterministic (meaning that the same code version is guaranteed to produce the same outputs from the same inputs) then it is possible to re-use the results of past jobs, and avoid re-running the computation to save time. Arvados uses the following algorithm to determine if a past job can be re-used:
+Because Arvados records the exact version of the script, input parameters, and runtime environment that was used to run the job, if the script is deterministic (meaning that the same code version is guaranteed to produce the same outputs from the same inputs) then it is possible to re-use the results of past jobs, and avoid re-running the computation to save time. Arvados uses the following algorithm to determine if a past job can be re-used:
notextile. <div class="spaced-out">
# If @find_or_create@ is false or omitted, create a new job and skip the rest of these steps.
-# Find a list of acceptable values for @script_version@. If @minimum_script_version@ is specified, this is the set of all revisions in the Git commit graph between @minimum_script_version@ and the @script_version@ in the submitted "job object":{{site.baseurl}}/api/schema/Job.html (inclusive)[2]. If @minimum_script_version@ is not specified, only @script_version@ is added to the list. If @exclude_script_versions@ is specified, the listed versions are excluded from the list.
-# Select jobs whose @script@ and @script_parameters@ attributes match those in the submitted "job object":{{site.baseurl}}/api/schema/Job.html, and whose @script_version@ attribute is in the list of acceptable versions. Exclude jobs that failed or set @nondeterministic@ to true.
-# If more than one of the candidate jobs has finished, check that all such jobs actually did produce the same output.
-# If existing jobs exist and do not disagree with one another about the correct output, return one of the selected past jobs instead of creating a new job. If there is more than one match, which job will be returned is undefined.
-# If an existing job could not be chosen this way, create a new job.
-
-fn1. As of this writing, versioning the runtime environment is still under development.
-
-fn2. This may include parallel branches if there is more than one path between @minimum_script_version@ and the submitted job's @script_version@ in the Git commit graph. Use @exclude_script_versions@ to blacklist specific commits.
+# If @filters@ are specified, find jobs that match those filters. Filters *must* be specified to limit the @repository@ and @script@ attributes. An error is returned if they are missing.
+# If @filters@ are not specified, find jobs with the same @repository@ and @script@, with a @script_version@ between @minimum_script_version@ and @script_version@ (excluding @excluded_script_versions@), and a @docker_image_locator@ with the latest Collection that matches the submitted job's @docker_image@ constraint.
+# If the found jobs include a completed job, and all found completed jobs have consistent output, return one of them. Which specific job is returned is undefined.
+# If the found jobs only include incomplete jobs, return one of them. Which specific job is returned is undefined.
+# If no job has been returned so far, create and return a new job.
</div>
|order|string|Order in which to return matching jobs.|query||
|filters|array|Conditions for filtering jobs.|query||
+See the create method documentation for more information about Job-specific filters.
+
h2. log_tail_follow
log_tail_follow jobs
apply_where_limit_order_params
end
- def apply_where_limit_order_params
- ar_table_name = @objects.table_name
-
- ft = record_filters @filters, ar_table_name
+ def apply_filters
+ ft = record_filters @filters, @objects.table_name
if ft[:cond_out].any?
@objects = @objects.where(ft[:cond_out].join(' AND '), *ft[:param_out])
end
+ end
+ def apply_where_limit_order_params
+ apply_filters
+
+ ar_table_name = @objects.table_name
if @where.is_a? Hash and @where.any?
conditions = ['1=1']
@where.each do |attr,value|
end
if params[:find_or_create]
- r = Commit.find_commit_range(current_user,
- resource_attrs[:repository],
- params[:minimum_script_version],
- resource_attrs[:script_version],
- params[:exclude_script_versions])
- # Search for jobs whose script_version is in the list of commits
- # returned by find_commit_range
+ load_filters_param
+ if @filters.empty? # Translate older creation parameters into filters.
+ @filters = [:repository, :script].map do |attrsym|
+ [attrsym.to_s, "=", resource_attrs[attrsym]]
+ end
+ @filters.append(["script_version", "in",
+ Commit.find_commit_range(current_user,
+ resource_attrs[:repository],
+ params[:minimum_script_version],
+ resource_attrs[:script_version],
+ params[:exclude_script_versions])])
+ if image_search = resource_attrs[:runtime_constraints].andand["docker_image"]
+ image_tag = resource_attrs[:runtime_constraints]["docker_image_tag"]
+ image_locator = Collection.
+ uuids_for_docker_image(image_search, image_tag, @read_users).last
+ return super if image_locator.nil? # We won't find anything to reuse.
+ @filters.append(["docker_image_locator", "=", image_locator])
+ else
+ @filters.append(["docker_image_locator", "=", nil])
+ end
+ else # Check specified filters for some reasonableness.
+ filter_names = @filters.map { |f| f.first }.uniq
+ ["repository", "script"].each do |req_filter|
+ if not filter_names.include?(req_filter)
+ raise ArgumentError.new("#{req_filter} filter required")
+ end
+ end
+ end
+
+ # Search for a reusable Job, and return it if found.
+ @objects = Job.readable_by(current_user)
+ apply_filters
@object = nil
incomplete_job = nil
- Job.readable_by(current_user).where(script: resource_attrs[:script],
- script_version: r).
- each do |j|
+ @objects.each do |j|
if j.nondeterministic != true and
((j.success == true and j.output != nil) or j.running == true) and
j.script_parameters == resource_attrs[:script_parameters]
def self._queue_requires_parameters
self._index_requires_parameters
end
+
+ protected
+
+ def load_filters_param
+ # Convert Job-specific git and Docker filters into normal SQL filters.
+ super
+ script_info = {"repository" => nil, "script" => nil}
+ script_range = {"exclude_versions" => []}
+ @filters.select! do |filter|
+ if (script_info.has_key? filter[0]) and (filter[1] == "=")
+ if script_info[filter[0]].nil?
+ script_info[filter[0]] = filter[2]
+ elsif script_info[filter[0]] != filter[2]
+ raise ArgumentError.new("incompatible #{filter[0]} filters")
+ end
+ end
+ case filter[0..1]
+ when ["script_version", "in git"]
+ script_range["min_version"] = filter.last
+ false
+ when ["script_version", "not in git"]
+ begin
+ script_range["exclude_versions"] += filter.last
+ rescue TypeError
+ script_range["exclude_versions"] << filter.last
+ end
+ false
+ when ["docker_image_locator", "in docker"], ["docker_image_locator", "not in docker"]
+ filter[1].sub!(/ docker$/, '')
+ image_search, image_tag = filter[2].split(':', 2)
+ filter[2] = Collection.
+ uuids_for_docker_image(image_search, image_tag, @read_users)
+ true
+ else
+ true
+ end
+ end
+
+ # Build a real script_version filter from any "not? in git" filters.
+ if (script_range.size > 1) or script_range["exclude_versions"].any?
+ script_info.each_pair do |key, value|
+ if value.nil?
+ raise ArgumentError.new("script_version filter needs #{key} filter")
+ end
+ end
+ last_version = begin resource_attrs[:script_version] rescue "HEAD" end
+ @filters.append(["script_version", "in",
+ Commit.find_commit_range(current_user,
+ script_info["repository"],
+ script_range["min_version"],
+ last_version,
+ script_range["exclude_versions"])])
+ end
+ end
end
[hash_part, size_part].compact.join '+'
end
- def self.for_latest_docker_image(search_term, search_tag=nil, readers=nil)
+ def self.uuids_for_docker_image(search_term, search_tag=nil, readers=nil)
readers ||= [Thread.current[:user]]
base_search = Link.
readable_by(*readers).
coll_matches = base_search.
where(link_class: "docker_image_hash", collections: {uuid: search_term})
if match = coll_matches.first
- return find_by_uuid(match.head_uuid)
+ return [match.head_uuid]
end
# Find Collections with matching Docker image repository+tag pairs.
"docker_image_hash", "#{search_term}%")
end
- # Select the image that was created most recently. Note that the
- # SQL search order and fallback timestamp values are chosen so
- # that if image timestamps are missing, we use the image with the
- # newest link.
- latest_image_link = nil
- latest_image_timestamp = "1900-01-01T00:00:00Z"
+ # Generate an order key for each result. We want to order the results
+ # so that anything with an image timestamp is considered more recent than
+ # anything without; then we use the link's created_at as a tiebreaker.
+ results = {}
matches.find_each do |link|
- link_timestamp = link.properties.fetch("image_timestamp",
- "1900-01-01T00:00:01Z")
- if link_timestamp > latest_image_timestamp
- latest_image_link = link
- latest_image_timestamp = link_timestamp
+ sort_key = []
+ if timestamp = link.properties["image_timestamp"]
+ sort_key.push("Z", timestamp.to_s)
end
+ sort_key.push("Y", link.created_at.to_s(:db))
+ results[link] = sort_key.join("")
+ end
+ results.keys.sort_by { |link| results[link] }.map { |link| link.head_uuid }
+ end
+
+ def self.for_latest_docker_image(search_term, search_tag=nil, readers=nil)
+ image_uuid = uuids_for_docker_image(search_term, search_tag, readers).last
+ if image_uuid.nil?
+ nil
+ else
+ find_by_uuid(image_uuid)
end
- latest_image_link.nil? ? nil : find_by_uuid(latest_image_link.head_uuid)
end
end
previous_job_run:
uuid: zzzzz-8i9sb-cjs4pklxxjykqqq
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ repository: foo
script: hash
script_version: 4fe459abe02d9b365932b8f5dc419439ab4e2577
script_parameters:
success: true
output: ea10d51bcf88862dbcc36eb292017dfd+45
+previous_docker_job_run:
+ uuid: zzzzz-8i9sb-k6emstgk4kw4yhi
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ repository: foo
+ script: hash
+ script_version: 4fe459abe02d9b365932b8f5dc419439ab4e2577
+ script_parameters:
+ input: fa7aeb5140e2848d39b416daeef4ffc5+45
+ an_integer: "1"
+ success: true
+ output: ea10d51bcf88862dbcc36eb292017dfd+45
+ docker_image_locator: fa3c1a9cb6783f85f2ecda037e07b8c3+167
+
previous_job_run_no_output:
uuid: zzzzz-8i9sb-cjs4pklxxjykppp
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ repository: foo
script: hash
script_version: 4fe459abe02d9b365932b8f5dc419439ab4e2577
script_parameters:
nondeterminisic_job_run:
uuid: zzzzz-8i9sb-cjs4pklxxjykyyy
owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ repository: foo
script: hash2
script_version: 4fe459abe02d9b365932b8f5dc419439ab4e2577
script_parameters:
assert_equal '077ba2ad3ea24a929091a9e6ce545c93199b8e57', new_job['script_version']
end
+ BASE_FILTERS = {
+ 'repository' => ['=', 'foo'],
+ 'script' => ['=', 'hash'],
+ 'script_version' => ['in git', 'master'],
+ 'docker_image_locator' => ['=', nil],
+ }
+
+ def filters_from_hash(hash)
+ hash.each_pair.map { |name, filter| [name] + filter }
+ end
+
+ test "can reuse a Job based on filters" do
+ filter_h = BASE_FILTERS.
+ merge('script_version' => ['in git', 'tag1'])
+ post(:create, {
+ job: {
+ script: "hash",
+ script_version: "master",
+ repository: "foo",
+ script_parameters: {
+ input: 'fa7aeb5140e2848d39b416daeef4ffc5+45',
+ an_integer: '1'
+ }
+ },
+ filters: filters_from_hash(filter_h),
+ find_or_create: true,
+ })
+ assert_response :success
+ assert_not_nil assigns(:object)
+ new_job = JSON.parse(@response.body)
+ assert_equal 'zzzzz-8i9sb-cjs4pklxxjykqqq', new_job['uuid']
+ assert_equal '4fe459abe02d9b365932b8f5dc419439ab4e2577', new_job['script_version']
+ end
+
+ test "can not reuse a Job based on filters" do
+ filter_a = filters_from_hash(BASE_FILTERS.reject { |k| k == 'script_version' })
+ filter_a += [["script_version", "in git",
+ "31ce37fe365b3dc204300a3e4c396ad333ed0556"],
+ ["script_version", "not in git", ["tag1"]]]
+ post(:create, {
+ job: {
+ script: "hash",
+ script_version: "master",
+ repository: "foo",
+ script_parameters: {
+ input: 'fa7aeb5140e2848d39b416daeef4ffc5+45',
+ an_integer: '1'
+ }
+ },
+ filters: filter_a,
+ find_or_create: true,
+ })
+ assert_response :success
+ assert_not_nil assigns(:object)
+ new_job = JSON.parse(@response.body)
+ assert_not_equal 'zzzzz-8i9sb-cjs4pklxxjykqqq', new_job['uuid']
+ assert_equal '077ba2ad3ea24a929091a9e6ce545c93199b8e57', new_job['script_version']
+ end
+
+ test "can not reuse a Job based on arbitrary filters" do
+ filter_h = BASE_FILTERS.
+ merge("created_at" => ["<", "2010-01-01T00:00:00Z"])
+ post(:create, {
+ job: {
+ script: "hash",
+ script_version: "4fe459abe02d9b365932b8f5dc419439ab4e2577",
+ repository: "foo",
+ script_parameters: {
+ input: 'fa7aeb5140e2848d39b416daeef4ffc5+45',
+ an_integer: '1'
+ }
+ },
+ filters: filters_from_hash(filter_h),
+ find_or_create: true,
+ })
+ assert_response :success
+ assert_not_nil assigns(:object)
+ new_job = JSON.parse(@response.body)
+ assert_not_equal 'zzzzz-8i9sb-cjs4pklxxjykqqq', new_job['uuid']
+ assert_equal '4fe459abe02d9b365932b8f5dc419439ab4e2577', new_job['script_version']
+ end
+
+ test "can reuse a Job with a Docker image" do
+ post(:create, {
+ job: {
+ script: "hash",
+ script_version: "4fe459abe02d9b365932b8f5dc419439ab4e2577",
+ repository: "foo",
+ script_parameters: {
+ input: 'fa7aeb5140e2848d39b416daeef4ffc5+45',
+ an_integer: '1'
+ },
+ runtime_constraints: {
+ docker_image: 'arvados/apitestfixture',
+ }
+ },
+ find_or_create: true,
+ })
+ assert_response :success
+ new_job = assigns(:object)
+ assert_not_nil new_job
+ target_job = jobs(:previous_docker_job_run)
+ [:uuid, :script_version, :docker_image_locator].each do |attr|
+ assert_equal(target_job.send(attr), new_job.send(attr))
+ end
+ end
+
+ test "can reuse a Job with a Docker image hash filter" do
+ filter_h = BASE_FILTERS.
+ merge("script_version" =>
+ ["=", "4fe459abe02d9b365932b8f5dc419439ab4e2577"],
+ "docker_image_locator" =>
+ ["in docker", links(:docker_image_collection_hash).name])
+ post(:create, {
+ job: {
+ script: "hash",
+ script_version: "4fe459abe02d9b365932b8f5dc419439ab4e2577",
+ repository: "foo",
+ script_parameters: {
+ input: 'fa7aeb5140e2848d39b416daeef4ffc5+45',
+ an_integer: '1'
+ },
+ },
+ filters: filters_from_hash(filter_h),
+ find_or_create: true,
+ })
+ assert_response :success
+ new_job = assigns(:object)
+ assert_not_nil new_job
+ target_job = jobs(:previous_docker_job_run)
+ [:uuid, :script_version, :docker_image_locator].each do |attr|
+ assert_equal(target_job.send(attr), new_job.send(attr))
+ end
+ end
+
+ test "reuse Job with Docker image repo+tag" do
+ filter_h = BASE_FILTERS.
+ merge("script_version" =>
+ ["=", "4fe459abe02d9b365932b8f5dc419439ab4e2577"],
+ "docker_image_locator" =>
+ ["in docker", links(:docker_image_collection_tag2).name])
+ post(:create, {
+ job: {
+ script: "hash",
+ script_version: "4fe459abe02d9b365932b8f5dc419439ab4e2577",
+ repository: "foo",
+ script_parameters: {
+ input: 'fa7aeb5140e2848d39b416daeef4ffc5+45',
+ an_integer: '1'
+ },
+ },
+ filters: filters_from_hash(filter_h),
+ find_or_create: true,
+ })
+ assert_response :success
+ new_job = assigns(:object)
+ assert_not_nil new_job
+ target_job = jobs(:previous_docker_job_run)
+ [:uuid, :script_version, :docker_image_locator].each do |attr|
+ assert_equal(target_job.send(attr), new_job.send(attr))
+ end
+ end
+
+ test "new job with unknown Docker image filter" do
+ filter_h = BASE_FILTERS.
+ merge("docker_image_locator" => ["in docker", "_nonesuchname_"])
+ post(:create, {
+ job: {
+ script: "hash",
+ script_version: "4fe459abe02d9b365932b8f5dc419439ab4e2577",
+ repository: "foo",
+ script_parameters: {
+ input: 'fa7aeb5140e2848d39b416daeef4ffc5+45',
+ an_integer: '1'
+ },
+ },
+ filters: filters_from_hash(filter_h),
+ find_or_create: true,
+ })
+ assert_response :success
+ new_job = assigns(:object)
+ assert_not_nil new_job
+ assert_not_equal(jobs(:previous_docker_job_run).uuid, new_job.uuid)
+ end
+
+ ["repository", "script"].each do |skip_key|
+ test "missing #{skip_key} filter raises an error" do
+ filter_a = filters_from_hash(BASE_FILTERS.reject { |k| k == skip_key })
+ post(:create, {
+ job: {
+ script: "hash",
+ script_version: "master",
+ repository: "foo",
+ script_parameters: {
+ input: 'fa7aeb5140e2848d39b416daeef4ffc5+45',
+ an_integer: '1'
+ }
+ },
+ filters: filter_a,
+ find_or_create: true,
+ })
+ assert_includes(405..599, @response.code.to_i,
+ "bad status code with missing #{skip_key} filter")
+ end
+ end
+
+ test "find Job with script version range" do
+ get :index, filters: [["repository", "=", "foo"],
+ ["script", "=", "hash"],
+ ["script_version", "in git", "tag1"]]
+ assert_response :success
+ assert_not_nil assigns(:objects)
+ assert_includes(assigns(:objects).map { |job| job.uuid },
+ jobs(:previous_job_run).uuid)
+ end
+
+ test "find Job with script version range exclusions" do
+ get :index, filters: [["repository", "=", "foo"],
+ ["script", "=", "hash"],
+ ["script_version", "not in git", "tag1"]]
+ assert_response :success
+ assert_not_nil assigns(:objects)
+ refute_includes(assigns(:objects).map { |job| job.uuid },
+ jobs(:previous_job_run).uuid)
+ end
+
+ test "find Job with Docker image range" do
+ get :index, filters: [["docker_image_locator", "in docker",
+ "arvados/apitestfixture"]]
+ assert_response :success
+ assert_not_nil assigns(:objects)
+ assert_includes(assigns(:objects).map { |job| job.uuid },
+ jobs(:previous_docker_job_run).uuid)
+ refute_includes(assigns(:objects).map { |job| job.uuid },
+ jobs(:previous_job_run).uuid)
+ end
+
+ test "find Job with Docker image using reader tokens" do
+ authorize_with :inactive
+ get(:index, {
+ filters: [["docker_image_locator", "in docker",
+ "arvados/apitestfixture"]],
+ reader_tokens: [api_token(:active)],
+ })
+ assert_response :success
+ assert_not_nil assigns(:objects)
+ assert_includes(assigns(:objects).map { |job| job.uuid },
+ jobs(:previous_docker_job_run).uuid)
+ refute_includes(assigns(:objects).map { |job| job.uuid },
+ jobs(:previous_job_run).uuid)
+ end
end