4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
35 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import logger
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import errors
42 from ganeti import ssconf
43 from ganeti import http
44 from ganeti import utils
47 class NodeDaemonRequestHandler(http.HTTPRequestHandler):
48 """The server implementation.
50 This class holds all methods exposed over the RPC interface.
53 def HandleRequest(self):
57 if self.command.upper() != "PUT":
58 raise http.HTTPBadRequest()
61 if path.startswith("/"):
64 method = getattr(self, "perspective_%s" % path, None)
66 raise httperror.HTTPNotFound()
70 return method(self.post_data)
72 logging.exception("Error in RPC call")
74 except errors.QuitGanetiException, err:
76 os.kill(self.server.noded_pid, signal.SIGTERM)
78 # the new block devices --------------------------
81 def perspective_blockdev_create(params):
82 """Create a block device.
85 bdev_s, size, owner, on_primary, info = params
86 bdev = objects.Disk.FromDict(bdev_s)
88 raise ValueError("can't unserialize data!")
89 return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
92 def perspective_blockdev_remove(params):
93 """Remove a block device.
97 bdev = objects.Disk.FromDict(bdev_s)
98 return backend.RemoveBlockDevice(bdev)
101 def perspective_blockdev_rename(params):
102 """Remove a block device.
105 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
106 return backend.RenameBlockDevices(devlist)
109 def perspective_blockdev_assemble(params):
110 """Assemble a block device.
113 bdev_s, owner, on_primary = params
114 bdev = objects.Disk.FromDict(bdev_s)
116 raise ValueError("can't unserialize data!")
117 return backend.AssembleBlockDevice(bdev, owner, on_primary)
120 def perspective_blockdev_shutdown(params):
121 """Shutdown a block device.
125 bdev = objects.Disk.FromDict(bdev_s)
127 raise ValueError("can't unserialize data!")
128 return backend.ShutdownBlockDevice(bdev)
131 def perspective_blockdev_addchildren(params):
132 """Add a child to a mirror device.
134 Note: this is only valid for mirror devices. It's the caller's duty
135 to send a correct disk, otherwise we raise an error.
138 bdev_s, ndev_s = params
139 bdev = objects.Disk.FromDict(bdev_s)
140 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
141 if bdev is None or ndevs.count(None) > 0:
142 raise ValueError("can't unserialize data!")
143 return backend.MirrorAddChildren(bdev, ndevs)
146 def perspective_blockdev_removechildren(params):
147 """Remove a child from a mirror device.
149 This is only valid for mirror devices, of course. It's the callers
150 duty to send a correct disk, otherwise we raise an error.
153 bdev_s, ndev_s = params
154 bdev = objects.Disk.FromDict(bdev_s)
155 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
156 if bdev is None or ndevs.count(None) > 0:
157 raise ValueError("can't unserialize data!")
158 return backend.MirrorRemoveChildren(bdev, ndevs)
161 def perspective_blockdev_getmirrorstatus(params):
162 """Return the mirror status for a list of disks.
165 disks = [objects.Disk.FromDict(dsk_s)
167 return backend.GetMirrorStatus(disks)
170 def perspective_blockdev_find(params):
171 """Expose the FindBlockDevice functionality for a disk.
173 This will try to find but not activate a disk.
176 disk = objects.Disk.FromDict(params[0])
177 return backend.FindBlockDevice(disk)
180 def perspective_blockdev_snapshot(params):
181 """Create a snapshot device.
183 Note that this is only valid for LVM disks, if we get passed
184 something else we raise an exception. The snapshot device can be
185 remove by calling the generic block device remove call.
188 cfbd = objects.Disk.FromDict(params[0])
189 return backend.SnapshotBlockDevice(cfbd)
192 def perspective_blockdev_grow(params):
193 """Grow a stack of devices.
196 cfbd = objects.Disk.FromDict(params[0])
198 return backend.GrowBlockDevice(cfbd, amount)
201 def perspective_blockdev_close(params):
202 """Closes the given block devices.
205 disks = [objects.Disk.FromDict(cf) for cf in params]
206 return backend.CloseBlockDevices(disks)
208 # export/import --------------------------
211 def perspective_snapshot_export(params):
212 """Export a given snapshot.
215 disk = objects.Disk.FromDict(params[0])
216 dest_node = params[1]
217 instance = objects.Instance.FromDict(params[2])
218 return backend.ExportSnapshot(disk, dest_node, instance)
221 def perspective_finalize_export(params):
222 """Expose the finalize export functionality.
225 instance = objects.Instance.FromDict(params[0])
226 snap_disks = [objects.Disk.FromDict(str_data)
227 for str_data in params[1]]
228 return backend.FinalizeExport(instance, snap_disks)
231 def perspective_export_info(params):
232 """Query information about an existing export on this node.
234 The given path may not contain an export, in which case we return
239 einfo = backend.ExportInfo(path)
245 def perspective_export_list(params):
246 """List the available exports on this node.
248 Note that as opposed to export_info, which may query data about an
249 export in any path, this only queries the standard Ganeti path
250 (constants.EXPORT_DIR).
253 return backend.ListExports()
256 def perspective_export_remove(params):
261 return backend.RemoveExport(export)
263 # volume --------------------------
266 def perspective_volume_list(params):
267 """Query the list of logical volumes in a given volume group.
271 return backend.GetVolumeList(vgname)
274 def perspective_vg_list(params):
275 """Query the list of volume groups.
278 return backend.ListVolumeGroups()
280 # bridge --------------------------
283 def perspective_bridges_exist(params):
284 """Check if all bridges given exist on this node.
287 bridges_list = params[0]
288 return backend.BridgesExist(bridges_list)
290 # instance --------------------------
293 def perspective_instance_os_add(params):
294 """Install an OS on a given instance.
297 inst_s, os_disk, swap_disk = params
298 inst = objects.Instance.FromDict(inst_s)
299 return backend.AddOSToInstance(inst, os_disk, swap_disk)
302 def perspective_instance_run_rename(params):
303 """Runs the OS rename script for an instance.
306 inst_s, old_name, os_disk, swap_disk = params
307 inst = objects.Instance.FromDict(inst_s)
308 return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
311 def perspective_instance_os_import(params):
312 """Run the import function of an OS onto a given instance.
315 inst_s, os_disk, swap_disk, src_node, src_image = params
316 inst = objects.Instance.FromDict(inst_s)
317 return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
321 def perspective_instance_shutdown(params):
322 """Shutdown an instance.
325 instance = objects.Instance.FromDict(params[0])
326 return backend.ShutdownInstance(instance)
329 def perspective_instance_start(params):
330 """Start an instance.
333 instance = objects.Instance.FromDict(params[0])
334 extra_args = params[1]
335 return backend.StartInstance(instance, extra_args)
338 def perspective_instance_migrate(params):
339 """Migrates an instance.
342 instance, target, live = params
343 return backend.MigrateInstance(instance, target, live)
346 def perspective_instance_reboot(params):
347 """Reboot an instance.
350 instance = objects.Instance.FromDict(params[0])
351 reboot_type = params[1]
352 extra_args = params[2]
353 return backend.RebootInstance(instance, reboot_type, extra_args)
356 def perspective_instance_info(params):
357 """Query instance information.
360 return backend.GetInstanceInfo(params[0])
363 def perspective_all_instances_info(params):
364 """Query information about all instances.
367 return backend.GetAllInstancesInfo()
370 def perspective_instance_list(params):
371 """Query the list of running instances.
374 return backend.GetInstanceList()
376 # node --------------------------
379 def perspective_node_tcp_ping(params):
380 """Do a TcpPing on the remote node.
383 return utils.TcpPing(params[1], params[2], timeout=params[3],
384 live_port_needed=params[4], source=params[0])
387 def perspective_node_info(params):
388 """Query node information.
392 return backend.GetNodeInfo(vgname)
395 def perspective_node_add(params):
396 """Complete the registration of this node in the cluster.
399 return backend.AddNode(params[0], params[1], params[2],
400 params[3], params[4], params[5])
403 def perspective_node_verify(params):
404 """Run a verify sequence on this node.
407 return backend.VerifyNode(params[0])
410 def perspective_node_start_master(params):
411 """Promote this node to master status.
414 return backend.StartMaster(params[0])
417 def perspective_node_stop_master(params):
418 """Demote this node from master status.
421 return backend.StopMaster(params[0])
424 def perspective_node_leave_cluster(params):
425 """Cleanup after leaving a cluster.
428 return backend.LeaveCluster()
431 def perspective_node_volumes(params):
432 """Query the list of all logical volume groups.
435 return backend.NodeVolumes()
437 # cluster --------------------------
440 def perspective_version(params):
441 """Query version information.
444 return constants.PROTOCOL_VERSION
447 def perspective_upload_file(params):
450 Note that the backend implementation imposes strict rules on which
454 return backend.UploadFile(*params)
457 # os -----------------------
460 def perspective_os_diagnose(params):
461 """Query detailed information about existing OSes.
464 return [os.ToDict() for os in backend.DiagnoseOS()]
467 def perspective_os_get(params):
468 """Query information about a given OS.
473 os_obj = backend.OSFromDisk(name)
474 except errors.InvalidOS, err:
475 os_obj = objects.OS.FromInvalidOS(err)
476 return os_obj.ToDict()
478 # hooks -----------------------
481 def perspective_hooks_runner(params):
485 hpath, phase, env = params
486 hr = backend.HooksRunner()
487 return hr.RunHooks(hpath, phase, env)
489 # iallocator -----------------
492 def perspective_iallocator_runner(params):
493 """Run an iallocator script.
497 iar = backend.IAllocatorRunner()
498 return iar.Run(name, idata)
500 # test -----------------------
503 def perspective_test_delay(params):
508 return utils.TestDelay(duration)
511 def perspective_file_storage_dir_create(params):
512 """Create the file storage directory.
515 file_storage_dir = params[0]
516 return backend.CreateFileStorageDir(file_storage_dir)
519 def perspective_file_storage_dir_remove(params):
520 """Remove the file storage directory.
523 file_storage_dir = params[0]
524 return backend.RemoveFileStorageDir(file_storage_dir)
527 def perspective_file_storage_dir_rename(params):
528 """Rename the file storage directory.
531 old_file_storage_dir = params[0]
532 new_file_storage_dir = params[1]
533 return backend.RenameFileStorageDir(old_file_storage_dir,
534 new_file_storage_dir)
537 def perspective_jobqueue_update(params):
541 (file_name, content) = params
542 return backend.JobQueueUpdate(file_name, content)
545 def perspective_jobqueue_purge(params):
549 return backend.JobQueuePurge()
552 class NodeDaemonHttpServer(http.HTTPServer):
553 def __init__(self, server_address):
554 http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
555 self.noded_pid = os.getpid()
557 def serve_forever(self):
558 """Handle requests until told to quit."""
559 sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
561 while not sighandler.called:
562 self.handle_request()
563 # TODO: There could be children running at this point
568 class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
569 """Forking HTTP Server.
571 This inherits from ForkingMixIn and HTTPServer in order to fork for each
572 request we handle. This allows more requests to be handled concurrently.
578 """Parse the command line options.
581 (options, args) as from OptionParser.parse_args()
584 parser = OptionParser(description="Ganeti node daemon",
585 usage="%prog [-f] [-d]",
586 version="%%prog (ganeti) %s" %
587 constants.RELEASE_VERSION)
589 parser.add_option("-f", "--foreground", dest="fork",
590 help="Don't detach from the current terminal",
591 default=True, action="store_false")
592 parser.add_option("-d", "--debug", dest="debug",
593 help="Enable some debug messages",
594 default=False, action="store_true")
595 options, args = parser.parse_args()
600 """Main function for the node daemon.
603 options, args = ParseOptions()
604 utils.debug = options.debug
605 for fname in (constants.SSL_CERT_FILE,):
606 if not os.path.isfile(fname):
607 print "config %s not there, will not run." % fname
611 ss = ssconf.SimpleStore()
612 port = ss.GetNodeDaemonPort()
613 pwdata = ss.GetNodeDaemonPassword()
614 except errors.ConfigurationError, err:
615 print "Cluster configuration incomplete: '%s'" % str(err)
618 # create the various SUB_RUN_DIRS, if not existing, so that we handle the
619 # situation where RUN_DIR is tmpfs
620 for dir_name in constants.SUB_RUN_DIRS:
621 if not os.path.exists(dir_name):
623 os.mkdir(dir_name, 0755)
624 except EnvironmentError, err:
625 if err.errno != errno.EEXIST:
626 print ("Node setup wrong, cannot create directory %s: %s" %
629 if not os.path.isdir(dir_name):
630 print ("Node setup wrong, %s is not a directory" % dir_name)
635 utils.Daemonize(logfile=constants.LOG_NODESERVER)
637 utils.WritePidFile(constants.NODED_PID)
639 logger.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
640 stderr_logging=not options.fork)
641 logging.info("ganeti node daemon startup")
644 server = ForkingHTTPServer(('', port))
646 server = NodeDaemonHttpServer(('', port))
649 server.serve_forever()
651 utils.RemovePidFile(constants.NODED_PID)
654 if __name__ == '__main__':