4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
34 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import logger
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import errors
42 from ganeti import ssconf
43 from ganeti import http
44 from ganeti import utils
46 _EXIT_GANETI_NODED = False
49 class NodeDaemonRequestHandler(http.HTTPRequestHandler):
50 """The server implementation.
52 This class holds all methods exposed over the RPC interface.
55 def HandleRequest(self):
59 global _EXIT_GANETI_NODED
61 if self.command.upper() != "PUT":
62 raise http.HTTPBadRequest()
65 if path.startswith("/"):
68 method = getattr(self, "perspective_%s" % path, None)
70 raise httperror.HTTPNotFound()
73 return method(self.post_data)
74 except errors.QuitGanetiException, err:
75 _EXIT_GANETI_NODED = True
77 # the new block devices --------------------------
80 def perspective_blockdev_create(params):
81 """Create a block device.
84 bdev_s, size, owner, on_primary, info = params
85 bdev = objects.Disk.FromDict(bdev_s)
87 raise ValueError("can't unserialize data!")
88 return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
91 def perspective_blockdev_remove(params):
92 """Remove a block device.
96 bdev = objects.Disk.FromDict(bdev_s)
97 return backend.RemoveBlockDevice(bdev)
100 def perspective_blockdev_rename(params):
101 """Remove a block device.
104 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
105 return backend.RenameBlockDevices(devlist)
108 def perspective_blockdev_assemble(params):
109 """Assemble a block device.
112 bdev_s, owner, on_primary = params
113 bdev = objects.Disk.FromDict(bdev_s)
115 raise ValueError("can't unserialize data!")
116 return backend.AssembleBlockDevice(bdev, owner, on_primary)
119 def perspective_blockdev_shutdown(params):
120 """Shutdown a block device.
124 bdev = objects.Disk.FromDict(bdev_s)
126 raise ValueError("can't unserialize data!")
127 return backend.ShutdownBlockDevice(bdev)
130 def perspective_blockdev_addchildren(params):
131 """Add a child to a mirror device.
133 Note: this is only valid for mirror devices. It's the caller's duty
134 to send a correct disk, otherwise we raise an error.
137 bdev_s, ndev_s = params
138 bdev = objects.Disk.FromDict(bdev_s)
139 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
140 if bdev is None or ndevs.count(None) > 0:
141 raise ValueError("can't unserialize data!")
142 return backend.MirrorAddChildren(bdev, ndevs)
145 def perspective_blockdev_removechildren(params):
146 """Remove a child from a mirror device.
148 This is only valid for mirror devices, of course. It's the callers
149 duty to send a correct disk, otherwise we raise an error.
152 bdev_s, ndev_s = params
153 bdev = objects.Disk.FromDict(bdev_s)
154 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
155 if bdev is None or ndevs.count(None) > 0:
156 raise ValueError("can't unserialize data!")
157 return backend.MirrorRemoveChildren(bdev, ndevs)
160 def perspective_blockdev_getmirrorstatus(params):
161 """Return the mirror status for a list of disks.
164 disks = [objects.Disk.FromDict(dsk_s)
166 return backend.GetMirrorStatus(disks)
169 def perspective_blockdev_find(params):
170 """Expose the FindBlockDevice functionality for a disk.
172 This will try to find but not activate a disk.
175 disk = objects.Disk.FromDict(params[0])
176 return backend.FindBlockDevice(disk)
179 def perspective_blockdev_snapshot(params):
180 """Create a snapshot device.
182 Note that this is only valid for LVM disks, if we get passed
183 something else we raise an exception. The snapshot device can be
184 remove by calling the generic block device remove call.
187 cfbd = objects.Disk.FromDict(params[0])
188 return backend.SnapshotBlockDevice(cfbd)
191 def perspective_blockdev_grow(params):
192 """Grow a stack of devices.
195 cfbd = objects.Disk.FromDict(params[0])
197 return backend.GrowBlockDevice(cfbd, amount)
200 def perspective_blockdev_close(params):
201 """Closes the given block devices.
204 disks = [objects.Disk.FromDict(cf) for cf in params]
205 return backend.CloseBlockDevices(disks)
207 # export/import --------------------------
210 def perspective_snapshot_export(params):
211 """Export a given snapshot.
214 disk = objects.Disk.FromDict(params[0])
215 dest_node = params[1]
216 instance = objects.Instance.FromDict(params[2])
217 return backend.ExportSnapshot(disk, dest_node, instance)
220 def perspective_finalize_export(params):
221 """Expose the finalize export functionality.
224 instance = objects.Instance.FromDict(params[0])
225 snap_disks = [objects.Disk.FromDict(str_data)
226 for str_data in params[1]]
227 return backend.FinalizeExport(instance, snap_disks)
230 def perspective_export_info(params):
231 """Query information about an existing export on this node.
233 The given path may not contain an export, in which case we return
238 einfo = backend.ExportInfo(path)
244 def perspective_export_list(params):
245 """List the available exports on this node.
247 Note that as opposed to export_info, which may query data about an
248 export in any path, this only queries the standard Ganeti path
249 (constants.EXPORT_DIR).
252 return backend.ListExports()
255 def perspective_export_remove(params):
260 return backend.RemoveExport(export)
262 # volume --------------------------
265 def perspective_volume_list(params):
266 """Query the list of logical volumes in a given volume group.
270 return backend.GetVolumeList(vgname)
273 def perspective_vg_list(params):
274 """Query the list of volume groups.
277 return backend.ListVolumeGroups()
279 # bridge --------------------------
282 def perspective_bridges_exist(params):
283 """Check if all bridges given exist on this node.
286 bridges_list = params[0]
287 return backend.BridgesExist(bridges_list)
289 # instance --------------------------
292 def perspective_instance_os_add(params):
293 """Install an OS on a given instance.
296 inst_s, os_disk, swap_disk = params
297 inst = objects.Instance.FromDict(inst_s)
298 return backend.AddOSToInstance(inst, os_disk, swap_disk)
301 def perspective_instance_run_rename(params):
302 """Runs the OS rename script for an instance.
305 inst_s, old_name, os_disk, swap_disk = params
306 inst = objects.Instance.FromDict(inst_s)
307 return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
310 def perspective_instance_os_import(params):
311 """Run the import function of an OS onto a given instance.
314 inst_s, os_disk, swap_disk, src_node, src_image = params
315 inst = objects.Instance.FromDict(inst_s)
316 return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
320 def perspective_instance_shutdown(params):
321 """Shutdown an instance.
324 instance = objects.Instance.FromDict(params[0])
325 return backend.ShutdownInstance(instance)
328 def perspective_instance_start(params):
329 """Start an instance.
332 instance = objects.Instance.FromDict(params[0])
333 extra_args = params[1]
334 return backend.StartInstance(instance, extra_args)
337 def perspective_instance_migrate(params):
338 """Migrates an instance.
341 instance, target, live = params
342 return backend.MigrateInstance(instance, target, live)
345 def perspective_instance_reboot(params):
346 """Reboot an instance.
349 instance = objects.Instance.FromDict(params[0])
350 reboot_type = params[1]
351 extra_args = params[2]
352 return backend.RebootInstance(instance, reboot_type, extra_args)
355 def perspective_instance_info(params):
356 """Query instance information.
359 return backend.GetInstanceInfo(params[0])
362 def perspective_all_instances_info(params):
363 """Query information about all instances.
366 return backend.GetAllInstancesInfo()
369 def perspective_instance_list(params):
370 """Query the list of running instances.
373 return backend.GetInstanceList()
375 # node --------------------------
378 def perspective_node_tcp_ping(params):
379 """Do a TcpPing on the remote node.
382 return utils.TcpPing(params[1], params[2], timeout=params[3],
383 live_port_needed=params[4], source=params[0])
386 def perspective_node_info(params):
387 """Query node information.
391 return backend.GetNodeInfo(vgname)
394 def perspective_node_add(params):
395 """Complete the registration of this node in the cluster.
398 return backend.AddNode(params[0], params[1], params[2],
399 params[3], params[4], params[5])
402 def perspective_node_verify(params):
403 """Run a verify sequence on this node.
406 return backend.VerifyNode(params[0])
409 def perspective_node_start_master(params):
410 """Promote this node to master status.
413 return backend.StartMaster()
416 def perspective_node_stop_master(params):
417 """Demote this node from master status.
420 return backend.StopMaster()
423 def perspective_node_leave_cluster(params):
424 """Cleanup after leaving a cluster.
427 return backend.LeaveCluster()
430 def perspective_node_volumes(params):
431 """Query the list of all logical volume groups.
434 return backend.NodeVolumes()
436 # cluster --------------------------
439 def perspective_version(params):
440 """Query version information.
443 return constants.PROTOCOL_VERSION
446 def perspective_upload_file(params):
449 Note that the backend implementation imposes strict rules on which
453 return backend.UploadFile(*params)
456 # os -----------------------
459 def perspective_os_diagnose(params):
460 """Query detailed information about existing OSes.
463 return [os.ToDict() for os in backend.DiagnoseOS()]
466 def perspective_os_get(params):
467 """Query information about a given OS.
472 os_obj = backend.OSFromDisk(name)
473 except errors.InvalidOS, err:
474 os_obj = objects.OS.FromInvalidOS(err)
475 return os_obj.ToDict()
477 # hooks -----------------------
480 def perspective_hooks_runner(params):
484 hpath, phase, env = params
485 hr = backend.HooksRunner()
486 return hr.RunHooks(hpath, phase, env)
488 # iallocator -----------------
491 def perspective_iallocator_runner(params):
492 """Run an iallocator script.
496 iar = backend.IAllocatorRunner()
497 return iar.Run(name, idata)
499 # test -----------------------
502 def perspective_test_delay(params):
507 return utils.TestDelay(duration)
510 def perspective_file_storage_dir_create(params):
511 """Create the file storage directory.
514 file_storage_dir = params[0]
515 return backend.CreateFileStorageDir(file_storage_dir)
518 def perspective_file_storage_dir_remove(params):
519 """Remove the file storage directory.
522 file_storage_dir = params[0]
523 return backend.RemoveFileStorageDir(file_storage_dir)
526 def perspective_file_storage_dir_rename(params):
527 """Rename the file storage directory.
530 old_file_storage_dir = params[0]
531 new_file_storage_dir = params[1]
532 return backend.RenameFileStorageDir(old_file_storage_dir,
533 new_file_storage_dir)
536 class NodeDaemonHttpServer(http.HTTPServer):
537 def __init__(self, server_address):
538 http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
541 class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
542 """Forking HTTP Server.
544 This inherits from ForkingMixIn and HTTPServer in order to fork for each
545 request we handle. This allows more requests to be handled concurrently.
551 """Parse the command line options.
554 (options, args) as from OptionParser.parse_args()
557 parser = OptionParser(description="Ganeti node daemon",
558 usage="%prog [-f] [-d]",
559 version="%%prog (ganeti) %s" %
560 constants.RELEASE_VERSION)
562 parser.add_option("-f", "--foreground", dest="fork",
563 help="Don't detach from the current terminal",
564 default=True, action="store_false")
565 parser.add_option("-d", "--debug", dest="debug",
566 help="Enable some debug messages",
567 default=False, action="store_true")
568 options, args = parser.parse_args()
573 """Main function for the node daemon.
576 options, args = ParseOptions()
577 utils.debug = options.debug
578 for fname in (constants.SSL_CERT_FILE,):
579 if not os.path.isfile(fname):
580 print "config %s not there, will not run." % fname
584 ss = ssconf.SimpleStore()
585 port = ss.GetNodeDaemonPort()
586 pwdata = ss.GetNodeDaemonPassword()
587 except errors.ConfigurationError, err:
588 print "Cluster configuration incomplete: '%s'" % str(err)
591 # create the various SUB_RUN_DIRS, if not existing, so that we handle the
592 # situation where RUN_DIR is tmpfs
593 for dir_name in constants.SUB_RUN_DIRS:
594 if not os.path.exists(dir_name):
596 os.mkdir(dir_name, 0755)
597 except EnvironmentError, err:
598 if err.errno != errno.EEXIST:
599 print ("Node setup wrong, cannot create directory %s: %s" %
602 if not os.path.isdir(dir_name):
603 print ("Node setup wrong, %s is not a directory" % dir_name)
608 utils.Daemonize(logfile=constants.LOG_NODESERVER)
610 logger.SetupDaemon(logfile=constants.LOG_NODESERVER, debug=options.debug,
611 stderr_logging=not options.fork)
612 logging.info("ganeti node daemon startup")
614 global _EXIT_GANETI_NODED
617 httpd = ForkingHTTPServer(('', port))
619 httpd = NodeDaemonHttpServer(('', port))
621 # FIXME: updating _EXIT_GANETI_NODED doesn't work when forking
622 while (not _EXIT_GANETI_NODED):
623 httpd.handle_request()
626 if __name__ == '__main__':