Added ability to disable server by specifying port 0. Started keeping api reponse...
authorMisha Zatsman <misha@curoverse.com>
Tue, 8 Apr 2014 17:30:27 +0000 (17:30 +0000)
committerMisha Zatsman <misha@curoverse.com>
Tue, 8 Apr 2014 17:30:27 +0000 (17:30 +0000)
services/datamanager/datamanager.py

index ae9fd0bea1c5d2581c56304c659976386f2c5e98..a5931b6838d204dbbca2eb24582aef213accf919 100755 (executable)
@@ -28,7 +28,13 @@ def fileSizeFormat(value):
 def byteSizeFromValidUuid(valid_uuid):
   return int(valid_uuid.split('+')[1])
 
+class maxdict(dict):
+  """A dictionary that holds the largest value entered for each key."""
+  def addValue(self, key, value):
+    dict.__setitem__(self, key, max(dict.get(self, key), value))
+
 class CollectionInfo:
+  DEFAULT_PERSISTER_REPLICATION_LEVEL=2
   all_by_uuid = {}
 
   def __init__(self, uuid):
@@ -38,6 +44,11 @@ class CollectionInfo:
     self.block_uuids = set()  # uuids of keep blocks in this collection
     self.reader_uuids = set()  # uuids of users who can read this collection
     self.persister_uuids = set()  # uuids of users who want this collection saved
+    # map from user uuid to replication level they desire
+    self.persister_replication = maxdict()
+
+    # The whole api response in case we need anything else later.
+    self.api_response = []
     CollectionInfo.all_by_uuid[uuid] = self
 
   def byteSize(self):
@@ -47,12 +58,12 @@ class CollectionInfo:
     return ('CollectionInfo uuid: %s\n'
             '               %d block(s) containing %s\n'
             '               reader_uuids: %s\n'
-            '               persister_uuids: %s' %
+            '               persister_replication: %s' %
             (self.uuid,
              len(self.block_uuids),
              fileSizeFormat(self.byteSize()),
              pprint.pformat(self.reader_uuids, indent = 15),
-             pprint.pformat(self.persister_uuids, indent = 15)))
+             pprint.pformat(self.persister_replication, indent = 15)))
 
   @staticmethod
   def get(uuid):
@@ -98,6 +109,7 @@ def readCollections(collection_uuids):
     collection_block_uuids = set()
     collection_response = arv.collections().get(uuid=collection_uuid).execute()
     collection_info = CollectionInfo.get(collection_uuid)
+    collection_info.api_response = collection_response
     manifest_lines = collection_response['manifest_text'].split('\n')
 
     if args.verbose:
@@ -138,6 +150,12 @@ def readLinks():
       if link['link_class'] == 'permission':
         collection_info.reader_uuids.add(link['tail_uuid'])
       elif link['link_class'] == 'resources':
+        replication_level = link['properties'].get(
+          'replication',
+          CollectionInfo.DEFAULT_PERSISTER_REPLICATION_LEVEL)
+        collection_info.persister_replication.addValue(
+          link['tail_uuid'],
+          replication_level)
         collection_info.persister_uuids.add(link['tail_uuid'])
 
   print 'Found the following link classes:'
@@ -146,7 +164,7 @@ def readLinks():
 def reportMostPopularCollections():
   most_popular_collections = sorted(
     CollectionInfo.all_by_uuid.values(),
-    key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_uuids),
+    key=lambda info: len(info.reader_uuids) + 10 * len(info.persister_replication),
     reverse=True)[:10]
 
   print 'Most popular Collections:'
@@ -257,7 +275,7 @@ parser.add_argument('-p',
                     '--port',
                     type=int,
                     default=9090,
-                    help=('The port number to serve on.'))
+                    help=('The port number to serve on. 0 means no server.'))
 parser.add_argument('-v',
                     '--verbose',
                     help='increase output verbosity',
@@ -453,9 +471,32 @@ class DataManagerHandler(BaseHTTPRequestHandler):
                        fileSizeFormat(collection.byteSize()))
       self.wfile.write('<P>Readers: %s\n' %
                        ', '.join(map(self.userLink, collection.reader_uuids)))
-      self.wfile.write('<P>Persisters: %s\n' %
-                       ', '.join(map(self.userLink,
-                                     collection.persister_uuids)))
+
+      if len(collection.persister_replication) == 0:
+        self.wfile.write('<P>No persisters\n')
+      else:
+        replication_to_users = defaultdict(set)
+        for user,replication in collection.persister_replication.items():
+          replication_to_users[replication].add(user)
+        replication_levels = sorted(replication_to_users.keys())
+
+        self.wfile.write('<P>%d persisters in %d replication levels maxing '
+                         'out at %dx replication:\n' %
+                         (len(collection.persister_replication),
+                          len(replication_levels),
+                          replication_levels[-1]))
+
+        # TODO(misha): This code is used twice, let's move it to a method.
+        self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
+                         '<TH>'.join(['Replication Level ' + str(x)
+                                      for x in replication_levels]))
+        self.wfile.write('<TR>\n')
+        for replication_level in replication_levels:
+          users = replication_to_users[replication_level]
+          self.wfile.write('<TD valign="top">%s\n' % '<BR>\n'.join(
+              map(self.userLink, users)))
+        self.wfile.write('</TR></TABLE>\n')
+
       replication_to_blocks = defaultdict(set)
       for block in collection.block_uuids:
         replication_to_blocks[block_to_replication[block]].add(block)
@@ -463,7 +504,8 @@ class DataManagerHandler(BaseHTTPRequestHandler):
       self.wfile.write('<P>%d blocks in %d replication level(s):\n' %
                        (len(collection.block_uuids), len(replication_levels)))
       self.wfile.write('<TABLE><TR><TH>%s</TR>\n' %
-                       '<TH>'.join(['Replication Level ' + str(x) for x in replication_levels]))
+                       '<TH>'.join(['Replication Level ' + str(x)
+                                    for x in replication_levels]))
       self.wfile.write('<TR>\n')
       for replication_level in replication_levels:
         blocks = replication_to_blocks[replication_level]
@@ -497,8 +539,11 @@ class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
 
 #if __name__ == '__main__':
 
-loader = threading.Thread(target = loadAllData, name = 'loader')
-loader.start()
+if args.port == 0:
+  loadAllData()
+else:
+  loader = threading.Thread(target = loadAllData, name = 'loader')
+  loader.start()
 
-server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
-server.serve_forever()
+  server = ThreadedHTTPServer(('localhost', args.port), DataManagerHandler)
+  server.serve_forever()