X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/195c7f914657e46ce26900311b0ac74df99d8be2..ba55d062da8dfb89a37afc2f13f2e689d0094829:/daemons/ganeti-noded?ds=sidebyside diff --git a/daemons/ganeti-noded b/daemons/ganeti-noded index d03b49d..1367f22 100755 --- a/daemons/ganeti-noded +++ b/daemons/ganeti-noded @@ -27,87 +27,85 @@ import os import sys import traceback -import BaseHTTPServer -import simplejson +import SocketServer import errno +import logging +import signal from optparse import OptionParser - from ganeti import backend -from ganeti import logger from ganeti import constants from ganeti import objects from ganeti import errors -from ganeti import ssconf +from ganeti import jstore +from ganeti import daemon +from ganeti import http from ganeti import utils +import ganeti.http.server + + +queue_lock = None + + +def _RequireJobQueueLock(fn): + """Decorator for job queue manipulating functions. + + """ + QUEUE_LOCK_TIMEOUT = 10 + + def wrapper(*args, **kwargs): + # Locking in exclusive, blocking mode because there could be several + # children running at the same time. Waiting up to 10 seconds. + queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT) + try: + return fn(*args, **kwargs) + finally: + queue_lock.Unlock() + + return wrapper + -class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): +class NodeHttpServer(http.server.HttpServer): """The server implementation. This class holds all methods exposed over the RPC interface. """ - def do_PUT(self): - """Handle a post request. + def __init__(self, *args, **kwargs): + http.server.HttpServer.__init__(self, *args, **kwargs) + self.noded_pid = os.getpid() + + def HandleRequest(self, req): + """Handle a request. """ - path = self.path + if req.request_method.upper() != http.HTTP_PUT: + raise http.HttpBadRequest() + + path = req.request_path if path.startswith("/"): path = path[1:] - mname = "perspective_%s" % path - if not hasattr(self, mname): - self.send_error(404) - return False - method = getattr(self, mname) - try: - body_length = int(self.headers.get('Content-Length', '0')) - except ValueError: - self.send_error(400, 'No Content-Length header or invalid format') - return False + method = getattr(self, "perspective_%s" % path, None) + if method is None: + raise http.HttpNotFound() try: - body = self.rfile.read(body_length) - except socket.error, err: - logger.Error("Socket error while reading: %s" % str(err)) - return - try: - params = simplejson.loads(body) - result = method(params) - payload = simplejson.dumps(result) + try: + return method(req.request_body) + except backend.RPCFail, err: + # our custom failure exception; str(err) works fine if the + # exception was constructed with a single argument, and in + # this case, err.message == err.args[0] == str(err) + return (False, str(err)) + except: + logging.exception("Error in RPC call") + raise except errors.QuitGanetiException, err: - global _EXIT_GANETI_NODED - _EXIT_GANETI_NODED = True - if isinstance(err.args, tuple) and len(err.args) == 2: - if err.args[0]: - self.send_error(500, "Error: %s" % str(err[1])) - else: - payload = simplejson.dumps(err.args[1]) - else: - self.log_message('QuitGanetiException Usage Error') - self.send_error(500, "Error: %s" % str(err)) - except Exception, err: - self.send_error(500, "Error: %s" % str(err)) - return False - self.send_response(200) - self.send_header('Content-Length', str(len(payload))) - self.end_headers() - self.wfile.write(payload) - return True - - def log_message(self, format, *args): - """Log a request to the log. - - This is the same as the parent, we just log somewhere else. - - """ - msg = ("%s - - [%s] %s" % - (self.address_string(), - self.log_date_time_string(), - format % args)) - logger.Debug(msg) + # Tell parent to quit + os.kill(self.noded_pid, signal.SIGTERM) # the new block devices -------------------------- @@ -120,7 +118,7 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): bdev = objects.Disk.FromDict(bdev_s) if bdev is None: raise ValueError("can't unserialize data!") - return backend.CreateBlockDevice(bdev, size, owner, on_primary, info) + return backend.BlockdevCreate(bdev, size, owner, on_primary, info) @staticmethod def perspective_blockdev_remove(params): @@ -129,7 +127,7 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """ bdev_s = params[0] bdev = objects.Disk.FromDict(bdev_s) - return backend.RemoveBlockDevice(bdev) + return backend.BlockdevRemove(bdev) @staticmethod def perspective_blockdev_rename(params): @@ -137,7 +135,7 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """ devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params] - return backend.RenameBlockDevices(devlist) + return backend.BlockdevRename(devlist) @staticmethod def perspective_blockdev_assemble(params): @@ -148,7 +146,7 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): bdev = objects.Disk.FromDict(bdev_s) if bdev is None: raise ValueError("can't unserialize data!") - return backend.AssembleBlockDevice(bdev, owner, on_primary) + return backend.BlockdevAssemble(bdev, owner, on_primary) @staticmethod def perspective_blockdev_shutdown(params): @@ -159,7 +157,7 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): bdev = objects.Disk.FromDict(bdev_s) if bdev is None: raise ValueError("can't unserialize data!") - return backend.ShutdownBlockDevice(bdev) + return backend.BlockdevShutdown(bdev) @staticmethod def perspective_blockdev_addchildren(params): @@ -174,7 +172,7 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] if bdev is None or ndevs.count(None) > 0: raise ValueError("can't unserialize data!") - return backend.MirrorAddChildren(bdev, ndevs) + return backend.BlockdevAddchildren(bdev, ndevs) @staticmethod def perspective_blockdev_removechildren(params): @@ -189,7 +187,7 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] if bdev is None or ndevs.count(None) > 0: raise ValueError("can't unserialize data!") - return backend.MirrorRemoveChildren(bdev, ndevs) + return backend.BlockdevRemovechildren(bdev, ndevs) @staticmethod def perspective_blockdev_getmirrorstatus(params): @@ -198,7 +196,7 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """ disks = [objects.Disk.FromDict(dsk_s) for dsk_s in params] - return backend.GetMirrorStatus(disks) + return backend.BlockdevGetmirrorstatus(disks) @staticmethod def perspective_blockdev_find(params): @@ -208,7 +206,7 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """ disk = objects.Disk.FromDict(params[0]) - return backend.FindBlockDevice(disk) + return backend.BlockdevFind(disk) @staticmethod def perspective_blockdev_snapshot(params): @@ -220,7 +218,7 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """ cfbd = objects.Disk.FromDict(params[0]) - return backend.SnapshotBlockDevice(cfbd) + return backend.BlockdevSnapshot(cfbd) @staticmethod def perspective_blockdev_grow(params): @@ -229,15 +227,54 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """ cfbd = objects.Disk.FromDict(params[0]) amount = params[1] - return backend.GrowBlockDevice(cfbd, amount) + return backend.BlockdevGrow(cfbd, amount) @staticmethod def perspective_blockdev_close(params): """Closes the given block devices. """ - disks = [objects.Disk.FromDict(cf) for cf in params] - return backend.CloseBlockDevices(disks) + disks = [objects.Disk.FromDict(cf) for cf in params[1]] + return backend.BlockdevClose(params[0], disks) + + # blockdev/drbd specific methods ---------- + + @staticmethod + def perspective_drbd_disconnect_net(params): + """Disconnects the network connection of drbd disks. + + Note that this is only valid for drbd disks, so the members of the + disk list must all be drbd devices. + + """ + nodes_ip, disks = params + disks = [objects.Disk.FromDict(cf) for cf in disks] + return backend.DrbdDisconnectNet(nodes_ip, disks) + + @staticmethod + def perspective_drbd_attach_net(params): + """Attaches the network connection of drbd disks. + + Note that this is only valid for drbd disks, so the members of the + disk list must all be drbd devices. + + """ + nodes_ip, disks, instance_name, multimaster = params + disks = [objects.Disk.FromDict(cf) for cf in disks] + return backend.DrbdAttachNet(nodes_ip, disks, + instance_name, multimaster) + + @staticmethod + def perspective_drbd_wait_sync(params): + """Wait until DRBD disks are synched. + + Note that this is only valid for drbd disks, so the members of the + disk list must all be drbd devices. + + """ + nodes_ip, disks = params + disks = [objects.Disk.FromDict(cf) for cf in disks] + return backend.DrbdWaitSync(nodes_ip, disks) # export/import -------------------------- @@ -249,7 +286,10 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): disk = objects.Disk.FromDict(params[0]) dest_node = params[1] instance = objects.Instance.FromDict(params[2]) - return backend.ExportSnapshot(disk, dest_node, instance) + cluster_name = params[3] + dev_idx = params[4] + return backend.ExportSnapshot(disk, dest_node, instance, + cluster_name, dev_idx) @staticmethod def perspective_finalize_export(params): @@ -328,28 +368,29 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """Install an OS on a given instance. """ - inst_s, os_disk, swap_disk = params + inst_s = params[0] inst = objects.Instance.FromDict(inst_s) - return backend.AddOSToInstance(inst, os_disk, swap_disk) + reinstall = params[1] + return backend.InstanceOsAdd(inst, reinstall) @staticmethod def perspective_instance_run_rename(params): """Runs the OS rename script for an instance. """ - inst_s, old_name, os_disk, swap_disk = params + inst_s, old_name = params inst = objects.Instance.FromDict(inst_s) - return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk) + return backend.RunRenameInstance(inst, old_name) @staticmethod def perspective_instance_os_import(params): """Run the import function of an OS onto a given instance. """ - inst_s, os_disk, swap_disk, src_node, src_image = params + inst_s, src_node, src_images, cluster_name = params inst = objects.Instance.FromDict(inst_s) - return backend.ImportOSIntoInstance(inst, os_disk, swap_disk, - src_node, src_image) + return backend.ImportOSIntoInstance(inst, src_node, src_images, + cluster_name) @staticmethod def perspective_instance_shutdown(params): @@ -357,7 +398,7 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """ instance = objects.Instance.FromDict(params[0]) - return backend.ShutdownInstance(instance) + return backend.InstanceShutdown(instance) @staticmethod def perspective_instance_start(params): @@ -365,8 +406,33 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """ instance = objects.Instance.FromDict(params[0]) - extra_args = params[1] - return backend.StartInstance(instance, extra_args) + return backend.StartInstance(instance) + + @staticmethod + def perspective_migration_info(params): + """Gather information about an instance to be migrated. + + """ + instance = objects.Instance.FromDict(params[0]) + return backend.MigrationInfo(instance) + + @staticmethod + def perspective_accept_instance(params): + """Prepare the node to accept an instance. + + """ + instance, info, target = params + instance = objects.Instance.FromDict(instance) + return backend.AcceptInstance(instance, info, target) + + @staticmethod + def perspective_finalize_migration(params): + """Finalize the instance migration. + + """ + instance, info, success = params + instance = objects.Instance.FromDict(instance) + return backend.FinalizeMigration(instance, info, success) @staticmethod def perspective_instance_migrate(params): @@ -374,6 +440,7 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """ instance, target, live = params + instance = objects.Instance.FromDict(instance) return backend.MigrateInstance(instance, target, live) @staticmethod @@ -383,29 +450,36 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """ instance = objects.Instance.FromDict(params[0]) reboot_type = params[1] - extra_args = params[2] - return backend.RebootInstance(instance, reboot_type, extra_args) + return backend.InstanceReboot(instance, reboot_type) @staticmethod def perspective_instance_info(params): """Query instance information. """ - return backend.GetInstanceInfo(params[0]) + return backend.GetInstanceInfo(params[0], params[1]) + + @staticmethod + def perspective_instance_migratable(params): + """Query whether the specified instance can be migrated. + + """ + instance = objects.Instance.FromDict(params[0]) + return backend.GetInstanceMigratable(instance) @staticmethod def perspective_all_instances_info(params): """Query information about all instances. """ - return backend.GetAllInstancesInfo() + return backend.GetAllInstancesInfo(params[0]) @staticmethod def perspective_instance_list(params): """Query the list of running instances. """ - return backend.GetInstanceList() + return backend.GetInstanceList(params[0]) # node -------------------------- @@ -418,12 +492,19 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): live_port_needed=params[4], source=params[0]) @staticmethod + def perspective_node_has_ip_address(params): + """Checks if a node has the given ip address. + + """ + return utils.OwnIpAddress(params[0]) + + @staticmethod def perspective_node_info(params): """Query node information. """ - vgname = params[0] - return backend.GetNodeInfo(vgname) + vgname, hypervisor_type = params + return backend.GetNodeInfo(vgname, hypervisor_type) @staticmethod def perspective_node_add(params): @@ -438,21 +519,21 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """Run a verify sequence on this node. """ - return backend.VerifyNode(params[0]) + return backend.VerifyNode(params[0], params[1]) @staticmethod def perspective_node_start_master(params): """Promote this node to master status. """ - return backend.StartMaster() + return backend.StartMaster(params[0]) @staticmethod def perspective_node_stop_master(params): """Demote this node from master status. """ - return backend.StopMaster() + return backend.StopMaster(params[0]) @staticmethod def perspective_node_leave_cluster(params): @@ -468,6 +549,23 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """ return backend.NodeVolumes() + @staticmethod + def perspective_node_demote_from_mc(params): + """Demote a node from the master candidate role. + + """ + return backend.DemoteFromMC() + + + @staticmethod + def perspective_node_powercycle(params): + """Tries to powercycle the nod. + + """ + hypervisor_type = params[0] + return backend.PowercycleNode(hypervisor_type) + + # cluster -------------------------- @staticmethod @@ -487,6 +585,20 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """ return backend.UploadFile(*params) + @staticmethod + def perspective_master_info(params): + """Query master information. + + """ + return backend.GetMasterInfo() + + @staticmethod + def perspective_write_ssconf_files(params): + """Write ssconf files. + + """ + (values,) = params + return backend.WriteSsconfFiles(values) # os ----------------------- @@ -495,7 +607,7 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): """Query detailed information about existing OSes. """ - return [os.ToDict() for os in backend.DiagnoseOS()] + return [os_obj.ToDict() for os_obj in backend.DiagnoseOS()] @staticmethod def perspective_os_get(params): @@ -541,6 +653,8 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): duration = params[0] return utils.TestDelay(duration) + # file storage --------------- + @staticmethod def perspective_file_storage_dir_create(params): """Create the file storage directory. @@ -567,16 +681,62 @@ class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): return backend.RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir) + # jobs ------------------------ + + @staticmethod + @_RequireJobQueueLock + def perspective_jobqueue_update(params): + """Update job queue. + + """ + (file_name, content) = params + return backend.JobQueueUpdate(file_name, content) + + @staticmethod + @_RequireJobQueueLock + def perspective_jobqueue_purge(params): + """Purge job queue. + + """ + return backend.JobQueuePurge() + + @staticmethod + @_RequireJobQueueLock + def perspective_jobqueue_rename(params): + """Rename a job queue file. + + """ + # TODO: What if a file fails to rename? + return [backend.JobQueueRename(old, new) for old, new in params] + + @staticmethod + def perspective_jobqueue_set_drain(params): + """Set/unset the queue drain flag. + + """ + drain_flag = params[0] + return backend.JobQueueSetDrainFlag(drain_flag) + + + # hypervisor --------------- + + @staticmethod + def perspective_hypervisor_validate_params(params): + """Validate the hypervisor parameters. + + """ + (hvname, hvparams) = params + return backend.ValidateHVParams(hvname, hvparams) + def ParseOptions(): """Parse the command line options. - Returns: - (options, args) as from OptionParser.parse_args() + @return: (options, args) as from OptionParser.parse_args() """ parser = OptionParser(description="Ganeti node daemon", - usage="%prog [-f] [-d]", + usage="%prog [-f] [-d] [-b ADDRESS]", version="%%prog (ganeti) %s" % constants.RELEASE_VERSION) @@ -586,6 +746,10 @@ def ParseOptions(): parser.add_option("-d", "--debug", dest="debug", help="Enable some debug messages", default=False, action="store_true") + parser.add_option("-b", "--bind", dest="bind_address", + help="Bind address", + default="", metavar="ADDRESS") + options, args = parser.parse_args() return options, args @@ -594,48 +758,57 @@ def main(): """Main function for the node daemon. """ + global queue_lock + options, args = ParseOptions() utils.debug = options.debug + + if options.fork: + utils.CloseFDs() + for fname in (constants.SSL_CERT_FILE,): if not os.path.isfile(fname): print "config %s not there, will not run." % fname sys.exit(5) try: - ss = ssconf.SimpleStore() - port = ss.GetNodeDaemonPort() - pwdata = ss.GetNodeDaemonPassword() + port = utils.GetNodeDaemonPort() except errors.ConfigurationError, err: print "Cluster configuration incomplete: '%s'" % str(err) sys.exit(5) - # create the various SUB_RUN_DIRS, if not existing, so that we handle the - # situation where RUN_DIR is tmpfs - for dir_name in constants.SUB_RUN_DIRS: - if not os.path.exists(dir_name): - try: - os.mkdir(dir_name, 0755) - except EnvironmentError, err: - if err.errno != errno.EEXIST: - print ("Node setup wrong, cannot create directory %s: %s" % - (dir_name, err)) - sys.exit(5) - if not os.path.isdir(dir_name): - print ("Node setup wrong, %s is not a directory" % dir_name) - sys.exit(5) + dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS] + dirs.append((constants.LOG_OS_DIR, 0750)) + dirs.append((constants.LOCK_DIR, 1777)) + utils.EnsureDirs(dirs) # become a daemon if options.fork: utils.Daemonize(logfile=constants.LOG_NODESERVER) - logger.SetupLogging(program="ganeti-noded", debug=options.debug) + utils.WritePidFile(constants.NODED_PID) + try: + utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug, + stderr_logging=not options.fork) + logging.info("ganeti node daemon startup") + + # Read SSL certificate + ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE, + ssl_cert_path=constants.SSL_CERT_FILE) - global _EXIT_GANETI_NODED - _EXIT_GANETI_NODED = False + # Prepare job queue + queue_lock = jstore.InitAndVerifyQueue(must_lock=False) - httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject) - while (not _EXIT_GANETI_NODED): - httpd.handle_request() + mainloop = daemon.Mainloop() + server = NodeHttpServer(mainloop, options.bind_address, port, + ssl_params=ssl_params, ssl_verify_peer=True) + server.Start() + try: + mainloop.Run() + finally: + server.Stop() + finally: + utils.RemovePidFile(constants.NODED_PID) if __name__ == '__main__':