X-Git-Url: https://code.grnet.gr/git/ganeti-local/blobdiff_plain/098c0958281c48228fe57eb7832153dafefbd07c..9ae49f2753cdd158edb910029f7d08c257d32ad0:/daemons/ganeti-noded diff --git a/daemons/ganeti-noded b/daemons/ganeti-noded index f630005..5388660 100755 --- a/daemons/ganeti-noded +++ b/daemons/ganeti-noded @@ -21,10 +21,15 @@ """Ganeti node daemon""" +# functions in this module need to have a given name structure, so: +# pylint: disable-msg=C0103 + import os import sys -import resource import traceback +import BaseHTTPServer +import simplejson +import errno from optparse import OptionParser @@ -35,261 +40,532 @@ from ganeti import constants from ganeti import objects from ganeti import errors from ganeti import ssconf +from ganeti import utils -from twisted.spread import pb -from twisted.internet import reactor -from twisted.cred import checkers, portal -from OpenSSL import SSL - - -class ServerContextFactory: - def getContext(self): - ctx = SSL.Context(SSL.TLSv1_METHOD) - ctx.use_certificate_file(constants.SSL_CERT_FILE) - ctx.use_privatekey_file(constants.SSL_CERT_FILE) - return ctx - -class ServerObject(pb.Avatar): - def __init__(self, name): - self.name = name - - def perspectiveMessageReceived(self, broker, message, args, kw): - """This method is called when a network message is received. - I will call:: +class ServerObject(BaseHTTPServer.BaseHTTPRequestHandler): + """The server implementation. - | self.perspective_%(message)s(*broker.unserialize(args), - | **broker.unserialize(kw)) + This class holds all methods exposed over the RPC interface. - to handle the method; subclasses of Avatar are expected to - implement methods of this naming convention. + """ + def do_PUT(self): + """Handle a post request. """ - args = broker.unserialize(args, self) - kw = broker.unserialize(kw, self) - method = getattr(self, "perspective_%s" % message) - tb = None - state = None + path = self.path + if path.startswith("/"): + path = path[1:] + mname = "perspective_%s" % path + if not hasattr(self, mname): + self.send_error(404) + return False + + method = getattr(self, mname) try: - state = method(*args, **kw) - except: - tb = traceback.format_exc() + body_length = int(self.headers.get('Content-Length', '0')) + except ValueError: + self.send_error(400, 'No Content-Length header or invalid format') + return False - return broker.serialize((tb, state), self, method, args, kw) + try: + body = self.rfile.read(body_length) + except socket.error, err: + logger.Error("Socket error while reading: %s" % str(err)) + return + try: + params = simplejson.loads(body) + result = method(params) + payload = simplejson.dumps(result) + except errors.QuitGanetiException, err: + global _EXIT_GANETI_NODED + _EXIT_GANETI_NODED = True + if isinstance(err, tuple) and len(err) == 2: + if err[0]: + self.send_error(500, "Error: %s" % str(err[1])) + else: + payload = simplejson.dumps(err[1]) + else: + self.log_message('GanetiQuitException Usage Error') + self.send_error(500, "Error: %s" % str(err)) + except Exception, err: + self.send_error(500, "Error: %s" % str(err)) + return False + self.send_response(200) + self.send_header('Content-Length', str(len(payload))) + self.end_headers() + self.wfile.write(payload) + return True + + def log_message(self, format, *args): + """Log a request to the log. + + This is the same as the parent, we just log somewhere else. + + """ + msg = ("%s - - [%s] %s" % + (self.address_string(), + self.log_date_time_string(), + format % args)) + logger.Debug(msg) # the new block devices -------------------------- - def perspective_blockdev_create(self, params): - bdev_s, size, on_primary = params - bdev = objects.ConfigObject.Loads(bdev_s) + @staticmethod + def perspective_blockdev_create(params): + """Create a block device. + + """ + bdev_s, size, owner, on_primary, info = params + bdev = objects.Disk.FromDict(bdev_s) if bdev is None: raise ValueError("can't unserialize data!") - return backend.CreateBlockDevice(bdev, size, on_primary) + return backend.CreateBlockDevice(bdev, size, owner, on_primary, info) + @staticmethod + def perspective_blockdev_remove(params): + """Remove a block device. - def perspective_blockdev_remove(self, params): + """ bdev_s = params[0] - bdev = objects.ConfigObject.Loads(bdev_s) + bdev = objects.Disk.FromDict(bdev_s) return backend.RemoveBlockDevice(bdev) + @staticmethod + def perspective_blockdev_rename(params): + """Remove a block device. + + """ + devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params] + return backend.RenameBlockDevices(devlist) + + @staticmethod + def perspective_blockdev_assemble(params): + """Assemble a block device. - def perspective_blockdev_assemble(self, params): - bdev_s, on_primary = params - bdev = objects.ConfigObject.Loads(bdev_s) + """ + bdev_s, owner, on_primary = params + bdev = objects.Disk.FromDict(bdev_s) if bdev is None: raise ValueError("can't unserialize data!") - return backend.AssembleBlockDevice(bdev, on_primary) + return backend.AssembleBlockDevice(bdev, owner, on_primary) + @staticmethod + def perspective_blockdev_shutdown(params): + """Shutdown a block device. - def perspective_blockdev_shutdown(self, params): + """ bdev_s = params[0] - bdev = objects.ConfigObject.Loads(bdev_s) + bdev = objects.Disk.FromDict(bdev_s) if bdev is None: raise ValueError("can't unserialize data!") return backend.ShutdownBlockDevice(bdev) + @staticmethod + def perspective_blockdev_addchildren(params): + """Add a child to a mirror device. + + Note: this is only valid for mirror devices. It's the caller's duty + to send a correct disk, otherwise we raise an error. - def perspective_blockdev_addchild(self, params): + """ bdev_s, ndev_s = params - bdev = objects.ConfigObject.Loads(bdev_s) - ndev = objects.ConfigObject.Loads(ndev_s) - if bdev is None or ndev is None: + bdev = objects.Disk.FromDict(bdev_s) + ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] + if bdev is None or ndevs.count(None) > 0: raise ValueError("can't unserialize data!") - return backend.MirrorAddChild(bdev, ndev) + return backend.MirrorAddChildren(bdev, ndevs) + + @staticmethod + def perspective_blockdev_removechildren(params): + """Remove a child from a mirror device. + This is only valid for mirror devices, of course. It's the callers + duty to send a correct disk, otherwise we raise an error. - def perspective_blockdev_removechild(self, params): + """ bdev_s, ndev_s = params - bdev = objects.ConfigObject.Loads(bdev_s) - ndev = objects.ConfigObject.Loads(ndev_s) - if bdev is None or ndev is None: + bdev = objects.Disk.FromDict(bdev_s) + ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s] + if bdev is None or ndevs.count(None) > 0: raise ValueError("can't unserialize data!") - return backend.MirrorRemoveChild(bdev, ndev) + return backend.MirrorRemoveChildren(bdev, ndevs) + + @staticmethod + def perspective_blockdev_getmirrorstatus(params): + """Return the mirror status for a list of disks. - def perspective_blockdev_getmirrorstatus(self, params): - disks = [objects.ConfigObject.Loads(dsk_s) + """ + disks = [objects.Disk.FromDict(dsk_s) for dsk_s in params] return backend.GetMirrorStatus(disks) - def perspective_blockdev_find(self, params): - disk = objects.ConfigObject.Loads(params[0]) + @staticmethod + def perspective_blockdev_find(params): + """Expose the FindBlockDevice functionality for a disk. + + This will try to find but not activate a disk. + + """ + disk = objects.Disk.FromDict(params[0]) return backend.FindBlockDevice(disk) - def perspective_blockdev_snapshot(self, params): - cfbd = objects.ConfigObject.Loads(params[0]) + @staticmethod + def perspective_blockdev_snapshot(params): + """Create a snapshot device. + + Note that this is only valid for LVM disks, if we get passed + something else we raise an exception. The snapshot device can be + remove by calling the generic block device remove call. + + """ + cfbd = objects.Disk.FromDict(params[0]) return backend.SnapshotBlockDevice(cfbd) + @staticmethod + def perspective_blockdev_grow(params): + """Grow a stack of devices. + + """ + cfbd = objects.Disk.FromDict(params[0]) + amount = params[1] + return backend.GrowBlockDevice(cfbd, amount) + + @staticmethod + def perspective_blockdev_close(params): + """Closes the given block devices. + + """ + disks = [objects.Disk.FromDict(cf) for cf in params] + return backend.CloseBlockDevices(disks) + # export/import -------------------------- - def perspective_snapshot_export(self, params): - disk = objects.ConfigObject.Loads(params[0]) + @staticmethod + def perspective_snapshot_export(params): + """Export a given snapshot. + + """ + disk = objects.Disk.FromDict(params[0]) dest_node = params[1] - instance = objects.ConfigObject.Loads(params[2]) - return backend.ExportSnapshot(disk,dest_node,instance) + instance = objects.Instance.FromDict(params[2]) + return backend.ExportSnapshot(disk, dest_node, instance) - def perspective_finalize_export(self, params): - instance = objects.ConfigObject.Loads(params[0]) - snap_disks = [objects.ConfigObject.Loads(str_data) + @staticmethod + def perspective_finalize_export(params): + """Expose the finalize export functionality. + + """ + instance = objects.Instance.FromDict(params[0]) + snap_disks = [objects.Disk.FromDict(str_data) for str_data in params[1]] return backend.FinalizeExport(instance, snap_disks) - def perspective_export_info(self, params): - dir = params[0] - einfo = backend.ExportInfo(dir) + @staticmethod + def perspective_export_info(params): + """Query information about an existing export on this node. + + The given path may not contain an export, in which case we return + None. + + """ + path = params[0] + einfo = backend.ExportInfo(path) if einfo is None: return einfo return einfo.Dumps() - def perspective_export_list(self, params): + @staticmethod + def perspective_export_list(params): + """List the available exports on this node. + + Note that as opposed to export_info, which may query data about an + export in any path, this only queries the standard Ganeti path + (constants.EXPORT_DIR). + + """ return backend.ListExports() - def perspective_export_remove(self, params): + @staticmethod + def perspective_export_remove(params): + """Remove an export. + + """ export = params[0] return backend.RemoveExport(export) # volume -------------------------- - def perspective_volume_list(self, params): + @staticmethod + def perspective_volume_list(params): + """Query the list of logical volumes in a given volume group. + + """ vgname = params[0] return backend.GetVolumeList(vgname) - def perspective_vg_list(self, params): + @staticmethod + def perspective_vg_list(params): + """Query the list of volume groups. + + """ return backend.ListVolumeGroups() # bridge -------------------------- - def perspective_bridges_exist(self, params): + @staticmethod + def perspective_bridges_exist(params): + """Check if all bridges given exist on this node. + + """ bridges_list = params[0] return backend.BridgesExist(bridges_list) # instance -------------------------- - def perspective_instance_os_add(self, params): + @staticmethod + def perspective_instance_os_add(params): + """Install an OS on a given instance. + + """ inst_s, os_disk, swap_disk = params - inst = objects.ConfigObject.Loads(inst_s) + inst = objects.Instance.FromDict(inst_s) return backend.AddOSToInstance(inst, os_disk, swap_disk) - def perspective_instance_os_import(self, params): + @staticmethod + def perspective_instance_run_rename(params): + """Runs the OS rename script for an instance. + + """ + inst_s, old_name, os_disk, swap_disk = params + inst = objects.Instance.FromDict(inst_s) + return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk) + + @staticmethod + def perspective_instance_os_import(params): + """Run the import function of an OS onto a given instance. + + """ inst_s, os_disk, swap_disk, src_node, src_image = params - inst = objects.ConfigObject.Loads(inst_s) + inst = objects.Instance.FromDict(inst_s) return backend.ImportOSIntoInstance(inst, os_disk, swap_disk, src_node, src_image) - def perspective_instance_shutdown(self, params): - instance = objects.ConfigObject.Loads(params[0]) + @staticmethod + def perspective_instance_shutdown(params): + """Shutdown an instance. + + """ + instance = objects.Instance.FromDict(params[0]) return backend.ShutdownInstance(instance) - def perspective_instance_start(self, params): - instance = objects.ConfigObject.Loads(params[0]) + @staticmethod + def perspective_instance_start(params): + """Start an instance. + + """ + instance = objects.Instance.FromDict(params[0]) extra_args = params[1] return backend.StartInstance(instance, extra_args) - def perspective_instance_info(self, params): + @staticmethod + def perspective_instance_migrate(params): + """Migrates an instance. + + """ + instance, target, live = params + return backend.MigrateInstance(instance, target, live) + + @staticmethod + def perspective_instance_reboot(params): + """Reboot an instance. + + """ + instance = objects.Instance.FromDict(params[0]) + reboot_type = params[1] + extra_args = params[2] + return backend.RebootInstance(instance, reboot_type, extra_args) + + @staticmethod + def perspective_instance_info(params): + """Query instance information. + + """ return backend.GetInstanceInfo(params[0]) - def perspective_all_instances_info(self, params): + @staticmethod + def perspective_all_instances_info(params): + """Query information about all instances. + + """ return backend.GetAllInstancesInfo() - def perspective_instance_list(self, params): + @staticmethod + def perspective_instance_list(params): + """Query the list of running instances. + + """ return backend.GetInstanceList() # node -------------------------- - def perspective_node_info(self, params): + @staticmethod + def perspective_node_tcp_ping(params): + """Do a TcpPing on the remote node. + + """ + return utils.TcpPing(params[1], params[2], timeout=params[3], + live_port_needed=params[4], source=params[0]) + + @staticmethod + def perspective_node_info(params): + """Query node information. + + """ vgname = params[0] return backend.GetNodeInfo(vgname) - def perspective_node_add(self, params): + @staticmethod + def perspective_node_add(params): + """Complete the registration of this node in the cluster. + + """ return backend.AddNode(params[0], params[1], params[2], params[3], params[4], params[5]) - def perspective_node_verify(self, params): + @staticmethod + def perspective_node_verify(params): + """Run a verify sequence on this node. + + """ return backend.VerifyNode(params[0]) - def perspective_node_start_master(self, params): + @staticmethod + def perspective_node_start_master(params): + """Promote this node to master status. + + """ return backend.StartMaster() - def perspective_node_stop_master(self, params): + @staticmethod + def perspective_node_stop_master(params): + """Demote this node from master status. + + """ return backend.StopMaster() - def perspective_node_leave_cluster(self, params): + @staticmethod + def perspective_node_leave_cluster(params): + """Cleanup after leaving a cluster. + + """ return backend.LeaveCluster() - def perspective_node_volumes(self, params): + @staticmethod + def perspective_node_volumes(params): + """Query the list of all logical volume groups. + + """ return backend.NodeVolumes() # cluster -------------------------- - def perspective_version(self, params): + @staticmethod + def perspective_version(params): + """Query version information. + + """ return constants.PROTOCOL_VERSION - def perspective_upload_file(self, params): + @staticmethod + def perspective_upload_file(params): + """Upload a file. + + Note that the backend implementation imposes strict rules on which + files are accepted. + + """ return backend.UploadFile(*params) # os ----------------------- - def perspective_os_diagnose(self, params): - os_list = backend.DiagnoseOS() - if not os_list: - # this catches also return values of 'False', - # for which we can't iterate over - return os_list - result = [] - for data in os_list: - if isinstance(data, objects.OS): - result.append(data.Dumps()) - elif isinstance(data, errors.InvalidOS): - result.append(data.args) - else: - raise errors.ProgrammerError, ("Invalid result from backend.DiagnoseOS" - " (class %s, %s)" % - (str(data.__class__), data)) + @staticmethod + def perspective_os_diagnose(params): + """Query detailed information about existing OSes. + + """ + return [os.ToDict() for os in backend.DiagnoseOS()] - return result + @staticmethod + def perspective_os_get(params): + """Query information about a given OS. - def perspective_os_get(self, params): + """ name = params[0] try: - os = backend.OSFromDisk(name).Dumps() + os_obj = backend.OSFromDisk(name) except errors.InvalidOS, err: - os = err.args - return os + os_obj = objects.OS.FromInvalidOS(err) + return os_obj.ToDict() # hooks ----------------------- - def perspective_hooks_runner(self, params): + @staticmethod + def perspective_hooks_runner(params): + """Run hook scripts. + + """ hpath, phase, env = params hr = backend.HooksRunner() return hr.RunHooks(hpath, phase, env) + # iallocator ----------------- + + @staticmethod + def perspective_iallocator_runner(params): + """Run an iallocator script. + + """ + name, idata = params + iar = backend.IAllocatorRunner() + return iar.Run(name, idata) -class MyRealm: - __implements__ = portal.IRealm - def requestAvatar(self, avatarId, mind, *interfaces): - if pb.IPerspective not in interfaces: - raise NotImplementedError - return pb.IPerspective, ServerObject(avatarId), lambda:None + # test ----------------------- + + @staticmethod + def perspective_test_delay(params): + """Run test delay. + + """ + duration = params[0] + return utils.TestDelay(duration) + + @staticmethod + def perspective_file_storage_dir_create(params): + """Create the file storage directory. + + """ + file_storage_dir = params[0] + return backend.CreateFileStorageDir(file_storage_dir) + + @staticmethod + def perspective_file_storage_dir_remove(params): + """Remove the file storage directory. + + """ + file_storage_dir = params[0] + return backend.RemoveFileStorageDir(file_storage_dir) + + @staticmethod + def perspective_file_storage_dir_rename(params): + """Rename the file storage directory. + + """ + old_file_storage_dir = params[0] + new_file_storage_dir = params[1] + return backend.RenameFileStorageDir(old_file_storage_dir, + new_file_storage_dir) def ParseOptions(): @@ -315,7 +591,11 @@ def ParseOptions(): def main(): + """Main function for the node daemon. + + """ options, args = ParseOptions() + utils.debug = options.debug for fname in (constants.SSL_CERT_FILE,): if not os.path.isfile(fname): print "config %s not there, will not run." % fname @@ -329,74 +609,34 @@ def main(): print "Cluster configuration incomplete: '%s'" % str(err) sys.exit(5) + # create /var/run/ganeti if not existing, in order to take care of + # tmpfs /var/run + if not os.path.exists(constants.BDEV_CACHE_DIR): + try: + os.mkdir(constants.BDEV_CACHE_DIR, 0755) + except EnvironmentError, err: + if err.errno != errno.EEXIST: + print ("Node setup wrong, cannot create directory %s: %s" % + (constants.BDEV_CACHE_DIR, err)) + sys.exit(5) + if not os.path.isdir(constants.BDEV_CACHE_DIR): + print ("Node setup wrong, %s is not a directory" % + constants.BDEV_CACHE_DIR) + sys.exit(5) + # become a daemon if options.fork: - createDaemon() - - logger.SetupLogging(twisted_workaround=True, debug=options.debug, - program="ganeti-noded") + utils.Daemonize(logfile=constants.LOG_NODESERVER) - p = portal.Portal(MyRealm()) - p.registerChecker( - checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata)) - reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory()) - reactor.run() + logger.SetupLogging(program="ganeti-noded", debug=options.debug) + global _EXIT_GANETI_NODED + _EXIT_GANETI_NODED = False -def createDaemon(): - """Detach a process from the controlling terminal and run it in the - background as a daemon. - - """ - UMASK = 077 - WORKDIR = "/" - # Default maximum for the number of available file descriptors. - if 'SC_OPEN_MAX' in os.sysconf_names: - try: - MAXFD = os.sysconf('SC_OPEN_MAX') - if MAXFD < 0: - MAXFD = 1024 - except OSError: - MAXFD = 1024 - else: - MAXFD = 1024 - # The standard I/O file descriptors are redirected to /dev/null by default. - #REDIRECT_TO = getattr(os, "devnull", "/dev/null") - REDIRECT_TO = constants.LOG_NODESERVER - try: - pid = os.fork() - except OSError, e: - raise Exception, "%s [%d]" % (e.strerror, e.errno) - if (pid == 0): # The first child. - os.setsid() - try: - pid = os.fork() # Fork a second child. - except OSError, e: - raise Exception, "%s [%d]" % (e.strerror, e.errno) - if (pid == 0): # The second child. - os.chdir(WORKDIR) - os.umask(UMASK) - else: - # exit() or _exit()? See below. - os._exit(0) # Exit parent (the first child) of the second child. - else: - os._exit(0) # Exit parent of the first child. - maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] - if (maxfd == resource.RLIM_INFINITY): - maxfd = MAXFD - - # Iterate through and close all file descriptors. - for fd in range(0, maxfd): - try: - os.close(fd) - except OSError: # ERROR, fd wasn't open to begin with (ignored) - pass - os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND) # standard input (0) - # Duplicate standard input to standard output and standard error. - os.dup2(0, 1) # standard output (1) - os.dup2(0, 2) # standard error (2) - return(0) + httpd = BaseHTTPServer.HTTPServer(('', port), ServerObject) + while (not _EXIT_GANETI_NODED): + httpd.handle_request() -if __name__=='__main__': +if __name__ == '__main__': main()