The methods are relative to the base URI, e.g., @/arvados/v1/resource_type@. For arguments specifying a *Location* of @path@, the value of the argument is incorporated into the path portion of the URI. For example, a @uuid@ of @aaaaa-bbbbb-ccccccccccccccc@ in a path position yields a URI of @/arvados/v1/resource_type/aaaaa-bbbbb-ccccccccccccccc@.
-Arguments specifying a *Location* of @query@ are incorporated into the query portion of the URI or request body. For example, @/arvados/v1/resource_type?resource_type={}@.
+Arguments specifying a *Location* of "query" are incorporated into the query portion of the URI or request body. For example, @/arvados/v1/resource_type?count=none@.
+
+Certain method calls on certain object types support "federation":{{site.baseurl}}/architecture/federation.html , that is, the ability to operate on objects owned by different clusters. API pages for specific object types list which federated operations are supported for that type (if any) in the "Methods" section.
h2. create
This method corresponds to the HTTP request @POST /arvados/v1/resource_type@. A successful create call returns a copy of the new object.
+To create an object on a remote cluster (federated create), provide the @cluster_id@ of the target cluster.
+
Arguments:
table(table table-bordered table-condensed).
|_. Argument |_. Type |_. Description |_. Location |
-|{resource_type}|object|Name is the singular form of the resource type, e.g., for the "collections" resource, this argument is "collection"|query||
+|{resource_type}|object|Name is the singular form of the resource type, e.g., for the "collections" resource, this argument is "collection"|body||
+|{cluster_id}|string|Optional, the cluster on which to create the object if not the current cluster.|query||
h2. delete
The @delete@ method deletes an object of the specified type. It corresponds to the HTTP request @DELETE /arvados/v1/resource_type/uuid@. A successful delete call returns a copy of the deleted object.
+The cluster id portion of the @uuid@ is used to determine which cluster owns the object, a federated delete request will be routed to that cluster.
+
Arguments:
table(table table-bordered table-condensed).
The @get@ method gets a single object with the specified @uuid@. It corresponds to the HTTP request @GET /arvados/v1/resource_type/uuid@.
+The cluster id portion of the @uuid@ is used to determine which cluster owns the object, a federated get request will be routed to that cluster.
+
Arguments:
table(table table-bordered table-condensed).
Note that exclusion filters @!=@ and @not in@ will return records for which the property is not defined at all. To restrict filtering to records on which the subproperty is defined, combine with an @exists@ filter.
+h4. Federated listing
+
+Federated listing forwards a request to multiple clusters and combines the results. Currently only a very restricted form of the "list" method is supported.
+
+To query multiple clusters, the list request must:
+
+* Have filters only matching @[["uuid", "in", [...]]@ or @["uuid", "=", "..."]@
+* Specify @count=none@
+* If @select@ is specified, it must include @uuid@
+* Not specify @limit@, @offset@ or @order@
+* Not request more items than the maximum response size
+
+This form may be used to request a specific list of objects by uuid which are owned by multiple clusters.
+
h3. Results of list method
A successful call to list will return the following object.
The @update@ method updates fields on the object with the specified @uuid@. It corresponds to the HTTP request @PUT /arvados/v1/resource_type/uuid@. Note that only the listed attributes (and "standard metadata":resources.html) are updated, unset attributes will retain their previous values, and the attributes of a given resource type are fixed (you cannot introduce new toplevel attributes). Also note that updates replace the value of the attribute, so if an attribute has an object value, the entire object is replaced. A successful update call returns the updated copy of the object.
+The cluster id portion of the @uuid@ is used to determine which cluster owns the object, a federated update request will be routed to that cluster.
+
table(table table-bordered table-condensed).
|_. Argument |_. Type |_. Description |_. Location |
{background:#ccffcc}.|uuid|string|The UUID of the resource in question.|path||
Required arguments are displayed in %{background:#ccffcc}green%.
+Supports federated @get@ only, which may be called with either a uuid or a portable data hash. When requesting a portable data hash which is not available on the home cluster, the query is forwarded to all the clusters listed in @RemoteClusters@ and returns the first successful result.
+
h3. create
Create a new Collection.
h3. get
-Gets a Collection's metadata by UUID.
+Gets a Collection's metadata by UUID or portable data hash. When making a request by portable data hash, the returned record will only have the @portable_data_hash@ and @manifest_text@.
Arguments:
Required arguments are displayed in %{background:#ccffcc}green%.
+Supports federated @create@, @delete@, @get@, @list@, and @update@.
+
h2(#create). create
Create a new container request.
|state|string|The allowed states are "Queued", "Locked", "Running", "Cancelled" and "Complete".|See "Container states":#container_states for more details.|
|started_at|datetime|When this container started running.|Null if container has not yet started.|
|finished_at|datetime|When this container finished.|Null if container has not yet finished.|
-|log|string|Portable data hash of the collection containing logs from a completed container run.|Null if the container is not yet finished.|
+|log|string|UUID or portable data hash of a collection containing the log messages produced when executing the container.|PDH after the container is finished, otherwise UUID or null.|
|environment|hash|Environment variables and values that should be set in the container environment (@docker run --env@). This augments and (when conflicts exist) overrides environment variables given in the image's Dockerfile.|Must be equal to a ContainerRequest's environment in order to satisfy the ContainerRequest.|
|cwd|string|Initial working directory.|Must be equal to a ContainerRequest's cwd in order to satisfy the ContainerRequest|
|command|array of strings|Command to execute.| Must be equal to a ContainerRequest's command in order to satisfy the ContainerRequest.|
Required arguments are displayed in %{background:#ccffcc}green%.
+Supports federated @get@ and @list@.
+
h2(#create). create
Create a new Container.
|order|array|Attributes to use as sort keys to determine the order resources are returned, each optionally followed by @asc@ or @desc@ to indicate ascending or descending order. Sort within a resource type by prefixing the attribute with the resource name and a period.|query|@["collections.modified_at desc"]@|
|filters|array|Conditions for filtering items.|query|@[["uuid", "is_a", "arvados#job"]]@|
|recursive|boolean (default false)|Include items owned by subprojects.|query|@true@|
+|exclude_home_project|boolean (default false)|Only return items which are visible to the user but not accessible within the user's home project. Use this to get a list of items that are shared with the user.|query|@true@|
Note: Because adding access tokens to manifests can be computationally expensive, the @manifest_text@ field is not included in listed collections. If you need it, request a "list of collections":{{site.baseurl}}/api/methods/collections.html with the filter @["owner_uuid", "=", GROUP_UUID]@, and @"manifest_text"@ listed in the select parameter.
Required arguments are displayed in %{background:#ccffcc}green%.
+Supports federated @create@, @delete@, @get@, @list@, and @update@.
+
h3. create
Create a new Workflow.
<p><strong>What is Arvados</strong>
<p><a href="https://arvados.org/">Arvados</a> enables you to quickly begin using cloud computing resources in your data science work. It allows you to track your methods and datasets, share them securely, and easily re-run analyses.
</p>
- <p><strong>Communications</strong>
- <p>Read our <a href="https://dev.arvados.org/projects/arvados/blogs">blog updates</a> or look through our <a href="https://dev.arvados.org/projects/arvados/activity">recent developer activity</a>.
- </p>
- <p>Questions? Email <a href="http://lists.arvados.org/mailman/listinfo/arvados">the mailing list</a>, or chat with us on IRC: <a href="irc://irc.oftc.net:6667/#arvados">#arvados</a> @ OFTC (you can <a href="https://webchat.oftc.net/?channels=arvados">join in your browser</a>).
+
+ <a name="Support"></a>
+ <p><strong>Support and Community</strong></p>
+
+<p>The recommended place to ask a question about Arvados is on Biostars. After you have <a href="//www.biostars.org/t/arvados/">read previous questions and answers</a> you can <a href="https://www.biostars.org/p/new/post/?tag_val=arvados">post your question using the 'arvados' tag</a>.</p>
+
+ <p>There is a <a href="http://lists.arvados.org/mailman/listinfo/arvados">mailing list</a>, and chat on IRC: <a href="irc://irc.oftc.net:6667/#arvados">#arvados</a> @ OFTC (you can <a href="https://webchat.oftc.net/?channels=arvados">join in your browser</a>).
</p>
- <p><strong>Want to contribute?</strong></p>
- <p>Check out our <a href="https://dev.arvados.org/projects/arvados">developer site</a>. We're open source, check out our code on <a href="https://github.com/curoverse/arvados">github</a>.
+
+ <p>Curoverse, a Veritas Genetics company, provides managed Arvados installations as well as commercial support for Arvados. Please visit <a href="https://curoverse.com">curoverse.com</a> or contact <a href="mailto:researchsales@veritasgenetics.com">researchsales@veritasgenetics.com</a> for more information.</p>
+
+ <p><strong>Contributing</strong></p>
+ <p>Please visit the <a href="https://dev.arvados.org/projects/arvados/wiki/Wiki#Contributing-and-hacking">developer site</a>. Arvados is 100% free and open source software, check out the code on <a href="https://github.com/curoverse/arvados">github</a>.
+
+ <p>Arvados is under active development, see the <a href="https://dev.arvados.org/projects/arvados/activity">recent developer activity</a>.
</p>
<p><strong>License</strong></p>
- <p>Arvados is under the copyleft <a href="{{ site.baseurl }}/user/copying/agpl-3.0.html">GNU AGPL v3</a>, with our SDKs under <a href="{{ site.baseurl }}/user/copying/LICENSE-2.0.html">Apache License 2.0</a> (so that you can incorporate proprietary toolchains into your pipelines).
+ <p>Most of Arvados is licensed under the <a href="{{ site.baseurl }}/user/copying/agpl-3.0.html">GNU AGPL v3</a>. The SDKs are licensed under the <a href="{{ site.baseurl }}/user/copying/LICENSE-2.0.html">Apache License 2.0</a> so that they can be incorporated into proprietary code. See the <a href="https://github.com/curoverse/arvados/blob/master/COPYING">COPYING file</a> for more information.
</p>
</div>
"git.curoverse.com/arvados.git/sdk/go/keepclient"
)
-var pathPattern = `^/arvados/v1/%s(/([0-9a-z]{5})-%s-)?.*$`
+var pathPattern = `^/arvados/v1/%s(/([0-9a-z]{5})-%s-[0-9a-z]{15})?(.*)$`
var wfRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "workflows", "7fd4e"))
var containersRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "containers", "dz642"))
var containerRequestsRe = regexp.MustCompile(fmt.Sprintf(pathPattern, "container_requests", "xvhdp"))
h.proxy.Do(w, req, urlOut, client, filter)
}
-func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
- m := h.matcher.FindStringSubmatch(req.URL.Path)
- clusterId := ""
+// Buffer request body, parse form parameters in request, and then
+// replace original body with the buffer so it can be re-read by
+// downstream proxy steps.
+func loadParamsFromForm(req *http.Request) error {
+ var postBody *bytes.Buffer
+ if req.Body != nil && req.Header.Get("Content-Type") == "application/x-www-form-urlencoded" {
+ var cl int64
+ if req.ContentLength > 0 {
+ cl = req.ContentLength
+ }
+ postBody = bytes.NewBuffer(make([]byte, 0, cl))
+ originalBody := req.Body
+ defer originalBody.Close()
+ req.Body = ioutil.NopCloser(io.TeeReader(req.Body, postBody))
+ }
- if len(m) == 3 {
- clusterId = m[2]
+ err := req.ParseForm()
+ if err != nil {
+ return err
+ }
+
+ if req.Body != nil && postBody != nil {
+ req.Body = ioutil.NopCloser(postBody)
}
+ return nil
+}
+
+type multiClusterQueryResponseCollector struct {
+ responses []map[string]interface{}
+ error error
+ kind string
+ clusterID string
+}
+
+func (c *multiClusterQueryResponseCollector) collectResponse(resp *http.Response,
+ requestError error) (newResponse *http.Response, err error) {
+ if requestError != nil {
+ c.error = requestError
+ return nil, nil
+ }
+
+ defer resp.Body.Close()
+ var loadInto struct {
+ Kind string `json:"kind"`
+ Items []map[string]interface{} `json:"items"`
+ Errors []string `json:"errors"`
+ }
+ err = json.NewDecoder(resp.Body).Decode(&loadInto)
+
+ if err != nil {
+ c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, err)
+ return nil, nil
+ }
+ if resp.StatusCode != http.StatusOK {
+ c.error = fmt.Errorf("error fetching from %v (%v): %v", c.clusterID, resp.Status, loadInto.Errors)
+ return nil, nil
+ }
+
+ c.responses = loadInto.Items
+ c.kind = loadInto.Kind
+
+ return nil, nil
+}
- if clusterId == "" {
- if values, err := url.ParseQuery(req.URL.RawQuery); err == nil {
- if len(values["cluster_id"]) == 1 {
- clusterId = values["cluster_id"][0]
+func (h *genericFederatedRequestHandler) remoteQueryUUIDs(w http.ResponseWriter,
+ req *http.Request,
+ clusterID string, uuids []string) (rp []map[string]interface{}, kind string, err error) {
+
+ found := make(map[string]bool)
+ prev_len_uuids := len(uuids) + 1
+ // Loop while
+ // (1) there are more uuids to query
+ // (2) we're making progress - on each iteration the set of
+ // uuids we are expecting for must shrink.
+ for len(uuids) > 0 && len(uuids) < prev_len_uuids {
+ var remoteReq http.Request
+ remoteReq.Header = req.Header
+ remoteReq.Method = "POST"
+ remoteReq.URL = &url.URL{Path: req.URL.Path}
+ remoteParams := make(url.Values)
+ remoteParams.Set("_method", "GET")
+ remoteParams.Set("count", "none")
+ if req.Form.Get("select") != "" {
+ remoteParams.Set("select", req.Form.Get("select"))
+ }
+ content, err := json.Marshal(uuids)
+ if err != nil {
+ return nil, "", err
+ }
+ remoteParams["filters"] = []string{fmt.Sprintf(`[["uuid", "in", %s]]`, content)}
+ enc := remoteParams.Encode()
+ remoteReq.Body = ioutil.NopCloser(bytes.NewBufferString(enc))
+
+ rc := multiClusterQueryResponseCollector{clusterID: clusterID}
+
+ if clusterID == h.handler.Cluster.ClusterID {
+ h.handler.localClusterRequest(w, &remoteReq,
+ rc.collectResponse)
+ } else {
+ h.handler.remoteClusterRequest(clusterID, w, &remoteReq,
+ rc.collectResponse)
+ }
+ if rc.error != nil {
+ return nil, "", rc.error
+ }
+
+ kind = rc.kind
+
+ if len(rc.responses) == 0 {
+ // We got zero responses, no point in doing
+ // another query.
+ return rp, kind, nil
+ }
+
+ rp = append(rp, rc.responses...)
+
+ // Go through the responses and determine what was
+ // returned. If there are remaining items, loop
+ // around and do another request with just the
+ // stragglers.
+ for _, i := range rc.responses {
+ uuid, ok := i["uuid"].(string)
+ if ok {
+ found[uuid] = true
}
}
+
+ l := []string{}
+ for _, u := range uuids {
+ if !found[u] {
+ l = append(l, u)
+ }
+ }
+ prev_len_uuids = len(uuids)
+ uuids = l
}
- if clusterId == "" && req.Method == "POST" && req.Header.Get("Content-Type") == "application/json" {
- var hasClusterId struct {
- ClusterID string `json:"cluster_id"`
+ return rp, kind, nil
+}
+
+func (h *genericFederatedRequestHandler) handleMultiClusterQuery(w http.ResponseWriter,
+ req *http.Request, clusterId *string) bool {
+
+ var filters [][]interface{}
+ err := json.Unmarshal([]byte(req.Form.Get("filters")), &filters)
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return true
+ }
+
+ // Split the list of uuids by prefix
+ queryClusters := make(map[string][]string)
+ expectCount := 0
+ for _, filter := range filters {
+ if len(filter) != 3 {
+ return false
}
- var cl int64
- if req.ContentLength > 0 {
- cl = req.ContentLength
+
+ if lhs, ok := filter[0].(string); !ok || lhs != "uuid" {
+ return false
+ }
+
+ op, ok := filter[1].(string)
+ if !ok {
+ return false
+ }
+
+ if op == "in" {
+ if rhs, ok := filter[2].([]interface{}); ok {
+ for _, i := range rhs {
+ if u, ok := i.(string); ok {
+ *clusterId = u[0:5]
+ queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+ expectCount += 1
+ }
+ }
+ }
+ } else if op == "=" {
+ if u, ok := filter[2].(string); ok {
+ *clusterId = u[0:5]
+ queryClusters[u[0:5]] = append(queryClusters[u[0:5]], u)
+ expectCount += 1
+ }
+ } else {
+ return false
}
- postBody := bytes.NewBuffer(make([]byte, 0, cl))
- defer req.Body.Close()
- rdr := io.TeeReader(req.Body, postBody)
+ }
+
+ if len(queryClusters) <= 1 {
+ // Query does not search for uuids across multiple
+ // clusters.
+ return false
+ }
- err := json.NewDecoder(rdr).Decode(&hasClusterId)
+ // Validations
+ count := req.Form.Get("count")
+ if count != "" && count != `none` && count != `"none"` {
+ httpserver.Error(w, "Federated multi-object query must have 'count=none'", http.StatusBadRequest)
+ return true
+ }
+ if req.Form.Get("limit") != "" || req.Form.Get("offset") != "" || req.Form.Get("order") != "" {
+ httpserver.Error(w, "Federated multi-object may not provide 'limit', 'offset' or 'order'.", http.StatusBadRequest)
+ return true
+ }
+ if expectCount > h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse() {
+ httpserver.Error(w, fmt.Sprintf("Federated multi-object request for %v objects which is more than max page size %v.",
+ expectCount, h.handler.Cluster.RequestLimits.GetMaxItemsPerResponse()), http.StatusBadRequest)
+ return true
+ }
+ if req.Form.Get("select") != "" {
+ foundUUID := false
+ var selects []string
+ err := json.Unmarshal([]byte(req.Form.Get("select")), &selects)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusBadRequest)
- return
+ return true
}
- req.Body = ioutil.NopCloser(postBody)
- clusterId = hasClusterId.ClusterID
+ for _, r := range selects {
+ if r == "uuid" {
+ foundUUID = true
+ break
+ }
+ }
+ if !foundUUID {
+ httpserver.Error(w, "Federated multi-object request must include 'uuid' in 'select'", http.StatusBadRequest)
+ return true
+ }
+ }
+
+ // Perform concurrent requests to each cluster
+
+ // use channel as a semaphore to limit the number of concurrent
+ // requests at a time
+ sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
+ defer close(sem)
+ wg := sync.WaitGroup{}
+
+ req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+ mtx := sync.Mutex{}
+ errors := []error{}
+ var completeResponses []map[string]interface{}
+ var kind string
+
+ for k, v := range queryClusters {
+ if len(v) == 0 {
+ // Nothing to query
+ continue
+ }
+
+ // blocks until it can put a value into the
+ // channel (which has a max queue capacity)
+ sem <- true
+ wg.Add(1)
+ go func(k string, v []string) {
+ rp, kn, err := h.remoteQueryUUIDs(w, req, k, v)
+ mtx.Lock()
+ if err == nil {
+ completeResponses = append(completeResponses, rp...)
+ kind = kn
+ } else {
+ errors = append(errors, err)
+ }
+ mtx.Unlock()
+ wg.Done()
+ <-sem
+ }(k, v)
+ }
+ wg.Wait()
+
+ if len(errors) > 0 {
+ var strerr []string
+ for _, e := range errors {
+ strerr = append(strerr, e.Error())
+ }
+ httpserver.Errors(w, strerr, http.StatusBadGateway)
+ return true
+ }
+
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ itemList := make(map[string]interface{})
+ itemList["items"] = completeResponses
+ itemList["kind"] = kind
+ json.NewEncoder(w).Encode(itemList)
+
+ return true
+}
+
+func (h *genericFederatedRequestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ m := h.matcher.FindStringSubmatch(req.URL.Path)
+ clusterId := ""
+
+ if len(m) > 0 && m[2] != "" {
+ clusterId = m[2]
+ }
+
+ // Get form parameters from URL and form body (if POST).
+ if err := loadParamsFromForm(req); err != nil {
+ httpserver.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+
+ // Check if the parameters have an explicit cluster_id
+ if req.Form.Get("cluster_id") != "" {
+ clusterId = req.Form.Get("cluster_id")
+ }
+
+ // Handle the POST-as-GET special case (workaround for large
+ // GET requests that potentially exceed maximum URL length,
+ // like multi-object queries where the filter has 100s of
+ // items)
+ effectiveMethod := req.Method
+ if req.Method == "POST" && req.Form.Get("_method") != "" {
+ effectiveMethod = req.Form.Get("_method")
+ }
+
+ if effectiveMethod == "GET" &&
+ clusterId == "" &&
+ req.Form.Get("filters") != "" &&
+ h.handleMultiClusterQuery(w, req, &clusterId) {
+ return
}
if clusterId == "" || clusterId == h.handler.Cluster.ClusterID {
return resp, nil
}
-type searchLocalClusterForPDH struct {
- sentResponse bool
-}
-
-func (s *searchLocalClusterForPDH) filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
+func filterLocalClusterResponse(resp *http.Response, requestError error) (newResponse *http.Response, err error) {
if requestError != nil {
return resp, requestError
}
if resp.StatusCode == 404 {
// Suppress returning this result, because we want to
// search the federation.
- s.sentResponse = false
return nil, nil
}
- s.sentResponse = true
return resp, nil
}
m = collectionRe.FindStringSubmatch(req.URL.Path)
clusterId := ""
- if len(m) == 3 {
+ if len(m) > 0 {
clusterId = m[2]
}
// Request for collection by PDH. Search the federation.
// First, query the local cluster.
- urlOut, insecure, err := findRailsAPI(h.handler.Cluster, h.handler.NodeProfile)
- if err != nil {
- httpserver.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
-
- urlOut = &url.URL{
- Scheme: urlOut.Scheme,
- Host: urlOut.Host,
- Path: req.URL.Path,
- RawPath: req.URL.RawPath,
- RawQuery: req.URL.RawQuery,
- }
- client := h.handler.secureClient
- if insecure {
- client = h.handler.insecureClient
- }
- sf := &searchLocalClusterForPDH{}
- h.handler.proxy.Do(w, req, urlOut, client, sf.filterLocalClusterResponse)
- if sf.sentResponse {
+ if h.handler.localClusterRequest(w, req, filterLocalClusterResponse) {
return
}
var errors []string
var errorCode int = 404
- // use channel as a semaphore to limit it to 4
- // parallel requests at a time
- sem := make(chan bool, 4)
+ // use channel as a semaphore to limit the number of concurrent
+ // requests at a time
+ sem := make(chan bool, h.handler.Cluster.RequestLimits.GetMultiClusterRequestConcurrency())
defer close(sem)
for remoteID := range h.handler.Cluster.RemoteClusters {
// blocks until it can put a value into the
mux := http.NewServeMux()
mux.Handle("/arvados/v1/workflows", &genericFederatedRequestHandler{next, h, wfRe})
mux.Handle("/arvados/v1/workflows/", &genericFederatedRequestHandler{next, h, wfRe})
- mux.Handle("/arvados/v1/containers", next)
+ mux.Handle("/arvados/v1/containers", &genericFederatedRequestHandler{next, h, containersRe})
mux.Handle("/arvados/v1/containers/", &genericFederatedRequestHandler{next, h, containersRe})
mux.Handle("/arvados/v1/container_requests", &genericFederatedRequestHandler{next, h, containerRequestsRe})
mux.Handle("/arvados/v1/container_requests/", &genericFederatedRequestHandler{next, h, containerRequestsRe})
import (
"encoding/json"
+ "fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
NodeProfiles: map[string]arvados.NodeProfile{
"*": nodeProfile,
},
+ RequestLimits: arvados.RequestLimits{
+ MaxItemsPerResponse: 1000,
+ MultiClusterRequestConcurrency: 4,
+ },
}, NodeProfile: &nodeProfile}
s.testServer = newServerFromIntegrationTestEnv(c)
s.testServer.Server.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(s.log, s.testHandler))
func (s *FederationSuite) TestRemoteWithTokenInQuery(c *check.C) {
req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+strings.Replace(arvadostest.WorkflowWithDefinitionYAMLUUID, "zzzzz-", "zmock-", 1)+"?api_token="+arvadostest.ActiveToken, nil)
s.testRequest(req)
- c.Assert(len(s.remoteMockRequests), check.Equals, 1)
+ c.Assert(s.remoteMockRequests, check.HasLen, 1)
pr := s.remoteMockRequests[0]
// Token is salted and moved from query to Authorization header.
c.Check(pr.URL.String(), check.Not(check.Matches), `.*api_token=.*`)
req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+strings.Replace(arvadostest.WorkflowWithDefinitionYAMLUUID, "zzzzz-", "zmock-", 1), nil)
req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
s.testRequest(req)
- c.Assert(len(s.remoteMockRequests), check.Equals, 1)
+ c.Assert(s.remoteMockRequests, check.HasLen, 1)
pr := s.remoteMockRequests[0]
// The salted token here has a "zzzzz-" UUID instead of a
// "ztest-" UUID because ztest's local database has the
req := httptest.NewRequest("GET", "/arvados/v1/workflows/"+strings.Replace(arvadostest.WorkflowWithDefinitionYAMLUUID, "zzzzz-", "zmock-", 1), nil)
req.Header.Set("Authorization", "Bearer "+remoteToken)
s.testRequest(req)
- c.Assert(len(s.remoteMockRequests), check.Equals, 1)
+ c.Assert(s.remoteMockRequests, check.HasLen, 1)
pr := s.remoteMockRequests[0]
c.Check(pr.Header.Get("Authorization"), check.Equals, "Bearer "+remoteToken)
}
var jresp httpserver.ErrorResponse
err := json.NewDecoder(resp.Body).Decode(&jresp)
c.Check(err, check.IsNil)
- c.Assert(len(jresp.Errors), check.Equals, 1)
+ c.Assert(jresp.Errors, check.HasLen, 1)
c.Check(jresp.Errors[0], check.Matches, re)
}
-func (s *FederationSuite) localServiceReturns404(c *check.C) *httpserver.Server {
+func (s *FederationSuite) localServiceHandler(c *check.C, h http.Handler) *httpserver.Server {
srv := &httpserver.Server{
Server: http.Server{
- Handler: http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
- w.WriteHeader(404)
- }),
+ Handler: h,
},
}
return srv
}
+func (s *FederationSuite) localServiceReturns404(c *check.C) *httpserver.Server {
+ return s.localServiceHandler(c, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.WriteHeader(404)
+ }))
+}
+
func (s *FederationSuite) TestGetLocalCollection(c *check.C) {
np := arvados.NodeProfile{
Controller: arvados.SystemServiceInstance{Listen: ":"},
c.Check(cr.Priority, check.Equals, 696)
}
-func (s *FederationSuite) TestCreateRemoteContainerRequest1(c *check.C) {
- defer s.localServiceReturns404(c).Close()
- req := httptest.NewRequest("POST", "/arvados/v1/container_requests",
- strings.NewReader(`{
- "cluster_id": "zzzzz",
- "container_request": {
- "name": "hello world",
- "state": "Uncommitted",
- "output_path": "/",
- "container_image": "123",
- "command": ["abc"]
- }
-}
-`))
- req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
- req.Header.Set("Content-type", "application/json")
- resp := s.testRequest(req)
- c.Check(resp.StatusCode, check.Equals, http.StatusOK)
- var cr arvados.ContainerRequest
- c.Check(json.NewDecoder(resp.Body).Decode(&cr), check.IsNil)
- c.Check(cr.Name, check.Equals, "hello world")
- c.Check(strings.HasPrefix(cr.UUID, "zzzzz-"), check.Equals, true)
-}
-
-func (s *FederationSuite) TestCreateRemoteContainerRequest2(c *check.C) {
+func (s *FederationSuite) TestCreateRemoteContainerRequest(c *check.C) {
defer s.localServiceReturns404(c).Close()
// pass cluster_id via query parameter, this allows arvados-controller
// to avoid parsing the body
c.Check(json.NewDecoder(resp.Body).Decode(&cn), check.IsNil)
c.Check(cn.UUID, check.Equals, arvadostest.QueuedContainerUUID)
}
+
+func (s *FederationSuite) TestListRemoteContainer(c *check.C) {
+ defer s.localServiceReturns404(c).Close()
+ req := httptest.NewRequest("GET", "/arvados/v1/containers?count=none&filters="+
+ url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v"]]]`, arvadostest.QueuedContainerUUID)), nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var cn arvados.ContainerList
+ c.Check(json.NewDecoder(resp.Body).Decode(&cn), check.IsNil)
+ c.Check(cn.Items[0].UUID, check.Equals, arvadostest.QueuedContainerUUID)
+}
+
+func (s *FederationSuite) TestListMultiRemoteContainers(c *check.C) {
+ defer s.localServiceHandler(c, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ bd, _ := ioutil.ReadAll(req.Body)
+ c.Check(string(bd), check.Equals, `_method=GET&count=none&filters=%5B%5B%22uuid%22%2C+%22in%22%2C+%5B%22zhome-xvhdp-cr5queuedcontnr%22%5D%5D%5D&select=%5B%22uuid%22%2C+%22command%22%5D`)
+ w.WriteHeader(200)
+ w.Write([]byte(`{"kind": "arvados#containerList", "items": [{"uuid": "zhome-xvhdp-cr5queuedcontnr", "command": ["abc"]}]}`))
+ })).Close()
+ req := httptest.NewRequest("GET", fmt.Sprintf("/arvados/v1/containers?count=none&filters=%s&select=%s",
+ url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`,
+ arvadostest.QueuedContainerUUID)),
+ url.QueryEscape(`["uuid", "command"]`)),
+ nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var cn arvados.ContainerList
+ c.Check(json.NewDecoder(resp.Body).Decode(&cn), check.IsNil)
+ c.Check(cn.Items, check.HasLen, 2)
+ mp := make(map[string]arvados.Container)
+ for _, cr := range cn.Items {
+ mp[cr.UUID] = cr
+ }
+ c.Check(mp[arvadostest.QueuedContainerUUID].Command, check.DeepEquals, []string{"echo", "hello"})
+ c.Check(mp[arvadostest.QueuedContainerUUID].ContainerImage, check.Equals, "")
+ c.Check(mp["zhome-xvhdp-cr5queuedcontnr"].Command, check.DeepEquals, []string{"abc"})
+ c.Check(mp["zhome-xvhdp-cr5queuedcontnr"].ContainerImage, check.Equals, "")
+}
+
+func (s *FederationSuite) TestListMultiRemoteContainerError(c *check.C) {
+ defer s.localServiceReturns404(c).Close()
+ req := httptest.NewRequest("GET", fmt.Sprintf("/arvados/v1/containers?count=none&filters=%s&select=%s",
+ url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`,
+ arvadostest.QueuedContainerUUID)),
+ url.QueryEscape(`["uuid", "command"]`)),
+ nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusBadGateway)
+ s.checkJSONErrorMatches(c, resp, `error fetching from zhome \(404 Not Found\): EOF`)
+}
+
+func (s *FederationSuite) TestListMultiRemoteContainersPaged(c *check.C) {
+
+ callCount := 0
+ defer s.localServiceHandler(c, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ bd, _ := ioutil.ReadAll(req.Body)
+ if callCount == 0 {
+ c.Check(string(bd), check.Equals, `_method=GET&count=none&filters=%5B%5B%22uuid%22%2C+%22in%22%2C+%5B%22zhome-xvhdp-cr5queuedcontnr%22%2C%22zhome-xvhdp-cr6queuedcontnr%22%5D%5D%5D`)
+ w.WriteHeader(200)
+ w.Write([]byte(`{"kind": "arvados#containerList", "items": [{"uuid": "zhome-xvhdp-cr5queuedcontnr", "command": ["abc"]}]}`))
+ } else if callCount == 1 {
+ c.Check(string(bd), check.Equals, `_method=GET&count=none&filters=%5B%5B%22uuid%22%2C+%22in%22%2C+%5B%22zhome-xvhdp-cr6queuedcontnr%22%5D%5D%5D`)
+ w.WriteHeader(200)
+ w.Write([]byte(`{"kind": "arvados#containerList", "items": [{"uuid": "zhome-xvhdp-cr6queuedcontnr", "command": ["efg"]}]}`))
+ }
+ callCount += 1
+ })).Close()
+ req := httptest.NewRequest("GET", fmt.Sprintf("/arvados/v1/containers?count=none&filters=%s",
+ url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr", "zhome-xvhdp-cr6queuedcontnr"]]]`,
+ arvadostest.QueuedContainerUUID))),
+ nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ c.Check(callCount, check.Equals, 2)
+ var cn arvados.ContainerList
+ c.Check(json.NewDecoder(resp.Body).Decode(&cn), check.IsNil)
+ c.Check(cn.Items, check.HasLen, 3)
+ mp := make(map[string]arvados.Container)
+ for _, cr := range cn.Items {
+ mp[cr.UUID] = cr
+ }
+ c.Check(mp[arvadostest.QueuedContainerUUID].Command, check.DeepEquals, []string{"echo", "hello"})
+ c.Check(mp["zhome-xvhdp-cr5queuedcontnr"].Command, check.DeepEquals, []string{"abc"})
+ c.Check(mp["zhome-xvhdp-cr6queuedcontnr"].Command, check.DeepEquals, []string{"efg"})
+}
+
+func (s *FederationSuite) TestListMultiRemoteContainersMissing(c *check.C) {
+
+ callCount := 0
+ defer s.localServiceHandler(c, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ bd, _ := ioutil.ReadAll(req.Body)
+ if callCount == 0 {
+ c.Check(string(bd), check.Equals, `_method=GET&count=none&filters=%5B%5B%22uuid%22%2C+%22in%22%2C+%5B%22zhome-xvhdp-cr5queuedcontnr%22%2C%22zhome-xvhdp-cr6queuedcontnr%22%5D%5D%5D`)
+ w.WriteHeader(200)
+ w.Write([]byte(`{"kind": "arvados#containerList", "items": [{"uuid": "zhome-xvhdp-cr6queuedcontnr", "command": ["efg"]}]}`))
+ } else if callCount == 1 {
+ c.Check(string(bd), check.Equals, `_method=GET&count=none&filters=%5B%5B%22uuid%22%2C+%22in%22%2C+%5B%22zhome-xvhdp-cr5queuedcontnr%22%5D%5D%5D`)
+ w.WriteHeader(200)
+ w.Write([]byte(`{"kind": "arvados#containerList", "items": []}`))
+ }
+ callCount += 1
+ })).Close()
+ req := httptest.NewRequest("GET", fmt.Sprintf("/arvados/v1/containers?count=none&filters=%s",
+ url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr", "zhome-xvhdp-cr6queuedcontnr"]]]`,
+ arvadostest.QueuedContainerUUID))),
+ nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ c.Check(callCount, check.Equals, 2)
+ var cn arvados.ContainerList
+ c.Check(json.NewDecoder(resp.Body).Decode(&cn), check.IsNil)
+ c.Check(cn.Items, check.HasLen, 2)
+ mp := make(map[string]arvados.Container)
+ for _, cr := range cn.Items {
+ mp[cr.UUID] = cr
+ }
+ c.Check(mp[arvadostest.QueuedContainerUUID].Command, check.DeepEquals, []string{"echo", "hello"})
+ c.Check(mp["zhome-xvhdp-cr6queuedcontnr"].Command, check.DeepEquals, []string{"efg"})
+}
+
+func (s *FederationSuite) TestListMultiRemoteContainerPageSizeError(c *check.C) {
+ s.testHandler.Cluster.RequestLimits.MaxItemsPerResponse = 1
+ req := httptest.NewRequest("GET", fmt.Sprintf("/arvados/v1/containers?count=none&filters=%s",
+ url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`,
+ arvadostest.QueuedContainerUUID))),
+ nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusBadRequest)
+ s.checkJSONErrorMatches(c, resp, `Federated multi-object request for 2 objects which is more than max page size 1.`)
+}
+
+func (s *FederationSuite) TestListMultiRemoteContainerLimitError(c *check.C) {
+ req := httptest.NewRequest("GET", fmt.Sprintf("/arvados/v1/containers?count=none&filters=%s&limit=1",
+ url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`,
+ arvadostest.QueuedContainerUUID))),
+ nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusBadRequest)
+ s.checkJSONErrorMatches(c, resp, `Federated multi-object may not provide 'limit', 'offset' or 'order'.`)
+}
+
+func (s *FederationSuite) TestListMultiRemoteContainerOffsetError(c *check.C) {
+ req := httptest.NewRequest("GET", fmt.Sprintf("/arvados/v1/containers?count=none&filters=%s&offset=1",
+ url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`,
+ arvadostest.QueuedContainerUUID))),
+ nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusBadRequest)
+ s.checkJSONErrorMatches(c, resp, `Federated multi-object may not provide 'limit', 'offset' or 'order'.`)
+}
+
+func (s *FederationSuite) TestListMultiRemoteContainerOrderError(c *check.C) {
+ req := httptest.NewRequest("GET", fmt.Sprintf("/arvados/v1/containers?count=none&filters=%s&order=uuid",
+ url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`,
+ arvadostest.QueuedContainerUUID))),
+ nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusBadRequest)
+ s.checkJSONErrorMatches(c, resp, `Federated multi-object may not provide 'limit', 'offset' or 'order'.`)
+}
+
+func (s *FederationSuite) TestListMultiRemoteContainerSelectError(c *check.C) {
+ req := httptest.NewRequest("GET", fmt.Sprintf("/arvados/v1/containers?count=none&filters=%s&select=%s",
+ url.QueryEscape(fmt.Sprintf(`[["uuid", "in", ["%v", "zhome-xvhdp-cr5queuedcontnr"]]]`,
+ arvadostest.QueuedContainerUUID)),
+ url.QueryEscape(`["command"]`)),
+ nil)
+ req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
+ resp := s.testRequest(req)
+ c.Check(resp.StatusCode, check.Equals, http.StatusBadRequest)
+ s.checkJSONErrorMatches(c, resp, `Federated multi-object request must include 'uuid' in 'select'`)
+}
})
}
-func (h *Handler) proxyRailsAPI(w http.ResponseWriter, req *http.Request, next http.Handler) {
+// localClusterRequest sets up a request so it can be proxied to the
+// local API server using proxy.Do(). Returns true if a response was
+// written, false if not.
+func (h *Handler) localClusterRequest(w http.ResponseWriter, req *http.Request, filter ResponseFilter) bool {
urlOut, insecure, err := findRailsAPI(h.Cluster, h.NodeProfile)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusInternalServerError)
- return
+ return true
}
urlOut = &url.URL{
Scheme: urlOut.Scheme,
if insecure {
client = h.insecureClient
}
- h.proxy.Do(w, req, urlOut, client, nil)
+ return h.proxy.Do(w, req, urlOut, client, filter)
+}
+
+func (h *Handler) proxyRailsAPI(w http.ResponseWriter, req *http.Request, next http.Handler) {
+ if !h.localClusterRequest(w, req, nil) && next != nil {
+ next.ServeHTTP(w, req)
+ }
}
// For now, findRailsAPI always uses the rails API running on this
type ResponseFilter func(*http.Response, error) (*http.Response, error)
+// Do sends a request, passes the result to the filter (if provided)
+// and then if the result is not suppressed by the filter, sends the
+// request to the ResponseWriter. Returns true if a response was written,
+// false if not.
func (p *proxy) Do(w http.ResponseWriter,
reqIn *http.Request,
urlOut *url.URL,
client *http.Client,
- filter ResponseFilter) {
+ filter ResponseFilter) bool {
// Copy headers from incoming request, then add/replace proxy
// headers like Via and X-Forwarded-For.
resp, err := client.Do(reqOut)
if filter == nil && err != nil {
httpserver.Error(w, err.Error(), http.StatusBadGateway)
- return
+ return true
}
// make sure original response body gets closed
if err != nil {
httpserver.Error(w, err.Error(), http.StatusBadGateway)
- return
+ return true
}
if resp == nil {
// filter() returned a nil response, this means suppress
// writing a response, for the case where there might
// be multiple response writers.
- return
+ return false
}
// the filter gave us a new response body, make sure that gets closed too.
if err != nil {
httpserver.Logger(reqIn).WithError(err).WithField("bytesCopied", n).Error("error copying response body")
}
+ return true
}
}
}
+type RequestLimits struct {
+ MaxItemsPerResponse int
+ MultiClusterRequestConcurrency int
+}
+
type Cluster struct {
ClusterID string `json:"-"`
ManagementToken string
HTTPRequestTimeout Duration
RemoteClusters map[string]RemoteCluster
PostgreSQL PostgreSQL
+ RequestLimits RequestLimits
}
type PostgreSQL struct {
}
}
+func (h RequestLimits) GetMultiClusterRequestConcurrency() int {
+ if h.MultiClusterRequestConcurrency == 0 {
+ return 4
+ }
+ return h.MultiClusterRequestConcurrency
+}
+
+func (h RequestLimits) GetMaxItemsPerResponse() int {
+ if h.MaxItemsPerResponse == 0 {
+ return 1000
+ }
+ return h.MaxItemsPerResponse
+}
+
type SystemServiceInstance struct {
Listen string
TLS bool
// Prefix (normally ".") is a top level directory, effectively
// prepended to all paths in the returned manifest.
MarshalManifest(prefix string) (string, error)
+
+ // Total data bytes in all files.
+ Size() int64
}
type collectionFileSystem struct {
return fs.fileSystem.root.(*dirnode).marshalManifest(prefix)
}
+func (fs *collectionFileSystem) Size() int64 {
+ return fs.fileSystem.root.(*dirnode).TreeSize()
+}
+
// filenodePtr is an offset into a file that is (usually) efficient to
// seek to. Specifically, if filenode.repacked==filenodePtr.repacked
// then
return dn.treenode.Child(name, replace)
}
-// sync flushes in-memory data (for all files in the tree rooted at
-// dn) to persistent storage. Caller must hold dn.Lock().
-func (dn *dirnode) sync() error {
+// sync flushes in-memory data (for the children with the given names,
+// which must be children of dn) to persistent storage. Caller must
+// have write lock on dn and the named children.
+func (dn *dirnode) sync(names []string) error {
type shortBlock struct {
fn *filenode
idx int
return nil
}
- names := make([]string, 0, len(dn.inodes))
- for name := range dn.inodes {
- names = append(names, name)
- }
- sort.Strings(names)
-
for _, name := range names {
fn, ok := dn.inodes[name].(*filenode)
if !ok {
continue
}
- fn.Lock()
- defer fn.Unlock()
for idx, seg := range fn.segments {
seg, ok := seg.(*memSegment)
if !ok {
var subdirs string
var blocks []string
- if err := dn.sync(); err != nil {
- return "", err
- }
-
names := make([]string, 0, len(dn.inodes))
- for name, node := range dn.inodes {
+ for name := range dn.inodes {
names = append(names, name)
+ }
+ sort.Strings(names)
+ for _, name := range names {
+ node := dn.inodes[name]
node.Lock()
defer node.Unlock()
}
- sort.Strings(names)
-
+ if err := dn.sync(names); err != nil {
+ return "", err
+ }
for _, name := range names {
switch node := dn.inodes[name].(type) {
case *dirnode:
return
}
+func (dn *dirnode) TreeSize() (bytes int64) {
+ dn.RLock()
+ defer dn.RUnlock()
+ for _, i := range dn.inodes {
+ switch i := i.(type) {
+ case *filenode:
+ bytes += i.Size()
+ case *dirnode:
+ bytes += i.TreeSize()
+ }
+ }
+ return
+}
+
type segment interface {
io.ReaderAt
Len() int
c.Check(err, check.IsNil)
m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
+ c.Check(s.fs.Size(), check.Equals, int64(6))
}
func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
c.Assert(err, check.IsNil)
defer f.Close()
for i := 0; i < 6502; i++ {
- switch rand.Int() & 3 {
- case 0:
+ r := rand.Uint32()
+ switch {
+ case r%11 == 0:
+ _, err := s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ case r&3 == 0:
f.Truncate(int64(rand.Intn(64)))
- case 1:
+ case r&3 == 1:
f.Seek(int64(rand.Intn(64)), io.SeekStart)
- case 2:
+ case r&3 == 2:
_, err := f.Write([]byte("beep boop"))
c.Check(err, check.IsNil)
- case 3:
+ case r&3 == 3:
_, err := ioutil.ReadAll(f)
c.Check(err, check.IsNil)
}
f, err := coll.FileSystem(nil, nil)
c.Check(err, check.IsNil)
c.Logf("%s loaded", time.Now())
+ c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))
for i := 0; i < dirCount; i++ {
for j := 0; j < fileCount; j++ {
gem 'protected_attributes'
group :test, :development do
- gem 'factory_girl_rails'
+ gem 'factory_bot_rails'
gem 'database_cleaner'
gem 'ruby-prof'
# Note: "require: false" here tells bunder not to automatically
eventmachine (1.2.6)
execjs (2.7.0)
extlib (0.9.16)
- factory_girl (4.9.0)
+ factory_bot (4.11.1)
activesupport (>= 3.0.0)
- factory_girl_rails (4.9.0)
- factory_girl (~> 4.9.0)
+ factory_bot_rails (4.11.1)
+ factory_bot (~> 4.11.1)
railties (>= 3.0.0)
faraday (0.12.2)
multipart-post (>= 1.2, < 3)
arvados-cli
coffee-rails (~> 4.0)
database_cleaner
- factory_girl_rails
+ factory_bot_rails
faye-websocket
httpclient
jquery-rails
def contents
load_searchable_objects
- send_json({
+ list = {
:kind => "arvados#objectList",
:etag => "",
:self_link => "",
:limit => @limit,
:items_available => @items_available,
:items => @objects.as_api_response(nil)
- })
+ }
+ if @extra_included
+ list[:included] = @extra_included.as_api_response(nil, {select: @select})
+ end
+ send_json(list)
end
def shared
# This also returns (in the "included" field) the objects that own
# those projects (users or non-project groups).
#
- # select groups that are readable by current user AND
- # the owner_uuid is a user (but not the current user) OR
- # the owner_uuid is not readable by the current user
- # the owner_uuid is a group but group_class is not a project
#
# The intended use of this endpoint is to support clients which
# wish to browse those projects which are visible to the user but
load_limit_offset_order_params
load_filters_param
- read_parent_check = if current_user.is_admin
- ""
- else
- "NOT EXISTS(SELECT 1 FROM #{PERMISSION_VIEW} WHERE "+
- "user_uuid=(:user_uuid) AND target_uuid=groups.owner_uuid AND perm_level >= 1) OR "
- end
+ @objects = exclude_home Group.readable_by(*@read_users), Group
- @objects = Group.readable_by(*@read_users).where("groups.owner_uuid IN (SELECT users.uuid FROM users WHERE users.uuid != (:user_uuid)) OR "+
- read_parent_check+
- "EXISTS(SELECT 1 FROM groups as gp where gp.uuid=groups.owner_uuid and gp.group_class != 'project')",
- user_uuid: current_user.uuid)
apply_where_limit_order_params
- owners = @objects.map(&:owner_uuid).to_a
-
if params["include"] == "owner_uuid"
+ owners = @objects.map(&:owner_uuid).to_set
@extra_included = []
[Group, User].each do |klass|
- @extra_included += klass.readable_by(*@read_users).where(uuid: owners).to_a
+ @extra_included += klass.readable_by(*@read_users).where(uuid: owners.to_a).to_a
end
end
else
filter_by_owner[:owner_uuid] = @object.uuid
end
+
+ if params['exclude_home_project']
+ raise ArgumentError.new "Cannot use 'exclude_home_project' with a parent object"
+ end
end
+ included_by_uuid = {}
+
seen_last_class = false
klasses.each do |klass|
@offset = 0 if seen_last_class # reset offset for the new next type being processed
@objects = klass.readable_by(*@read_users, {:include_trash => params[:include_trash]}).
order(request_order).where(where_conds)
+ if params['exclude_home_project']
+ @objects = exclude_home @objects, klass
+ end
+
klass_limit = limit_all - all_objects.count
@limit = klass_limit
apply_where_limit_order_params klass
# with limit=0 and just accumulate items_available.
limit_all = all_objects.count
end
+
+ if params["include"] == "owner_uuid"
+ owners = klass_object_list[:items].map {|i| i[:owner_uuid]}.to_set
+ [Group, User].each do |ownerklass|
+ ownerklass.readable_by(*@read_users).where(uuid: owners.to_a).each do |ow|
+ included_by_uuid[ow.uuid] = ow
+ end
+ end
+ end
+ end
+
+ if params["include"]
+ @extra_included = included_by_uuid.values
end
@objects = all_objects
@offset = offset_all
end
+ protected
+
+ def exclude_home objectlist, klass
+ # select records that are readable by current user AND
+ # the owner_uuid is a user (but not the current user) OR
+ # the owner_uuid is not readable by the current user
+ # the owner_uuid is a group but group_class is not a project
+
+ read_parent_check = if current_user.is_admin
+ ""
+ else
+ "NOT EXISTS(SELECT 1 FROM #{PERMISSION_VIEW} WHERE "+
+ "user_uuid=(:user_uuid) AND target_uuid=#{klass.table_name}.owner_uuid AND perm_level >= 1) OR "
+ end
+
+ objectlist.where("#{klass.table_name}.owner_uuid IN (SELECT users.uuid FROM users WHERE users.uuid != (:user_uuid)) OR "+
+ read_parent_check+
+ "EXISTS(SELECT 1 FROM groups as gp where gp.uuid=#{klass.table_name}.owner_uuid and gp.group_class != 'project')",
+ user_uuid: current_user.uuid)
+ end
+
end
crunchLogThrottleLines: Rails.application.config.crunch_log_throttle_lines,
crunchLimitLogBytesPerJob: Rails.application.config.crunch_limit_log_bytes_per_job,
crunchLogPartialLineThrottlePeriod: Rails.application.config.crunch_log_partial_line_throttle_period,
+ crunchLogUpdatePeriod: Rails.application.config.crunch_log_update_period,
+ crunchLogUpdateSize: Rails.application.config.crunch_log_update_size,
remoteHosts: Rails.configuration.remote_hosts,
remoteHostsViaDNS: Rails.configuration.remote_hosts_via_dns,
websocketUrl: Rails.application.config.websocket_address,
before_save :update_secret_mounts_md5
before_save :scrub_secret_mounts
before_save :clear_runtime_status_when_queued
+ after_save :update_cr_logs
after_save :handle_completed
after_save :propagate_priority
after_commit { UpdatePriority.run_update_thread }
current_user.andand.is_admin
end
- def permission_to_update
- # Override base permission check to allow auth_uuid to set progress and
- # output (only). Whether it is legal to set progress and output in the current
- # state has already been checked in validate_change.
- current_user.andand.is_admin ||
- (!current_api_client_authorization.nil? and
- [self.auth_uuid, self.locked_by_uuid].include? current_api_client_authorization.uuid)
- end
-
def ensure_owner_uuid_is_permitted
- # Override base permission check to allow auth_uuid to set progress and
- # output (only). Whether it is legal to set progress and output in the current
- # state has already been checked in validate_change.
- if !current_api_client_authorization.nil? and self.auth_uuid == current_api_client_authorization.uuid
- check_update_whitelist [:progress, :output]
- else
- super
- end
+ # validate_change ensures owner_uuid can't be changed at all --
+ # except during create, which requires admin privileges. Checking
+ # permission here would be superfluous.
+ true
end
def set_timestamps
def validate_change
permitted = [:state]
+ progress_attrs = [:progress, :runtime_status, :log, :output]
+ final_attrs = [:exit_code, :finished_at]
if self.new_record?
permitted.push(:owner_uuid, :command, :container_image, :cwd,
case self.state
when Locked
- permitted.push :priority, :runtime_status
+ permitted.push :priority, :runtime_status, :log
when Queued
permitted.push :priority
when Running
- permitted.push :priority, :progress, :output, :runtime_status
+ permitted.push :priority, *progress_attrs
if self.state_changed?
permitted.push :started_at
end
when Complete
if self.state_was == Running
- permitted.push :finished_at, :output, :log, :exit_code
+ permitted.push *final_attrs, *progress_attrs
end
when Cancelled
case self.state_was
when Running
- permitted.push :finished_at, :output, :log
+ permitted.push :finished_at, *progress_attrs
when Queued, Locked
permitted.push :finished_at, :log
end
return false
end
+ if current_api_client_authorization.andand.uuid.andand == self.auth_uuid
+ # The contained process itself can update progress indicators,
+ # but can't change priority etc.
+ permitted = permitted & (progress_attrs + final_attrs + [:state] - [:log])
+ elsif self.locked_by_uuid && self.locked_by_uuid != current_api_client_authorization.andand.uuid
+ # When locked, progress fields cannot be updated by the wrong
+ # dispatcher, even though it has admin privileges.
+ permitted = permitted - progress_attrs
+ end
check_update_whitelist permitted
end
end
end
+ def update_cr_logs
+ # If self.final?, this update is superfluous: the final log/output
+ # update will be done when handle_completed calls finalize! on
+ # each requesting CR.
+ return if self.final? || !self.log_changed?
+ leave_modified_by_user_alone do
+ ContainerRequest.where(container_uuid: self.uuid).each do |cr|
+ cr.update_collections(container: self, collections: ['log'])
+ cr.save!
+ end
+ end
+ end
+
def assign_auth
if self.auth_uuid_changed?
return errors.add :auth_uuid, 'is readonly'
# Finalize the container request after the container has
# finished/cancelled.
def finalize!
- out_coll = nil
- log_coll = nil
- c = Container.find_by_uuid(container_uuid)
- ['output', 'log'].each do |out_type|
- pdh = c.send(out_type)
+ update_collections(container: Container.find_by_uuid(container_uuid))
+ update_attributes!(state: Final)
+ end
+
+ def update_collections(container:, collections: ['log', 'output'])
+ collections.each do |out_type|
+ pdh = container.send(out_type)
next if pdh.nil?
coll_name = "Container #{out_type} for request #{uuid}"
trash_at = nil
end
manifest = Collection.where(portable_data_hash: pdh).first.manifest_text
- coll = Collection.new(owner_uuid: owner_uuid,
- manifest_text: manifest,
- portable_data_hash: pdh,
- name: coll_name,
- trash_at: trash_at,
- delete_at: trash_at,
- properties: {
- 'type' => out_type,
- 'container_request' => uuid,
- })
- coll.save_with_unique_name!
- if out_type == 'output'
- out_coll = coll.uuid
- else
- log_coll = coll.uuid
+ coll_uuid = self.send(out_type + '_uuid')
+ coll = coll_uuid.nil? ? nil : Collection.where(uuid: coll_uuid).first
+ if !coll
+ coll = Collection.new(
+ owner_uuid: self.owner_uuid,
+ name: coll_name,
+ properties: {
+ 'type' => out_type,
+ 'container_request' => uuid,
+ })
end
+ coll.assign_attributes(
+ portable_data_hash: pdh,
+ manifest_text: manifest,
+ trash_at: trash_at,
+ delete_at: trash_at)
+ coll.save_with_unique_name!
+ self.send(out_type + '_uuid=', coll.uuid)
end
- update_attributes!(state: Final, output_uuid: out_coll, log_uuid: log_coll)
end
def self.full_text_searchable_columns
permitted.push :container_count
end
+ if current_user.andand.is_admin
+ permitted.push :log_uuid
+ end
+
when Final
if self.state_was == Committed
# "Cancel" means setting priority=0, state=Committed
crunch_log_partial_line_throttle_period: 5
+ # Container logs are written to Keep and saved in a collection,
+ # which is updated periodically while the container runs. This
+ # value sets the interval (given in seconds) between collection
+ # updates.
+ crunch_log_update_period: 1800
+
+ # The log collection is also updated when the specified amount of
+ # log data (given in bytes) is produced in less than one update
+ # period.
+ crunch_log_update_size: 33554432
+
# Attributes to suppress in events and audit logs. Notably,
# specifying ["manifest_text"] here typically makes the database
# smaller and faster.
#
# SPDX-License-Identifier: AGPL-3.0
-FactoryGirl.define do
+FactoryBot.define do
factory :api_client do
- is_trusted false
+ is_trusted { false }
to_create do |instance|
CurrentApiClientHelper.act_as_system_user do
instance.save!
#
# SPDX-License-Identifier: AGPL-3.0
-FactoryGirl.define do
+FactoryBot.define do
factory :api_client_authorization do
api_client
- scopes ['all']
+ scopes { ['all'] }
trait :trusted do
association :api_client, factory: :api_client, is_trusted: true
#
# SPDX-License-Identifier: AGPL-3.0
-FactoryGirl.define do
+FactoryBot.define do
factory :group do
end
end
#
# SPDX-License-Identifier: AGPL-3.0
-FactoryGirl.define do
+FactoryBot.define do
factory :link do
factory :permission_link do
- link_class 'permission'
+ link_class { 'permission' }
end
end
end
extend CurrentApiClient
end
-FactoryGirl.define do
+FactoryBot.define do
factory :user do
transient do
- join_groups []
+ join_groups { [] }
end
after :create do |user, evaluator|
CurrentApiClientHelper.act_as_system_user do
end
end
end
- first_name "Factory"
- last_name "Factory"
+ first_name { "Factory" }
+ last_name { "Factory" }
identity_url do
"https://example.com/#{rand(2**24).to_s(36)}"
end
factory :active_user do
- is_active true
+ is_active { true }
after :create do |user|
CurrentApiClientHelper.act_as_system_user do
Link.create!(tail_uuid: user.uuid,
assert_equal json_response['included'][0]["uuid"], groups(:group_for_sharing_tests).uuid
end
+ ### contents with exclude_home_project
+
+ test 'contents, exclude home owned by another user' do
+ authorize_with :user_bar_in_sharing_group
+
+ act_as_system_user do
+ Link.create!(
+ tail_uuid: users(:user_bar_in_sharing_group).uuid,
+ link_class: 'permission',
+ name: 'can_read',
+ head_uuid: groups(:project_owned_by_foo).uuid)
+ Link.create!(
+ tail_uuid: users(:user_bar_in_sharing_group).uuid,
+ link_class: 'permission',
+ name: 'can_read',
+ head_uuid: collections(:collection_owned_by_foo).uuid)
+ end
+
+ get :contents, {:include => "owner_uuid", :exclude_home_project => true}
+
+ assert_equal 2, json_response['items'].length
+ assert_equal json_response['items'][0]["uuid"], groups(:project_owned_by_foo).uuid
+ assert_equal json_response['items'][1]["uuid"], collections(:collection_owned_by_foo).uuid
+
+ assert_equal 1, json_response['included'].length
+ assert_equal json_response['included'][0]["uuid"], users(:user_foo_in_sharing_group).uuid
+ end
+
+ test 'contents, exclude home, owned by unreadable project' do
+ authorize_with :user_bar_in_sharing_group
+
+ act_as_system_user do
+ Group.find_by_uuid(groups(:project_owned_by_foo).uuid).update!(owner_uuid: groups(:aproject).uuid)
+ Link.create!(
+ tail_uuid: users(:user_bar_in_sharing_group).uuid,
+ link_class: 'permission',
+ name: 'can_read',
+ head_uuid: groups(:project_owned_by_foo).uuid)
+ end
+
+ get :contents, {:include => "owner_uuid", :exclude_home_project => true}
+
+ assert_equal 1, json_response['items'].length
+ assert_equal json_response['items'][0]["uuid"], groups(:project_owned_by_foo).uuid
+
+ assert_equal 0, json_response['included'].length
+ end
+
+ test 'contents, exclude home, owned by non-project' do
+ authorize_with :user_bar_in_sharing_group
+
+ act_as_system_user do
+ Group.find_by_uuid(groups(:project_owned_by_foo).uuid).update!(owner_uuid: groups(:group_for_sharing_tests).uuid)
+ end
+
+ get :contents, {:include => "owner_uuid", :exclude_home_project => true}
+
+ assert_equal 1, json_response['items'].length
+ assert_equal json_response['items'][0]["uuid"], groups(:project_owned_by_foo).uuid
+
+ assert_equal 1, json_response['included'].length
+ assert_equal json_response['included'][0]["uuid"], groups(:group_for_sharing_tests).uuid
+ end
+
+
+ test 'contents, exclude home, with parent specified' do
+ authorize_with :active
+
+ get :contents, {id: groups(:aproject).uuid, :include => "owner_uuid", :exclude_home_project => true}
+
+ assert_response 422
+ end
+
end
require File.expand_path('../../config/environment', __FILE__)
require 'rails/test_help'
require 'mocha'
-require 'mocha/mini_test'
+require 'mocha/minitest'
module ArvadosTestSupport
def json_response
end
class ActiveSupport::TestCase
- include FactoryGirl::Syntax::Methods
+ include FactoryBot::Syntax::Methods
fixtures :all
include ArvadosTestSupport
assert c.update_attributes(exit_code: 1, state: Container::Complete)
end
- test "locked_by_uuid can set output on running container" do
- c, _ = minimal_new
+ test "locked_by_uuid can update log when locked/running, and output when running" do
+ logcoll = collections(:real_log_collection)
+ c, cr1 = minimal_new
+ cr2 = ContainerRequest.new(DEFAULT_ATTRS)
+ cr2.state = ContainerRequest::Committed
+ act_as_user users(:active) do
+ cr2.save!
+ end
+ assert_equal cr1.container_uuid, cr2.container_uuid
+
+ logpdh_time1 = logcoll.portable_data_hash
+
set_user_from_auth :dispatch1
c.lock
- c.update_attributes! state: Container::Running
-
assert_equal c.locked_by_uuid, Thread.current[:api_client_authorization].uuid
-
- assert c.update_attributes output: collections(:collection_owned_by_active).portable_data_hash
- assert c.update_attributes! state: Container::Complete
- end
-
- test "auth_uuid can set output on running container, but not change container state" do
+ c.update_attributes!(log: logpdh_time1)
+ c.update_attributes!(state: Container::Running)
+ cr1.reload
+ cr2.reload
+ cr1log_uuid = cr1.log_uuid
+ cr2log_uuid = cr2.log_uuid
+ assert_not_nil cr1log_uuid
+ assert_not_nil cr2log_uuid
+ assert_not_equal logcoll.uuid, cr1log_uuid
+ assert_not_equal logcoll.uuid, cr2log_uuid
+ assert_not_equal cr1log_uuid, cr2log_uuid
+
+ logcoll.update_attributes!(manifest_text: logcoll.manifest_text + ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n")
+ logpdh_time2 = logcoll.portable_data_hash
+
+ assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
+ assert c.update_attributes(log: logpdh_time2)
+ assert c.update_attributes(state: Container::Complete, log: logcoll.portable_data_hash)
+ c.reload
+ assert_equal collections(:collection_owned_by_active).portable_data_hash, c.output
+ assert_equal logpdh_time2, c.log
+ refute c.update_attributes(output: nil)
+ refute c.update_attributes(log: nil)
+ cr1.reload
+ cr2.reload
+ assert_equal cr1log_uuid, cr1.log_uuid
+ assert_equal cr2log_uuid, cr2.log_uuid
+ assert_equal [logpdh_time2], Collection.where(uuid: [cr1log_uuid, cr2log_uuid]).to_a.collect(&:portable_data_hash).uniq
+ end
+
+ test "auth_uuid can set output, progress, runtime_status, state on running container -- but not log" do
c, _ = minimal_new
set_user_from_auth :dispatch1
c.lock
c.update_attributes! state: Container::Running
- Thread.current[:api_client_authorization] = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
- Thread.current[:user] = User.find_by_id(Thread.current[:api_client_authorization].user_id)
- assert c.update_attributes output: collections(:collection_owned_by_active).portable_data_hash
+ auth = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
+ Thread.current[:api_client_authorization] = auth
+ Thread.current[:api_client] = auth.api_client
+ Thread.current[:token] = auth.token
+ Thread.current[:user] = auth.user
- assert_raises ArvadosModel::PermissionDeniedError do
- # auth_uuid cannot set container state
- c.update_attributes state: Container::Complete
- end
+ assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
+ assert c.update_attributes(runtime_status: {'warning' => 'something happened'})
+ assert c.update_attributes(progress: 0.5)
+ refute c.update_attributes(log: collections(:real_log_collection).portable_data_hash)
+ c.reload
+ assert c.update_attributes(state: Container::Complete, exit_code: 0)
end
test "not allowed to set output that is not readable by current user" do
c.update_attributes! state: Container::Running
set_user_from_auth :running_to_be_deleted_container_auth
- assert_raises ArvadosModel::PermissionDeniedError do
- c.update_attributes! output: collections(:foo_file).portable_data_hash
- end
+ refute c.update_attributes(output: collections(:foo_file).portable_data_hash)
end
test "can set trashed output on running container" do
// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
- Docker ThinDockerClient
- client *arvados.Client
- ArvClient IArvadosClient
- Kc IKeepClient
- arvados.Container
+ Docker ThinDockerClient
+ client *arvados.Client
+ ArvClient IArvadosClient
+ Kc IKeepClient
+ Container arvados.Container
ContainerConfig dockercontainer.Config
- dockercontainer.HostConfig
- token string
- ContainerID string
- ExitCode *int
- NewLogWriter
- loggingDone chan bool
- CrunchLog *ThrottledLogger
- Stdout io.WriteCloser
- Stderr io.WriteCloser
- LogCollection arvados.CollectionFileSystem
- LogsPDH *string
- RunArvMount
- MkTempDir
- ArvMount *exec.Cmd
- ArvMountPoint string
- HostOutputDir string
- Binds []string
- Volumes map[string]struct{}
- OutputPDH *string
- SigChan chan os.Signal
- ArvMountExit chan error
- SecretMounts map[string]arvados.Mount
- MkArvClient func(token string) (IArvadosClient, error)
- finalState string
- parentTemp string
+ HostConfig dockercontainer.HostConfig
+ token string
+ ContainerID string
+ ExitCode *int
+ NewLogWriter NewLogWriter
+ loggingDone chan bool
+ CrunchLog *ThrottledLogger
+ Stdout io.WriteCloser
+ Stderr io.WriteCloser
+ logUUID string
+ logMtx sync.Mutex
+ LogCollection arvados.CollectionFileSystem
+ LogsPDH *string
+ RunArvMount RunArvMount
+ MkTempDir MkTempDir
+ ArvMount *exec.Cmd
+ ArvMountPoint string
+ HostOutputDir string
+ Binds []string
+ Volumes map[string]struct{}
+ OutputPDH *string
+ SigChan chan os.Signal
+ ArvMountExit chan error
+ SecretMounts map[string]arvados.Mount
+ MkArvClient func(token string) (IArvadosClient, error)
+ finalState string
+ parentTemp string
ListProcesses func() ([]PsProcess, error)
}
}
+func (runner *ContainerRunner) updateLogs() {
+ ticker := time.NewTicker(crunchLogUpdatePeriod / 360)
+ defer ticker.Stop()
+
+ sigusr1 := make(chan os.Signal, 1)
+ signal.Notify(sigusr1, syscall.SIGUSR1)
+ defer signal.Stop(sigusr1)
+
+ saveAtTime := time.Now().Add(crunchLogUpdatePeriod)
+ saveAtSize := crunchLogUpdateSize
+ var savedSize int64
+ for {
+ select {
+ case <-ticker.C:
+ case <-sigusr1:
+ saveAtTime = time.Now()
+ }
+ runner.logMtx.Lock()
+ done := runner.LogsPDH != nil
+ runner.logMtx.Unlock()
+ if done {
+ return
+ }
+ size := runner.LogCollection.Size()
+ if size == savedSize || (time.Now().Before(saveAtTime) && size < saveAtSize) {
+ continue
+ }
+ saveAtTime = time.Now().Add(crunchLogUpdatePeriod)
+ saveAtSize = runner.LogCollection.Size() + crunchLogUpdateSize
+ saved, err := runner.saveLogCollection(false)
+ if err != nil {
+ runner.CrunchLog.Printf("error updating log collection: %s", err)
+ continue
+ }
+
+ var updated arvados.Container
+ err = runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+ "container": arvadosclient.Dict{"log": saved.PortableDataHash},
+ }, &updated)
+ if err != nil {
+ runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
+ continue
+ }
+
+ savedSize = size
+ }
+}
+
// CaptureOutput saves data from the container's output directory if
// needed, and updates the container output accordingly.
func (runner *ContainerRunner) CaptureOutput() error {
// -- it exists only to send logs to other channels.
return nil
}
+ saved, err := runner.saveLogCollection(true)
+ if err != nil {
+ return fmt.Errorf("error saving log collection: %s", err)
+ }
+ runner.logMtx.Lock()
+ defer runner.logMtx.Unlock()
+ runner.LogsPDH = &saved.PortableDataHash
+ return nil
+}
+func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) {
+ runner.logMtx.Lock()
+ defer runner.logMtx.Unlock()
+ if runner.LogsPDH != nil {
+ // Already finalized.
+ return
+ }
mt, err := runner.LogCollection.MarshalManifest(".")
if err != nil {
- return fmt.Errorf("While creating log manifest: %v", err)
- }
-
- var response arvados.Collection
- err = runner.ArvClient.Create("collections",
- arvadosclient.Dict{
- "ensure_unique_name": true,
- "collection": arvadosclient.Dict{
- "is_trashed": true,
- "name": "logs for " + runner.Container.UUID,
- "manifest_text": mt}},
- &response)
+ err = fmt.Errorf("error creating log manifest: %v", err)
+ return
+ }
+ updates := arvadosclient.Dict{
+ "name": "logs for " + runner.Container.UUID,
+ "manifest_text": mt,
+ }
+ if final {
+ updates["is_trashed"] = true
+ } else {
+ exp := time.Now().Add(crunchLogUpdatePeriod * 24)
+ updates["trash_at"] = exp
+ updates["delete_at"] = exp
+ }
+ reqBody := arvadosclient.Dict{"collection": updates}
+ if runner.logUUID == "" {
+ reqBody["ensure_unique_name"] = true
+ err = runner.ArvClient.Create("collections", reqBody, &response)
+ } else {
+ err = runner.ArvClient.Update("collections", runner.logUUID, reqBody, &response)
+ }
if err != nil {
- return fmt.Errorf("While creating log collection: %v", err)
+ return
}
- runner.LogsPDH = &response.PortableDataHash
- return nil
+ runner.logUUID = response.UUID
+ return
}
// UpdateContainerRunning updates the container state to "Running"
cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
loadLogThrottleParams(api)
+ go cr.updateLogs()
return cr, nil
}
mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
outmap := output.(*arvados.Collection)
outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
+ outmap.UUID = fmt.Sprintf("zzzzz-4zz18-%15.15x", md5.Sum([]byte(mt)))
}
return nil
if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
client.WasSetRunning = true
}
+ } else if resourceType == "collections" {
+ mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
+ output.(*arvados.Collection).UUID = uuid
+ output.(*arvados.Collection).PortableDataHash = fmt.Sprintf("%x", md5.Sum([]byte(mt)))
}
return nil
}
cr.ArvMountPoint = ""
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
cr.statInterval = 5 * time.Second
err := cr.SetupMounts()
c.Check(err, IsNil)
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts["/out"] = arvados.Mount{Kind: "tmp"}
cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
- cr.OutputPath = "/out"
+ cr.Container.OutputPath = "/out"
err := cr.SetupMounts()
c.Check(err, IsNil)
cr.ArvMountPoint = ""
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
apiflag := true
cr.Container.RuntimeConstraints.API = &apiflag
cr.Container.Mounts = map[string]arvados.Mount{
"/keeptmp": {Kind: "collection", Writable: true},
}
- cr.OutputPath = "/keeptmp"
+ cr.Container.OutputPath = "/keeptmp"
os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
"/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
"/keepout": {Kind: "collection", Writable: true},
}
- cr.OutputPath = "/keepout"
+ cr.Container.OutputPath = "/keepout"
os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
"/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
"/keepout": {Kind: "collection", Writable: true},
}
- cr.OutputPath = "/keepout"
+ cr.Container.OutputPath = "/keepout"
os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
"/tmp": {Kind: "tmp"},
"/tmp/foo": {Kind: "collection"},
}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
Path: "baz",
Writable: true},
}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz", os.ModePerm)
"/tmp": {Kind: "tmp"},
"/tmp/foo": {Kind: "tmp"},
}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
err := cr.SetupMounts()
c.Check(err, NotNil)
Path: "/",
},
}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
err := cr.SetupMounts()
c.Check(err, IsNil)
var crunchLogThrottleLines int64 = 1024
var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
var crunchLogBytesPerEvent int64 = 4096
-var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
+var crunchLogSecondsBetweenEvents = time.Second
+var crunchLogUpdatePeriod = time.Hour / 2
+var crunchLogUpdateSize = int64(1 << 25)
// ArvLogWriter is an io.WriteCloser that processes each write by
// writing it through to another io.WriteCloser (typically a
// load the rate limit discovery config parameters
func loadLogThrottleParams(clnt IArvadosClient) {
- param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
- if err == nil {
- crunchLimitLogBytesPerJob = int64(param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogThrottleBytes")
- if err == nil {
- crunchLogThrottleBytes = int64(param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogThrottlePeriod")
- if err == nil {
- crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogThrottleLines")
- if err == nil {
- crunchLogThrottleLines = int64(param.(float64))
+ loadDuration := func(dst *time.Duration, key string) {
+ if param, err := clnt.Discovery(key); err != nil {
+ return
+ } else if d, ok := param.(float64); !ok {
+ return
+ } else {
+ *dst = time.Duration(d) * time.Second
+ }
}
-
- param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
- if err == nil {
- crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
+ loadInt64 := func(dst *int64, key string) {
+ if param, err := clnt.Discovery(key); err != nil {
+ return
+ } else if val, ok := param.(float64); !ok {
+ return
+ } else {
+ *dst = int64(val)
+ }
}
- param, err = clnt.Discovery("crunchLogBytesPerEvent")
- if err == nil {
- crunchLogBytesPerEvent = int64(param.(float64))
- }
+ loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob")
+ loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes")
+ loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod")
+ loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines")
+ loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod")
+ loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent")
+ loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents")
+ loadInt64(&crunchLogUpdateSize, "crunchLogUpdateSize")
+ loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
- param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
- if err == nil {
- crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))
- }
}
func (s *LoggingTestSuite) SetUpTest(c *C) {
s.client = arvados.NewClientFromEnv()
+ crunchLogUpdatePeriod = time.Hour * 24 * 365
+ crunchLogUpdateSize = 1 << 50
}
func (s *LoggingTestSuite) TestWriteLogs(c *C) {
c.Check(mt, Equals, ". 48f9023dc683a850b1c9b482b14c4b97+163 0:83:crunch-run.txt 83:80:stdout.txt\n")
}
+func (s *LoggingTestSuite) TestLogUpdate(c *C) {
+ for _, trial := range []struct {
+ maxBytes int64
+ maxDuration time.Duration
+ }{
+ {1000, 10 * time.Second},
+ {1000000, time.Millisecond},
+ } {
+ c.Logf("max %d bytes, %s", trial.maxBytes, trial.maxDuration)
+ crunchLogUpdateSize = trial.maxBytes
+ crunchLogUpdatePeriod = trial.maxDuration
+
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ defer kc.Close()
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
+ ts := &TestTimestamper{}
+ cr.CrunchLog.Timestamper = ts.Timestamp
+ w, err := cr.NewLogWriter("stdout")
+ c.Assert(err, IsNil)
+ stdout := NewThrottledLogger(w)
+ stdout.Timestamper = ts.Timestamp
+
+ c.Check(cr.logUUID, Equals, "")
+ cr.CrunchLog.Printf("Hello %1000s", "space")
+ for i, t := 0, time.NewTicker(time.Millisecond); i < 5000 && cr.logUUID == ""; i++ {
+ <-t.C
+ }
+ c.Check(cr.logUUID, Not(Equals), "")
+ cr.CrunchLog.Print("Goodbye")
+ fmt.Fprint(stdout, "Goodbye\n")
+ cr.CrunchLog.Close()
+ stdout.Close()
+ w.Close()
+
+ mt, err := cr.LogCollection.MarshalManifest(".")
+ c.Check(err, IsNil)
+ // Block packing depends on whether there's an update
+ // between the two Goodbyes -- either way the first
+ // block will be 4dc76.
+ c.Check(mt, Matches, `. 4dc76e0a212bfa30c39d76d8c16da0c0\+1038 (afc503bc1b9a828b4bb543cb629e936c\+78|90699dc22545cd74a0664303f70bc05a\+39 276b49339fd5203d15a93ff3de11bfb9\+39) 0:1077:crunch-run.txt 1077:39:stdout.txt\n`)
+ }
+}
+
func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytes(c *C) {
s.testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds")
}
export HOME=/var/lib/arvados
+chown arvbox /dev/stderr
+
if test -z "$1" ; then
exec chpst -u arvbox:arvbox:docker $0-service
else
from __future__ import print_function
import arvados
+import itertools
import Queue
import threading
-import _strptime
from crunchstat_summary import logger
def __init__(self, collection_id):
self._collection_id = collection_id
self._label = collection_id
- self._reader = None
+ self._readers = []
def __str__(self):
return self._label
logger.debug('load collection %s', self._collection_id)
collection = arvados.collection.CollectionReader(self._collection_id)
filenames = [filename for filename in collection]
- if len(filenames) == 1:
- filename = filenames[0]
- else:
- filename = 'crunchstat.txt'
- self._label = "{}/{}".format(self._collection_id, filename)
- self._reader = collection.open(filename)
- return iter(self._reader)
+ # Crunch2 has multiple stats files
+ if len(filenames) > 1:
+ filenames = ['crunchstat.txt', 'arv-mount.txt']
+ for filename in filenames:
+ try:
+ self._readers.append(collection.open(filename))
+ except IOError:
+ logger.warn('Unable to open %s', filename)
+ self._label = "{}/{}".format(self._collection_id, filenames[0])
+ return itertools.chain(*[iter(reader) for reader in self._readers])
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
- if self._reader:
- self._reader.close()
- self._reader = None
+ if self._readers:
+ for reader in self._readers:
+ reader.close()
+ self._readers = []
class LiveLogReader(object):
class Task(object):
def __init__(self):
self.starttime = None
+ self.finishtime = None
self.series = collections.defaultdict(list)
logger.debug('%s: done %s', self.label, uuid)
continue
- m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr crunchstat: (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+ # 2017-12-02_17:15:08 e51c5-8i9sb-mfp68stkxnqdd6m 63676 0 stderr crunchstat: keepcalls 0 put 2576 get -- interval 10.0000 seconds 0 put 2576 get
+ m = re.search(r'^(?P<timestamp>[^\s.]+)(\.\d+)? (?P<job_uuid>\S+) \d+ (?P<seq>\d+) stderr (?P<crunchstat>crunchstat: )(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
if not m:
continue
else:
# crunch2
- m = re.search(r'^(?P<timestamp>\S+) (?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
+ # 2017-12-01T16:56:24.723509200Z crunchstat: keepcalls 0 put 3 get -- interval 10.0000 seconds 0 put 3 get
+ m = re.search(r'^(?P<timestamp>\S+) (?P<crunchstat>crunchstat: )?(?P<category>\S+) (?P<current>.*?)( -- interval (?P<interval>.*))?\n$', line)
if not m:
continue
raise ValueError("Cannot parse timestamp {!r}".format(
timestamp))
- if not task.starttime:
- task.starttime = timestamp
+ if task.starttime is None:
logger.debug('%s: task %s starttime %s',
self.label, task_id, timestamp)
- task.finishtime = timestamp
+ if task.starttime is None or timestamp < task.starttime:
+ task.starttime = timestamp
+ if task.finishtime is None or timestamp > task.finishtime:
+ task.finishtime = timestamp
- if not self.starttime:
+ if self.starttime is None or timestamp < task.starttime:
self.starttime = timestamp
- self.finishtime = timestamp
+ if self.finishtime is None or timestamp < task.finishtime:
+ self.finishtime = timestamp
+
+ if (not self.detected_crunch1) and task.starttime is not None and task.finishtime is not None:
+ elapsed = (task.finishtime - task.starttime).seconds
+ self.task_stats[task_id]['time'] = {'elapsed': elapsed}
+ if elapsed > self.stats_max['time']['elapsed']:
+ self.stats_max['time']['elapsed'] = elapsed
this_interval_s = None
for group in ['current', 'interval']:
else:
stats[stat] = int(val)
except ValueError as e:
- logger.warning(
- 'Error parsing value %r (stat %r, category %r): %r',
- val, stat, category, e)
- logger.warning('%s', line)
+ # If the line doesn't start with 'crunchstat:' we
+ # might have mistaken an error message for a
+ # structured crunchstat line.
+ if m.group("crunchstat") is None or m.group("category") == "crunchstat":
+ logger.warning("%s: log contains message\n %s", self.label, line)
+ else:
+ logger.warning(
+ '%s: Error parsing value %r (stat %r, category %r): %r',
+ self.label, val, stat, category, e)
+ logger.warning('%s', line)
continue
if 'user' in stats or 'sys' in stats:
stats['user+sys'] = stats.get('user', 0) + stats.get('sys', 0)
--- /dev/null
+category metric task_max task_max_rate job_total
+blkio:0:0 read 0 0 0
+blkio:0:0 write 0 0 0
+fuseops read 0 0 0
+fuseops write 0 0 0
+keepcache hit 0 0 0
+keepcache miss 0 0 0
+keepcalls get 0 0 0
+keepcalls put 0 0 0
+net:keep0 rx 0 0 0
+net:keep0 tx 0 0 0
+net:keep0 tx+rx 0 0 0
+time elapsed 10 - 10
+# Number of tasks: 1
+# Max CPU time spent by a single task: 0s
+# Max CPU usage in a single interval: 0%
+# Overall CPU usage: 0%
+# Max memory used by a single task: 0.00GB
+# Max network traffic in a single task: 0.00GB
+# Max network speed in a single interval: 0.00MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
+#!! container max CPU usage was 0% -- try runtime_constraints "vcpus":1
+#!! container max RSS was 0 MiB -- try runtime_constraints "ram":0
--- /dev/null
+category metric task_max task_max_rate job_total
+cpu cpus 20 - -
+cpu sys 0.39 0.04 0.39
+cpu user 2.06 0.20 2.06
+cpu user+sys 2.45 0.24 2.45
+mem cache 172032 - -
+mem pgmajfault 0 - 0
+mem rss 69525504 - -
+mem swap 0 - -
+net:eth0 rx 859480 1478.97 859480
+net:eth0 tx 55888 395.71 55888
+net:eth0 tx+rx 915368 1874.69 915368
+statfs available 397744787456 - 397744787456
+statfs total 402611240960 - 402611240960
+statfs used 4870303744 52426.18 4866453504
+time elapsed 20 - 20
+# Number of tasks: 1
+# Max CPU time spent by a single task: 2.45s
+# Max CPU usage in a single interval: 23.70%
+# Overall CPU usage: 12.25%
+# Max memory used by a single task: 0.07GB
+# Max network traffic in a single task: 0.00GB
+# Max network speed in a single interval: 0.00MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
+#!! container max CPU usage was 24% -- try runtime_constraints "vcpus":1
+#!! container max RSS was 67 MiB -- try runtime_constraints "ram":1020054732
--- /dev/null
+category metric task_max task_max_rate job_total
+blkio:0:0 read 0 0 0
+blkio:0:0 write 0 0 0
+cpu cpus 20 - -
+cpu sys 0.39 0.04 0.39
+cpu user 2.06 0.20 2.06
+cpu user+sys 2.45 0.24 2.45
+fuseops read 0 0 0
+fuseops write 0 0 0
+keepcache hit 0 0 0
+keepcache miss 0 0 0
+keepcalls get 0 0 0
+keepcalls put 0 0 0
+mem cache 172032 - -
+mem pgmajfault 0 - 0
+mem rss 69525504 - -
+mem swap 0 - -
+net:eth0 rx 859480 1478.97 859480
+net:eth0 tx 55888 395.71 55888
+net:eth0 tx+rx 915368 1874.69 915368
+net:keep0 rx 0 0 0
+net:keep0 tx 0 0 0
+net:keep0 tx+rx 0 0 0
+statfs available 397744787456 - 397744787456
+statfs total 402611240960 - 402611240960
+statfs used 4870303744 52426.18 4866453504
+time elapsed 20 - 20
+# Number of tasks: 1
+# Max CPU time spent by a single task: 2.45s
+# Max CPU usage in a single interval: 23.70%
+# Overall CPU usage: 12.25%
+# Max memory used by a single task: 0.07GB
+# Max network traffic in a single task: 0.00GB
+# Max network speed in a single interval: 0.00MB/s
+# Keep cache miss rate 0.00%
+# Keep cache utilization 0.00%
+#!! container max CPU usage was 24% -- try runtime_constraints "vcpus":1
+#!! container max RSS was 67 MiB -- try runtime_constraints "ram":1020054732
+++ /dev/null
-category metric task_max task_max_rate job_total
-cpu cpus 20 - -
-cpu sys 0.82 0.08 0.82
-cpu user 2.31 0.22 2.31
-cpu user+sys 3.13 0.30 3.13
-mem cache 23846912 - -
-mem pgmajfault 121 - 121
-mem rss 65470464 - -
-mem swap 0 - -
-net:eth0 rx 500762 951.15 500762
-net:eth0 tx 36242 226.61 36242
-net:eth0 tx+rx 537004 1177.76 537004
-# Number of tasks: 1
-# Max CPU time spent by a single task: 3.13s
-# Max CPU usage in a single interval: 29.89%
-# Overall CPU usage: 0%
-# Max memory used by a single task: 0.07GB
-# Max network traffic in a single task: 0.00GB
-# Max network speed in a single interval: 0.00MB/s
-# Keep cache miss rate 0.00%
-# Keep cache utilization 0.00%
-#!! container max CPU usage was 30% -- try runtime_constraints "vcpus":1
-#!! container max RSS was 63 MiB -- try runtime_constraints "ram":1020054732
class SummarizeContainer(ReportDiff):
fake_container = {
- 'uuid': '9tee4-dz642-mjfb0i5hzojp16a',
+ 'uuid': '9tee4-dz642-lymtndkpy39eibk',
'created_at': '2017-08-18T14:27:25.371388141',
'log': '9tee4-4zz18-ihyzym9tcwjwg4r',
}
'created_at': '2017-08-18T14:27:25.242339223Z',
'container_uuid': fake_container['uuid'],
}
+ reportfile = os.path.join(
+ TESTS_DIR, 'container_9tee4-dz642-lymtndkpy39eibk.txt.gz')
logfile = os.path.join(
- TESTS_DIR, 'container_9tee4-dz642-mjfb0i5hzojp16a-crunchstat.txt.gz')
+ TESTS_DIR, 'container_9tee4-dz642-lymtndkpy39eibk-crunchstat.txt.gz')
+ arvmountlog = os.path.join(
+ TESTS_DIR, 'container_9tee4-dz642-lymtndkpy39eibk-arv-mount.txt.gz')
@mock.patch('arvados.collection.CollectionReader')
@mock.patch('arvados.api')
mock_api().containers().get().execute.return_value = self.fake_container
mock_cr().__iter__.return_value = [
'crunch-run.txt', 'stderr.txt', 'node-info.txt',
- 'container.json', 'crunchstat.txt']
- mock_cr().open.return_value = gzip.open(self.logfile)
+ 'container.json', 'crunchstat.txt', 'arv-mount.txt']
+ def _open(n):
+ if n == "crunchstat.txt":
+ return gzip.open(self.logfile)
+ elif n == "arv-mount.txt":
+ return gzip.open(self.arvmountlog)
+ mock_cr().open.side_effect = _open
args = crunchstat_summary.command.ArgumentParser().parse_args(
['--job', self.fake_request['uuid']])
cmd = crunchstat_summary.command.Command(args)
cmd.run()
- self.diff_known_report(self.logfile, cmd)
+ self.diff_known_report(self.reportfile, cmd)
class SummarizeJob(ReportDiff):