Merge branch 'devel-2.1'
[ganeti-local] / daemons / ganeti-noded
index 7a5ab7a..732d681 100755 (executable)
@@ -108,6 +108,17 @@ def _DecodeImportExportIO(ieio, ieioargs):
   return ieioargs
 
 
+class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
+  """Custom Request Executor class that ensures NodeHttpServer children are
+  locked in ram.
+
+  """
+  def __init__(self, *args, **kwargs):
+    utils.Mlockall()
+
+    http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
+
+
 class NodeHttpServer(http.server.HttpServer):
   """The server implementation.
 
@@ -350,21 +361,14 @@ class NodeHttpServer(http.server.HttpServer):
     disks = [objects.Disk.FromDict(cf) for cf in disks]
     return backend.DrbdWaitSync(nodes_ip, disks)
 
-  # export/import  --------------------------
-
   @staticmethod
-  def perspective_snapshot_export(params):
-    """Export a given snapshot.
+  def perspective_drbd_helper(params):
+    """Query drbd helper.
 
     """
-    disk = objects.Disk.FromDict(params[0])
-    dest_node = params[1]
-    instance = objects.Instance.FromDict(params[2])
-    cluster_name = params[3]
-    dev_idx = params[4]
-    debug = params[5]
-    return backend.ExportSnapshot(disk, dest_node, instance,
-                                  cluster_name, dev_idx, debug)
+    return backend.GetDrbdUsermodeHelper()
+
+  # export/import  --------------------------
 
   @staticmethod
   def perspective_finalize_export(params):
@@ -488,16 +492,6 @@ class NodeHttpServer(http.server.HttpServer):
     return backend.RunRenameInstance(inst, old_name, debug)
 
   @staticmethod
-  def perspective_instance_os_import(params):
-    """Run the import function of an OS onto a given instance.
-
-    """
-    inst_s, src_node, src_images, cluster_name, debug = params
-    inst = objects.Instance.FromDict(inst_s)
-    return backend.ImportOSIntoInstance(inst, src_node, src_images,
-                                        cluster_name, debug)
-
-  @staticmethod
   def perspective_instance_shutdown(params):
     """Shutdown an instance.
 
@@ -725,6 +719,14 @@ class NodeHttpServer(http.server.HttpServer):
     os_obj = backend.OSFromDisk(name)
     return os_obj.ToDict()
 
+  @staticmethod
+  def perspective_os_validate(params):
+    """Run a given OS' validation routine.
+
+    """
+    required, name, checks, params = params
+    return backend.ValidateOS(required, name, checks, params)
+
   # hooks -----------------------
 
   @staticmethod
@@ -816,15 +818,6 @@ class NodeHttpServer(http.server.HttpServer):
     # TODO: What if a file fails to rename?
     return [backend.JobQueueRename(old, new) for old, new in params]
 
-  @staticmethod
-  def perspective_jobqueue_set_drain(params):
-    """Set/unset the queue drain flag.
-
-    """
-    drain_flag = params[0]
-    return backend.JobQueueSetDrainFlag(drain_flag)
-
-
   # hypervisor ---------------
 
   @staticmethod
@@ -838,7 +831,7 @@ class NodeHttpServer(http.server.HttpServer):
   # Crypto
 
   @staticmethod
-  def perspective_create_x509_certificate(params):
+  def perspective_x509_cert_create(params):
     """Creates a new X509 certificate for SSL/TLS.
 
     """
@@ -846,7 +839,7 @@ class NodeHttpServer(http.server.HttpServer):
     return backend.CreateX509Certificate(validity)
 
   @staticmethod
-  def perspective_remove_x509_certificate(params):
+  def perspective_x509_cert_remove(params):
     """Removes a X509 certificate.
 
     """
@@ -856,27 +849,31 @@ class NodeHttpServer(http.server.HttpServer):
   # Import and export
 
   @staticmethod
-  def perspective_start_import_listener(params):
+  def perspective_import_start(params):
     """Starts an import daemon.
 
     """
-    (x509_key_name, source_x509_ca, instance, dest, dest_args) = params
-    return backend.StartImportExportDaemon(constants.IEM_IMPORT,
-                                           x509_key_name, source_x509_ca,
+    (opts_s, instance, dest, dest_args) = params
+
+    opts = objects.ImportExportOptions.FromDict(opts_s)
+
+    return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
                                            None, None,
                                            objects.Instance.FromDict(instance),
                                            dest,
                                            _DecodeImportExportIO(dest,
                                                                  dest_args))
+
   @staticmethod
-  def perspective_start_export(params):
+  def perspective_export_start(params):
     """Starts an export daemon.
 
     """
-    (x509_key_name, dest_x509_ca, host, port, instance,
-     source, source_args) = params
-    return backend.StartImportExportDaemon(constants.IEM_EXPORT,
-                                           x509_key_name, dest_x509_ca,
+    (opts_s, host, port, instance, source, source_args) = params
+
+    opts = objects.ImportExportOptions.FromDict(opts_s)
+
+    return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
                                            host, port,
                                            objects.Instance.FromDict(instance),
                                            source,
@@ -884,14 +881,21 @@ class NodeHttpServer(http.server.HttpServer):
                                                                  source_args))
 
   @staticmethod
-  def perspective_get_import_export_status(params):
+  def perspective_impexp_status(params):
     """Retrieves the status of an import or export daemon.
 
     """
     return backend.GetImportExportStatus(params[0])
 
   @staticmethod
-  def perspective_cleanup_import_export(params):
+  def perspective_impexp_abort(params):
+    """Aborts an import or export.
+
+    """
+    return backend.AbortImportExport(params[0])
+
+  @staticmethod
+  def perspective_impexp_cleanup(params):
     """Cleans up after an import or export.
 
     """
@@ -912,6 +916,16 @@ def ExecNoded(options, _):
   """Main node daemon function, executed with the PID file held.
 
   """
+  if options.mlock:
+    request_executor_class = MlockallRequestExecutor
+    try:
+      utils.Mlockall()
+    except errors.NoCtypesError:
+      logging.warning("Cannot set memory lock, ctypes module not found")
+      request_executor_class = http.server.HttpServerRequestExecutor
+  else:
+    request_executor_class = http.server.HttpServerRequestExecutor
+
   # Read SSL certificate
   if options.ssl:
     ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
@@ -928,7 +942,8 @@ def ExecNoded(options, _):
 
   mainloop = daemon.Mainloop()
   server = NodeHttpServer(mainloop, options.bind_address, options.port,
-                          ssl_params=ssl_params, ssl_verify_peer=True)
+                          ssl_params=ssl_params, ssl_verify_peer=True,
+                          request_executor_class=request_executor_class)
   server.Start()
   try:
     mainloop.Run()
@@ -944,6 +959,10 @@ def main():
                         usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
                         version="%%prog (ganeti) %s" %
                         constants.RELEASE_VERSION)
+  parser.add_option("--no-mlock", dest="mlock",
+                    help="Do not mlock the node memory in ram",
+                    default=True, action="store_false")
+
   dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
   dirs.append((constants.LOG_OS_DIR, 0750))
   dirs.append((constants.LOCK_DIR, 1777))
@@ -951,7 +970,8 @@ def main():
   dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
   daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
                      default_ssl_cert=constants.NODED_CERT_FILE,
-                     default_ssl_key=constants.NODED_CERT_FILE)
+                     default_ssl_key=constants.NODED_CERT_FILE,
+                     console_logging=True)
 
 
 if __name__ == '__main__':