+func (conn *Conn) checkProperties(ctx context.Context, properties interface{}) error {
+ if properties == nil {
+ return nil
+ }
+ var props map[string]interface{}
+ switch properties := properties.(type) {
+ case string:
+ err := json.Unmarshal([]byte(properties), &props)
+ if err != nil {
+ return err
+ }
+ case map[string]interface{}:
+ props = properties
+ default:
+ return fmt.Errorf("unexpected properties type %T", properties)
+ }
+ voc, err := conn.VocabularyGet(ctx)
+ if err != nil {
+ return err
+ }
+ err = voc.Check(props)
+ if err != nil {
+ return httpErrorf(http.StatusBadRequest, voc.Check(props).Error())
+ }
+ return nil
+}
+
+func (conn *Conn) maybeRefreshVocabularyCache() error {
+ if conn.lastVocabularyRefreshCheck.Add(time.Second).After(time.Now()) {
+ // Throttle the access to disk to at most once per second.
+ return nil
+ }
+ conn.lastVocabularyRefreshCheck = time.Now()
+ fi, err := os.Stat(conn.cluster.API.VocabularyPath)
+ if err != nil {
+ err = fmt.Errorf("couldn't stat vocabulary file %q: %v", conn.cluster.API.VocabularyPath, err)
+ conn.lastVocabularyError = err
+ return err
+ }
+ if fi.ModTime().After(conn.vocabularyFileModTime) {
+ err = conn.loadVocabularyFile()
+ if err != nil {
+ conn.lastVocabularyError = err
+ return err
+ }
+ conn.vocabularyFileModTime = fi.ModTime()
+ conn.lastVocabularyError = nil
+ }
+ return nil
+}
+
+func (conn *Conn) loadVocabularyFile() error {
+ vf, err := os.ReadFile(conn.cluster.API.VocabularyPath)
+ if err != nil {
+ return fmt.Errorf("couldn't reading the vocabulary file: %v", err)
+ }
+ mk := make([]string, 0, len(conn.cluster.Collections.ManagedProperties))
+ for k := range conn.cluster.Collections.ManagedProperties {
+ mk = append(mk, k)
+ }
+ voc, err := arvados.NewVocabulary(vf, mk)
+ if err != nil {
+ return fmt.Errorf("while loading vocabulary file %q: %s", conn.cluster.API.VocabularyPath, err)
+ }
+ err = voc.Validate()
+ if err != nil {
+ return fmt.Errorf("while validating vocabulary file %q: %s", conn.cluster.API.VocabularyPath, err)
+ }
+ conn.vocabularyCache = voc
+ return nil
+}
+
+// LastVocabularyError returns the last error encountered while loading the
+// vocabulary file.
+func (conn *Conn) LastVocabularyError() error {
+ conn.maybeRefreshVocabularyCache()
+ return conn.lastVocabularyError
+}
+
+// VocabularyGet refreshes the vocabulary cache if necessary and returns it.
+func (conn *Conn) VocabularyGet(ctx context.Context) (arvados.Vocabulary, error) {
+ if conn.cluster.API.VocabularyPath == "" {
+ return arvados.Vocabulary{
+ Tags: map[string]arvados.VocabularyTag{},
+ }, nil
+ }
+ logger := ctxlog.FromContext(ctx)
+ if conn.vocabularyCache == nil {
+ // Initial load of vocabulary file.
+ err := conn.loadVocabularyFile()
+ if err != nil {
+ logger.WithError(err).Error("error loading vocabulary file")
+ return arvados.Vocabulary{}, err
+ }
+ }
+ err := conn.maybeRefreshVocabularyCache()
+ if err != nil {
+ logger.WithError(err).Error("error reloading vocabulary file - ignoring")
+ } else {
+ logger.Info("vocabulary file reloaded successfully")
+ }
+ return *conn.vocabularyCache, nil
+}
+