from optparse import OptionParser
from ganeti import backend
-from ganeti import logger
from ganeti import constants
from ganeti import objects
from ganeti import errors
from ganeti import jstore
-from ganeti import ssconf
+from ganeti import daemon
from ganeti import http
from ganeti import utils
"""Decorator for job queue manipulating functions.
"""
+ QUEUE_LOCK_TIMEOUT = 10
+
def wrapper(*args, **kwargs):
# Locking in exclusive, blocking mode because there could be several
# children running at the same time. Waiting up to 10 seconds.
- queue_lock.Exclusive(blocking=True, timeout=10)
+ queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
try:
return fn(*args, **kwargs)
finally:
queue_lock.Unlock()
+
return wrapper
-class NodeDaemonRequestHandler(http.HTTPRequestHandler):
+class NodeHttpServer(http.HttpServer):
"""The server implementation.
This class holds all methods exposed over the RPC interface.
"""
- def HandleRequest(self):
+ def __init__(self, *args, **kwargs):
+ http.HttpServer.__init__(self, *args, **kwargs)
+ self.noded_pid = os.getpid()
+
+ def HandleRequest(self, req):
"""Handle a request.
"""
- if self.command.upper() != "PUT":
+ if req.request_method.upper() != "PUT":
raise http.HTTPBadRequest()
- path = self.path
+ path = req.request_path
if path.startswith("/"):
path = path[1:]
method = getattr(self, "perspective_%s" % path, None)
if method is None:
- raise httperror.HTTPNotFound()
+ raise http.HTTPNotFound()
try:
try:
- return method(self.post_data)
+ return method(req.request_post_data)
except:
logging.exception("Error in RPC call")
raise
except errors.QuitGanetiException, err:
# Tell parent to quit
- os.kill(self.server.noded_pid, signal.SIGTERM)
+ os.kill(self.noded_pid, signal.SIGTERM)
# the new block devices --------------------------
disk = objects.Disk.FromDict(params[0])
dest_node = params[1]
instance = objects.Instance.FromDict(params[2])
- return backend.ExportSnapshot(disk, dest_node, instance)
+ cluster_name = params[3]
+ return backend.ExportSnapshot(disk, dest_node, instance, cluster_name)
@staticmethod
def perspective_finalize_export(params):
"""Install an OS on a given instance.
"""
- inst_s, os_disk, swap_disk = params
+ inst_s = params[0]
inst = objects.Instance.FromDict(inst_s)
- return backend.AddOSToInstance(inst, os_disk, swap_disk)
+ return backend.AddOSToInstance(inst)
@staticmethod
def perspective_instance_run_rename(params):
"""Runs the OS rename script for an instance.
"""
- inst_s, old_name, os_disk, swap_disk = params
+ inst_s, old_name = params
inst = objects.Instance.FromDict(inst_s)
- return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
+ return backend.RunRenameInstance(inst, old_name)
@staticmethod
def perspective_instance_os_import(params):
"""Run the import function of an OS onto a given instance.
"""
- inst_s, os_disk, swap_disk, src_node, src_image = params
+ inst_s, os_disk, swap_disk, src_node, src_image, cluster_name = params
inst = objects.Instance.FromDict(inst_s)
return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
- src_node, src_image)
+ src_node, src_image, cluster_name)
@staticmethod
def perspective_instance_shutdown(params):
"""
instance, target, live = params
+ instance = objects.Instance.FromDict(instance)
return backend.MigrateInstance(instance, target, live)
@staticmethod
"""Query instance information.
"""
- return backend.GetInstanceInfo(params[0])
+ return backend.GetInstanceInfo(params[0], params[1])
@staticmethod
def perspective_all_instances_info(params):
"""Query information about all instances.
"""
- return backend.GetAllInstancesInfo()
+ return backend.GetAllInstancesInfo(params[0])
@staticmethod
def perspective_instance_list(params):
"""Query the list of running instances.
"""
- return backend.GetInstanceList()
+ return backend.GetInstanceList(params[0])
# node --------------------------
live_port_needed=params[4], source=params[0])
@staticmethod
+ def perspective_node_has_ip_address(params):
+ """Checks if a node has the given ip address.
+
+ """
+ return utils.OwnIpAddress(params[0])
+
+ @staticmethod
def perspective_node_info(params):
"""Query node information.
"""
- vgname = params[0]
- return backend.GetNodeInfo(vgname)
+ vgname, hypervisor_type = params
+ return backend.GetNodeInfo(vgname, hypervisor_type)
@staticmethod
def perspective_node_add(params):
"""Run a verify sequence on this node.
"""
- return backend.VerifyNode(params[0])
+ return backend.VerifyNode(params[0], params[1])
@staticmethod
def perspective_node_start_master(params):
"""
return backend.UploadFile(*params)
+ @staticmethod
+ def perspective_master_info(params):
+ """Query master information.
+
+ """
+ return backend.GetMasterInfo()
# os -----------------------
duration = params[0]
return utils.TestDelay(duration)
+ # file storage ---------------
+
@staticmethod
def perspective_file_storage_dir_create(params):
"""Create the file storage directory.
return backend.RenameFileStorageDir(old_file_storage_dir,
new_file_storage_dir)
+ # jobs ------------------------
+
@staticmethod
@_RequireJobQueueLock
def perspective_jobqueue_update(params):
return backend.JobQueueRename(old, new)
+ @staticmethod
+ def perspective_jobqueue_set_drain(params):
+ """Set/unset the queue drain flag.
-class NodeDaemonHttpServer(http.HTTPServer):
- def __init__(self, server_address):
- http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
- self.noded_pid = os.getpid()
-
- def serve_forever(self):
- """Handle requests until told to quit."""
- sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
- try:
- while not sighandler.called:
- self.handle_request()
- # TODO: There could be children running at this point
- finally:
- sighandler.Reset()
+ """
+ drain_flag = params[0]
+ return backend.JobQueueSetDrainFlag(drain_flag)
-class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
- """Forking HTTP Server.
+ # hypervisor ---------------
- This inherits from ForkingMixIn and HTTPServer in order to fork for each
- request we handle. This allows more requests to be handled concurrently.
+ @staticmethod
+ def perspective_hypervisor_validate_params(params):
+ """Validate the hypervisor parameters.
- """
+ """
+ (hvname, hvparams) = params
+ return backend.ValidateHVParams(hvname, hvparams)
def ParseOptions():
sys.exit(5)
try:
- ss = ssconf.SimpleStore()
- port = ss.GetNodeDaemonPort()
- pwdata = ss.GetNodeDaemonPassword()
+ port = utils.GetNodeDaemonPort()
+ pwdata = utils.GetNodeDaemonPassword()
except errors.ConfigurationError, err:
print "Cluster configuration incomplete: '%s'" % str(err)
sys.exit(5)
utils.Daemonize(logfile=constants.LOG_NODESERVER)
utils.WritePidFile(constants.NODED_PID)
+ try:
+ utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
+ stderr_logging=not options.fork)
+ logging.info("ganeti node daemon startup")
- logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
- stderr_logging=not options.fork)
- logging.info("ganeti node daemon startup")
-
- # Prepare job queue
- queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
-
- if options.fork:
- server = ForkingHTTPServer(('', port))
- else:
- server = NodeDaemonHttpServer(('', port))
+ # Prepare job queue
+ queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
- try:
- server.serve_forever()
+ mainloop = daemon.Mainloop()
+ server = NodeHttpServer(mainloop, ("", port))
+ server.Start()
+ try:
+ mainloop.Run()
+ finally:
+ server.Stop()
finally:
utils.RemovePidFile(constants.NODED_PID)