X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/19205c393b6a0565313831224a64a56563b21b42..4ca693ca220138a4bcc47b19b9766c9e3a4c6c1f:/daemons/ganeti-noded?ds=sidebyside diff --git a/daemons/ganeti-noded b/daemons/ganeti-noded index 959dcf7..ef78513 100755 --- a/daemons/ganeti-noded +++ b/daemons/ganeti-noded @@ -21,14 +21,16 @@ """Ganeti node daemon""" -# functions in this module need to have a given name structure, so: -# pylint: disable-msg=C0103 +# pylint: disable-msg=C0103,W0142 + +# C0103: Functions in this module need to have a given name structure, +# and the name of the daemon doesn't match + +# W0142: Used * or ** magic, since we do use it extensively in this +# module import os import sys -import traceback -import SocketServer -import errno import logging import signal @@ -42,13 +44,34 @@ from ganeti import jstore from ganeti import daemon from ganeti import http from ganeti import utils +from ganeti import storage +from ganeti import serializer -import ganeti.http.server +import ganeti.http.server # pylint: disable-msg=W0611 queue_lock = None +def _PrepareQueueLock(): + """Try to prepare the queue lock. + + @return: None for success, otherwise an exception object + + """ + global queue_lock # pylint: disable-msg=W0603 + + if queue_lock is not None: + return None + + # Prepare job queue + try: + queue_lock = jstore.InitAndVerifyQueue(must_lock=False) + return None + except EnvironmentError, err: + return err + + def _RequireJobQueueLock(fn): """Decorator for job queue manipulating functions. @@ -58,6 +81,9 @@ def _RequireJobQueueLock(fn): def wrapper(*args, **kwargs): # Locking in exclusive, blocking mode because there could be several # children running at the same time. Waiting up to 10 seconds. + if _PrepareQueueLock() is not None: + raise errors.JobQueueError("Job queue failed initialization," + " cannot update jobs") queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) try: return fn(*args, **kwargs) @@ -67,12 +93,30 @@ def _RequireJobQueueLock(fn): return wrapper +def _DecodeImportExportIO(ieio, ieioargs): + """Decodes import/export I/O information. + + """ + if ieio == constants.IEIO_RAW_DISK: + assert len(ieioargs) == 1 + return (objects.Disk.FromDict(ieioargs[0]), ) + + if ieio == constants.IEIO_SCRIPT: + assert len(ieioargs) == 2 + return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1]) + + return ieioargs + + class NodeHttpServer(http.server.HttpServer): """The server implementation. This class holds all methods exposed over the RPC interface. """ + # too many public methods, and unused args - all methods get params + # due to the API + # pylint: disable-msg=R0904,W0613 def __init__(self, *args, **kwargs): http.server.HttpServer.__init__(self, *args, **kwargs) self.noded_pid = os.getpid() @@ -93,14 +137,26 @@ class NodeHttpServer(http.server.HttpServer): raise http.HttpNotFound() try: - try: - return method(req.request_body) - except: - logging.exception("Error in RPC call") - raise + result = (True, method(serializer.LoadJson(req.request_body))) + + except backend.RPCFail, err: + # our custom failure exception; str(err) works fine if the + # exception was constructed with a single argument, and in + # this case, err.message == err.args[0] == str(err) + result = (False, str(err)) except errors.QuitGanetiException, err: # Tell parent to quit + logging.info("Shutting down the node daemon, arguments: %s", + str(err.args)) os.kill(self.noded_pid, signal.SIGTERM) + # And return the error's arguments, which must be already in + # correct tuple format + result = err.args + except Exception, err: + logging.exception("Error in RPC call") + result = (False, "Error while executing backend function: %s" % str(err)) + + return serializer.DumpJson(result, indent=False) # the new block devices -------------------------- @@ -113,7 +169,7 @@ class NodeHttpServer(http.server.HttpServer): bdev = objects.Disk.FromDict(bdev_s) if bdev is None: raise ValueError("can't unserialize data!") - return backend.CreateBlockDevice(bdev, size, owner, on_primary, info) + return backend.BlockdevCreate(bdev, size, owner, on_primary, info) @staticmethod def perspective_blockdev_remove(params): @@ -122,7 +178,7 @@ class NodeHttpServer(http.server.HttpServer): """ bdev_s = params[0] bdev = objects.Disk.FromDict(bdev_s) - return backend.RemoveBlockDevice(bdev) + return backend.BlockdevRemove(bdev) @staticmethod def perspective_blockdev_rename(params): @@ -130,7 +186,7 @@ class NodeHttpServer(http.server.HttpServer): """ devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params] - return backend.RenameBlockDevices(devlist) + return backend.BlockdevRename(devlist) @staticmethod def perspective_blockdev_assemble(params): @@ -141,7 +197,7 @@ class NodeHttpServer(http.server.HttpServer): bdev = objects.Disk.FromDict(bdev_s) if bdev is None: raise ValueError("can't unserialize data!") - return backend.AssembleBlockDevice(bdev, owner, on_primary) + return backend.BlockdevAssemble(bdev, owner, on_primary) @staticmethod def perspective_blockdev_shutdown(params): @@ -152,7 +208,7 @@ class NodeHttpServer(http.server.HttpServer): bdev = objects.Disk.FromDict(bdev_s) if bdev is None: raise ValueError("can't unserialize data!") - return backend.ShutdownBlockDevice(bdev) + return backend.BlockdevShutdown(bdev) @staticmethod def perspective_blockdev_addchildren(params): @@ -167,7 +223,7 @@ class NodeHttpServer(http.server.HttpServer): ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] if bdev is None or ndevs.count(None) > 0: raise ValueError("can't unserialize data!") - return backend.MirrorAddChildren(bdev, ndevs) + return backend.BlockdevAddchildren(bdev, ndevs) @staticmethod def perspective_blockdev_removechildren(params): @@ -182,7 +238,7 @@ class NodeHttpServer(http.server.HttpServer): ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] if bdev is None or ndevs.count(None) > 0: raise ValueError("can't unserialize data!") - return backend.MirrorRemoveChildren(bdev, ndevs) + return backend.BlockdevRemovechildren(bdev, ndevs) @staticmethod def perspective_blockdev_getmirrorstatus(params): @@ -190,8 +246,9 @@ class NodeHttpServer(http.server.HttpServer): """ disks = [objects.Disk.FromDict(dsk_s) - for dsk_s in params] - return backend.GetMirrorStatus(disks) + for dsk_s in params] + return [status.ToDict() + for status in backend.BlockdevGetmirrorstatus(disks)] @staticmethod def perspective_blockdev_find(params): @@ -201,7 +258,12 @@ class NodeHttpServer(http.server.HttpServer): """ disk = objects.Disk.FromDict(params[0]) - return backend.FindBlockDevice(disk) + + result = backend.BlockdevFind(disk) + if result is None: + return None + + return result.ToDict() @staticmethod def perspective_blockdev_snapshot(params): @@ -213,7 +275,7 @@ class NodeHttpServer(http.server.HttpServer): """ cfbd = objects.Disk.FromDict(params[0]) - return backend.SnapshotBlockDevice(cfbd) + return backend.BlockdevSnapshot(cfbd) @staticmethod def perspective_blockdev_grow(params): @@ -222,30 +284,73 @@ class NodeHttpServer(http.server.HttpServer): """ cfbd = objects.Disk.FromDict(params[0]) amount = params[1] - return backend.GrowBlockDevice(cfbd, amount) + return backend.BlockdevGrow(cfbd, amount) @staticmethod def perspective_blockdev_close(params): """Closes the given block devices. """ - disks = [objects.Disk.FromDict(cf) for cf in params] - return backend.CloseBlockDevices(disks) + disks = [objects.Disk.FromDict(cf) for cf in params[1]] + return backend.BlockdevClose(params[0], disks) - # export/import -------------------------- + @staticmethod + def perspective_blockdev_getsize(params): + """Compute the sizes of the given block devices. + + """ + disks = [objects.Disk.FromDict(cf) for cf in params[0]] + return backend.BlockdevGetsize(disks) @staticmethod - def perspective_snapshot_export(params): - """Export a given snapshot. + def perspective_blockdev_export(params): + """Compute the sizes of the given block devices. """ disk = objects.Disk.FromDict(params[0]) - dest_node = params[1] - instance = objects.Instance.FromDict(params[2]) - cluster_name = params[3] - dev_idx = params[4] - return backend.ExportSnapshot(disk, dest_node, instance, - cluster_name, dev_idx) + dest_node, dest_path, cluster_name = params[1:] + return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name) + + # blockdev/drbd specific methods ---------- + + @staticmethod + def perspective_drbd_disconnect_net(params): + """Disconnects the network connection of drbd disks. + + Note that this is only valid for drbd disks, so the members of the + disk list must all be drbd devices. + + """ + nodes_ip, disks = params + disks = [objects.Disk.FromDict(cf) for cf in disks] + return backend.DrbdDisconnectNet(nodes_ip, disks) + + @staticmethod + def perspective_drbd_attach_net(params): + """Attaches the network connection of drbd disks. + + Note that this is only valid for drbd disks, so the members of the + disk list must all be drbd devices. + + """ + nodes_ip, disks, instance_name, multimaster = params + disks = [objects.Disk.FromDict(cf) for cf in disks] + return backend.DrbdAttachNet(nodes_ip, disks, + instance_name, multimaster) + + @staticmethod + def perspective_drbd_wait_sync(params): + """Wait until DRBD disks are synched. + + Note that this is only valid for drbd disks, so the members of the + disk list must all be drbd devices. + + """ + nodes_ip, disks = params + disks = [objects.Disk.FromDict(cf) for cf in disks] + return backend.DrbdWaitSync(nodes_ip, disks) + + # export/import -------------------------- @staticmethod def perspective_finalize_export(params): @@ -253,8 +358,14 @@ class NodeHttpServer(http.server.HttpServer): """ instance = objects.Instance.FromDict(params[0]) - snap_disks = [objects.Disk.FromDict(str_data) - for str_data in params[1]] + + snap_disks = [] + for disk in params[1]: + if isinstance(disk, bool): + snap_disks.append(disk) + else: + snap_disks.append(objects.Disk.FromDict(disk)) + return backend.FinalizeExport(instance, snap_disks) @staticmethod @@ -266,10 +377,7 @@ class NodeHttpServer(http.server.HttpServer): """ path = params[0] - einfo = backend.ExportInfo(path) - if einfo is None: - return einfo - return einfo.Dumps() + return backend.ExportInfo(path) @staticmethod def perspective_export_list(params): @@ -293,7 +401,7 @@ class NodeHttpServer(http.server.HttpServer): # volume -------------------------- @staticmethod - def perspective_volume_list(params): + def perspective_lv_list(params): """Query the list of logical volumes in a given volume group. """ @@ -307,6 +415,32 @@ class NodeHttpServer(http.server.HttpServer): """ return backend.ListVolumeGroups() + # Storage -------------------------- + + @staticmethod + def perspective_storage_list(params): + """Get list of storage units. + + """ + (su_name, su_args, name, fields) = params + return storage.GetStorage(su_name, *su_args).List(name, fields) + + @staticmethod + def perspective_storage_modify(params): + """Modify a storage unit. + + """ + (su_name, su_args, name, changes) = params + return storage.GetStorage(su_name, *su_args).Modify(name, changes) + + @staticmethod + def perspective_storage_execute(params): + """Execute an operation on a storage unit. + + """ + (su_name, su_args, name, op) = params + return storage.GetStorage(su_name, *su_args).Execute(name, op) + # bridge -------------------------- @staticmethod @@ -326,26 +460,18 @@ class NodeHttpServer(http.server.HttpServer): """ inst_s = params[0] inst = objects.Instance.FromDict(inst_s) - return backend.AddOSToInstance(inst) + reinstall = params[1] + debug = params[2] + return backend.InstanceOsAdd(inst, reinstall, debug) @staticmethod def perspective_instance_run_rename(params): """Runs the OS rename script for an instance. """ - inst_s, old_name = params + inst_s, old_name, debug = params inst = objects.Instance.FromDict(inst_s) - return backend.RunRenameInstance(inst, old_name) - - @staticmethod - def perspective_instance_os_import(params): - """Run the import function of an OS onto a given instance. - - """ - inst_s, src_node, src_images, cluster_name = params - inst = objects.Instance.FromDict(inst_s) - return backend.ImportOSIntoInstance(inst, src_node, src_images, - cluster_name) + return backend.RunRenameInstance(inst, old_name, debug) @staticmethod def perspective_instance_shutdown(params): @@ -353,7 +479,8 @@ class NodeHttpServer(http.server.HttpServer): """ instance = objects.Instance.FromDict(params[0]) - return backend.ShutdownInstance(instance) + timeout = params[1] + return backend.InstanceShutdown(instance, timeout) @staticmethod def perspective_instance_start(params): @@ -361,8 +488,33 @@ class NodeHttpServer(http.server.HttpServer): """ instance = objects.Instance.FromDict(params[0]) - extra_args = params[1] - return backend.StartInstance(instance, extra_args) + return backend.StartInstance(instance) + + @staticmethod + def perspective_migration_info(params): + """Gather information about an instance to be migrated. + + """ + instance = objects.Instance.FromDict(params[0]) + return backend.MigrationInfo(instance) + + @staticmethod + def perspective_accept_instance(params): + """Prepare the node to accept an instance. + + """ + instance, info, target = params + instance = objects.Instance.FromDict(instance) + return backend.AcceptInstance(instance, info, target) + + @staticmethod + def perspective_finalize_migration(params): + """Finalize the instance migration. + + """ + instance, info, success = params + instance = objects.Instance.FromDict(instance) + return backend.FinalizeMigration(instance, info, success) @staticmethod def perspective_instance_migrate(params): @@ -380,8 +532,8 @@ class NodeHttpServer(http.server.HttpServer): """ instance = objects.Instance.FromDict(params[0]) reboot_type = params[1] - extra_args = params[2] - return backend.RebootInstance(instance, reboot_type, extra_args) + shutdown_timeout = params[2] + return backend.InstanceReboot(instance, reboot_type, shutdown_timeout) @staticmethod def perspective_instance_info(params): @@ -391,6 +543,14 @@ class NodeHttpServer(http.server.HttpServer): return backend.GetInstanceInfo(params[0], params[1]) @staticmethod + def perspective_instance_migratable(params): + """Query whether the specified instance can be migrated. + + """ + instance = objects.Instance.FromDict(params[0]) + return backend.GetInstanceMigratable(instance) + + @staticmethod def perspective_all_instances_info(params): """Query information about all instances. @@ -449,7 +609,7 @@ class NodeHttpServer(http.server.HttpServer): """Promote this node to master status. """ - return backend.StartMaster(params[0]) + return backend.StartMaster(params[0], params[1]) @staticmethod def perspective_node_stop_master(params): @@ -463,7 +623,7 @@ class NodeHttpServer(http.server.HttpServer): """Cleanup after leaving a cluster. """ - return backend.LeaveCluster() + return backend.LeaveCluster(params[0]) @staticmethod def perspective_node_volumes(params): @@ -472,6 +632,23 @@ class NodeHttpServer(http.server.HttpServer): """ return backend.NodeVolumes() + @staticmethod + def perspective_node_demote_from_mc(params): + """Demote a node from the master candidate role. + + """ + return backend.DemoteFromMC() + + + @staticmethod + def perspective_node_powercycle(params): + """Tries to powercycle the nod. + + """ + hypervisor_type = params[0] + return backend.PowercycleNode(hypervisor_type) + + # cluster -------------------------- @staticmethod @@ -513,7 +690,7 @@ class NodeHttpServer(http.server.HttpServer): """Query detailed information about existing OSes. """ - return [os.ToDict() for os in backend.DiagnoseOS()] + return backend.DiagnoseOS() @staticmethod def perspective_os_get(params): @@ -521,10 +698,7 @@ class NodeHttpServer(http.server.HttpServer): """ name = params[0] - try: - os_obj = backend.OSFromDisk(name) - except errors.InvalidOS, err: - os_obj = objects.OS.FromInvalidOS(err) + os_obj = backend.OSFromDisk(name) return os_obj.ToDict() # hooks ----------------------- @@ -557,7 +731,10 @@ class NodeHttpServer(http.server.HttpServer): """ duration = params[0] - return utils.TestDelay(duration) + status, rval = utils.TestDelay(duration) + if not status: + raise backend.RPCFail(rval) + return rval # file storage --------------- @@ -612,9 +789,8 @@ class NodeHttpServer(http.server.HttpServer): """Rename a job queue file. """ - (old, new) = params - - return backend.JobQueueRename(old, new) + # TODO: What if a file fails to rename? + return [backend.JobQueueRename(old, new) for old, new in params] @staticmethod def perspective_jobqueue_set_drain(params): @@ -635,101 +811,130 @@ class NodeHttpServer(http.server.HttpServer): (hvname, hvparams) = params return backend.ValidateHVParams(hvname, hvparams) + # Crypto -def ParseOptions(): - """Parse the command line options. + @staticmethod + def perspective_x509_cert_create(params): + """Creates a new X509 certificate for SSL/TLS. - Returns: - (options, args) as from OptionParser.parse_args() + """ + (validity, ) = params + return backend.CreateX509Certificate(validity) - """ - parser = OptionParser(description="Ganeti node daemon", - usage="%prog [-f] [-d]", - version="%%prog (ganeti) %s" % - constants.RELEASE_VERSION) + @staticmethod + def perspective_x509_cert_remove(params): + """Removes a X509 certificate. - parser.add_option("-f", "--foreground", dest="fork", - help="Don't detach from the current terminal", - default=True, action="store_false") - parser.add_option("-d", "--debug", dest="debug", - help="Enable some debug messages", - default=False, action="store_true") - options, args = parser.parse_args() - return options, args + """ + (name, ) = params + return backend.RemoveX509Certificate(name) + # Import and export -def EnsureRuntimeEnvironment(): - """Ensure our run-time environment is complete. + @staticmethod + def perspective_import_start(params): + """Starts an import daemon. - Currently this creates directories which could be missing, either - due to directories being on a tmpfs mount, or due to incomplete - packaging. + """ + (x509_key_name, source_x509_ca, instance, dest, dest_args) = params + return backend.StartImportExportDaemon(constants.IEM_IMPORT, + x509_key_name, source_x509_ca, + None, None, + objects.Instance.FromDict(instance), + dest, + _DecodeImportExportIO(dest, + dest_args)) + @staticmethod + def perspective_export_start(params): + """Starts an export daemon. - """ - dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS] - dirs.append((constants.LOG_OS_DIR, 0750)) - for dir_name, dir_mode in dirs: - if not os.path.exists(dir_name): - try: - os.mkdir(dir_name, dir_mode) - except EnvironmentError, err: - if err.errno != errno.EEXIST: - print ("Node setup wrong, cannot create directory '%s': %s" % - (dir_name, err)) - sys.exit(5) - if not os.path.isdir(dir_name): - print ("Node setup wrong, '%s' is not a directory" % dir_name) - sys.exit(5) + """ + (x509_key_name, dest_x509_ca, host, port, instance, + source, source_args) = params + return backend.StartImportExportDaemon(constants.IEM_EXPORT, + x509_key_name, dest_x509_ca, + host, port, + objects.Instance.FromDict(instance), + source, + _DecodeImportExportIO(source, + source_args)) + @staticmethod + def perspective_impexp_status(params): + """Retrieves the status of an import or export daemon. -def main(): - """Main function for the node daemon. + """ + return backend.GetImportExportStatus(params[0]) - """ - global queue_lock + @staticmethod + def perspective_impexp_abort(params): + """Aborts an import or export. - options, args = ParseOptions() - utils.debug = options.debug - for fname in (constants.SSL_CERT_FILE,): - if not os.path.isfile(fname): - print "config %s not there, will not run." % fname - sys.exit(5) + """ + return backend.AbortImportExport(params[0]) - try: - port = utils.GetNodeDaemonPort() - except errors.ConfigurationError, err: - print "Cluster configuration incomplete: '%s'" % str(err) - sys.exit(5) + @staticmethod + def perspective_impexp_cleanup(params): + """Cleans up after an import or export. + + """ + return backend.CleanupImportExport(params[0]) - EnsureRuntimeEnvironment() - # become a daemon - if options.fork: - utils.Daemonize(logfile=constants.LOG_NODESERVER) +def CheckNoded(_, args): + """Initial checks whether to run or exit with a failure. - utils.WritePidFile(constants.NODED_PID) - try: - utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug, - stderr_logging=not options.fork) - logging.info("ganeti node daemon startup") + """ + if args: # noded doesn't take any arguments + print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" % + sys.argv[0]) + sys.exit(constants.EXIT_FAILURE) - # Read SSL certificate - ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE, - ssl_cert_path=constants.SSL_CERT_FILE) - # Prepare job queue - queue_lock = jstore.InitAndVerifyQueue(must_lock=False) +def ExecNoded(options, _): + """Main node daemon function, executed with the PID file held. - mainloop = daemon.Mainloop() - server = NodeHttpServer(mainloop, "", port, - ssl_params=ssl_params, ssl_verify_peer=True) - server.Start() - try: - mainloop.Run() - finally: - server.Stop() + """ + # Read SSL certificate + if options.ssl: + ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key, + ssl_cert_path=options.ssl_cert) + else: + ssl_params = None + + err = _PrepareQueueLock() + if err is not None: + # this might be some kind of file-system/permission error; while + # this breaks the job queue functionality, we shouldn't prevent + # startup of the whole node daemon because of this + logging.critical("Can't init/verify the queue, proceeding anyway: %s", err) + + mainloop = daemon.Mainloop() + server = NodeHttpServer(mainloop, options.bind_address, options.port, + ssl_params=ssl_params, ssl_verify_peer=True) + server.Start() + try: + mainloop.Run() finally: - utils.RemovePidFile(constants.NODED_PID) + server.Stop() + + +def main(): + """Main function for the node daemon. + + """ + parser = OptionParser(description="Ganeti node daemon", + usage="%prog [-f] [-d] [-p port] [-b ADDRESS]", + version="%%prog (ganeti) %s" % + constants.RELEASE_VERSION) + dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS] + dirs.append((constants.LOG_OS_DIR, 0750)) + dirs.append((constants.LOCK_DIR, 1777)) + dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE)) + dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE)) + daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded, + default_ssl_cert=constants.NODED_CERT_FILE, + default_ssl_key=constants.NODED_CERT_FILE) if __name__ == '__main__':