#!/usr/bin/python # # Copyright (C) 2006, 2007 Google Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. """Ganeti node daemon""" import os import sys import resource import traceback from optparse import OptionParser from ganeti import backend from ganeti import logger from ganeti import constants from ganeti import objects from ganeti import errors from ganeti import ssconf from twisted.spread import pb from twisted.internet import reactor from twisted.cred import checkers, portal from OpenSSL import SSL class ServerContextFactory: def getContext(self): ctx = SSL.Context(SSL.TLSv1_METHOD) ctx.use_certificate_file(constants.SSL_CERT_FILE) ctx.use_privatekey_file(constants.SSL_CERT_FILE) return ctx class ServerObject(pb.Avatar): def __init__(self, name): self.name = name def perspectiveMessageReceived(self, broker, message, args, kw): """This method is called when a network message is received. I will call:: | self.perspective_%(message)s(*broker.unserialize(args), | **broker.unserialize(kw)) to handle the method; subclasses of Avatar are expected to implement methods of this naming convention. """ args = broker.unserialize(args, self) kw = broker.unserialize(kw, self) method = getattr(self, "perspective_%s" % message) tb = None state = None try: state = method(*args, **kw) except: tb = traceback.format_exc() return broker.serialize((tb, state), self, method, args, kw) # the new block devices -------------------------- def perspective_blockdev_create(self,params): bdev_s, size, on_primary = params bdev = objects.ConfigObject.Loads(bdev_s) if bdev is None: raise ValueError("can't unserialize data!") return backend.CreateBlockDevice(bdev, size, on_primary) def perspective_blockdev_remove(self,params): bdev_s = params[0] bdev = objects.ConfigObject.Loads(bdev_s) return backend.RemoveBlockDevice(bdev) def perspective_blockdev_assemble(self,params): bdev_s, on_primary = params bdev = objects.ConfigObject.Loads(bdev_s) if bdev is None: raise ValueError("can't unserialize data!") return backend.AssembleBlockDevice(bdev, on_primary) def perspective_blockdev_shutdown(self,params): bdev_s = params[0] bdev = objects.ConfigObject.Loads(bdev_s) if bdev is None: raise ValueError("can't unserialize data!") return backend.ShutdownBlockDevice(bdev) def perspective_blockdev_addchild(self,params): bdev_s, ndev_s = params bdev = objects.ConfigObject.Loads(bdev_s) ndev = objects.ConfigObject.Loads(ndev_s) if bdev is None or ndev is None: raise ValueError("can't unserialize data!") return backend.MirrorAddChild(bdev, ndev) def perspective_blockdev_removechild(self,params): bdev_s, ndev_s = params bdev = objects.ConfigObject.Loads(bdev_s) ndev = objects.ConfigObject.Loads(ndev_s) if bdev is None or ndev is None: raise ValueError("can't unserialize data!") return backend.MirrorRemoveChild(bdev, ndev) def perspective_blockdev_getmirrorstatus(self, params): disks = [objects.ConfigObject.Loads(dsk_s) for dsk_s in params] return backend.GetMirrorStatus(disks) def perspective_blockdev_find(self, params): disk = objects.ConfigObject.Loads(params[0]) return backend.FindBlockDevice(disk) def perspective_blockdev_snapshot(self,params): cfbd = objects.ConfigObject.Loads(params[0]) return backend.SnapshotBlockDevice(cfbd) # export/import -------------------------- def perspective_snapshot_export(self,params): disk = objects.ConfigObject.Loads(params[0]) dest_node = params[1] instance = objects.ConfigObject.Loads(params[2]) return backend.ExportSnapshot(disk,dest_node,instance) def perspective_finalize_export(self,params): instance = objects.ConfigObject.Loads(params[0]) snap_disks = [objects.ConfigObject.Loads(str_data) for str_data in params[1]] return backend.FinalizeExport(instance, snap_disks) def perspective_export_info(self,params): dir = params[0] einfo = backend.ExportInfo(dir) if einfo is None: return einfo return einfo.Dumps() def perspective_export_list(self, params): return backend.ListExports() def perspective_export_remove(self, params): export = params[0] return backend.RemoveExport(export) # volume -------------------------- def perspective_volume_list(self,params): vgname = params[0] return backend.GetVolumeList(vgname) def perspective_vg_list(self,params): return backend.ListVolumeGroups() # bridge -------------------------- def perspective_bridges_exist(self,params): bridges_list = params[0] return backend.BridgesExist(bridges_list) # instance -------------------------- def perspective_instance_os_add(self,params): inst_s, os_disk, swap_disk = params inst = objects.ConfigObject.Loads(inst_s) return backend.AddOSToInstance(inst, os_disk, swap_disk) def perspective_instance_os_import(self, params): inst_s, os_disk, swap_disk, src_node, src_image = params inst = objects.ConfigObject.Loads(inst_s) return backend.ImportOSIntoInstance(inst, os_disk, swap_disk, src_node, src_image) def perspective_instance_shutdown(self,params): instance = objects.ConfigObject.Loads(params[0]) return backend.ShutdownInstance(instance) def perspective_instance_start(self,params): instance = objects.ConfigObject.Loads(params[0]) extra_args = params[1] return backend.StartInstance(instance, extra_args) def perspective_instance_info(self,params): return backend.GetInstanceInfo(params[0]) def perspective_all_instances_info(self,params): return backend.GetAllInstancesInfo() def perspective_instance_list(self,params): return backend.GetInstanceList() # node -------------------------- def perspective_node_info(self,params): vgname = params[0] return backend.GetNodeInfo(vgname) def perspective_node_add(self,params): return backend.AddNode(params[0], params[1], params[2], params[3], params[4], params[5]) def perspective_node_verify(self,params): return backend.VerifyNode(params[0]) def perspective_node_start_master(self, params): return backend.StartMaster() def perspective_node_stop_master(self, params): return backend.StopMaster() def perspective_node_leave_cluster(self, params): return backend.LeaveCluster() # cluster -------------------------- def perspective_version(self,params): return constants.PROTOCOL_VERSION def perspective_configfile_list(self,params): return backend.ListConfigFiles() def perspective_upload_file(self,params): return backend.UploadFile(*params) # os ----------------------- def perspective_os_diagnose(self, params): os_list = backend.DiagnoseOS() if not os_list: # this catches also return values of 'False', # for which we can't iterate over return os_list result = [] for data in os_list: if isinstance(data, objects.OS): result.append(data.Dumps()) elif isinstance(data, errors.InvalidOS): result.append(data.args) else: raise errors.ProgrammerError, ("Invalid result from backend.DiagnoseOS" " (class %s, %s)" % (str(data.__class__), data)) return result def perspective_os_get(self, params): name = params[0] try: os = backend.OSFromDisk(name).Dumps() except errors.InvalidOS, err: os = err.args return os # hooks ----------------------- def perspective_hooks_runner(self, params): hpath, phase, env = params hr = backend.HooksRunner() return hr.RunHooks(hpath, phase, env) class MyRealm: __implements__ = portal.IRealm def requestAvatar(self, avatarId, mind, *interfaces): if pb.IPerspective not in interfaces: raise NotImplementedError return pb.IPerspective, ServerObject(avatarId), lambda:None def ParseOptions(): """Parse the command line options. Returns: (options, args) as from OptionParser.parse_args() """ parser = OptionParser(description="Ganeti node daemon", usage="%prog [-f] [-d]", version="%%prog (ganeti) %s" % constants.RELEASE_VERSION) parser.add_option("-f", "--foreground", dest="fork", help="Don't detach from the current terminal", default=True, action="store_false") parser.add_option("-d", "--debug", dest="debug", help="Enable some debug messages", default=False, action="store_true") options, args = parser.parse_args() return options, args def main(): options, args = ParseOptions() for fname in (constants.SSL_CERT_FILE,): if not os.path.isfile(fname): print "config %s not there, will not run." % fname sys.exit(5) try: ss = ssconf.SimpleStore() port = ss.GetNodeDaemonPort() pwdata = ss.GetNodeDaemonPassword() except errors.ConfigurationError, err: print "Cluster configuration incomplete: '%s'" % str(err) sys.exit(5) # become a daemon if options.fork: createDaemon() logger.SetupLogging(twisted_workaround=True, debug=options.debug, program="ganeti-noded") p = portal.Portal(MyRealm()) p.registerChecker( checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata)) reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory()) reactor.run() def createDaemon(): """Detach a process from the controlling terminal and run it in the background as a daemon. """ UMASK = 077 WORKDIR = "/" # Default maximum for the number of available file descriptors. if 'SC_OPEN_MAX' in os.sysconf_names: try: MAXFD = os.sysconf('SC_OPEN_MAX') if MAXFD < 0: MAXFD = 1024 except OSError: MAXFD = 1024 else: MAXFD = 1024 # The standard I/O file descriptors are redirected to /dev/null by default. #REDIRECT_TO = getattr(os, "devnull", "/dev/null") REDIRECT_TO = constants.LOG_NODESERVER try: pid = os.fork() except OSError, e: raise Exception, "%s [%d]" % (e.strerror, e.errno) if (pid == 0): # The first child. os.setsid() try: pid = os.fork() # Fork a second child. except OSError, e: raise Exception, "%s [%d]" % (e.strerror, e.errno) if (pid == 0): # The second child. os.chdir(WORKDIR) os.umask(UMASK) else: # exit() or _exit()? See below. os._exit(0) # Exit parent (the first child) of the second child. else: os._exit(0) # Exit parent of the first child. maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] if (maxfd == resource.RLIM_INFINITY): maxfd = MAXFD # Iterate through and close all file descriptors. for fd in range(0, maxfd): try: os.close(fd) except OSError: # ERROR, fd wasn't open to begin with (ignored) pass os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND) # standard input (0) # Duplicate standard input to standard output and standard error. os.dup2(0, 1) # standard output (1) os.dup2(0, 2) # standard error (2) return(0) if __name__=='__main__': main()