return ieioargs
+class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
+ """Custom Request Executor class that ensures NodeHttpServer children are
+ locked in ram.
+
+ """
+ def __init__(self, *args, **kwargs):
+ utils.Mlockall()
+
+ http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
+
+
class NodeHttpServer(http.server.HttpServer):
"""The server implementation.
disks = [objects.Disk.FromDict(cf) for cf in disks]
return backend.DrbdWaitSync(nodes_ip, disks)
- # export/import --------------------------
-
@staticmethod
- def perspective_snapshot_export(params):
- """Export a given snapshot.
+ def perspective_drbd_helper(params):
+ """Query drbd helper.
"""
- disk = objects.Disk.FromDict(params[0])
- dest_node = params[1]
- instance = objects.Instance.FromDict(params[2])
- cluster_name = params[3]
- dev_idx = params[4]
- debug = params[5]
- return backend.ExportSnapshot(disk, dest_node, instance,
- cluster_name, dev_idx, debug)
+ return backend.GetDrbdUsermodeHelper()
+
+ # export/import --------------------------
@staticmethod
def perspective_finalize_export(params):
return backend.RunRenameInstance(inst, old_name, debug)
@staticmethod
- def perspective_instance_os_import(params):
- """Run the import function of an OS onto a given instance.
-
- """
- inst_s, src_node, src_images, cluster_name, debug = params
- inst = objects.Instance.FromDict(inst_s)
- return backend.ImportOSIntoInstance(inst, src_node, src_images,
- cluster_name, debug)
-
- @staticmethod
def perspective_instance_shutdown(params):
"""Shutdown an instance.
os_obj = backend.OSFromDisk(name)
return os_obj.ToDict()
+ @staticmethod
+ def perspective_os_validate(params):
+ """Run a given OS' validation routine.
+
+ """
+ required, name, checks, params = params
+ return backend.ValidateOS(required, name, checks, params)
+
# hooks -----------------------
@staticmethod
# TODO: What if a file fails to rename?
return [backend.JobQueueRename(old, new) for old, new in params]
- @staticmethod
- def perspective_jobqueue_set_drain(params):
- """Set/unset the queue drain flag.
-
- """
- drain_flag = params[0]
- return backend.JobQueueSetDrainFlag(drain_flag)
-
-
# hypervisor ---------------
@staticmethod
# Crypto
@staticmethod
- def perspective_create_x509_certificate(params):
+ def perspective_x509_cert_create(params):
"""Creates a new X509 certificate for SSL/TLS.
"""
return backend.CreateX509Certificate(validity)
@staticmethod
- def perspective_remove_x509_certificate(params):
+ def perspective_x509_cert_remove(params):
"""Removes a X509 certificate.
"""
# Import and export
@staticmethod
- def perspective_start_import_listener(params):
+ def perspective_import_start(params):
"""Starts an import daemon.
"""
- (x509_key_name, source_x509_ca, instance, dest, dest_args) = params
- return backend.StartImportExportDaemon(constants.IEM_IMPORT,
- x509_key_name, source_x509_ca,
+ (opts_s, instance, dest, dest_args) = params
+
+ opts = objects.ImportExportOptions.FromDict(opts_s)
+
+ return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
None, None,
objects.Instance.FromDict(instance),
dest,
_DecodeImportExportIO(dest,
dest_args))
+
@staticmethod
- def perspective_start_export(params):
+ def perspective_export_start(params):
"""Starts an export daemon.
"""
- (x509_key_name, dest_x509_ca, host, port, instance,
- source, source_args) = params
- return backend.StartImportExportDaemon(constants.IEM_EXPORT,
- x509_key_name, dest_x509_ca,
+ (opts_s, host, port, instance, source, source_args) = params
+
+ opts = objects.ImportExportOptions.FromDict(opts_s)
+
+ return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
host, port,
objects.Instance.FromDict(instance),
source,
source_args))
@staticmethod
- def perspective_get_import_export_status(params):
+ def perspective_impexp_status(params):
"""Retrieves the status of an import or export daemon.
"""
return backend.GetImportExportStatus(params[0])
@staticmethod
- def perspective_cleanup_import_export(params):
+ def perspective_impexp_abort(params):
+ """Aborts an import or export.
+
+ """
+ return backend.AbortImportExport(params[0])
+
+ @staticmethod
+ def perspective_impexp_cleanup(params):
"""Cleans up after an import or export.
"""
"""Main node daemon function, executed with the PID file held.
"""
+ if options.mlock:
+ request_executor_class = MlockallRequestExecutor
+ try:
+ utils.Mlockall()
+ except errors.NoCtypesError:
+ logging.warning("Cannot set memory lock, ctypes module not found")
+ request_executor_class = http.server.HttpServerRequestExecutor
+ else:
+ request_executor_class = http.server.HttpServerRequestExecutor
+
# Read SSL certificate
if options.ssl:
ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
mainloop = daemon.Mainloop()
server = NodeHttpServer(mainloop, options.bind_address, options.port,
- ssl_params=ssl_params, ssl_verify_peer=True)
+ ssl_params=ssl_params, ssl_verify_peer=True,
+ request_executor_class=request_executor_class)
server.Start()
try:
mainloop.Run()
usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
version="%%prog (ganeti) %s" %
constants.RELEASE_VERSION)
+ parser.add_option("--no-mlock", dest="mlock",
+ help="Do not mlock the node memory in ram",
+ default=True, action="store_false")
+
dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
dirs.append((constants.LOG_OS_DIR, 0750))
dirs.append((constants.LOCK_DIR, 1777))
dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
default_ssl_cert=constants.NODED_CERT_FILE,
- default_ssl_key=constants.NODED_CERT_FILE)
+ default_ssl_key=constants.NODED_CERT_FILE,
+ console_logging=True)
if __name__ == '__main__':