-#
-# Demonstrate Arvados federation features. This performs a parallel
-# scatter over some arbitrary number of files and federated clusters,
-# then joins the results.
-#
+# Demonstrate Arvados federation features. This example searches a
+# list of CSV files that are hosted on different Arvados clusters.
+# For each file, send a task to the remote cluster which will scan
+# file and extracts the rows where the column "select_column" has one
+# of the values appearing in the "select_values" file. The home
+# cluster then runs a task which pulls the results from the remote
+# clusters and merges the results to produce a final report.
+
cwlVersion: v1.0
class: Workflow
$namespaces:
dockerPull: arvados/jobs
# Define a record type so we can conveniently associate the input
- # file, the cluster on which the file lives, and the project on that
- # cluster that will own the container requests and intermediate
- # outputs.
+ # file and the cluster where the task should run.
SchemaDefRequirement:
types:
- - name: FileOnCluster
- type: record
- fields:
- file: File
- cluster: string
- project: string
+ - $import: FileOnCluster.yml
inputs:
- # Expect an array of FileOnCluster records (defined above)
- # as our input.
- shards:
+ select_column: string
+ select_values: File
+
+ datasets:
type:
type: array
- items: FileOnCluster
+ items: FileOnCluster.yml#FileOnCluster
+
+ intermediate_projects: string[]
outputs:
# Will produce an output file with the results of the distributed
- # analysis jobs joined together.
+ # analysis jobs merged together.
joined:
type: File
- outputSource: gather-results/joined
+ outputSource: gather-results/out
steps:
distributed-analysis:
in:
- # Take "shards" array as input, we scatter over it below.
- shard: shards
-
- # Use an expression to extract the "file" field to assign to the
- # "inp" parameter of the tool.
- inp: {valueFrom: $(inputs.shard.file)}
+ select_column: select_column
+ select_values: select_values
+ dataset: datasets
+ intermediate_projects: intermediate_projects
# Scatter over shards, this means creating a parallel job for each
# element in the "shards" array. Expressions are evaluated for
# each element.
- scatter: shard
+ scatter: [dataset, intermediate_projects]
+ scatterMethod: dotproduct
- # Specify the cluster target for this job. This means each
- # separate scatter job will execute on the cluster that was
+ # Specify the cluster target for this task. This means each
+ # separate scatter task will execute on the cluster that was
# specified in the "cluster" field.
#
# Arvados handles streaming data between clusters, for example,
# the federation.
hints:
arv:ClusterTarget:
- cluster_id: $(inputs.shard.cluster)
- project_uuid: $(inputs.shard.project)
+ cluster_id: $(inputs.dataset.cluster)
+ project_uuid: $(inputs.intermediate_projects)
out: [out]
- run: md5sum.cwl
+ run: extract.cwl
# Collect the results of the distributed step and join them into a
# single output file. Arvados handles streaming inputs,
# intermediate results, and outputs between clusters on demand.
gather-results:
in:
- inp: distributed-analysis/out
- out: [joined]
- run: cat.cwl
+ dataset: distributed-analysis/out
+ out: [out]
+ run: merge.cwl