"dataclass": "Collection"
}
},
- "script_version":"<b>you</b>:master",
+ "repository":"<b>you</b>",
- "script_version":"master"
++ "script_version":"master",
+ "output_is_persistent":true
}
}
}
* @"components"@ is a set of scripts that make up the pipeline
* The component is listed with a human-readable name (@"do_hash"@ in this example)
* @"script"@ specifies the name of the script to run. The script is searched for in the "crunch_scripts/" subdirectory of the @git@ checkout specified by @"script_version"@.
- * @"script_version"@ specifies the version of the script that you wish to run. This can be in the form of an explicit @git@ revision hash, or in the form "repository:branch" (in which case it will take the HEAD of the specified branch). Arvados logs the script version that was used in the run, enabling you to go back and re-run any past job with the guarantee that the exact same code will be used as was used in the previous run. You can access a list of available @git@ repositories on the Arvados workbench under "Compute %(rarr)→% Code repositories":http://{{site.arvados_workbench_host}}//repositories .
+ * @"repository"@ is the git repository to search for the script version. You can access a list of available @git@ repositories on the Arvados workbench under "Compute %(rarr)→% Code repositories":https://{{site.arvados_workbench_host}}//repositories .
+ * @"script_version"@ specifies the version of the script that you wish to run. This can be in the form of an explicit @git@ revision hash, a tag, or a branch (in which case it will take the HEAD of the specified branch). Arvados logs the script version that was used in the run, enabling you to go back and re-run any past job with the guarantee that the exact same code will be used as was used in the previous run.
* @"script_parameters"@ describes the parameters for the script. In this example, there is one parameter called @input@ which is @required@ and is a @Collection@.
+* @"output_is_persistent"@ indicates whether the output of the job is considered valuable. If this value is false (or not given), the output will be treated as intermediate data and eventually deleted to reclaim disk space.
Now, use @arv pipeline_template create@ tell Arvados about your pipeline template:
moretodo = false
@components.each do |cname, c|
job = nil
-
+ c_already_finished = (c[:job] &&
+ c[:job][:uuid] &&
+ !c[:job][:success].nil?)
if !c[:job] and
- c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
- # Job is fully specified (all parameter values are present) but
- # no particular job has been found.
-
- debuglog "component #{cname} ready to satisfy."
-
- c.delete :wait
- second_place_job = nil # satisfies component, but not finished yet
-
- (@options[:no_reuse] ? [] : JobCache.
- where(script: c[:script],
- script_parameters: c[:script_parameters],
- script_version_descends_from: c[:script_version])
- ).each do |candidate_job|
- candidate_params_downcase = Hash[candidate_job[:script_parameters].
- map { |k,v| [k.downcase,v] }]
- c_params_downcase = Hash[c[:script_parameters].
- map { |k,v| [k.downcase,v] }]
-
- debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
-
- unless candidate_params_downcase == c_params_downcase
- next
- end
-
- if c[:script_version] !=
- candidate_job[:script_version][0,c[:script_version].length]
- debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if script_version matched.", 2
- next
- end
-
- unless candidate_job[:success] || candidate_job[:running] ||
- (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
- debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
- next
- end
-
- if candidate_job[:success]
- unless @options[:no_reuse_finished]
- job = candidate_job
- $stderr.puts "using #{job[:uuid]} (finished at #{job[:finished_at]}) for component #{cname}"
- c[:job] = job
- end
- else
- second_place_job ||= candidate_job
- end
- break
- end
- if not c[:job] and second_place_job
- job = second_place_job
- $stderr.puts "using #{job[:uuid]} (running since #{job[:started_at]}) for component #{cname}"
+ c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
+ # No job yet associated with this component and is component inputs
+ # are fully specified (any output_of script_parameters are resolved
+ # to real value)
+ job = JobCache.create({:script => c[:script],
+ :script_parameters => c[:script_parameters],
+ :script_version => c[:script_version],
+ :repository => c[:repository],
+ :minimum_script_version => c[:minimum_script_version],
+ :exclude_script_versions => c[:exclude_minimum_script_versions],
+ :nondeterministic => c[:nondeterministic],
- :no_reuse => @options[:no_reuse]})
++ :no_reuse => @options[:no_reuse],
++ :output_is_persistent => c[:output_is_persistent] || false})
+ if job
+ debuglog "component #{cname} new job #{job[:uuid]}"
c[:job] = job
+ else
+ debuglog "component #{cname} new job failed"
end
- if not c[:job]
- debuglog "component #{cname} not satisfied by any existing job."
- if !@options[:dry_run]
- debuglog "component #{cname} new job."
- job = JobCache.create(:script => c[:script],
- :script_parameters => c[:script_parameters],
- :runtime_constraints => c[:runtime_constraints] || {},
- :script_version => c[:script_version] || 'master',
- :output_is_persistent => c[:output_is_persistent] || false)
- if job
- debuglog "component #{cname} new job #{job[:uuid]}"
- c[:job] = job
- else
- debuglog "component #{cname} new job failed"
- end
- end
- end
- else
- c[:wait] = true
end
+
if c[:job] and c[:job][:uuid]
if (c[:job][:running] or
not (c[:job][:finished_at] or c[:job][:cancelled_at]))
end
end
end
+ unless c_already_finished
+ if c[:output_is_persistent]
+ # This is my first time discovering that the job
+ # succeeded. I need to make sure a resources/wants
+ # link is in place to protect the output from garbage
+ # collection. (Normally Crunch does this for me, but
+ # here I might be reusing the output of someone else's
+ # job and I need to make sure it's understood that the
+ # output is valuable to me, too.)
+ wanted = c[:job][:output]
+ debuglog "checking for existing persistence link for #{wanted}"
+ @my_user_uuid ||= $arv.user.current[:uuid]
+ links = $arv.link.list(limit: 1,
+ filters:
+ [%w(link_class = resources),
+ %w(name = wants),
+ %w(tail_uuid =) + [@my_user_uuid],
+ %w(head_uuid =) + [wanted]
+ ])[:items]
+ if links.any?
+ debuglog "link already exists, uuid #{links.first[:uuid]}"
+ else
+ newlink = $arv.link.create link: \
+ {
+ link_class: 'resources',
+ name: 'wants',
+ tail_kind: 'arvados#user',
+ tail_uuid: @my_user_uuid,
+ head_kind: 'arvados#collection',
+ head_uuid: wanted
+ }
+ debuglog "added link, uuid #{newlink[:uuid]}"
+ end
+ end
+ end
elsif c[:job][:running] ||
(!c[:job][:started_at] && !c[:job][:cancelled_at])
+ # Job is still running
moretodo = true
elsif c[:job][:cancelled_at]
debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."