4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
35 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import logger
39 from ganeti import constants
40 from ganeti import objects
41 from ganeti import errors
42 from ganeti import ssconf
43 from ganeti import http
44 from ganeti import utils
47 class NodeDaemonRequestHandler(http.HTTPRequestHandler):
48 """The server implementation.
50 This class holds all methods exposed over the RPC interface.
53 def HandleRequest(self):
57 if self.command.upper() != "PUT":
58 raise http.HTTPBadRequest()
61 if path.startswith("/"):
64 method = getattr(self, "perspective_%s" % path, None)
66 raise httperror.HTTPNotFound()
69 return method(self.post_data)
70 except errors.QuitGanetiException, err:
72 os.kill(self.server.noded_pid, signal.SIGTERM)
74 # the new block devices --------------------------
77 def perspective_blockdev_create(params):
78 """Create a block device.
81 bdev_s, size, owner, on_primary, info = params
82 bdev = objects.Disk.FromDict(bdev_s)
84 raise ValueError("can't unserialize data!")
85 return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
88 def perspective_blockdev_remove(params):
89 """Remove a block device.
93 bdev = objects.Disk.FromDict(bdev_s)
94 return backend.RemoveBlockDevice(bdev)
97 def perspective_blockdev_rename(params):
98 """Remove a block device.
101 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
102 return backend.RenameBlockDevices(devlist)
105 def perspective_blockdev_assemble(params):
106 """Assemble a block device.
109 bdev_s, owner, on_primary = params
110 bdev = objects.Disk.FromDict(bdev_s)
112 raise ValueError("can't unserialize data!")
113 return backend.AssembleBlockDevice(bdev, owner, on_primary)
116 def perspective_blockdev_shutdown(params):
117 """Shutdown a block device.
121 bdev = objects.Disk.FromDict(bdev_s)
123 raise ValueError("can't unserialize data!")
124 return backend.ShutdownBlockDevice(bdev)
127 def perspective_blockdev_addchildren(params):
128 """Add a child to a mirror device.
130 Note: this is only valid for mirror devices. It's the caller's duty
131 to send a correct disk, otherwise we raise an error.
134 bdev_s, ndev_s = params
135 bdev = objects.Disk.FromDict(bdev_s)
136 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
137 if bdev is None or ndevs.count(None) > 0:
138 raise ValueError("can't unserialize data!")
139 return backend.MirrorAddChildren(bdev, ndevs)
142 def perspective_blockdev_removechildren(params):
143 """Remove a child from a mirror device.
145 This is only valid for mirror devices, of course. It's the callers
146 duty to send a correct disk, otherwise we raise an error.
149 bdev_s, ndev_s = params
150 bdev = objects.Disk.FromDict(bdev_s)
151 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
152 if bdev is None or ndevs.count(None) > 0:
153 raise ValueError("can't unserialize data!")
154 return backend.MirrorRemoveChildren(bdev, ndevs)
157 def perspective_blockdev_getmirrorstatus(params):
158 """Return the mirror status for a list of disks.
161 disks = [objects.Disk.FromDict(dsk_s)
163 return backend.GetMirrorStatus(disks)
166 def perspective_blockdev_find(params):
167 """Expose the FindBlockDevice functionality for a disk.
169 This will try to find but not activate a disk.
172 disk = objects.Disk.FromDict(params[0])
173 return backend.FindBlockDevice(disk)
176 def perspective_blockdev_snapshot(params):
177 """Create a snapshot device.
179 Note that this is only valid for LVM disks, if we get passed
180 something else we raise an exception. The snapshot device can be
181 remove by calling the generic block device remove call.
184 cfbd = objects.Disk.FromDict(params[0])
185 return backend.SnapshotBlockDevice(cfbd)
188 def perspective_blockdev_grow(params):
189 """Grow a stack of devices.
192 cfbd = objects.Disk.FromDict(params[0])
194 return backend.GrowBlockDevice(cfbd, amount)
197 def perspective_blockdev_close(params):
198 """Closes the given block devices.
201 disks = [objects.Disk.FromDict(cf) for cf in params]
202 return backend.CloseBlockDevices(disks)
204 # export/import --------------------------
207 def perspective_snapshot_export(params):
208 """Export a given snapshot.
211 disk = objects.Disk.FromDict(params[0])
212 dest_node = params[1]
213 instance = objects.Instance.FromDict(params[2])
214 return backend.ExportSnapshot(disk, dest_node, instance)
217 def perspective_finalize_export(params):
218 """Expose the finalize export functionality.
221 instance = objects.Instance.FromDict(params[0])
222 snap_disks = [objects.Disk.FromDict(str_data)
223 for str_data in params[1]]
224 return backend.FinalizeExport(instance, snap_disks)
227 def perspective_export_info(params):
228 """Query information about an existing export on this node.
230 The given path may not contain an export, in which case we return
235 einfo = backend.ExportInfo(path)
241 def perspective_export_list(params):
242 """List the available exports on this node.
244 Note that as opposed to export_info, which may query data about an
245 export in any path, this only queries the standard Ganeti path
246 (constants.EXPORT_DIR).
249 return backend.ListExports()
252 def perspective_export_remove(params):
257 return backend.RemoveExport(export)
259 # volume --------------------------
262 def perspective_volume_list(params):
263 """Query the list of logical volumes in a given volume group.
267 return backend.GetVolumeList(vgname)
270 def perspective_vg_list(params):
271 """Query the list of volume groups.
274 return backend.ListVolumeGroups()
276 # bridge --------------------------
279 def perspective_bridges_exist(params):
280 """Check if all bridges given exist on this node.
283 bridges_list = params[0]
284 return backend.BridgesExist(bridges_list)
286 # instance --------------------------
289 def perspective_instance_os_add(params):
290 """Install an OS on a given instance.
293 inst_s, os_disk, swap_disk = params
294 inst = objects.Instance.FromDict(inst_s)
295 return backend.AddOSToInstance(inst, os_disk, swap_disk)
298 def perspective_instance_run_rename(params):
299 """Runs the OS rename script for an instance.
302 inst_s, old_name, os_disk, swap_disk = params
303 inst = objects.Instance.FromDict(inst_s)
304 return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
307 def perspective_instance_os_import(params):
308 """Run the import function of an OS onto a given instance.
311 inst_s, os_disk, swap_disk, src_node, src_image = params
312 inst = objects.Instance.FromDict(inst_s)
313 return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
317 def perspective_instance_shutdown(params):
318 """Shutdown an instance.
321 instance = objects.Instance.FromDict(params[0])
322 return backend.ShutdownInstance(instance)
325 def perspective_instance_start(params):
326 """Start an instance.
329 instance = objects.Instance.FromDict(params[0])
330 extra_args = params[1]
331 return backend.StartInstance(instance, extra_args)
334 def perspective_instance_migrate(params):
335 """Migrates an instance.
338 instance, target, live = params
339 return backend.MigrateInstance(instance, target, live)
342 def perspective_instance_reboot(params):
343 """Reboot an instance.
346 instance = objects.Instance.FromDict(params[0])
347 reboot_type = params[1]
348 extra_args = params[2]
349 return backend.RebootInstance(instance, reboot_type, extra_args)
352 def perspective_instance_info(params):
353 """Query instance information.
356 return backend.GetInstanceInfo(params[0])
359 def perspective_all_instances_info(params):
360 """Query information about all instances.
363 return backend.GetAllInstancesInfo()
366 def perspective_instance_list(params):
367 """Query the list of running instances.
370 return backend.GetInstanceList()
372 # node --------------------------
375 def perspective_node_tcp_ping(params):
376 """Do a TcpPing on the remote node.
379 return utils.TcpPing(params[1], params[2], timeout=params[3],
380 live_port_needed=params[4], source=params[0])
383 def perspective_node_info(params):
384 """Query node information.
388 return backend.GetNodeInfo(vgname)
391 def perspective_node_add(params):
392 """Complete the registration of this node in the cluster.
395 return backend.AddNode(params[0], params[1], params[2],
396 params[3], params[4], params[5])
399 def perspective_node_verify(params):
400 """Run a verify sequence on this node.
403 return backend.VerifyNode(params[0])
406 def perspective_node_start_master(params):
407 """Promote this node to master status.
410 return backend.StartMaster()
413 def perspective_node_stop_master(params):
414 """Demote this node from master status.
417 return backend.StopMaster()
420 def perspective_node_leave_cluster(params):
421 """Cleanup after leaving a cluster.
424 return backend.LeaveCluster()
427 def perspective_node_volumes(params):
428 """Query the list of all logical volume groups.
431 return backend.NodeVolumes()
433 # cluster --------------------------
436 def perspective_version(params):
437 """Query version information.
440 return constants.PROTOCOL_VERSION
443 def perspective_upload_file(params):
446 Note that the backend implementation imposes strict rules on which
450 return backend.UploadFile(*params)
453 # os -----------------------
456 def perspective_os_diagnose(params):
457 """Query detailed information about existing OSes.
460 return [os.ToDict() for os in backend.DiagnoseOS()]
463 def perspective_os_get(params):
464 """Query information about a given OS.
469 os_obj = backend.OSFromDisk(name)
470 except errors.InvalidOS, err:
471 os_obj = objects.OS.FromInvalidOS(err)
472 return os_obj.ToDict()
474 # hooks -----------------------
477 def perspective_hooks_runner(params):
481 hpath, phase, env = params
482 hr = backend.HooksRunner()
483 return hr.RunHooks(hpath, phase, env)
485 # iallocator -----------------
488 def perspective_iallocator_runner(params):
489 """Run an iallocator script.
493 iar = backend.IAllocatorRunner()
494 return iar.Run(name, idata)
496 # test -----------------------
499 def perspective_test_delay(params):
504 return utils.TestDelay(duration)
507 def perspective_file_storage_dir_create(params):
508 """Create the file storage directory.
511 file_storage_dir = params[0]
512 return backend.CreateFileStorageDir(file_storage_dir)
515 def perspective_file_storage_dir_remove(params):
516 """Remove the file storage directory.
519 file_storage_dir = params[0]
520 return backend.RemoveFileStorageDir(file_storage_dir)
523 def perspective_file_storage_dir_rename(params):
524 """Rename the file storage directory.
527 old_file_storage_dir = params[0]
528 new_file_storage_dir = params[1]
529 return backend.RenameFileStorageDir(old_file_storage_dir,
530 new_file_storage_dir)
533 class NodeDaemonHttpServer(http.HTTPServer):
534 def __init__(self, server_address):
535 http.HTTPServer.__init__(self, server_address, NodeDaemonRequestHandler)
536 self.noded_pid = os.getpid()
538 def serve_forever(self):
539 """Handle requests until told to quit."""
540 sighandler = utils.SignalHandler([signal.SIGINT, signal.SIGTERM])
542 while not sighandler.called:
543 self.handle_request()
544 # TODO: There could be children running at this point
549 class ForkingHTTPServer(SocketServer.ForkingMixIn, NodeDaemonHttpServer):
550 """Forking HTTP Server.
552 This inherits from ForkingMixIn and HTTPServer in order to fork for each
553 request we handle. This allows more requests to be handled concurrently.
559 """Parse the command line options.
562 (options, args) as from OptionParser.parse_args()
565 parser = OptionParser(description="Ganeti node daemon",
566 usage="%prog [-f] [-d]",
567 version="%%prog (ganeti) %s" %
568 constants.RELEASE_VERSION)
570 parser.add_option("-f", "--foreground", dest="fork",
571 help="Don't detach from the current terminal",
572 default=True, action="store_false")
573 parser.add_option("-d", "--debug", dest="debug",
574 help="Enable some debug messages",
575 default=False, action="store_true")
576 options, args = parser.parse_args()
581 """Main function for the node daemon.
584 options, args = ParseOptions()
585 utils.debug = options.debug
586 for fname in (constants.SSL_CERT_FILE,):
587 if not os.path.isfile(fname):
588 print "config %s not there, will not run." % fname
592 ss = ssconf.SimpleStore()
593 port = ss.GetNodeDaemonPort()
594 pwdata = ss.GetNodeDaemonPassword()
595 except errors.ConfigurationError, err:
596 print "Cluster configuration incomplete: '%s'" % str(err)
599 # create the various SUB_RUN_DIRS, if not existing, so that we handle the
600 # situation where RUN_DIR is tmpfs
601 for dir_name in constants.SUB_RUN_DIRS:
602 if not os.path.exists(dir_name):
604 os.mkdir(dir_name, 0755)
605 except EnvironmentError, err:
606 if err.errno != errno.EEXIST:
607 print ("Node setup wrong, cannot create directory %s: %s" %
610 if not os.path.isdir(dir_name):
611 print ("Node setup wrong, %s is not a directory" % dir_name)
616 utils.Daemonize(logfile=constants.LOG_NODESERVER)
618 utils.WritePidFile(constants.NODED_PID)
620 logger.SetupDaemon(logfile=constants.LOG_NODESERVER, debug=options.debug,
621 stderr_logging=not options.fork)
622 logging.info("ganeti node daemon startup")
625 server = ForkingHTTPServer(('', port))
627 server = NodeDaemonHttpServer(('', port))
630 server.serve_forever()
632 utils.RemovePidFile(constants.NODED_PID)
635 if __name__ == '__main__':