+func (ks *keepstore) remoteClient(remoteID string, remoteCluster arvados.RemoteCluster, token string) (*keepclient.KeepClient, error) {
+ ks.remoteClientsMtx.Lock()
+ kc, ok := ks.remoteClients[remoteID]
+ ks.remoteClientsMtx.Unlock()
+ if !ok {
+ c := &arvados.Client{
+ APIHost: remoteCluster.Host,
+ AuthToken: "xxx",
+ Insecure: remoteCluster.Insecure,
+ }
+ ac, err := arvadosclient.New(c)
+ if err != nil {
+ return nil, err
+ }
+ kc, err = keepclient.MakeKeepClient(ac)
+ if err != nil {
+ return nil, err
+ }
+ kc.DiskCacheSize = keepclient.DiskCacheDisabled
+
+ ks.remoteClientsMtx.Lock()
+ ks.remoteClients[remoteID] = kc
+ ks.remoteClientsMtx.Unlock()
+ }
+ accopy := *kc.Arvados
+ accopy.ApiToken = token
+ kccopy := kc.Clone()
+ kccopy.Arvados = &accopy
+ token, err := auth.SaltToken(token, remoteID)
+ if err != nil {
+ return nil, err
+ }
+ kccopy.Arvados.ApiToken = token
+ return kccopy, nil
+}
+
+// BlockWrite writes a block to one or more volumes.
+func (ks *keepstore) BlockWrite(ctx context.Context, opts arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+ var resp arvados.BlockWriteResponse
+ var hash string
+ if opts.Data == nil {
+ buf, err := ks.bufferPool.GetContext(ctx)
+ if err != nil {
+ return resp, err
+ }
+ defer ks.bufferPool.Put(buf)
+ w := bytes.NewBuffer(buf[:0])
+ h := md5.New()
+ limitedReader := &io.LimitedReader{R: opts.Reader, N: BlockSize}
+ n, err := io.Copy(io.MultiWriter(w, h), limitedReader)
+ if err != nil {
+ return resp, err
+ }
+ if limitedReader.N == 0 {
+ // Data size is either exactly BlockSize, or too big.
+ n, err := opts.Reader.Read(make([]byte, 1))
+ if n > 0 {
+ return resp, httpserver.ErrorWithStatus(err, http.StatusRequestEntityTooLarge)
+ }
+ if err != io.EOF {
+ return resp, err
+ }
+ }
+ opts.Data = buf[:n]
+ if opts.DataSize != 0 && int(n) != opts.DataSize {
+ return resp, httpserver.ErrorWithStatus(fmt.Errorf("content length %d did not match specified data size %d", n, opts.DataSize), http.StatusBadRequest)
+ }
+ hash = fmt.Sprintf("%x", h.Sum(nil))
+ } else {
+ hash = fmt.Sprintf("%x", md5.Sum(opts.Data))
+ }
+ if opts.Hash != "" && !strings.HasPrefix(opts.Hash, hash) {
+ return resp, httpserver.ErrorWithStatus(fmt.Errorf("content hash %s did not match specified locator %s", hash, opts.Hash), http.StatusBadRequest)
+ }
+ rvzmounts := ks.rendezvous(hash, ks.mountsW)
+ result := newPutProgress(opts.StorageClasses)
+ for _, mnt := range rvzmounts {
+ if !result.Want(mnt) {
+ continue
+ }
+ cmp := &checkEqual{Expect: opts.Data}
+ if err := mnt.BlockRead(ctx, hash, cmp); err == nil {
+ if !cmp.Equal() {
+ return resp, errCollision
+ }
+ err := mnt.BlockTouch(hash)
+ if err == nil {
+ result.Add(mnt)
+ }
+ }
+ }
+ var allFull atomic.Bool
+ allFull.Store(true)
+ // pending tracks what result will be if all outstanding
+ // writes succeed.
+ pending := result.Copy()
+ cond := sync.NewCond(new(sync.Mutex))
+ cond.L.Lock()
+ var wg sync.WaitGroup
+nextmnt:
+ for _, mnt := range rvzmounts {
+ for {
+ if result.Done() || ctx.Err() != nil {
+ break nextmnt
+ }
+ if !result.Want(mnt) {
+ continue nextmnt
+ }
+ if pending.Want(mnt) {
+ break
+ }
+ // This mount might not be needed, depending
+ // on the outcome of pending writes. Wait for
+ // a pending write to finish, then check
+ // again.
+ cond.Wait()
+ }
+ mnt := mnt
+ logger := ks.logger.WithField("mount", mnt.UUID)
+ pending.Add(mnt)
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ logger.Debug("start write")
+ err := mnt.BlockWrite(ctx, hash, opts.Data)
+ cond.L.Lock()
+ defer cond.L.Unlock()
+ defer cond.Broadcast()
+ if err != nil {
+ logger.Debug("write failed")
+ pending.Sub(mnt)
+ if err != errFull {
+ allFull.Store(false)
+ }
+ } else {
+ result.Add(mnt)
+ pending.Sub(mnt)
+ }
+ }()
+ }
+ cond.L.Unlock()
+ wg.Wait()
+ if ctx.Err() != nil {
+ return resp, ctx.Err()
+ }
+ if result.Done() || result.totalReplication > 0 {
+ resp = arvados.BlockWriteResponse{
+ Locator: ks.signLocator(ctxToken(ctx), fmt.Sprintf("%s+%d", hash, len(opts.Data))),
+ Replicas: result.totalReplication,
+ StorageClasses: result.classDone,
+ }
+ return resp, nil
+ }
+ if allFull.Load() {
+ return resp, errFull
+ }
+ return resp, errVolumeUnavailable
+}
+
+// rendezvous sorts the given mounts by descending priority, then by
+// rendezvous order for the given locator.
+func (*keepstore) rendezvous(locator string, mnts []*mount) []*mount {
+ hash := locator
+ if len(hash) > 32 {
+ hash = hash[:32]
+ }
+ // copy the provided []*mount before doing an in-place sort
+ mnts = append([]*mount(nil), mnts...)
+ weight := make(map[*mount]string)
+ for _, mnt := range mnts {
+ uuidpart := mnt.UUID
+ if len(uuidpart) == 27 {
+ // strip zzzzz-yyyyy- prefixes
+ uuidpart = uuidpart[12:]
+ }
+ weight[mnt] = fmt.Sprintf("%x", md5.Sum([]byte(hash+uuidpart)))
+ }
+ sort.Slice(mnts, func(i, j int) bool {
+ if p := mnts[i].priority - mnts[j].priority; p != 0 {
+ return p > 0
+ }
+ return weight[mnts[i]] < weight[mnts[j]]
+ })
+ return mnts
+}
+
+// checkEqual reports whether the data written to it (via io.WriterAt
+// interface) is equal to the expected data.