sdk/cwl/tests/wf/indir1/hello2.txt
sdk/cwl/tests/chipseq/data/Genomes/*
CITATION.cff
+SECURITY.md
\ No newline at end of file
--- /dev/null
+# Arvados Project Security Policy
+
+## Supported Versions
+
+The Arvados project will issue security fixes by making point releases
+on the current stable release series (X.Y.0, X.Y.1, X.Y.2, etc).
+
+The most recent stable release version, along with release notes and
+upgrade notes documenting security fixes, can be found at these
+locations:
+
+https://arvados.org/releases/
+
+https://doc.arvados.org/admin/upgrading.html
+
+The Arvados project does not support versions older than the current
+stable release except by special arrangement (contact info@curii.com).
+
+Release announcements, including notification of security fixes, are
+sent to the Arvados announcement list:
+
+https://lists.arvados.org//mailman/listinfo/arvados
+
+## Reporting Security Issues
+
+If you believe you have found a security vulnerability in any Arvados-owned repository, please report it to us through coordinated disclosure.
+
+**Please do not report security vulnerabilities through public GitHub issues, discussions, or pull requests.**
+
+Instead, please send an email to dev@curii.com.
+
+Please include as much of the information listed below as you can to help us better understand and resolve the issue:
+
+ * The type of issue (e.g., remote code execution, SQL injection, or cross-site scripting)
+ * Full paths of source file(s) related to the manifestation of the issue
+ * The location of the affected source code (tag/branch/commit or direct URL)
+ * Any special configuration required to reproduce the issue
+ * Step-by-step instructions to reproduce the issue
+ * Proof-of-concept or exploit code (if possible)
+ * Impact of the issue, including how an attacker might exploit the issue
+
+This information will help us triage your report more quickly.
flash = {}
# set owner_uuid to current project, provided it is writable
- action_data = Oj.load(params['action_data'] || "{}")
+ action_data = Oj.safe_load(params['action_data'] || "{}")
if action_data['current_project_uuid'] and
current_project = Group.find?(action_data['current_project_uuid']) and
current_project.writable_by.andand.include?(current_user.uuid)
if params[:filters]
filters = params[:filters]
if filters.is_a? String
- filters = Oj.load filters
+ filters = Oj.safe_load filters
elsif filters.is_a? Array
filters = filters.collect do |filter|
if filter.is_a? String
# Accept filters[]=["foo","=","bar"]
- Oj.load filter
+ Oj.safe_load filter
else
# Accept filters=[["foo","=","bar"]]
filter
@updates.keys.each do |attr|
if @object.send(attr).is_a? Hash
if @updates[attr].is_a? String
- @updates[attr] = Oj.load @updates[attr]
+ @updates[attr] = Oj.safe_load @updates[attr]
end
if params[:merge] || params["merge_#{attr}".to_sym]
# Merge provided Hash with current Hash, instead of
"data-emptytext" => "none",
"data-placement" => "bottom",
"data-type" => "select",
- "data-source" => (opt_empty_selection + primary_type[:symbols].map {|i| {:value => i, :text => i} }).to_json,
+ "data-source" => (opt_empty_selection + primary_type[:symbols].map {|i| {:value => cwl_shortname(i), :text => cwl_shortname(i)} }).to_json,
"data-url" => url_for(action: "update", id: object.uuid, controller: object.class.to_s.pluralize.underscore, merge: true),
"data-title" => "Set value for #{cwl_shortname(input_schema[:id])}",
"data-name" => dn,
def initialize(request_url, api_response)
@api_status = api_response.status_code
@api_response_s = api_response.content
- @api_response = Oj.load(@api_response_s, :symbol_keys => true)
+ @api_response = Oj.strict_load(@api_response_s, :symbol_keys => true)
errors = @api_response[:errors]
if errors.respond_to?(:join)
errors = errors.join("\n\n")
end
begin
- resp = Oj.load(msg.content, :symbol_keys => true)
+ resp = Oj.strict_load(msg.content, :symbol_keys => true)
rescue Oj::ParseError
resp = nil
end
}
end
def json_response
- Oj.load(@response.body)
+ Oj.safe_load(@response.body)
end
end
fmt.Fprintf(stderr, "target UUID is not a container or container request UUID: %s\n", targetUUID)
return 1
}
- sshconn, err := rpcconn.ContainerSSH(context.TODO(), arvados.ContainerSSHOptions{
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ sshconn, err := rpcconn.ContainerSSH(ctx, arvados.ContainerSSHOptions{
UUID: targetUUID,
DetachKeys: *detachKeys,
LoginUsername: loginUsername,
return 0
}
- ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()
_, err := io.Copy(stdout, sshconn.Conn)
"git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
check "gopkg.in/check.v1"
)
ContainerUUID: uuid,
Address: "0.0.0.0:0",
AuthSecret: authSecret,
+ Log: ctxlog.TestLogger(c),
// Just forward connections to localhost instead of a
// container, so we can test without running a
// container.
cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
cmd.Stdout = &stdout
cmd.Stderr = &stderr
+ stdin, err := cmd.StdinPipe()
+ c.Assert(err, check.IsNil)
+ go fmt.Fprintln(stdin, "data appears on stdin, but stdin does not close; cmd should exit anyway, not hang")
+ time.AfterFunc(5*time.Second, func() {
+ c.Errorf("timed out -- remote end is probably hung waiting for us to close stdin")
+ stdin.Close()
+ })
c.Check(cmd.Run(), check.IsNil)
c.Check(stdout.String(), check.Equals, "ok\n")
"git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/health"
+ dispatchslurm "git.arvados.org/arvados.git/services/crunch-dispatch-slurm"
"git.arvados.org/arvados.git/services/githttpd"
keepbalance "git.arvados.org/arvados.git/services/keep-balance"
keepweb "git.arvados.org/arvados.git/services/keep-web"
"crunch-run": crunchrun.Command,
"dispatch-cloud": dispatchcloud.Command,
"dispatch-lsf": lsf.DispatchCommand,
+ "dispatch-slurm": dispatchslurm.Command,
"git-httpd": githttpd.Command,
"health": healthCommand,
"install": install.Command,
- Computation with Crunch:
- api/execution.html.textile.liquid
- architecture/dispatchcloud.html.textile.liquid
+ - architecture/hpc.html.textile.liquid
- architecture/singularity.html.textile.liquid
- Other:
- api/permission-model.html.textile.liquid
table(table table-bordered table-condensed).
|_.Service |_.ExternalURL required? |_.InternalURLs required?|_.InternalURLs must be reachable from other cluster nodes?|_.Note|
|railsapi |no |yes|no ^1^|InternalURLs only used by Controller|
-|controller |yes |yes|no ^2^|InternalURLs only used by reverse proxy (e.g. Nginx)|
+|controller |yes |yes|yes ^2,4^|InternalURLs used by reverse proxy and container shell connections|
|arvados-dispatch-cloud|no |yes|no ^3^|InternalURLs only used to expose Prometheus metrics|
|arvados-dispatch-lsf|no |yes|no ^3^|InternalURLs only used to expose Prometheus metrics|
|git-http |yes |yes|no ^2^|InternalURLs only used by reverse proxy (e.g. Nginx)|
^1^ If @Controller@ runs on a different host than @RailsAPI@, the @InternalURLs@ will need to be reachable from the host that runs @Controller@.
^2^ If the reverse proxy (e.g. Nginx) does not run on the same host as the Arvados service it fronts, the @InternalURLs@ will need to be reachable from the host that runs the reverse proxy.
^3^ If the Prometheus metrics are not collected from the same machine that runs the service, the @InternalURLs@ will need to be reachable from the host that collects the metrics.
+^4^ If dispatching containers to HPC (Slurm/LSF) and there are multiple @Controller@ services, they must be able to connect to one another using their InternalURLs, otherwise the "tunnel connections":{{site.baseurl}}/architecture/hpc.html enabling "container shell access":{{site.baseurl}}/install/container-shell-access.html will not work.
When @InternalURLs@ do not need to be reachable from other nodes, it is most secure to use loopback addresses as @InternalURLs@, e.g. @http://127.0.0.1:9005@.
<div class="releasenotes">
</notextile>
-h2(#main). development main (as of 2022-06-02)
+h2(#main). development main (as of 2022-08-09)
+
+"previous: Upgrading to 2.4.2":#v2_4_2
+
+h2(#v2_4_2). v2.4.2 (2022-08-09)
"previous: Upgrading to 2.4.1":#v2_4_1
+h3. GHSL-2022-063
+
+GitHub Security Lab (GHSL) reported a remote code execution (RCE) vulnerability in the Arvados Workbench that allows authenticated attackers to execute arbitrary code via specially crafted JSON payloads.
+
+This vulnerability is fixed in 2.4.2 ("#19316":https://dev.arvados.org/issues/19316).
+
+It is likely that this vulnerability exists in all versions of Arvados up to 2.4.1.
+
+This vulnerability is specific to the Ruby on Rails Workbench application ("Workbench 1"). We do not believe any other Arvados components, including the TypesScript browser-based Workbench application ("Workbench 2") or API Server, are vulnerable to this attack.
+
+h3. CVE-2022-31163 and CVE-2022-32224
+
+As a precaution, Arvados 2.4.2 has includes security updates for Ruby on Rails and the TZInfo Ruby gem. However, there are no known exploits in Arvados based on these CVEs.
+
+h3. Disable Sharing URLs UI
+
+There is now a configuration option @Workbench.DisableSharingURLsUI@ for admins to disable the user interface for "sharing link" feature (URLs which can be sent to users to access the data in a specific collection in Arvados without an Arvados account), for organizations where sharing links violate their data sharing policy.
+
h2(#v2_4_1). v2.4.1 (2022-06-02)
"previous: Upgrading to 2.4.0":#v2_4_0
|runtime_auth_scopes|array of string|The scopes associated with the auth token used to run this container.||
|output_storage_classes|array of strings|The storage classes that will be used for the log and output collections of this container request|default is ["default"]|
|output_properties|hash|User metadata properties to set on the output collection. The output collection will also have default properties "type" ("intermediate" or "output") and "container_request" (the uuid of container request that produced the collection).|
+|cumulative_cost|number|Estimated cost of the cloud VMs used to satisfy the request, including retried attempts and completed subrequests, but not including reused containers.|0 if container was reused or VM price information was not available.|
h2(#priority). Priority
|interactive_session_started|boolean|Indicates whether @arvados-client shell@ has been used to run commands in the container, which may have altered the container's behavior and output.||
|output_storage_classes|array of strings|The storage classes that will be used for the log and output collections of this container||
|output_properties|hash|User metadata properties to set on the output collection.|
+|cost|number|Estimated cost of the cloud VM used to run the container.|0 if not available.|
+|subrequests_cost|number|Total estimated cumulative cost of container requests submitted by this container.|0 if not available.|
h2(#container_states). Container states
--- /dev/null
+---
+layout: default
+navsection: architecture
+title: Dispatching containers to HPC
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+Arvados can be configured to run containers on an HPC cluster using Slurm or LSF, as an alternative to "dispatching to cloud VMs":dispatchcloud.html.
+
+In this configuration, the appropriate Arvados dispatcher service -- @crunch-dispatch-slurm@ or @arvados-dispatch-lsf@ -- picks up each container as it appears in the Arvados queue and submits a short shell script as a batch job to the HPC job queue. The shell script executes the @crunch-run@ container supervisor which retrieves the container specification from the Arvados controller, starts an arv-mount process, runs the container using @docker exec@ or @singularity exec@, and sends updates (logs, outputs, exit code, etc.) back to the Arvados controller.
+
+h2. Container communication channel (reverse https tunnel)
+
+The crunch-run program runs a gateway server to facilitate the “container shell” feature. However, depending on the site's network topology, the Arvados controller may not be able to connect directly to the compute node where a given crunch-run process is running.
+
+Instead, in the HPC configuration, crunch-run connects to the Arvados controller at startup and sets up a multiplexed tunnel, allowing the controller process to connect to crunch-run's gateway server without initiating a connection to the compute node, or even knowing the compute node's IP address.
+
+This means that when a client requests a container shell connection, the traffic goes through two or three servers:
+# The client connects to a controller host C1.
+# If the multiplexed tunnel is connected to a different controller host C2, then C1 proxies the incoming request to C2, using C2's InternalURL.
+# The controller host (C1 or C2) uses the multiplexed tunnel to connect to crunch-run's container gateway.
+
+h2. Scaling
+
+The @API.MaxConcurrentRequests@ configuration should not be set too low, or the long-lived tunnel connections can starve other clients.
<pre>
$ curl -O https://git.arvados.org/arvados.git/blob_plain/refs/heads/main:/tools/arvbox/bin/arvbox
$ chmod +x arvbox
-$ ./arvbox start localdemo latest
+$ ./arvbox start localdemo
+$ ./arvbox root-cert
$ ./arvbox adduser demouser demo@example.com
</pre>
-You can now log in as @demouser@ using the password you selected.
+You will then need to "install the arvbox root certificate":#root-cert . After that, you can now log in to Workbench as @demouser@ with the password you selected.
h2. Requirements
-* Linux 3.x+ and Docker 1.9+
+* Linux 3.x+ and Docker 1.10+
* Minimum of 3 GiB of RAM + additional memory to run jobs
* Minimum of 3 GiB of disk + storage for actual data
listusers list user logins
</pre>
-h2. Install root certificate
+h2(#root-cert). Install root certificate
Arvbox creates root certificate to authorize Arvbox services. Installing the root certificate into your web browser will prevent security errors when accessing Arvbox services with your web browser. Every Arvbox instance generates a new root signing key.
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.0 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
+ github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 // indirect
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
github.com/kevinburke/ssh_config v0.0.0-20171013211458-802051befeb5 // indirect
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
+github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87 h1:xixZ2bWeofWV68J+x6AzmKuVM/JWCQwkWm6GW/MUR6I=
+github.com/hashicorp/yamux v0.0.0-20211028200310-0bc27b27de87/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
RAM: hostram,
Scratch: scratch,
IncludedScratch: scratch,
+ Price: 1.0,
}}
cfg.Clusters[id] = cc
}
return conn.chooseBackend(options.UUID).ContainerUnlock(ctx, options)
}
-func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ContainerSSHConnection, error) {
+func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ConnectionResponse, error) {
return conn.chooseBackend(options.UUID).ContainerSSH(ctx, options)
}
+func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (arvados.ConnectionResponse, error) {
+ return conn.chooseBackend(options.UUID).ContainerGatewayTunnel(ctx, options)
+}
+
func (conn *Conn) ContainerRequestList(ctx context.Context, options arvados.ListOptions) (arvados.ContainerRequestList, error) {
return conn.generated_ContainerRequestList(ctx, options)
}
time.Sleep(time.Second / 2)
}
}
+ c.Logf("cr.CumulativeCost == %f", cr.CumulativeCost)
+ c.Check(cr.CumulativeCost, check.Not(check.Equals), 0.0)
if expectExitCode >= 0 {
c.Check(ctr.State, check.Equals, arvados.ContainerStateComplete)
c.Check(ctr.ExitCode, check.Equals, expectExitCode)
"net/http"
"os"
"strings"
+ "sync"
"time"
"git.arvados.org/arvados.git/lib/controller/railsproxy"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
+ "github.com/hashicorp/yamux"
"github.com/sirupsen/logrus"
)
lastVocabularyRefreshCheck time.Time
lastVocabularyError error
loginController
+ gwTunnels map[string]*yamux.Session
+ gwTunnelsLock sync.Mutex
}
func NewConn(cluster *arvados.Cluster) *Conn {
import (
"bufio"
+ "bytes"
"context"
"crypto/hmac"
"crypto/sha256"
+ "crypto/subtle"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
+ "io"
+ "io/ioutil"
+ "net"
"net/http"
"net/url"
"strings"
+ "git.arvados.org/arvados.git/lib/controller/rpc"
+ "git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/auth"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
+ "github.com/hashicorp/yamux"
+)
+
+var (
+ forceProxyForTest = false
+ forceInternalURLForTest *arvados.URL
)
// ContainerSSH returns a connection to the SSH server in the
//
// If the returned error is nil, the caller is responsible for closing
// sshconn.Conn.
-func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOptions) (sshconn arvados.ContainerSSHConnection, err error) {
+func (conn *Conn) ContainerSSH(ctx context.Context, opts arvados.ContainerSSHOptions) (sshconn arvados.ConnectionResponse, err error) {
user, err := conn.railsProxy.UserGetCurrent(ctx, arvados.GetOptions{})
if err != nil {
- return
+ return sshconn, err
}
ctr, err := conn.railsProxy.ContainerGet(ctx, arvados.GetOptions{UUID: opts.UUID})
if err != nil {
- return
+ return sshconn, err
}
ctxRoot := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{conn.cluster.SystemRootToken}})
if !user.IsAdmin || !conn.cluster.Containers.ShellAccess.Admin {
if !conn.cluster.Containers.ShellAccess.User {
- err = httpserver.ErrorWithStatus(errors.New("shell access is disabled in config"), http.StatusServiceUnavailable)
- return
+ return sshconn, httpserver.ErrorWithStatus(errors.New("shell access is disabled in config"), http.StatusServiceUnavailable)
}
- var crs arvados.ContainerRequestList
- crs, err = conn.railsProxy.ContainerRequestList(ctxRoot, arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"container_uuid", "=", opts.UUID}}})
+ crs, err := conn.railsProxy.ContainerRequestList(ctxRoot, arvados.ListOptions{Limit: -1, Filters: []arvados.Filter{{"container_uuid", "=", opts.UUID}}})
if err != nil {
- return
+ return sshconn, err
}
for _, cr := range crs.Items {
if cr.ModifiedByUserUUID != user.UUID {
- err = httpserver.ErrorWithStatus(errors.New("permission denied: container is associated with requests submitted by other users"), http.StatusForbidden)
- return
+ return sshconn, httpserver.ErrorWithStatus(errors.New("permission denied: container is associated with requests submitted by other users"), http.StatusForbidden)
}
}
if crs.ItemsAvailable != len(crs.Items) {
- err = httpserver.ErrorWithStatus(errors.New("incomplete response while checking permission"), http.StatusInternalServerError)
- return
+ return sshconn, httpserver.ErrorWithStatus(errors.New("incomplete response while checking permission"), http.StatusInternalServerError)
}
}
- switch ctr.State {
- case arvados.ContainerStateQueued, arvados.ContainerStateLocked:
- err = httpserver.ErrorWithStatus(fmt.Errorf("container is not running yet (state is %q)", ctr.State), http.StatusServiceUnavailable)
- return
- case arvados.ContainerStateRunning:
- if ctr.GatewayAddress == "" {
- err = httpserver.ErrorWithStatus(errors.New("container is running but gateway is not available -- installation problem or feature not supported"), http.StatusServiceUnavailable)
- return
+ conn.gwTunnelsLock.Lock()
+ tunnel := conn.gwTunnels[opts.UUID]
+ conn.gwTunnelsLock.Unlock()
+
+ if ctr.State == arvados.ContainerStateQueued || ctr.State == arvados.ContainerStateLocked {
+ return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("container is not running yet (state is %q)", ctr.State), http.StatusServiceUnavailable)
+ } else if ctr.State != arvados.ContainerStateRunning {
+ return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("container has ended (state is %q)", ctr.State), http.StatusGone)
+ }
+
+ // targetHost is the value we'll use in the Host header in our
+ // "Upgrade: ssh" http request. It's just a placeholder
+ // "localhost", unless we decide to connect directly, in which
+ // case we'll set it to the gateway's external ip:host. (The
+ // gateway doesn't even look at it, but we might as well.)
+ targetHost := "localhost"
+ myURL, _ := service.URLFromContext(ctx)
+
+ var rawconn net.Conn
+ if host, _, splitErr := net.SplitHostPort(ctr.GatewayAddress); splitErr == nil && host != "" && host != "127.0.0.1" {
+ // If crunch-run provided a GatewayAddress like
+ // "ipaddr:port", that means "ipaddr" is one of the
+ // external interfaces where the gateway is
+ // listening. In that case, it's the most
+ // reliable/direct option, so we use it even if a
+ // tunnel might also be available.
+ targetHost = ctr.GatewayAddress
+ rawconn, err = net.Dial("tcp", ctr.GatewayAddress)
+ if err != nil {
+ return sshconn, httpserver.ErrorWithStatus(err, http.StatusServiceUnavailable)
}
- default:
- err = httpserver.ErrorWithStatus(fmt.Errorf("container has ended (state is %q)", ctr.State), http.StatusGone)
- return
+ } else if tunnel != nil && !(forceProxyForTest && !opts.NoForward) {
+ // If we can't connect directly, and the gateway has
+ // established a yamux tunnel with us, connect through
+ // the tunnel.
+ //
+ // ...except: forceProxyForTest means we are emulating
+ // a situation where the gateway has established a
+ // yamux tunnel with controller B, and the
+ // ContainerSSH request arrives at controller A. If
+ // opts.NoForward==false then we are acting as A, so
+ // we pretend not to have a tunnel, and fall through
+ // to the "tunurl" case below. If opts.NoForward==true
+ // then the client is A and we are acting as B, so we
+ // connect to our tunnel.
+ rawconn, err = tunnel.Open()
+ if err != nil {
+ return sshconn, httpserver.ErrorWithStatus(err, http.StatusServiceUnavailable)
+ }
+ } else if ctr.GatewayAddress == "" {
+ return sshconn, httpserver.ErrorWithStatus(errors.New("container is running but gateway is not available"), http.StatusServiceUnavailable)
+ } else if tunurl := strings.TrimPrefix(ctr.GatewayAddress, "tunnel "); tunurl != ctr.GatewayAddress &&
+ tunurl != "" &&
+ tunurl != myURL.String() &&
+ !opts.NoForward {
+ // If crunch-run provided a GatewayAddress like
+ // "tunnel https://10.0.0.10:1010/", that means the
+ // gateway has established a yamux tunnel with the
+ // controller process at the indicated InternalURL
+ // (which isn't us, otherwise we would have had
+ // "tunnel != nil" above). We need to proxy through to
+ // the other controller process in order to use the
+ // tunnel.
+ for u := range conn.cluster.Services.Controller.InternalURLs {
+ if u.String() == tunurl {
+ ctxlog.FromContext(ctx).Debugf("proxying ContainerSSH request to other controller at %s", u)
+ u := url.URL(u)
+ arpc := rpc.NewConn(conn.cluster.ClusterID, &u, conn.cluster.TLS.Insecure, rpc.PassthroughTokenProvider)
+ opts.NoForward = true
+ return arpc.ContainerSSH(ctx, opts)
+ }
+ }
+ ctxlog.FromContext(ctx).Warnf("container gateway provided a tunnel endpoint %s that is not one of Services.Controller.InternalURLs", tunurl)
+ return sshconn, httpserver.ErrorWithStatus(errors.New("container gateway is running but tunnel endpoint is invalid"), http.StatusServiceUnavailable)
+ } else {
+ return sshconn, httpserver.ErrorWithStatus(errors.New("container gateway is running but tunnel is down"), http.StatusServiceUnavailable)
}
+
// crunch-run uses a self-signed / unverifiable TLS
// certificate, so we use the following scheme to ensure we're
// not talking to a MITM.
// X-Arvados-Authorization-Response header, proving that the
// server knows ctrKey.
var requestAuth, respondAuth string
- netconn, err := tls.Dial("tcp", ctr.GatewayAddress, &tls.Config{
+ tlsconn := tls.Client(rawconn, &tls.Config{
InsecureSkipVerify: true,
VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
if len(rawCerts) == 0 {
return nil
},
})
+ err = tlsconn.HandshakeContext(ctx)
if err != nil {
- err = httpserver.ErrorWithStatus(err, http.StatusBadGateway)
- return
+ return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("TLS handshake failed: %w", err), http.StatusBadGateway)
}
if respondAuth == "" {
- err = httpserver.ErrorWithStatus(errors.New("BUG: no respondAuth"), http.StatusInternalServerError)
- return
+ tlsconn.Close()
+ return sshconn, httpserver.ErrorWithStatus(errors.New("BUG: no respondAuth"), http.StatusInternalServerError)
}
- bufr := bufio.NewReader(netconn)
- bufw := bufio.NewWriter(netconn)
+ bufr := bufio.NewReader(tlsconn)
+ bufw := bufio.NewWriter(tlsconn)
u := url.URL{
Scheme: "http",
- Host: ctr.GatewayAddress,
+ Host: targetHost,
Path: "/ssh",
}
- bufw.WriteString("GET " + u.String() + " HTTP/1.1\r\n")
+ postform := url.Values{
+ "uuid": {opts.UUID},
+ "detach_keys": {opts.DetachKeys},
+ "login_username": {opts.LoginUsername},
+ "no_forward": {fmt.Sprintf("%v", opts.NoForward)},
+ }
+ postdata := postform.Encode()
+ bufw.WriteString("POST " + u.String() + " HTTP/1.1\r\n")
bufw.WriteString("Host: " + u.Host + "\r\n")
bufw.WriteString("Upgrade: ssh\r\n")
- bufw.WriteString("X-Arvados-Target-Uuid: " + opts.UUID + "\r\n")
bufw.WriteString("X-Arvados-Authorization: " + requestAuth + "\r\n")
- bufw.WriteString("X-Arvados-Detach-Keys: " + opts.DetachKeys + "\r\n")
- bufw.WriteString("X-Arvados-Login-Username: " + opts.LoginUsername + "\r\n")
+ bufw.WriteString("Content-Type: application/x-www-form-urlencoded\r\n")
+ fmt.Fprintf(bufw, "Content-Length: %d\r\n", len(postdata))
bufw.WriteString("\r\n")
+ bufw.WriteString(postdata)
bufw.Flush()
- resp, err := http.ReadResponse(bufr, &http.Request{Method: "GET"})
+ resp, err := http.ReadResponse(bufr, &http.Request{Method: "POST"})
if err != nil {
- err = httpserver.ErrorWithStatus(fmt.Errorf("error reading http response from gateway: %w", err), http.StatusBadGateway)
- netconn.Close()
- return
+ tlsconn.Close()
+ return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("error reading http response from gateway: %w", err), http.StatusBadGateway)
}
- if resp.Header.Get("X-Arvados-Authorization-Response") != respondAuth {
- err = httpserver.ErrorWithStatus(errors.New("bad X-Arvados-Authorization-Response header"), http.StatusBadGateway)
- netconn.Close()
- return
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusSwitchingProtocols {
+ body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 1000))
+ tlsconn.Close()
+ return sshconn, httpserver.ErrorWithStatus(fmt.Errorf("unexpected status %s %q", resp.Status, body), http.StatusBadGateway)
}
if strings.ToLower(resp.Header.Get("Upgrade")) != "ssh" ||
strings.ToLower(resp.Header.Get("Connection")) != "upgrade" {
- err = httpserver.ErrorWithStatus(errors.New("bad upgrade"), http.StatusBadGateway)
- netconn.Close()
- return
+ tlsconn.Close()
+ return sshconn, httpserver.ErrorWithStatus(errors.New("bad upgrade"), http.StatusBadGateway)
+ }
+ if resp.Header.Get("X-Arvados-Authorization-Response") != respondAuth {
+ tlsconn.Close()
+ return sshconn, httpserver.ErrorWithStatus(errors.New("bad X-Arvados-Authorization-Response header"), http.StatusBadGateway)
}
if !ctr.InteractiveSessionStarted {
},
})
if err != nil {
- netconn.Close()
- return
+ tlsconn.Close()
+ return sshconn, httpserver.ErrorWithStatus(err, http.StatusInternalServerError)
}
}
- sshconn.Conn = netconn
+ sshconn.Conn = tlsconn
sshconn.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
sshconn.Logger = ctxlog.FromContext(ctx)
+ sshconn.Header = http.Header{"Upgrade": {"ssh"}}
+ return sshconn, nil
+}
+
+// ContainerGatewayTunnel sets up a tunnel enabling us (controller) to
+// connect to the caller's (crunch-run's) gateway server.
+func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, opts arvados.ContainerGatewayTunnelOptions) (resp arvados.ConnectionResponse, err error) {
+ h := hmac.New(sha256.New, []byte(conn.cluster.SystemRootToken))
+ fmt.Fprint(h, opts.UUID)
+ authSecret := fmt.Sprintf("%x", h.Sum(nil))
+ if subtle.ConstantTimeCompare([]byte(authSecret), []byte(opts.AuthSecret)) != 1 {
+ ctxlog.FromContext(ctx).Info("received incorrect auth_secret")
+ return resp, httpserver.ErrorWithStatus(errors.New("authentication error"), http.StatusUnauthorized)
+ }
+
+ muxconn, clientconn := net.Pipe()
+ tunnel, err := yamux.Server(muxconn, nil)
+ if err != nil {
+ clientconn.Close()
+ return resp, httpserver.ErrorWithStatus(err, http.StatusInternalServerError)
+ }
+
+ conn.gwTunnelsLock.Lock()
+ if conn.gwTunnels == nil {
+ conn.gwTunnels = map[string]*yamux.Session{opts.UUID: tunnel}
+ } else {
+ conn.gwTunnels[opts.UUID] = tunnel
+ }
+ conn.gwTunnelsLock.Unlock()
+
+ go func() {
+ <-tunnel.CloseChan()
+ conn.gwTunnelsLock.Lock()
+ if conn.gwTunnels[opts.UUID] == tunnel {
+ delete(conn.gwTunnels, opts.UUID)
+ }
+ conn.gwTunnelsLock.Unlock()
+ }()
+
+ // Assuming we're acting as the backend of an http server,
+ // lib/controller/router will call resp's ServeHTTP handler,
+ // which upgrades the incoming http connection to a raw socket
+ // and connects it to our yamux.Server through our net.Pipe().
+ resp.Conn = clientconn
+ resp.Bufrw = &bufio.ReadWriter{Reader: bufio.NewReader(&bytes.Buffer{}), Writer: bufio.NewWriter(&bytes.Buffer{})}
+ resp.Logger = ctxlog.FromContext(ctx)
+ resp.Header = http.Header{"Upgrade": {"tunnel"}}
+ if u, ok := service.URLFromContext(ctx); ok {
+ resp.Header.Set("X-Arvados-Internal-Url", u.String())
+ } else if forceInternalURLForTest != nil {
+ resp.Header.Set("X-Arvados-Internal-Url", forceInternalURLForTest.String())
+ }
return
}
"io"
"io/ioutil"
"net"
+ "net/http/httptest"
+ "net/url"
+ "strings"
"time"
"git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/lib/controller/router"
+ "git.arvados.org/arvados.git/lib/controller/rpc"
"git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
fmt.Fprint(h, s.ctrUUID)
authKey := fmt.Sprintf("%x", h.Sum(nil))
+ rtr := router.New(s.localdb, router.Config{})
+ srv := httptest.NewUnstartedServer(rtr)
+ srv.StartTLS()
+ // the test setup doesn't use lib/service so
+ // service.URLFromContext() returns nothing -- instead, this
+ // is how we advertise our internal URL and enable
+ // proxy-to-other-controller mode,
+ forceInternalURLForTest = &arvados.URL{Scheme: "https", Host: srv.Listener.Addr().String()}
+ ac := &arvados.Client{
+ APIHost: srv.Listener.Addr().String(),
+ AuthToken: arvadostest.Dispatch1Token,
+ Insecure: true,
+ }
s.gw = &crunchrun.Gateway{
ContainerUUID: s.ctrUUID,
AuthSecret: authKey,
Address: "localhost:0",
Log: ctxlog.TestLogger(c),
Target: crunchrun.GatewayTargetStub{},
+ ArvadosClient: ac,
}
c.Assert(s.gw.Start(), check.IsNil)
rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
Attrs: map[string]interface{}{
"state": arvados.ContainerStateLocked}})
c.Assert(err, check.IsNil)
- _, err = s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
+}
+
+func (s *ContainerGatewaySuite) SetUpTest(c *check.C) {
+ // clear any tunnel sessions started by previous test cases
+ s.localdb.gwTunnelsLock.Lock()
+ s.localdb.gwTunnels = nil
+ s.localdb.gwTunnelsLock.Unlock()
+
+ rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
+ _, err := s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
UUID: s.ctrUUID,
Attrs: map[string]interface{}{
"state": arvados.ContainerStateRunning,
"gateway_address": s.gw.Address}})
c.Assert(err, check.IsNil)
-}
-func (s *ContainerGatewaySuite) SetUpTest(c *check.C) {
s.cluster.Containers.ShellAccess.Admin = true
s.cluster.Containers.ShellAccess.User = true
- _, err := arvadostest.DB(c, s.cluster).Exec(`update containers set interactive_session_started=$1 where uuid=$2`, false, s.ctrUUID)
+ _, err = arvadostest.DB(c, s.cluster).Exec(`update containers set interactive_session_started=$1 where uuid=$2`, false, s.ctrUUID)
c.Check(err, check.IsNil)
}
_, err = s.localdb.ContainerSSH(ctx, arvados.ContainerSSHOptions{UUID: s.ctrUUID})
c.Check(err, check.ErrorMatches, `.* 404 .*`)
}
+
+func (s *ContainerGatewaySuite) TestCreateTunnel(c *check.C) {
+ // no AuthSecret
+ conn, err := s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{
+ UUID: s.ctrUUID,
+ })
+ c.Check(err, check.ErrorMatches, `authentication error`)
+ c.Check(conn.Conn, check.IsNil)
+
+ // bogus AuthSecret
+ conn, err = s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{
+ UUID: s.ctrUUID,
+ AuthSecret: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
+ })
+ c.Check(err, check.ErrorMatches, `authentication error`)
+ c.Check(conn.Conn, check.IsNil)
+
+ // good AuthSecret
+ conn, err = s.localdb.ContainerGatewayTunnel(s.ctx, arvados.ContainerGatewayTunnelOptions{
+ UUID: s.ctrUUID,
+ AuthSecret: s.gw.AuthSecret,
+ })
+ c.Check(err, check.IsNil)
+ c.Check(conn.Conn, check.NotNil)
+}
+
+func (s *ContainerGatewaySuite) TestConnectThroughTunnelWithProxyOK(c *check.C) {
+ forceProxyForTest = true
+ defer func() { forceProxyForTest = false }()
+ s.cluster.Services.Controller.InternalURLs[*forceInternalURLForTest] = arvados.ServiceInstance{}
+ defer delete(s.cluster.Services.Controller.InternalURLs, *forceInternalURLForTest)
+ s.testConnectThroughTunnel(c, "")
+}
+
+func (s *ContainerGatewaySuite) TestConnectThroughTunnelWithProxyError(c *check.C) {
+ forceProxyForTest = true
+ defer func() { forceProxyForTest = false }()
+ // forceInternalURLForTest shouldn't be used because it isn't
+ // listed in s.cluster.Services.Controller.InternalURLs
+ s.testConnectThroughTunnel(c, `.*tunnel endpoint is invalid.*`)
+}
+
+func (s *ContainerGatewaySuite) TestConnectThroughTunnelNoProxyOK(c *check.C) {
+ s.testConnectThroughTunnel(c, "")
+}
+
+func (s *ContainerGatewaySuite) testConnectThroughTunnel(c *check.C, expectErrorMatch string) {
+ rootctx := auth.NewContext(context.Background(), &auth.Credentials{Tokens: []string{s.cluster.SystemRootToken}})
+ // Until the tunnel starts up, set gateway_address to a value
+ // that can't work. We want to ensure the only way we can
+ // reach the gateway is through the tunnel.
+ gwaddr := "127.0.0.1:0"
+ tungw := &crunchrun.Gateway{
+ ContainerUUID: s.ctrUUID,
+ AuthSecret: s.gw.AuthSecret,
+ Log: ctxlog.TestLogger(c),
+ Target: crunchrun.GatewayTargetStub{},
+ ArvadosClient: s.gw.ArvadosClient,
+ UpdateTunnelURL: func(url string) {
+ c.Logf("UpdateTunnelURL(%q)", url)
+ gwaddr = "tunnel " + url
+ s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
+ UUID: s.ctrUUID,
+ Attrs: map[string]interface{}{
+ "gateway_address": gwaddr}})
+ },
+ }
+ c.Assert(tungw.Start(), check.IsNil)
+
+ // We didn't supply an external hostname in the Address field,
+ // so Start() should assign a local address.
+ host, _, err := net.SplitHostPort(tungw.Address)
+ c.Assert(err, check.IsNil)
+ c.Check(host, check.Equals, "127.0.0.1")
+
+ _, err = s.localdb.ContainerUpdate(rootctx, arvados.UpdateOptions{
+ UUID: s.ctrUUID,
+ Attrs: map[string]interface{}{
+ "state": arvados.ContainerStateRunning,
+ "gateway_address": gwaddr}})
+ c.Assert(err, check.IsNil)
+
+ for deadline := time.Now().Add(5 * time.Second); time.Now().Before(deadline); time.Sleep(time.Second / 2) {
+ ctr, err := s.localdb.ContainerGet(s.ctx, arvados.GetOptions{UUID: s.ctrUUID})
+ c.Assert(err, check.IsNil)
+ c.Check(ctr.InteractiveSessionStarted, check.Equals, false)
+ c.Logf("ctr.GatewayAddress == %s", ctr.GatewayAddress)
+ if strings.HasPrefix(ctr.GatewayAddress, "tunnel ") {
+ break
+ }
+ }
+
+ c.Log("connecting to gateway through tunnel")
+ arpc := rpc.NewConn("", &url.URL{Scheme: "https", Host: s.gw.ArvadosClient.APIHost}, true, rpc.PassthroughTokenProvider)
+ sshconn, err := arpc.ContainerSSH(s.ctx, arvados.ContainerSSHOptions{UUID: s.ctrUUID})
+ if expectErrorMatch != "" {
+ c.Check(err, check.ErrorMatches, expectErrorMatch)
+ return
+ }
+ c.Assert(err, check.IsNil)
+ c.Assert(sshconn.Conn, check.NotNil)
+ defer sshconn.Conn.Close()
+
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+
+ // Receive text banner
+ buf := make([]byte, 12)
+ _, err := io.ReadFull(sshconn.Conn, buf)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf), check.Equals, "SSH-2.0-Go\r\n")
+
+ // Send text banner
+ _, err = sshconn.Conn.Write([]byte("SSH-2.0-Fake\r\n"))
+ c.Check(err, check.IsNil)
+
+ // Receive binary
+ _, err = io.ReadFull(sshconn.Conn, buf[:4])
+ c.Check(err, check.IsNil)
+
+ // If we can get this far into an SSH handshake...
+ c.Logf("was able to read %x -- success, tunnel is working", buf[:4])
+ }()
+ select {
+ case <-done:
+ case <-time.After(time.Second):
+ c.Fail()
+ }
+ ctr, err := s.localdb.ContainerGet(s.ctx, arvados.GetOptions{UUID: s.ctrUUID})
+ c.Check(err, check.IsNil)
+ c.Check(ctr.InteractiveSessionStarted, check.Equals, true)
+}
hdrOut[k] = v
}
}
- xff := reqIn.RemoteAddr
- if xffIn := reqIn.Header.Get("X-Forwarded-For"); xffIn != "" {
- xff = xffIn + "," + xff
+ xff := ""
+ for _, xffIn := range reqIn.Header["X-Forwarded-For"] {
+ if xffIn != "" {
+ xff += xffIn + ","
+ }
}
+ xff += reqIn.RemoteAddr
hdrOut.Set("X-Forwarded-For", xff)
if hdrOut.Get("X-Forwarded-Proto") == "" {
hdrOut.Set("X-Forwarded-Proto", reqIn.URL.Scheme)
"bypass_federation": true,
"recursive": true,
"exclude_home_project": true,
+ "no_forward": true,
}
func stringToBool(s string) bool {
return rtr.backend.ContainerSSH(ctx, *opts.(*arvados.ContainerSSHOptions))
},
},
+ {
+ // arvados-client built before commit
+ // bdc29d3129f6d75aa9ce0a24ffb849a272b06f08
+ // used GET with params in headers instead of
+ // POST form
+ arvados.APIEndpoint{"GET", "arvados/v1/connect/{uuid}/ssh", ""},
+ func() interface{} { return &arvados.ContainerSSHOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return nil, httpError(http.StatusGone, fmt.Errorf("API endpoint is obsolete -- please upgrade your arvados-client program"))
+ },
+ },
+ {
+ arvados.EndpointContainerGatewayTunnel,
+ func() interface{} { return &arvados.ContainerGatewayTunnelOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.ContainerGatewayTunnel(ctx, *opts.(*arvados.ContainerGatewayTunnelOptions))
+ },
+ },
{
arvados.EndpointGroupCreate,
func() interface{} { return &arvados.CreateOptions{} },
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
)
// ContainerSSH returns a connection to the out-of-band SSH server for
// a running container. If the returned error is nil, the caller is
// responsible for closing sshconn.Conn.
-func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (sshconn arvados.ContainerSSHConnection, err error) {
+func (conn *Conn) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (sshconn arvados.ConnectionResponse, err error) {
+ u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", options.UUID, -1))
+ if err != nil {
+ err = fmt.Errorf("url.Parse: %w", err)
+ return
+ }
+ return conn.socket(ctx, u, "ssh", url.Values{
+ "detach_keys": {options.DetachKeys},
+ "login_username": {options.LoginUsername},
+ "no_forward": {fmt.Sprintf("%v", options.NoForward)},
+ })
+}
+
+// ContainerGatewayTunnel returns a connection to a yamux session on
+// the controller. The caller should connect the returned resp.Conn to
+// a client-side yamux session.
+func (conn *Conn) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (tunnelconn arvados.ConnectionResponse, err error) {
+ u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerGatewayTunnel.Path, "{uuid}", options.UUID, -1))
+ if err != nil {
+ err = fmt.Errorf("url.Parse: %w", err)
+ return
+ }
+ return conn.socket(ctx, u, "tunnel", url.Values{
+ "auth_secret": {options.AuthSecret},
+ })
+}
+
+// socket sets up a socket using the specified API endpoint and
+// upgrade header.
+func (conn *Conn) socket(ctx context.Context, u *url.URL, upgradeHeader string, postform url.Values) (connresp arvados.ConnectionResponse, err error) {
addr := conn.baseURL.Host
if strings.Index(addr, ":") < 1 || (strings.Contains(addr, "::") && addr[0] != '[') {
// hostname or ::1 or 1::1
}
netconn, err := tls.Dial("tcp", addr, &tls.Config{InsecureSkipVerify: insecure})
if err != nil {
- err = fmt.Errorf("tls.Dial: %w", err)
- return
+ return connresp, fmt.Errorf("tls.Dial: %w", err)
}
defer func() {
if err != nil {
bufr := bufio.NewReader(netconn)
bufw := bufio.NewWriter(netconn)
- u, err := conn.baseURL.Parse("/" + strings.Replace(arvados.EndpointContainerSSH.Path, "{uuid}", options.UUID, -1))
- if err != nil {
- err = fmt.Errorf("tls.Dial: %w", err)
- return
- }
- u.RawQuery = url.Values{
- "detach_keys": {options.DetachKeys},
- "login_username": {options.LoginUsername},
- }.Encode()
tokens, err := conn.tokenProvider(ctx)
if err != nil {
- return
+ return connresp, err
} else if len(tokens) < 1 {
- err = httpserver.ErrorWithStatus(errors.New("unauthorized"), http.StatusUnauthorized)
- return
+ return connresp, httpserver.ErrorWithStatus(errors.New("unauthorized"), http.StatusUnauthorized)
}
- bufw.WriteString("GET " + u.String() + " HTTP/1.1\r\n")
+ postdata := postform.Encode()
+ bufw.WriteString("POST " + u.String() + " HTTP/1.1\r\n")
bufw.WriteString("Authorization: Bearer " + tokens[0] + "\r\n")
bufw.WriteString("Host: " + u.Host + "\r\n")
- bufw.WriteString("Upgrade: ssh\r\n")
+ bufw.WriteString("Upgrade: " + upgradeHeader + "\r\n")
+ bufw.WriteString("Content-Type: application/x-www-form-urlencoded\r\n")
+ fmt.Fprintf(bufw, "Content-Length: %d\r\n", len(postdata))
bufw.WriteString("\r\n")
+ bufw.WriteString(postdata)
bufw.Flush()
- resp, err := http.ReadResponse(bufr, &http.Request{Method: "GET"})
+ resp, err := http.ReadResponse(bufr, &http.Request{Method: "POST"})
if err != nil {
- err = fmt.Errorf("http.ReadResponse: %w", err)
- return
+ return connresp, fmt.Errorf("http.ReadResponse: %w", err)
}
+ defer resp.Body.Close()
if resp.StatusCode != http.StatusSwitchingProtocols {
- defer resp.Body.Close()
- body, _ := ioutil.ReadAll(resp.Body)
+ ctxlog.FromContext(ctx).Infof("rpc.Conn.socket: server %s did not switch protocols, got status %s", u.String(), resp.Status)
+ body, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 10000))
var message string
var errDoc httpserver.ErrorResponse
if err := json.Unmarshal(body, &errDoc); err == nil {
} else {
message = fmt.Sprintf("%q", body)
}
- err = fmt.Errorf("server did not provide a tunnel: %s (HTTP %d)", message, resp.StatusCode)
- return
+ return connresp, fmt.Errorf("server did not provide a tunnel: %s: %s", resp.Status, message)
}
- if strings.ToLower(resp.Header.Get("Upgrade")) != "ssh" ||
+ if strings.ToLower(resp.Header.Get("Upgrade")) != upgradeHeader ||
strings.ToLower(resp.Header.Get("Connection")) != "upgrade" {
- err = fmt.Errorf("bad response from server: Upgrade %q Connection %q", resp.Header.Get("Upgrade"), resp.Header.Get("Connection"))
- return
+ return connresp, fmt.Errorf("bad response from server: Upgrade %q Connection %q", resp.Header.Get("Upgrade"), resp.Header.Get("Connection"))
}
- sshconn.Conn = netconn
- sshconn.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
- return
+ connresp.Conn = netconn
+ connresp.Bufrw = &bufio.ReadWriter{Reader: bufr, Writer: bufw}
+ connresp.Header = resp.Header
+ return connresp, nil
}
func (conn *Conn) ContainerRequestCreate(ctx context.Context, options arvados.CreateOptions) (arvados.ContainerRequest, error) {
"io"
"net"
"net/http"
+ "net/url"
"os"
"os/exec"
"sync"
"syscall"
+ "time"
+ "git.arvados.org/arvados.git/lib/controller/rpc"
"git.arvados.org/arvados.git/lib/selfsigned"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/auth"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/httpserver"
"github.com/creack/pty"
"github.com/google/shlex"
+ "github.com/hashicorp/yamux"
"golang.org/x/crypto/ssh"
"golang.org/x/net/context"
)
type Gateway struct {
ContainerUUID string
- Address string // listen host:port; if port=0, Start() will change it to the selected port
- AuthSecret string
- Target GatewayTarget
- Log interface {
+ // Caller should set Address to "", or "host:0" or "host:port"
+ // where host is a known external IP address; port is a
+ // desired port number to listen on; and ":0" chooses an
+ // available dynamic port.
+ //
+ // If Address is "", Start() listens only on the loopback
+ // interface (and changes Address to "127.0.0.1:port").
+ // Otherwise it listens on all interfaces.
+ //
+ // If Address is "host:0", Start() updates Address to
+ // "host:port".
+ Address string
+ AuthSecret string
+ Target GatewayTarget
+ Log interface {
Printf(fmt string, args ...interface{})
}
+ // If non-nil, set up a ContainerGatewayTunnel, so that the
+ // controller can connect to us even if our external IP
+ // address is unknown or not routable from controller.
+ ArvadosClient *arvados.Client
+
+ // When a tunnel is connected or reconnected, this func (if
+ // not nil) will be called with the InternalURL of the
+ // controller process at the other end of the tunnel.
+ UpdateTunnelURL func(url string)
sshConfig ssh.ServerConfig
requestAuth string
// from arvados-controller, and PORT is either the desired
// port where we should run our gateway server, or "0" if we
// should choose an available port.
- host, port, err := net.SplitHostPort(gw.Address)
+ extAddr := gw.Address
+ // Generally we can't know which local interface corresponds
+ // to an externally reachable IP address, so if we expect to
+ // be reachable by external hosts, we listen on all
+ // interfaces.
+ listenHost := ""
+ if extAddr == "" {
+ // If the dispatcher doesn't tell us our external IP
+ // address, controller will only be able to connect
+ // through the tunnel (see runTunnel), so our gateway
+ // server only needs to listen on the loopback
+ // interface.
+ extAddr = "127.0.0.1:0"
+ listenHost = "127.0.0.1"
+ }
+ extHost, extPort, err := net.SplitHostPort(extAddr)
if err != nil {
return err
}
Certificates: []tls.Certificate{cert},
},
},
- Addr: ":" + port,
+ Addr: net.JoinHostPort(listenHost, extPort),
}
err = srv.Start()
if err != nil {
return err
}
- // Get the port number we are listening on (the port might be
+ go func() {
+ err := srv.Wait()
+ gw.Log.Printf("gateway server stopped: %s", err)
+ }()
+ // Get the port number we are listening on (extPort might be
// "0" or a port name, in which case this will be different).
- _, port, err = net.SplitHostPort(srv.Addr)
+ _, listenPort, err := net.SplitHostPort(srv.Addr)
if err != nil {
return err
}
- // When changing state to Running, we will set
- // gateway_address to "HOST:PORT" where HOST is our
- // external hostname/IP as provided by arvados-dispatch-cloud,
- // and PORT is the port number we ended up listening on.
- gw.Address = net.JoinHostPort(host, port)
+ // When changing state to Running, the caller will want to set
+ // gateway_address to a "HOST:PORT" that, if controller
+ // connects to it, will reach this gateway server.
+ //
+ // The most likely thing to work is: HOST is our external
+ // hostname/IP as provided by the caller
+ // (arvados-dispatch-cloud) or 127.0.0.1 to indicate
+ // non-tunnel connections aren't available; and PORT is the
+ // port number we are listening on.
+ gw.Address = net.JoinHostPort(extHost, listenPort)
+ gw.Log.Printf("gateway server listening at %s", gw.Address)
+ if gw.ArvadosClient != nil {
+ go gw.maintainTunnel(gw.Address)
+ }
return nil
}
+func (gw *Gateway) maintainTunnel(addr string) {
+ for ; ; time.Sleep(5 * time.Second) {
+ err := gw.runTunnel(addr)
+ gw.Log.Printf("runTunnel: %s", err)
+ }
+}
+
+// runTunnel connects to controller and sets up a tunnel through
+// which controller can connect to the gateway server at the given
+// addr.
+func (gw *Gateway) runTunnel(addr string) error {
+ ctx := auth.NewContext(context.Background(), auth.NewCredentials(gw.ArvadosClient.AuthToken))
+ arpc := rpc.NewConn("", &url.URL{Scheme: "https", Host: gw.ArvadosClient.APIHost}, gw.ArvadosClient.Insecure, rpc.PassthroughTokenProvider)
+ tun, err := arpc.ContainerGatewayTunnel(ctx, arvados.ContainerGatewayTunnelOptions{
+ UUID: gw.ContainerUUID,
+ AuthSecret: gw.AuthSecret,
+ })
+ if err != nil {
+ return fmt.Errorf("error creating gateway tunnel: %s", err)
+ }
+ mux, err := yamux.Client(tun.Conn, nil)
+ if err != nil {
+ return fmt.Errorf("error setting up mux client end: %s", err)
+ }
+ if url := tun.Header.Get("X-Arvados-Internal-Url"); url != "" && gw.UpdateTunnelURL != nil {
+ gw.UpdateTunnelURL(url)
+ }
+ for {
+ muxconn, err := mux.AcceptStream()
+ if err != nil {
+ return err
+ }
+ gw.Log.Printf("tunnel connection %d started", muxconn.StreamID())
+ go func() {
+ defer muxconn.Close()
+ gwconn, err := net.Dial("tcp", addr)
+ if err != nil {
+ gw.Log.Printf("tunnel connection %d: error connecting to %s: %s", muxconn.StreamID(), addr, err)
+ return
+ }
+ defer gwconn.Close()
+ var wg sync.WaitGroup
+ wg.Add(2)
+ go func() {
+ defer wg.Done()
+ _, err := io.Copy(gwconn, muxconn)
+ if err != nil {
+ gw.Log.Printf("tunnel connection %d: mux end: %s", muxconn.StreamID(), err)
+ }
+ gwconn.Close()
+ }()
+ go func() {
+ defer wg.Done()
+ _, err := io.Copy(muxconn, gwconn)
+ if err != nil {
+ gw.Log.Printf("tunnel connection %d: gateway end: %s", muxconn.StreamID(), err)
+ }
+ muxconn.Close()
+ }()
+ wg.Wait()
+ gw.Log.Printf("tunnel connection %d finished", muxconn.StreamID())
+ }()
+ }
+}
+
// handleSSH connects to an SSH server that allows the caller to run
// interactive commands as root (or any other desired user) inside the
// container. The tunnel itself can only be created by an
// In future we'll handle browser traffic too, but for now the
// only traffic we expect is an SSH tunnel from
// (*lib/controller/localdb.Conn)ContainerSSH()
- if req.Method != "GET" || req.Header.Get("Upgrade") != "ssh" {
+ if req.Method != "POST" || req.Header.Get("Upgrade") != "ssh" {
http.Error(w, "path not found", http.StatusNotFound)
return
}
- if want := req.Header.Get("X-Arvados-Target-Uuid"); want != gw.ContainerUUID {
+ req.ParseForm()
+ if want := req.Form.Get("uuid"); want != gw.ContainerUUID {
http.Error(w, fmt.Sprintf("misdirected request: meant for %q but received by crunch-run %q", want, gw.ContainerUUID), http.StatusBadGateway)
return
}
http.Error(w, "bad X-Arvados-Authorization header", http.StatusUnauthorized)
return
}
- detachKeys := req.Header.Get("X-Arvados-Detach-Keys")
- username := req.Header.Get("X-Arvados-Login-Username")
+ detachKeys := req.Form.Get("detach_keys")
+ username := req.Form.Get("login_username")
if username == "" {
username = "root"
}
ctx := req.Context()
conn, newchans, reqs, err := ssh.NewServerConn(netconn, &gw.sshConfig)
- if err != nil {
+ if err == io.EOF {
+ return
+ } else if err != nil {
gw.Log.Printf("ssh.NewServerConn: %s", err)
return
}
func (gw *Gateway) handleSession(ctx context.Context, newch ssh.NewChannel, detachKeys, username string) {
ch, reqs, err := newch.Accept()
if err != nil {
- gw.Log.Printf("accept session channel: %s", err)
+ gw.Log.Printf("error accepting session channel: %s", err)
return
}
+ defer ch.Close()
+
var pty0, tty0 *os.File
// Where to send errors/messages for the client to see
logw := io.Writer(ch.Stderr())
eol := "\n"
// Env vars to add to child process
termEnv := []string(nil)
- for req := range reqs {
+
+ started := 0
+ wantClose := make(chan struct{})
+ for {
+ var req *ssh.Request
+ select {
+ case r, ok := <-reqs:
+ if !ok {
+ return
+ }
+ req = r
+ case <-wantClose:
+ return
+ }
ok := false
switch req.Type {
case "shell", "exec":
+ if started++; started != 1 {
+ // RFC 4254 6.5: "Only one of these
+ // requests can succeed per channel."
+ break
+ }
ok = true
var payload struct {
Command string
}
defer func() {
ch.SendRequest("exit-status", false, ssh.Marshal(&resp))
- ch.Close()
+ close(wantClose)
}()
cmd, err := gw.Target.InjectCommand(ctx, detachKeys, username, tty0 != nil, execargs)
resp.Status = 1
return
}
- cmd.Stdin = ch
- cmd.Stdout = ch
- cmd.Stderr = ch.Stderr()
if tty0 != nil {
cmd.Stdin = tty0
cmd.Stdout = tty0
cmd.Stderr = tty0
- var wg sync.WaitGroup
- defer wg.Wait()
- wg.Add(2)
- go func() { io.Copy(ch, pty0); wg.Done() }()
- go func() { io.Copy(pty0, ch); wg.Done() }()
+ go io.Copy(ch, pty0)
+ go io.Copy(pty0, ch)
// Send our own debug messages to tty as well.
logw = tty0
+ } else {
+ // StdinPipe may seem
+ // superfluous here, but it's
+ // not: it causes cmd.Run() to
+ // return when the subprocess
+ // exits. Without it, Run()
+ // waits for stdin to close,
+ // which causes "ssh ... echo
+ // ok" (with the client's
+ // stdin connected to a
+ // terminal or something) to
+ // hang.
+ stdin, err := cmd.StdinPipe()
+ if err != nil {
+ fmt.Fprintln(ch.Stderr(), err)
+ ch.CloseWrite()
+ resp.Status = 1
+ return
+ }
+ go func() {
+ io.Copy(stdin, ch)
+ stdin.Close()
+ }()
+ cmd.Stdout = ch
+ cmd.Stderr = ch.Stderr()
}
cmd.SysProcAttr = &syscall.SysProcAttr{
Setctty: tty0 != nil,
// would be a gaping security
// hole).
default:
- // fmt.Fprintf(logw, "declining %q req"+eol, req.Type)
+ // fmt.Fprintf(logw, "declined request %q on ssh channel"+eol, req.Type)
}
if req.WantReply {
req.Reply(ok, nil)
MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
finalState string
parentTemp string
+ costStartTime time.Time
keepstoreLogger io.WriteCloser
keepstoreLogbuf *bufThenWrite
if runner.finalState == "Complete" && runner.OutputPDH != nil {
update["output"] = *runner.OutputPDH
}
+ var it arvados.InstanceType
+ if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
+ update["cost"] = it.Price * time.Now().Sub(runner.costStartTime).Seconds() / time.Hour.Seconds()
+ }
return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
}
runner.CrunchLog.Printf("Using FUSE mount: %s", v)
runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
+ runner.costStartTime = time.Now()
hostname, hosterr := os.Hostname()
if hosterr != nil {
runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
+ version := flags.Bool("version", false, "Write version information to stdout and exit 0.")
ignoreDetachFlag := false
if len(args) > 0 && args[0] == "-no-detach" {
if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
return code
+ } else if *version {
+ fmt.Fprintln(stdout, prog, cmd.Version.String())
+ return 0
} else if !*list && flags.NArg() != 1 {
fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
return 2
// not safe to run a gateway service without an auth
// secret
cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)")
- } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" {
- // dispatcher did not tell us which external IP
- // address to advertise --> no gateway service
- cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)")
} else {
+ gwListen := os.Getenv("GatewayAddress")
cr.gateway = Gateway{
Address: gwListen,
AuthSecret: gwAuthSecret,
Target: cr.executor,
Log: cr.CrunchLog,
}
+ if gwListen == "" {
+ // Direct connection won't work, so we use the
+ // gateway_address field to indicate the
+ // internalURL of the controller process that
+ // has the current tunnel connection.
+ cr.gateway.ArvadosClient = cr.dispatcherClient
+ cr.gateway.UpdateTunnelURL = func(url string) {
+ cr.gateway.Address = "tunnel " + url
+ cr.DispatcherArvClient.Update("containers", containerUUID,
+ arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil)
+ }
+ }
err = cr.gateway.Start()
if err != nil {
log.Printf("error starting gateway server: %s", err)
"bytes"
"fmt"
"io"
+ "io/ioutil"
"net"
"net/http"
"os"
"strings"
"time"
+ "git.arvados.org/arvados.git/lib/diagnostics"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"golang.org/x/net/context"
c.Check(s.stderr.String(), Equals, "")
}
+func (s *executorSuite) TestDiagnosticsImage(c *C) {
+ s.newExecutor(c)
+ imagefile := c.MkDir() + "/hello-world.tar"
+ err := ioutil.WriteFile(imagefile, diagnostics.HelloWorldDockerImage, 0777)
+ c.Assert(err, IsNil)
+ err = s.executor.LoadImage("", imagefile, arvados.Container{}, "", nil)
+ c.Assert(err, IsNil)
+
+ c.Logf("Using container runtime: %s", s.executor.Runtime())
+ s.spec.Image = "hello-world"
+ s.spec.Command = []string{"/hello"}
+ s.checkRun(c, 0)
+ c.Check(s.stdout.String(), Matches, `(?ms)\nHello from Docker!\n.*`)
+}
+
func (s *executorSuite) TestExitStatus(c *C) {
s.spec.Command = []string{"false"}
s.checkRun(c, 1)
c.Check(s.logFiles["crunch-run.txt"], Not(Matches), `(?ms).* at http://169\.254\..*`)
c.Check(s.logFiles["stderr.txt"], Matches, `(?ms).*ARVADOS_KEEP_SERVICES=http://[\d\.]{7,}:\d+\n.*`)
}
+}
+func (s *integrationSuite) TestRunTrivialContainerWithNoLocalKeepstore(c *C) {
// Check that (1) config is loaded from $ARVADOS_CONFIG when
// not provided on stdin and (2) if a local keepstore is not
// started, crunch-run.txt explains why not.
s.SetUpTest(c)
s.stdin.Reset()
s.testRunTrivialContainer(c)
+ c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because KeepBuffers=0 in config\n.*`)
+
+ s.SetUpTest(c)
+ s.args = []string{"-config", c.MkDir() + "/config.yaml"}
+ s.stdin.Reset()
+ buf, err := ioutil.ReadFile(os.Getenv("ARVADOS_CONFIG"))
+ c.Assert(err, IsNil)
+ err = ioutil.WriteFile(s.args[1], bytes.Replace(buf, []byte("LocalKeepBlobBuffersPerVCPU: 0"), []byte("LocalKeepBlobBuffersPerVCPU: 1"), -1), 0666)
+ c.Assert(err, IsNil)
+ s.testRunTrivialContainer(c)
c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because a volume \(zzzzz-nyw5e-00000000000000\d\) uses AccessViaHosts\n.*`)
// Check that config read errors are logged
s.SetUpTest(c)
s.args = []string{"-config", c.MkDir() + "/config-unreadable.yaml"}
s.stdin.Reset()
- err := ioutil.WriteFile(s.args[1], []byte{}, 0)
+ err = ioutil.WriteFile(s.args[1], []byte{}, 0)
c.Check(err, IsNil)
s.testRunTrivialContainer(c)
c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* permission denied\n.*`)
package diagnostics
import (
+ "archive/tar"
"bytes"
"context"
+ _ "embed"
"flag"
"fmt"
"io"
f := flag.NewFlagSet(prog, flag.ContinueOnError)
f.StringVar(&diag.projectName, "project-name", "scratch area for diagnostics", "name of project to find/create in home project and use for temporary/test objects")
f.StringVar(&diag.logLevel, "log-level", "info", "logging level (debug, info, warning, error)")
- f.StringVar(&diag.dockerImage, "docker-image", "alpine:latest", "image to use when running a test container")
+ f.StringVar(&diag.dockerImage, "docker-image", "", "image to use when running a test container (default: use embedded hello-world image)")
f.BoolVar(&diag.checkInternal, "internal-client", false, "check that this host is considered an \"internal\" client")
f.BoolVar(&diag.checkExternal, "external-client", false, "check that this host is considered an \"external\" client")
f.IntVar(&diag.priority, "priority", 500, "priority for test container (1..1000, or 0 to skip)")
}
}
+// docker save hello-world > hello-world.tar
+//go:embed hello-world.tar
+var HelloWorldDockerImage []byte
+
type diagnoser struct {
stdout io.Writer
stderr io.Writer
}()
}
+ // Read hello-world.tar to find image ID, so we can upload it
+ // as "sha256:{...}.tar"
+ var imageSHA2 string
+ {
+ tr := tar.NewReader(bytes.NewReader(HelloWorldDockerImage))
+ for {
+ hdr, err := tr.Next()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ diag.errorf("internal error/bug: cannot read embedded docker image tar file: %s", err)
+ return
+ }
+ if s := strings.TrimSuffix(hdr.Name, ".json"); len(s) == 64 && s != hdr.Name {
+ imageSHA2 = s
+ }
+ }
+ if imageSHA2 == "" {
+ diag.errorf("internal error/bug: cannot find {sha256}.json file in embedded docker image tar file")
+ return
+ }
+ }
+ tarfilename := "sha256:" + imageSHA2 + ".tar"
+
diag.dotest(100, "uploading file via webdav", func() error {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(diag.timeout))
defer cancel()
if collection.UUID == "" {
return fmt.Errorf("skipping, no test collection")
}
- req, err := http.NewRequestWithContext(ctx, "PUT", cluster.Services.WebDAVDownload.ExternalURL.String()+"c="+collection.UUID+"/testfile", bytes.NewBufferString("testfiledata"))
+ req, err := http.NewRequestWithContext(ctx, "PUT", cluster.Services.WebDAVDownload.ExternalURL.String()+"c="+collection.UUID+"/"+tarfilename, bytes.NewReader(HelloWorldDockerImage))
if err != nil {
return fmt.Errorf("BUG? http.NewRequest: %s", err)
}
fileurl string
}{
{false, false, http.StatusNotFound, strings.Replace(davurl.String(), "*", "d41d8cd98f00b204e9800998ecf8427e-0", 1) + "foo"},
- {false, false, http.StatusNotFound, strings.Replace(davurl.String(), "*", "d41d8cd98f00b204e9800998ecf8427e-0", 1) + "testfile"},
+ {false, false, http.StatusNotFound, strings.Replace(davurl.String(), "*", "d41d8cd98f00b204e9800998ecf8427e-0", 1) + tarfilename},
{false, false, http.StatusNotFound, cluster.Services.WebDAVDownload.ExternalURL.String() + "c=d41d8cd98f00b204e9800998ecf8427e+0/_/foo"},
- {false, false, http.StatusNotFound, cluster.Services.WebDAVDownload.ExternalURL.String() + "c=d41d8cd98f00b204e9800998ecf8427e+0/_/testfile"},
- {true, true, http.StatusOK, strings.Replace(davurl.String(), "*", strings.Replace(collection.PortableDataHash, "+", "-", -1), 1) + "testfile"},
- {true, false, http.StatusOK, cluster.Services.WebDAVDownload.ExternalURL.String() + "c=" + collection.UUID + "/_/testfile"},
+ {false, false, http.StatusNotFound, cluster.Services.WebDAVDownload.ExternalURL.String() + "c=d41d8cd98f00b204e9800998ecf8427e+0/_/" + tarfilename},
+ {true, true, http.StatusOK, strings.Replace(davurl.String(), "*", strings.Replace(collection.PortableDataHash, "+", "-", -1), 1) + tarfilename},
+ {true, false, http.StatusOK, cluster.Services.WebDAVDownload.ExternalURL.String() + "c=" + collection.UUID + "/_/" + tarfilename},
} {
diag.dotest(120+i, fmt.Sprintf("downloading from webdav (%s)", trial.fileurl), func() error {
if trial.needWildcard && !davWildcard {
if resp.StatusCode != trial.status {
return fmt.Errorf("unexpected response status: %s", resp.Status)
}
- if trial.status == http.StatusOK && string(body) != "testfiledata" {
- return fmt.Errorf("unexpected response content: %q", body)
+ if trial.status == http.StatusOK && !bytes.Equal(body, HelloWorldDockerImage) {
+ excerpt := body
+ if len(excerpt) > 128 {
+ excerpt = append([]byte(nil), body[:128]...)
+ excerpt = append(excerpt, []byte("[...]")...)
+ }
+ return fmt.Errorf("unexpected response content: len %d, %q", len(body), excerpt)
}
return nil
})
return fmt.Errorf("skipping, no project to work in")
}
+ timestamp := time.Now().Format(time.RFC3339)
+ ctrCommand := []string{"echo", timestamp}
+ if diag.dockerImage == "" {
+ if collection.UUID == "" {
+ return fmt.Errorf("skipping, no test collection to use as docker image")
+ }
+ diag.dockerImage = collection.PortableDataHash
+ ctrCommand = []string{"/hello"}
+ }
+
var cr arvados.ContainerRequest
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(diag.timeout))
defer cancel()
- timestamp := time.Now().Format(time.RFC3339)
err := client.RequestAndDecodeContext(ctx, &cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{"container_request": map[string]interface{}{
"owner_uuid": project.UUID,
"name": fmt.Sprintf("diagnostics container request %s", timestamp),
"container_image": diag.dockerImage,
- "command": []string{"echo", timestamp},
+ "command": ctrCommand,
"use_existing": false,
"output_path": "/mnt/output",
"output_name": fmt.Sprintf("diagnostics output %s", timestamp),
"libattr1-dev",
"libcrypt-ssleay-perl",
"libfuse-dev",
+ "libgbm1", // cypress / workbench2 tests
"libgnutls28-dev",
"libjson-perl",
"libpam-dev",
fmt.Fprintln(stderr, "...looks good")
}
- if out, err := exec.CommandContext(ctx, "docker", "version").CombinedOutput(); err == nil && strings.Contains(string(out), "\nServer:\n") {
- fmt.Fprintln(stderr, "loading alpine docker image for diagnostics...")
- cmd := exec.CommandContext(ctx, "docker", "pull", "alpine")
- cmd.Stdout = stderr
- cmd.Stderr = stderr
- err = cmd.Run()
- if err != nil {
- err = fmt.Errorf("%v: %w", cmd.Args, err)
- return 1
- }
- cmd = exec.CommandContext(ctx, "arv", "sudo", "keep", "docker", "alpine")
- cmd.Stdout = stderr
- cmd.Stderr = stderr
- err = cmd.Run()
- if err != nil {
- err = fmt.Errorf("%v: %w", cmd.Args, err)
- return 1
- }
- fmt.Fprintln(stderr, "...done")
- } else {
- fmt.Fprintln(stderr, "docker is not installed -- skipping step of downloading 'alpine' image")
- }
-
fmt.Fprintf(stderr, `
Setup complete. Next steps:
* run 'arv sudo diagnostics'
import (
"context"
+ "crypto/hmac"
+ "crypto/sha256"
"errors"
"fmt"
"math"
var crArgs []string
crArgs = append(crArgs, crunchRunCommand...)
crArgs = append(crArgs, container.UUID)
- crScript := execScript(crArgs)
+
+ h := hmac.New(sha256.New, []byte(disp.Cluster.SystemRootToken))
+ fmt.Fprint(h, container.UUID)
+ authsecret := fmt.Sprintf("%x", h.Sum(nil))
+
+ crScript := execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret})
bsubArgs, err := disp.bsubArgs(container)
if err != nil {
}
}
-func execScript(args []string) []byte {
- s := "#!/bin/sh\nexec"
+func execScript(args []string, env map[string]string) []byte {
+ s := "#!/bin/sh\n"
+ for k, v := range env {
+ s += k + `='`
+ s += strings.Replace(v, `'`, `'\''`, -1)
+ s += `' `
+ }
+ s += `exec`
for _, w := range args {
s += ` '`
s += strings.Replace(w, `'`, `'\''`, -1)
# Load the new object
newobj = case global_opts[:format]
when 'json'
- Oj.load(newcontent)
+ Oj.safe_load(newcontent)
when 'yaml'
YAML.load(newcontent)
else
Called when there's a need to report errors, warnings or just
activity statuses, for example in the RuntimeStatusLoggingHandler.
"""
+
+ if kind not in ('error', 'warning'):
+ # Ignore any other status kind
+ return
+
with self.workflow_eval_lock:
current = None
try:
if current is None:
return
runtime_status = current.get('runtime_status', {})
- if kind in ('error', 'warning'):
- updatemessage = runtime_status.get(kind, "")
- if not updatemessage:
- updatemessage = message
-
- # Subsequent messages tacked on in detail
- updatedetail = runtime_status.get(kind+'Detail', "")
- maxlines = 40
- if updatedetail.count("\n") < maxlines:
- if updatedetail:
- updatedetail += "\n"
- updatedetail += message + "\n"
-
- if detail:
- updatedetail += detail + "\n"
-
- if updatedetail.count("\n") >= maxlines:
- updatedetail += "\nSome messages may have been omitted. Check the full log."
-
- runtime_status.update({
- kind: updatemessage,
- kind+'Detail': updatedetail,
- })
- else:
- # Ignore any other status kind
+
+ original_updatemessage = updatemessage = runtime_status.get(kind, "")
+ if not updatemessage:
+ updatemessage = message
+
+ # Subsequent messages tacked on in detail
+ original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
+ maxlines = 40
+ if updatedetail.count("\n") < maxlines:
+ if updatedetail:
+ updatedetail += "\n"
+ updatedetail += message + "\n"
+
+ if detail:
+ updatedetail += detail + "\n"
+
+ if updatedetail.count("\n") >= maxlines:
+ updatedetail += "\nSome messages may have been omitted. Check the full log."
+
+ if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
+ # don't waste time doing an update if nothing changed
+ # (usually because we exceeded the max lines)
return
+
+ runtime_status.update({
+ kind: updatemessage,
+ kind+'Detail': updatedetail,
+ })
+
try:
self.api.containers().update(uuid=current['uuid'],
body={
self.project_uuid = runtimeContext.project_uuid
# Upload local file references in the job order.
- job_order = upload_job_order(self, "%s input" % runtimeContext.name,
- updated_tool, job_order, runtimeContext)
+ with Perf(metrics, "upload_job_order"):
+ job_order = upload_job_order(self, "%s input" % runtimeContext.name,
+ updated_tool, job_order, runtimeContext)
# the last clause means: if it is a command line tool, and we
# are going to wait for the result, and always_submit_runner
loadingContext = self.loadingContext.copy()
loadingContext.do_validate = False
+ loadingContext.disable_js_validation = True
if submitting:
loadingContext.do_update = False
# Document may have been auto-updated. Reload the original
# document with updating disabled because we want to
# submit the document with its original CWL version, not
# the auto-updated one.
- tool = load_tool(updated_tool.tool["id"], loadingContext)
+ with Perf(metrics, "load_tool original"):
+ tool = load_tool(updated_tool.tool["id"], loadingContext)
else:
tool = updated_tool
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
- merged_map = upload_workflow_deps(self, tool, runtimeContext)
+ logger.info("Uploading workflow dependencies")
+ with Perf(metrics, "upload_workflow_deps"):
+ merged_map = upload_workflow_deps(self, tool, runtimeContext)
# Recreate process object (ArvadosWorkflow or
# ArvadosCommandTool) because tool document may have been
loadingContext.loader = tool.doc_loader
loadingContext.avsc_names = tool.doc_schema
loadingContext.metadata = tool.metadata
- tool = load_tool(tool.tool, loadingContext)
+ with Perf(metrics, "load_tool"):
+ tool = load_tool(tool.tool, loadingContext)
if runtimeContext.update_workflow or runtimeContext.create_workflow:
# Create a pipeline template or workflow record and exit.
import copy
from collections import namedtuple
from io import StringIO
-from typing import Mapping, Sequence
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Mapping,
+ MutableMapping,
+ Sequence,
+ MutableSequence,
+ Optional,
+ Set,
+ Sized,
+ Tuple,
+ Type,
+ Union,
+ cast,
+)
+from cwltool.utils import (
+ CWLObjectType,
+ CWLOutputAtomType,
+ CWLOutputType,
+)
if os.name == "posix" and sys.version_info[0] < 3:
import subprocess32 as subprocess
from ._version import __version__
from . import done
from . context import ArvRuntimeContext
+from .perf import Perf
logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
def trim_anonymous_location(obj):
"""Remove 'location' field from File and Directory literals.
if sfname is None:
continue
- p_location = primary["location"]
- if "/" in p_location:
- sfpath = (
- p_location[0 : p_location.rindex("/") + 1]
- + sfname
- )
+ if isinstance(sfname, str):
+ p_location = primary["location"]
+ if "/" in p_location:
+ sfpath = (
+ p_location[0 : p_location.rindex("/") + 1]
+ + sfname
+ )
required = builder.do_eval(required, context=primary)
- if fsaccess.exists(sfpath):
- if pattern is not None:
- found.append({"location": sfpath, "class": "File"})
- else:
- found.append(sf)
- elif required:
- raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
- "Required secondary file '%s' does not exist" % sfpath)
+ if isinstance(sfname, list) or isinstance(sfname, dict):
+ each = aslist(sfname)
+ for e in each:
+ if required and not fsaccess.exists(e.get("location")):
+ raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+ "Required secondary file '%s' does not exist" % e.get("location"))
+ found.extend(each)
+
+ if isinstance(sfname, str):
+ if fsaccess.exists(sfpath):
+ if pattern is not None:
+ found.append({"location": sfpath, "class": "File"})
+ else:
+ found.append(sf)
+ elif required:
+ raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+ "Required secondary file '%s' does not exist" % sfpath)
primary["secondaryFiles"] = cmap(found)
if discovered is not None:
def upload_dependencies(arvrunner, name, document_loader,
workflowobj, uri, loadref_run, runtimeContext,
- include_primary=True, discovered_secondaryfiles=None):
+ include_primary=True, discovered_secondaryfiles=None,
+ cache=None):
"""Upload the dependencies of the workflowobj document to Keep.
Returns a pathmapper object mapping local paths to keep references. Also
defrg, _ = urllib.parse.urldefrag(joined)
if defrg not in loaded:
loaded.add(defrg)
+ if cache is not None and defrg in cache:
+ return cache[defrg]
# Use fetch_text to get raw file (before preprocessing).
text = document_loader.fetch_text(defrg)
if isinstance(text, bytes):
else:
textIO = StringIO(text)
yamlloader = YAML(typ='safe', pure=True)
- return yamlloader.load(textIO)
+ result = yamlloader.load(textIO)
+ if cache is not None:
+ cache[defrg] = result
+ return result
else:
return {}
scanobj = workflowobj
if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
- # Need raw file content (before preprocessing) to ensure
- # that external references in $include and $mixin are captured.
- scanobj = loadref("", workflowobj["id"])
+ defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+ if cache is not None and defrg not in cache:
+ # if we haven't seen this file before, want raw file
+ # content (before preprocessing) to ensure that external
+ # references like $include haven't already been inlined.
+ scanobj = loadref("", workflowobj["id"])
metadata = scanobj
- sc_result = scandeps(uri, scanobj,
- loadref_fields,
- set(("$include", "location")),
- loadref, urljoin=document_loader.fetcher.urljoin,
- nestdirs=False)
+ with Perf(metrics, "scandeps include, location"):
+ sc_result = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$include", "location")),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
- optional_deps = scandeps(uri, scanobj,
- loadref_fields,
- set(("$schemas",)),
- loadref, urljoin=document_loader.fetcher.urljoin,
- nestdirs=False)
+ with Perf(metrics, "scandeps $schemas"):
+ optional_deps = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$schemas",)),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
- sc_result.extend(optional_deps)
+ if sc_result is None:
+ sc_result = []
+
+ if optional_deps is None:
+ optional_deps = []
+
+ if optional_deps:
+ sc_result.extend(optional_deps)
sc = []
uuids = {}
sc.append(obj)
collect_uuids(obj)
- visit_class(workflowobj, ("File", "Directory"), collect_uuids)
- visit_class(sc_result, ("File", "Directory"), collect_uploads)
+ with Perf(metrics, "collect uuids"):
+ visit_class(workflowobj, ("File", "Directory"), collect_uuids)
+
+ with Perf(metrics, "collect uploads"):
+ visit_class(sc_result, ("File", "Directory"), collect_uploads)
# Resolve any collection uuids we found to portable data hashes
# and assign them to uuid_map
uuid_map = {}
fetch_uuids = list(uuids.keys())
- while fetch_uuids:
- # For a large number of fetch_uuids, API server may limit
- # response size, so keep fetching from API server has nothing
- # more to give us.
- lookups = arvrunner.api.collections().list(
- filters=[["uuid", "in", fetch_uuids]],
- count="none",
- select=["uuid", "portable_data_hash"]).execute(
- num_retries=arvrunner.num_retries)
+ with Perf(metrics, "fetch_uuids"):
+ while fetch_uuids:
+ # For a large number of fetch_uuids, API server may limit
+ # response size, so keep fetching from API server has nothing
+ # more to give us.
+ lookups = arvrunner.api.collections().list(
+ filters=[["uuid", "in", fetch_uuids]],
+ count="none",
+ select=["uuid", "portable_data_hash"]).execute(
+ num_retries=arvrunner.num_retries)
- if not lookups["items"]:
- break
+ if not lookups["items"]:
+ break
- for l in lookups["items"]:
- uuid_map[l["uuid"]] = l["portable_data_hash"]
+ for l in lookups["items"]:
+ uuid_map[l["uuid"]] = l["portable_data_hash"]
- fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
+ fetch_uuids = [u for u in fetch_uuids if u not in uuid_map]
normalizeFilesDirs(sc)
- if include_primary and "id" in workflowobj:
- sc.append({"class": "File", "location": workflowobj["id"]})
+ if "id" in workflowobj:
+ defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
+ if include_primary:
+ # make sure it's included
+ sc.append({"class": "File", "location": defrg})
+ else:
+ # make sure it's excluded
+ sc = [d for d in sc if d.get("location") != defrg]
def visit_default(obj):
def defaults_are_optional(f):
else:
del discovered[d]
- mapper = ArvPathMapper(arvrunner, sc, "",
- "keep:%s",
- "keep:%s/%s",
- name=name,
- single_collection=True,
- optional_deps=optional_deps)
+ with Perf(metrics, "mapper"):
+ mapper = ArvPathMapper(arvrunner, sc, "",
+ "keep:%s",
+ "keep:%s/%s",
+ name=name,
+ single_collection=True,
+ optional_deps=optional_deps)
keeprefs = set()
def addkeepref(k):
p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "")
p[collectionUUID] = uuid
- visit_class(workflowobj, ("File", "Directory"), setloc)
- visit_class(discovered, ("File", "Directory"), setloc)
+ with Perf(metrics, "setloc"):
+ visit_class(workflowobj, ("File", "Directory"), setloc)
+ visit_class(discovered, ("File", "Directory"), setloc)
if discovered_secondaryfiles is not None:
for d in discovered:
def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- upload_docker(arvrunner, tool, runtimeContext)
+ with Perf(metrics, "upload_docker"):
+ upload_docker(arvrunner, tool, runtimeContext)
document_loader = tool.doc_loader
merged_map = {}
-
+ tool_dep_cache = {}
def upload_tool_deps(deptool):
if "id" in deptool:
discovered_secondaryfiles = {}
- pm = upload_dependencies(arvrunner,
- "%s dependencies" % (shortname(deptool["id"])),
- document_loader,
- deptool,
- deptool["id"],
- False,
- runtimeContext,
- include_primary=False,
- discovered_secondaryfiles=discovered_secondaryfiles)
+ with Perf(metrics, "upload_dependencies %s" % shortname(deptool["id"])):
+ pm = upload_dependencies(arvrunner,
+ "%s dependencies" % (shortname(deptool["id"])),
+ document_loader,
+ deptool,
+ deptool["id"],
+ False,
+ runtimeContext,
+ include_primary=False,
+ discovered_secondaryfiles=discovered_secondaryfiles,
+ cache=tool_dep_cache)
document_loader.idx[deptool["id"]] = deptool
toolmap = {}
for k,v in pm.items():
# file to determine what version of cwltool and schema-salad to
# build.
install_requires=[
- 'cwltool==3.1.20220224085855',
- 'schema-salad==8.2.20211116214159',
+ 'cwltool==3.1.20220623174452',
+ 'schema-salad==8.3.20220801194920',
'arvados-python-client{}'.format(pysdk_dep),
'setuptools',
'ciso8601 >= 2.0.0',
- 'networkx < 2.6'
+ 'networkx < 2.6',
+ 'msgpack==1.0.3'
],
data_files=[
('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
"encoding/json"
"io"
"net"
+ "net/http"
"github.com/sirupsen/logrus"
)
EndpointContainerDelete = APIEndpoint{"DELETE", "arvados/v1/containers/{uuid}", ""}
EndpointContainerLock = APIEndpoint{"POST", "arvados/v1/containers/{uuid}/lock", ""}
EndpointContainerUnlock = APIEndpoint{"POST", "arvados/v1/containers/{uuid}/unlock", ""}
- EndpointContainerSSH = APIEndpoint{"GET", "arvados/v1/connect/{uuid}/ssh", ""} // move to /containers after #17014 fixes routing
+ EndpointContainerSSH = APIEndpoint{"POST", "arvados/v1/connect/{uuid}/ssh", ""} // move to /containers after #17014 fixes routing
+ EndpointContainerGatewayTunnel = APIEndpoint{"POST", "arvados/v1/connect/{uuid}/gateway_tunnel", ""} // move to /containers after #17014 fixes routing
EndpointContainerRequestCreate = APIEndpoint{"POST", "arvados/v1/container_requests", "container_request"}
EndpointContainerRequestUpdate = APIEndpoint{"PATCH", "arvados/v1/container_requests/{uuid}", "container_request"}
EndpointContainerRequestGet = APIEndpoint{"GET", "arvados/v1/container_requests/{uuid}", ""}
UUID string `json:"uuid"`
DetachKeys string `json:"detach_keys"`
LoginUsername string `json:"login_username"`
+ NoForward bool `json:"no_forward"`
}
-type ContainerSSHConnection struct {
+type ConnectionResponse struct {
Conn net.Conn `json:"-"`
Bufrw *bufio.ReadWriter `json:"-"`
Logger logrus.FieldLogger `json:"-"`
+ Header http.Header `json:"-"`
+}
+
+type ContainerGatewayTunnelOptions struct {
+ UUID string `json:"uuid"`
+ AuthSecret string `json:"auth_secret"`
}
type GetOptions struct {
ContainerDelete(ctx context.Context, options DeleteOptions) (Container, error)
ContainerLock(ctx context.Context, options GetOptions) (Container, error)
ContainerUnlock(ctx context.Context, options GetOptions) (Container, error)
- ContainerSSH(ctx context.Context, options ContainerSSHOptions) (ContainerSSHConnection, error)
+ ContainerSSH(ctx context.Context, options ContainerSSHOptions) (ConnectionResponse, error)
+ ContainerGatewayTunnel(ctx context.Context, options ContainerGatewayTunnelOptions) (ConnectionResponse, error)
ContainerRequestCreate(ctx context.Context, options CreateOptions) (ContainerRequest, error)
ContainerRequestUpdate(ctx context.Context, options UpdateOptions) (ContainerRequest, error)
ContainerRequestGet(ctx context.Context, options GetOptions) (ContainerRequest, error)
RuntimeToken string `json:"runtime_token"`
AuthUUID string `json:"auth_uuid"`
Log string `json:"log"`
+ Cost float64 `json:"cost"`
+ SubrequestsCost float64 `json:"subrequests_cost"`
}
// ContainerRequest is an arvados#container_request resource.
ContainerCount int `json:"container_count"`
OutputStorageClasses []string `json:"output_storage_classes"`
OutputProperties map[string]interface{} `json:"output_properties"`
+ CumulativeCost float64 `json:"cumulative_cost"`
}
// Mount is special behavior to attach to a filesystem path or device.
"github.com/sirupsen/logrus"
)
-func (sshconn ContainerSSHConnection) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+func (cresp ConnectionResponse) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ defer cresp.Conn.Close()
hj, ok := w.(http.Hijacker)
if !ok {
http.Error(w, "ResponseWriter does not support connection upgrade", http.StatusInternalServerError)
return
}
w.Header().Set("Connection", "upgrade")
- w.Header().Set("Upgrade", "ssh")
+ for k, v := range cresp.Header {
+ w.Header()[k] = v
+ }
w.WriteHeader(http.StatusSwitchingProtocols)
conn, bufrw, err := hj.Hijack()
if err != nil {
defer conn.Close()
var bytesIn, bytesOut int64
+ ctx, cancel := context.WithCancel(req.Context())
var wg sync.WaitGroup
- ctx, cancel := context.WithCancel(context.Background())
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
- n, err := io.CopyN(conn, sshconn.Bufrw, int64(sshconn.Bufrw.Reader.Buffered()))
+ n, err := io.CopyN(conn, cresp.Bufrw, int64(cresp.Bufrw.Reader.Buffered()))
bytesOut += n
if err == nil {
- n, err = io.Copy(conn, sshconn.Conn)
+ n, err = io.Copy(conn, cresp.Conn)
bytesOut += n
}
if err != nil {
- ctxlog.FromContext(req.Context()).WithError(err).Error("error copying downstream")
+ ctxlog.FromContext(ctx).WithError(err).Error("error copying downstream")
}
}()
wg.Add(1)
go func() {
defer wg.Done()
defer cancel()
- n, err := io.CopyN(sshconn.Conn, bufrw, int64(bufrw.Reader.Buffered()))
+ n, err := io.CopyN(cresp.Conn, bufrw, int64(bufrw.Reader.Buffered()))
bytesIn += n
if err == nil {
- n, err = io.Copy(sshconn.Conn, conn)
+ n, err = io.Copy(cresp.Conn, conn)
bytesIn += n
}
if err != nil {
- ctxlog.FromContext(req.Context()).WithError(err).Error("error copying upstream")
+ ctxlog.FromContext(ctx).WithError(err).Error("error copying upstream")
}
}()
<-ctx.Done()
- if sshconn.Logger != nil {
- go func() {
- wg.Wait()
- sshconn.Logger.WithFields(logrus.Fields{
+ go func() {
+ // Wait for both io.Copy goroutines to finish and increment
+ // their byte counters.
+ wg.Wait()
+ if cresp.Logger != nil {
+ cresp.Logger.WithFields(logrus.Fields{
"bytesIn": bytesIn,
"bytesOut": bytesOut,
}).Info("closed connection")
- }()
- }
+ }
+ }()
}
as.appendCall(ctx, as.ContainerUnlock, options)
return arvados.Container{}, as.Error
}
-func (as *APIStub) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ContainerSSHConnection, error) {
+func (as *APIStub) ContainerSSH(ctx context.Context, options arvados.ContainerSSHOptions) (arvados.ConnectionResponse, error) {
as.appendCall(ctx, as.ContainerSSH, options)
- return arvados.ContainerSSHConnection{}, as.Error
+ return arvados.ConnectionResponse{}, as.Error
+}
+func (as *APIStub) ContainerGatewayTunnel(ctx context.Context, options arvados.ContainerGatewayTunnelOptions) (arvados.ConnectionResponse, error) {
+ as.appendCall(ctx, as.ContainerGatewayTunnel, options)
+ return arvados.ConnectionResponse{}, as.Error
}
func (as *APIStub) ContainerRequestCreate(ctx context.Context, options arvados.CreateOptions) (arvados.ContainerRequest, error) {
as.appendCall(ctx, as.ContainerRequestCreate, options)
self.set_committed(False)
self.notify(DEL, self, pathcomponents[0], deleteditem)
else:
- item.remove(pathcomponents[1])
+ item.remove(pathcomponents[1], recursive=recursive)
def _clonefrom(self, source):
for k,v in listitems(source):
"JobsAPI": {
"GitInternalDir": os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'internal.git'),
},
+ "LocalKeepBlobBuffersPerVCPU": 0,
"SupportedDockerImageFormats": {"v1": {}},
"ShellAccess": {
"Admin": True,
with self.assertRaises(arvados.errors.ArgumentError):
c.remove("")
+ def test_remove_recursive(self):
+ c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:a/b/c/d/efg.txt 0:10:xyz.txt\n')
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:xyz.txt\n./a/b/c/d 781e5e245d69b566979b86e28d23f2c7+10 0:10:efg.txt\n", c.portable_manifest_text())
+ self.assertIn("a", c)
+ self.assertEqual(1, len(c["a"].keys()))
+ # cannot remove non-empty directory with default recursive=False
+ with self.assertRaises(OSError):
+ c.remove("a/b")
+ with self.assertRaises(OSError):
+ c.remove("a/b/c/d")
+ c.remove("a/b", recursive=True)
+ self.assertEqual(0, len(c["a"].keys()))
+ self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 0:10:xyz.txt\n./a d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", c.portable_manifest_text())
+
def test_find(self):
c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n')
self.assertIs(c.find("."), c)
included_by_uuid = {}
seen_last_class = false
+ error_by_class = {}
+ any_success = false
+
klasses.each do |klass|
# check if current klass is same as params['last_object_class']
seen_last_class = true if((params['count'].andand.==('none')) and
# Adjust the limit based on number of objects fetched so far
klass_limit = limit_all - all_objects.count
@limit = klass_limit
- apply_where_limit_order_params klass
+
+ begin
+ apply_where_limit_order_params klass
+ rescue ArgumentError => e
+ if e.inspect =~ /Invalid attribute '.+' for operator '.+' in filter/ or
+ e.inspect =~ /Invalid attribute '.+' for subproperty filter/
+ error_by_class[klass.name] = e
+ next
+ end
+ raise
+ else
+ any_success = true
+ end
# This actually fetches the objects
klass_object_list = object_list(model_class: klass)
end
end
+ # Only error out when every searchable object type errored out
+ if !any_success && error_by_class.size > 0
+ error_msg = error_by_class.collect do |klass, err|
+ "#{err} on object type #{klass}"
+ end.join("\n")
+ raise ArgumentError.new(error_msg)
+ end
+
if params["include"]
@extra_included = included_by_uuid.values
end
t.add :interactive_session_started
t.add :output_storage_classes
t.add :output_properties
+ t.add :cost
+ t.add :subrequests_cost
end
# Supported states for a container
def validate_change
permitted = [:state]
- progress_attrs = [:progress, :runtime_status, :log, :output, :output_properties, :exit_code]
final_attrs = [:finished_at]
+ progress_attrs = [:progress, :runtime_status, :subrequests_cost, :cost,
+ :log, :output, :output_properties, :exit_code]
if self.new_record?
permitted.push(:owner_uuid, :command, :container_image, :cwd,
permitted.push :priority
when Running
- permitted.push :priority, :output_properties, *progress_attrs
+ permitted.push :priority, :output_properties, :gateway_address, *progress_attrs
if self.state_changed?
- permitted.push :started_at, :gateway_address
+ permitted.push :started_at
end
if !self.interactive_session_started_was
permitted.push :interactive_session_started
when Running
permitted.push :finished_at, *progress_attrs
when Queued, Locked
- permitted.push :finished_at, :log, :runtime_status
+ permitted.push :finished_at, :log, :runtime_status, :cost
end
else
cr.with_lock do
leave_modified_by_user_alone do
# Use row locking because this increments container_count
+ cr.cumulative_cost += self.cost + self.subrequests_cost
cr.container_uuid = c.uuid
cr.save!
end
serialize :scheduling_parameters, Hash
after_find :fill_container_defaults_after_find
+ after_initialize { @state_was_when_initialized = self.state_was } # see finalize_if_needed
before_validation :fill_field_defaults, :if => :new_record?
before_validation :fill_container_defaults
validates :command, :container_image, :output_path, :cwd, :presence => true
t.add :use_existing
t.add :output_storage_classes
t.add :output_properties
+ t.add :cumulative_cost
end
# Supported states for a container request
def finalize!
container = Container.find_by_uuid(container_uuid)
if !container.nil?
+ # We don't want to add the container cost if the container was
+ # already finished when this CR was committed. But we are
+ # running in an after_save hook after a lock/reload, so
+ # state_was has already been updated to Committed regardless.
+ # Hence the need for @state_was_when_initialized.
+ if @state_was_when_initialized == Committed
+ # Add the final container cost to our cumulative cost (which
+ # may already be non-zero from previous attempts if
+ # container_count_max > 1).
+ self.cumulative_cost += container.cost + container.subrequests_cost
+ end
+
+ # Add our cumulative cost to the subrequests_cost of the
+ # requesting container, if any.
+ if self.requesting_container_uuid
+ Container.where(
+ uuid: self.requesting_container_uuid,
+ state: Container::Running,
+ ).each do |c|
+ c.subrequests_cost += self.cumulative_cost
+ c.save!
+ end
+ end
+
update_collections(container: container)
if container.state == Container::Complete
case self.state
when Committed
- permitted.push :priority, :container_count_max, :container_uuid
+ permitted.push :priority, :container_count_max, :container_uuid, :cumulative_cost
if self.priority.nil?
self.errors.add :priority, "cannot be nil"
when Final
if self.state_was == Committed
# "Cancel" means setting priority=0, state=Committed
- permitted.push :priority
+ permitted.push :priority, :cumulative_cost
if current_user.andand.is_admin
permitted.push :output_uuid, :log_uuid
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class AddCostToContainers < ActiveRecord::Migration[5.2]
+ def change
+ add_column :containers, :cost, :float, null: false, default: 0
+ add_column :containers, :subrequests_cost, :float, null: false, default: 0
+ add_column :container_requests, :cumulative_cost, :float, null: false, default: 0
+ end
+end
secret_mounts jsonb DEFAULT '{}'::jsonb,
runtime_token text,
output_storage_classes jsonb DEFAULT '["default"]'::jsonb,
- output_properties jsonb DEFAULT '{}'::jsonb
+ output_properties jsonb DEFAULT '{}'::jsonb,
+ cumulative_cost double precision DEFAULT 0.0 NOT NULL
);
gateway_address character varying,
interactive_session_started boolean DEFAULT false NOT NULL,
output_storage_classes jsonb DEFAULT '["default"]'::jsonb,
- output_properties jsonb DEFAULT '{}'::jsonb
+ output_properties jsonb DEFAULT '{}'::jsonb,
+ cost double precision DEFAULT 0.0 NOT NULL,
+ subrequests_cost double precision DEFAULT 0.0 NOT NULL
);
('20220401153101'),
('20220505112900'),
('20220726034131');
-
+('20220804133317');
raise ArgumentError.new("Invalid operator for subproperty search '#{operator}'")
end
elsif operator == "exists"
- if col.type != :jsonb
+ if col.nil? or col.type != :jsonb
raise ArgumentError.new("Invalid attribute '#{attr}' for operator '#{operator}' in filter")
end
json_response['errors'].join(' '))
end
+ test "groups contents with properties filter succeeds on objects with properties field" do
+ @controller = Arvados::V1::GroupsController.new
+ authorize_with :admin
+ get :contents, params: {
+ filters: [
+ ['properties', 'exists', 'foo'],
+ ['uuid', 'is_a', ["arvados#group","arvados#collection","arvados#containerRequest"]],
+ ]
+ }
+ assert_response 200
+ assert json_response['items'].length == 0
+ end
+
+ # Tests bug #19297
+ test "groups contents with properties filter succeeds on some objects with properties field" do
+ @controller = Arvados::V1::GroupsController.new
+ authorize_with :admin
+ get :contents, params: {
+ filters: [
+ ['properties', 'exists', 'foo'],
+ ['uuid', 'is_a', ["arvados#group","arvados#workflow"]],
+ ]
+ }
+ assert_response 200
+ assert json_response['items'].length == 0
+ end
+
+ # Tests bug #19297
+ test "groups contents with properties filter fails on objects without properties field" do
+ @controller = Arvados::V1::GroupsController.new
+ authorize_with :admin
+ get :contents, params: {
+ filters: [
+ ['properties', 'exists', 'foo'],
+ ['uuid', 'is_a', ["arvados#workflow"]],
+ ]
+ }
+ assert_response 422
+ assert_match(/Invalid attribute 'properties' for operator 'exists'.*on object type Workflow/, json_response['errors'].join(' '))
+ end
+
+ test "groups contents without filters and limit=0, count=none" do
+ @controller = Arvados::V1::GroupsController.new
+ authorize_with :admin
+ get :contents, params: {
+ limit: 0,
+ count: 'none',
+ }
+ assert_response 200
+ assert json_response['items'].length == 0
+ end
+
test "replication_desired = 2" do
@controller = Arvados::V1::CollectionsController.new
authorize_with :admin
act_as_system_user do
Container.find_by_uuid(cr.container_uuid).
- update_attributes!(state: Container::Cancelled)
+ update_attributes!(state: Container::Cancelled, cost: 1.25)
end
cr.reload
assert_equal "Final", cr.state
+ assert_equal 1.25, cr.cumulative_cost
assert_equal users(:active).uuid, cr.modified_by_user_uuid
end
log_pdh = 'fa7aeb5140e2848d39b416daeef4ffc5+45'
act_as_system_user do
c.update_attributes!(state: Container::Complete,
+ cost: 1.25,
output: output_pdh,
log: log_pdh)
end
cr.reload
assert_equal "Final", cr.state
+ assert_equal 1.25, cr.cumulative_cost
assert_equal users(:active).uuid, cr.modified_by_user_uuid
assert_not_nil cr.output_uuid
prev_container_uuid = cr.container_uuid
act_as_system_user do
+ c.update_attributes!(cost: 0.5, subrequests_cost: 1.25)
c.update_attributes!(state: Container::Cancelled)
end
c = act_as_system_user do
c = Container.find_by_uuid(cr.container_uuid)
+ c.update_attributes!(state: Container::Locked)
+ c.update_attributes!(state: Container::Running)
+ c.update_attributes!(cost: 0.125)
c.update_attributes!(state: Container::Cancelled)
c
end
assert_equal "Final", cr.state
assert_equal prev_container_uuid, cr.container_uuid
assert_not_equal cr2.container_uuid, cr.container_uuid
+ assert_equal 1.875, cr.cumulative_cost
end
test "Retry on container cancelled with runtime_token" do
end
end
+ test "Cumulative cost includes retried attempts but not reused containers" do
+ set_user_from_auth :active
+ cr = create_minimal_req!(priority: 5, state: "Committed", container_count_max: 3)
+ c = Container.find_by_uuid cr.container_uuid
+ act_as_system_user do
+ c.update_attributes!(state: Container::Locked)
+ c.update_attributes!(state: Container::Running)
+ c.update_attributes!(state: Container::Cancelled, cost: 3)
+ end
+ cr.reload
+ assert_equal 3, cr.cumulative_cost
+
+ c = Container.find_by_uuid cr.container_uuid
+ lock_and_run c
+ c.reload
+ assert_equal 0, c.subrequests_cost
+
+ # cr2 is a child/subrequest
+ cr2 = with_container_auth(c) do
+ create_minimal_req!(priority: 10, state: "Committed", container_count_max: 1, command: ["echo", "foo2"])
+ end
+ assert_equal c.uuid, cr2.requesting_container_uuid
+ c2 = Container.find_by_uuid cr2.container_uuid
+ act_as_system_user do
+ c2.update_attributes!(state: Container::Locked)
+ c2.update_attributes!(state: Container::Running)
+ logc = Collection.new(owner_uuid: system_user_uuid,
+ manifest_text: ". ef772b2f28e2c8ca84de45466ed19ee9+7815 0:0:arv-mount.txt\n")
+ logc.save!
+ c2.update_attributes!(state: Container::Complete,
+ exit_code: 0,
+ output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
+ log: logc.portable_data_hash,
+ cost: 7)
+ end
+ c.reload
+ assert_equal 7, c.subrequests_cost
+
+ # cr3 is an identical child/subrequest, will reuse c2
+ cr3 = with_container_auth(c) do
+ create_minimal_req!(priority: 10, state: "Committed", container_count_max: 1, command: ["echo", "foo2"])
+ end
+ assert_equal c.uuid, cr3.requesting_container_uuid
+ c3 = Container.find_by_uuid cr3.container_uuid
+ assert_equal c2.uuid, c3.uuid
+ assert_equal Container::Complete, c3.state
+ c.reload
+ assert_equal 7, c.subrequests_cost
+
+ act_as_system_user do
+ c.update_attributes!(state: Container::Complete, exit_code: 0, cost: 9)
+ end
+
+ c.reload
+ assert_equal 7, c.subrequests_cost
+ cr.reload
+ assert_equal 3+7+9, cr.cumulative_cost
+ end
+
end
Thread.current[:user] = auth.user
end
+ assert c.update_attributes(gateway_address: "127.0.0.1:9")
assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
assert c.update_attributes(runtime_status: {'warning' => 'something happened'})
assert c.update_attributes(progress: 0.5)
import (
"context"
+ "crypto/hmac"
+ "crypto/sha256"
"fmt"
"log"
"math"
crArgs := append([]string(nil), crunchRunCommand...)
crArgs = append(crArgs, "--runtime-engine="+disp.cluster.Containers.RuntimeEngine)
crArgs = append(crArgs, container.UUID)
- crScript := strings.NewReader(execScript(crArgs))
+
+ h := hmac.New(sha256.New, []byte(disp.cluster.SystemRootToken))
+ fmt.Fprint(h, container.UUID)
+ authsecret := fmt.Sprintf("%x", h.Sum(nil))
+
+ crScript := strings.NewReader(execScript(crArgs, map[string]string{"GatewayAuthSecret": authsecret}))
sbArgs, err := disp.sbatchArgs(container)
if err != nil {
"strings"
)
-func execScript(args []string) string {
- s := "#!/bin/sh\nexec"
+func execScript(args []string, env map[string]string) string {
+ s := "#!/bin/sh\n"
+ for k, v := range env {
+ s += k + `='`
+ s += strings.Replace(v, `'`, `'\''`, -1)
+ s += `' `
+ }
+ s += `exec`
for _, w := range args {
s += ` '`
s += strings.Replace(w, `'`, `'\''`, -1)
{[]string{`foo"`, "'waz 'qux\n"}, `exec 'foo"' ''\''waz '\''qux` + "\n" + `'`},
} {
c.Logf("%+v -> %+v", test.args, test.script)
- c.Check(execScript(test.args), Equals, "#!/bin/sh\n"+test.script+"\n")
+ c.Check(execScript(test.args, nil), Equals, "#!/bin/sh\n"+test.script+"\n")
}
+ c.Check(execScript([]string{"sh", "-c", "echo $foo"}, map[string]string{"foo": "b'ar"}), Equals, "#!/bin/sh\nfoo='b'\\''ar' exec 'sh' '-c' 'echo $foo'\n")
}
for _, c := range classes {
perclass[c] = 0
}
- for _, r := range bsm.get(blkid).Replicas {
+ bs, ok := bsm.entries[blkid]
+ if !ok {
+ return 0
+ }
+ for _, r := range bs.Replicas {
total += r.KeepMount.Replication
mntclasses := r.KeepMount.StorageClasses
if len(mntclasses) == 0 {
package keepbalance
import (
+ "sync"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, nil)
c.Check(n, check.Equals, 4)
}
+
+func (s *confirmedReplicationSuite) TestConcurrency(c *check.C) {
+ var wg sync.WaitGroup
+ for i := 1000; i < 1256; i++ {
+ i := i
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(i), knownBlkid(i)}, []string{"default"})
+ c.Check(n, check.Equals, 0)
+ }()
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10)}, []string{"default"})
+ c.Check(n, check.Equals, 1)
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20)}, []string{"default"})
+ c.Check(n, check.Equals, 2)
+ }()
+ }
+ wg.Wait()
+}
// Use about 1 goroutine per 2 CPUs. Based on experiments with
// a 2-core host, using more concurrent database
// calls/transactions makes this process slower, not faster.
- for i := 0; i < runtime.NumCPU()+1/2; i++ {
+ for i := 0; i < (runtime.NumCPU()+1)/2; i++ {
wg.Add(1)
goSendErr(errs, func() error {
defer wg.Done()
collection, filepath = h.determineCollection(fs, filepath)
}
if collection != nil {
- log = log.WithField("collection_uuid", collection.UUID).
- WithField("collection_file_path", filepath)
- props["collection_uuid"] = collection.UUID
+ log = log.WithField("collection_file_path", filepath)
props["collection_file_path"] = filepath
- // h.determineCollection populates the collection_uuid prop with the PDH, if
- // this collection is being accessed via PDH. In that case, blank the
- // collection_uuid field so that consumers of the log entries can rely on it
- // being a UUID, or blank. The PDH remains available via the
- // portable_data_hash property.
- if props["collection_uuid"] == collection.PortableDataHash {
- props["collection_uuid"] = ""
+ // h.determineCollection populates the collection_uuid
+ // prop with the PDH, if this collection is being
+ // accessed via PDH. For logging, we use a different
+ // field depending on whether it's a UUID or PDH.
+ if len(collection.UUID) > 32 {
+ log = log.WithField("portable_data_hash", collection.UUID)
+ props["portable_data_hash"] = collection.UUID
+ } else {
+ log = log.WithField("collection_uuid", collection.UUID)
+ props["collection_uuid"] = collection.UUID
}
}
if r.Method == "PUT" || r.Method == "POST" {
}
func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string) (*arvados.Collection, string) {
- segments := strings.Split(path, "/")
- var i int
- for i = 0; i < len(segments); i++ {
- dir := append([]string{}, segments[0:i]...)
- dir = append(dir, ".arvados#collection")
- f, err := fs.OpenFile(strings.Join(dir, "/"), os.O_RDONLY, 0)
- if f != nil {
- defer f.Close()
- }
+ target := strings.TrimSuffix(path, "/")
+ for {
+ fi, err := fs.Stat(target)
if err != nil {
- if !os.IsNotExist(err) {
+ return nil, ""
+ }
+ switch src := fi.Sys().(type) {
+ case *arvados.Collection:
+ return src, strings.TrimPrefix(path[len(target):], "/")
+ case *arvados.Group:
+ return nil, ""
+ default:
+ if _, ok := src.(error); ok {
return nil, ""
}
- continue
}
- // err is nil so we found it.
- decoder := json.NewDecoder(f)
- var collection arvados.Collection
- err = decoder.Decode(&collection)
- if err != nil {
+ // Try parent
+ cut := strings.LastIndexByte(target, '/')
+ if cut < 0 {
return nil, ""
}
- return &collection, strings.Join(segments[i:], "/")
+ target = target[:cut]
}
- return nil, ""
}
# installer.sh will log in to each of these nodes and then provision
# it for the specified roles.
NODES=(
- [localhost]=api,controller,websocket,dispatcher,keepbalance,keepstore,keepproxy,keepweb,workbench,workbench2,webshell
+ [localhost]=api,controller,websocket,dispatcher,keepbalance,keepstore,keepproxy,keepweb,workbench,workbench2,webshell,shell
)
# External ports used by the Arvados services
# installer.sh will log in to each of these nodes and then provision
# it for the specified roles.
NODES=(
- [localhost]=api,controller,websocket,dispatcher,keepbalance,keepstore,keepproxy,keepweb,workbench,workbench2,webshell
+ [localhost]=api,controller,websocket,dispatcher,keepbalance,keepstore,keepproxy,keepweb,workbench,workbench2,webshell,shell
)
# Set this value when installing a cluster in a single host with a single
echo "extra_custom_certs_dir: /srv/salt/certs" > ${P_DIR}/extra_custom_certs.sls
echo "extra_custom_certs:" >> ${P_DIR}/extra_custom_certs.sls
- for c in controller websocket workbench workbench2 webshell keepweb keepproxy; do
+ for c in controller websocket workbench workbench2 webshell keepweb keepproxy shell; do
# Are we in a single-host-single-hostname env?
if [ "${USE_SINGLE_HOSTNAME}" = "yes" ]; then
# Are we in a single-host-single-hostname env?