4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
35 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
46 import ganeti.http.server
52 def _RequireJobQueueLock(fn):
53 """Decorator for job queue manipulating functions.
56 QUEUE_LOCK_TIMEOUT = 10
58 def wrapper(*args, **kwargs):
59 # Locking in exclusive, blocking mode because there could be several
60 # children running at the same time. Waiting up to 10 seconds.
61 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
63 return fn(*args, **kwargs)
70 class NodeHttpServer(http.server.HttpServer):
71 """The server implementation.
73 This class holds all methods exposed over the RPC interface.
76 def __init__(self, *args, **kwargs):
77 http.server.HttpServer.__init__(self, *args, **kwargs)
78 self.noded_pid = os.getpid()
80 def HandleRequest(self, req):
84 if req.request_method.upper() != http.HTTP_PUT:
85 raise http.HttpBadRequest()
87 path = req.request_path
88 if path.startswith("/"):
91 method = getattr(self, "perspective_%s" % path, None)
93 raise http.HttpNotFound()
97 return method(req.request_body)
99 logging.exception("Error in RPC call")
101 except errors.QuitGanetiException, err:
102 # Tell parent to quit
103 os.kill(self.noded_pid, signal.SIGTERM)
105 # the new block devices --------------------------
108 def perspective_blockdev_create(params):
109 """Create a block device.
112 bdev_s, size, owner, on_primary, info = params
113 bdev = objects.Disk.FromDict(bdev_s)
115 raise ValueError("can't unserialize data!")
116 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
119 def perspective_blockdev_remove(params):
120 """Remove a block device.
124 bdev = objects.Disk.FromDict(bdev_s)
125 return backend.BlockdevRemove(bdev)
128 def perspective_blockdev_rename(params):
129 """Remove a block device.
132 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
133 return backend.BlockdevRename(devlist)
136 def perspective_blockdev_assemble(params):
137 """Assemble a block device.
140 bdev_s, owner, on_primary = params
141 bdev = objects.Disk.FromDict(bdev_s)
143 raise ValueError("can't unserialize data!")
144 return backend.BlockdevAssemble(bdev, owner, on_primary)
147 def perspective_blockdev_shutdown(params):
148 """Shutdown a block device.
152 bdev = objects.Disk.FromDict(bdev_s)
154 raise ValueError("can't unserialize data!")
155 return backend.BlockdevShutdown(bdev)
158 def perspective_blockdev_addchildren(params):
159 """Add a child to a mirror device.
161 Note: this is only valid for mirror devices. It's the caller's duty
162 to send a correct disk, otherwise we raise an error.
165 bdev_s, ndev_s = params
166 bdev = objects.Disk.FromDict(bdev_s)
167 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
168 if bdev is None or ndevs.count(None) > 0:
169 raise ValueError("can't unserialize data!")
170 return backend.BlockdevAddchildren(bdev, ndevs)
173 def perspective_blockdev_removechildren(params):
174 """Remove a child from a mirror device.
176 This is only valid for mirror devices, of course. It's the callers
177 duty to send a correct disk, otherwise we raise an error.
180 bdev_s, ndev_s = params
181 bdev = objects.Disk.FromDict(bdev_s)
182 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
183 if bdev is None or ndevs.count(None) > 0:
184 raise ValueError("can't unserialize data!")
185 return backend.BlockdevRemovechildren(bdev, ndevs)
188 def perspective_blockdev_getmirrorstatus(params):
189 """Return the mirror status for a list of disks.
192 disks = [objects.Disk.FromDict(dsk_s)
194 return backend.BlockdevGetmirrorstatus(disks)
197 def perspective_blockdev_find(params):
198 """Expose the FindBlockDevice functionality for a disk.
200 This will try to find but not activate a disk.
203 disk = objects.Disk.FromDict(params[0])
204 return backend.BlockdevFind(disk)
207 def perspective_blockdev_snapshot(params):
208 """Create a snapshot device.
210 Note that this is only valid for LVM disks, if we get passed
211 something else we raise an exception. The snapshot device can be
212 remove by calling the generic block device remove call.
215 cfbd = objects.Disk.FromDict(params[0])
216 return backend.BlockdevSnapshot(cfbd)
219 def perspective_blockdev_grow(params):
220 """Grow a stack of devices.
223 cfbd = objects.Disk.FromDict(params[0])
225 return backend.BlockdevGrow(cfbd, amount)
228 def perspective_blockdev_close(params):
229 """Closes the given block devices.
232 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
233 return backend.BlockdevClose(params[0], disks)
236 def perspective_blockdev_getsize(params):
237 """Compute the sizes of the given block devices.
240 disks = [objects.Disk.FromDict(cf) for cf in params[0]]
241 return backend.BlockdevGetsize(disks)
243 # blockdev/drbd specific methods ----------
246 def perspective_drbd_disconnect_net(params):
247 """Disconnects the network connection of drbd disks.
249 Note that this is only valid for drbd disks, so the members of the
250 disk list must all be drbd devices.
253 nodes_ip, disks = params
254 disks = [objects.Disk.FromDict(cf) for cf in disks]
255 return backend.DrbdDisconnectNet(nodes_ip, disks)
258 def perspective_drbd_attach_net(params):
259 """Attaches the network connection of drbd disks.
261 Note that this is only valid for drbd disks, so the members of the
262 disk list must all be drbd devices.
265 nodes_ip, disks, instance_name, multimaster = params
266 disks = [objects.Disk.FromDict(cf) for cf in disks]
267 return backend.DrbdAttachNet(nodes_ip, disks,
268 instance_name, multimaster)
271 def perspective_drbd_wait_sync(params):
272 """Wait until DRBD disks are synched.
274 Note that this is only valid for drbd disks, so the members of the
275 disk list must all be drbd devices.
278 nodes_ip, disks = params
279 disks = [objects.Disk.FromDict(cf) for cf in disks]
280 return backend.DrbdWaitSync(nodes_ip, disks)
282 # export/import --------------------------
285 def perspective_snapshot_export(params):
286 """Export a given snapshot.
289 disk = objects.Disk.FromDict(params[0])
290 dest_node = params[1]
291 instance = objects.Instance.FromDict(params[2])
292 cluster_name = params[3]
294 return backend.ExportSnapshot(disk, dest_node, instance,
295 cluster_name, dev_idx)
298 def perspective_finalize_export(params):
299 """Expose the finalize export functionality.
302 instance = objects.Instance.FromDict(params[0])
303 snap_disks = [objects.Disk.FromDict(str_data)
304 for str_data in params[1]]
305 return backend.FinalizeExport(instance, snap_disks)
308 def perspective_export_info(params):
309 """Query information about an existing export on this node.
311 The given path may not contain an export, in which case we return
316 einfo = backend.ExportInfo(path)
322 def perspective_export_list(params):
323 """List the available exports on this node.
325 Note that as opposed to export_info, which may query data about an
326 export in any path, this only queries the standard Ganeti path
327 (constants.EXPORT_DIR).
330 return backend.ListExports()
333 def perspective_export_remove(params):
338 return backend.RemoveExport(export)
340 # volume --------------------------
343 def perspective_volume_list(params):
344 """Query the list of logical volumes in a given volume group.
348 return backend.GetVolumeList(vgname)
351 def perspective_vg_list(params):
352 """Query the list of volume groups.
355 return backend.ListVolumeGroups()
357 # bridge --------------------------
360 def perspective_bridges_exist(params):
361 """Check if all bridges given exist on this node.
364 bridges_list = params[0]
365 return backend.BridgesExist(bridges_list)
367 # instance --------------------------
370 def perspective_instance_os_add(params):
371 """Install an OS on a given instance.
375 inst = objects.Instance.FromDict(inst_s)
376 return backend.InstanceOsAdd(inst)
379 def perspective_instance_run_rename(params):
380 """Runs the OS rename script for an instance.
383 inst_s, old_name = params
384 inst = objects.Instance.FromDict(inst_s)
385 return backend.RunRenameInstance(inst, old_name)
388 def perspective_instance_os_import(params):
389 """Run the import function of an OS onto a given instance.
392 inst_s, src_node, src_images, cluster_name = params
393 inst = objects.Instance.FromDict(inst_s)
394 return backend.ImportOSIntoInstance(inst, src_node, src_images,
398 def perspective_instance_shutdown(params):
399 """Shutdown an instance.
402 instance = objects.Instance.FromDict(params[0])
403 return backend.InstanceShutdown(instance)
406 def perspective_instance_start(params):
407 """Start an instance.
410 instance = objects.Instance.FromDict(params[0])
411 return backend.StartInstance(instance)
414 def perspective_migration_info(params):
415 """Gather information about an instance to be migrated.
418 instance = objects.Instance.FromDict(params[0])
419 return backend.MigrationInfo(instance)
422 def perspective_accept_instance(params):
423 """Prepare the node to accept an instance.
426 instance, info, target = params
427 instance = objects.Instance.FromDict(instance)
428 return backend.AcceptInstance(instance, info, target)
431 def perspective_finalize_migration(params):
432 """Finalize the instance migration.
435 instance, info, success = params
436 instance = objects.Instance.FromDict(instance)
437 return backend.FinalizeMigration(instance, info, success)
440 def perspective_instance_migrate(params):
441 """Migrates an instance.
444 instance, target, live = params
445 instance = objects.Instance.FromDict(instance)
446 return backend.MigrateInstance(instance, target, live)
449 def perspective_instance_reboot(params):
450 """Reboot an instance.
453 instance = objects.Instance.FromDict(params[0])
454 reboot_type = params[1]
455 return backend.InstanceReboot(instance, reboot_type)
458 def perspective_instance_info(params):
459 """Query instance information.
462 return backend.GetInstanceInfo(params[0], params[1])
465 def perspective_instance_migratable(params):
466 """Query whether the specified instance can be migrated.
469 instance = objects.Instance.FromDict(params[0])
470 return backend.GetInstanceMigratable(instance)
473 def perspective_all_instances_info(params):
474 """Query information about all instances.
477 return backend.GetAllInstancesInfo(params[0])
480 def perspective_instance_list(params):
481 """Query the list of running instances.
484 return backend.GetInstanceList(params[0])
486 # node --------------------------
489 def perspective_node_tcp_ping(params):
490 """Do a TcpPing on the remote node.
493 return utils.TcpPing(params[1], params[2], timeout=params[3],
494 live_port_needed=params[4], source=params[0])
497 def perspective_node_has_ip_address(params):
498 """Checks if a node has the given ip address.
501 return utils.OwnIpAddress(params[0])
504 def perspective_node_info(params):
505 """Query node information.
508 vgname, hypervisor_type = params
509 return backend.GetNodeInfo(vgname, hypervisor_type)
512 def perspective_node_add(params):
513 """Complete the registration of this node in the cluster.
516 return backend.AddNode(params[0], params[1], params[2],
517 params[3], params[4], params[5])
520 def perspective_node_verify(params):
521 """Run a verify sequence on this node.
524 return backend.VerifyNode(params[0], params[1])
527 def perspective_node_start_master(params):
528 """Promote this node to master status.
531 return backend.StartMaster(params[0], params[1])
534 def perspective_node_stop_master(params):
535 """Demote this node from master status.
538 return backend.StopMaster(params[0])
541 def perspective_node_leave_cluster(params):
542 """Cleanup after leaving a cluster.
545 return backend.LeaveCluster()
548 def perspective_node_volumes(params):
549 """Query the list of all logical volume groups.
552 return backend.NodeVolumes()
555 def perspective_node_demote_from_mc(params):
556 """Demote a node from the master candidate role.
559 return backend.DemoteFromMC()
562 # cluster --------------------------
565 def perspective_version(params):
566 """Query version information.
569 return constants.PROTOCOL_VERSION
572 def perspective_upload_file(params):
575 Note that the backend implementation imposes strict rules on which
579 return backend.UploadFile(*params)
582 def perspective_master_info(params):
583 """Query master information.
586 return backend.GetMasterInfo()
589 def perspective_write_ssconf_files(params):
590 """Write ssconf files.
594 return backend.WriteSsconfFiles(values)
596 # os -----------------------
599 def perspective_os_diagnose(params):
600 """Query detailed information about existing OSes.
603 return [os_obj.ToDict() for os_obj in backend.DiagnoseOS()]
606 def perspective_os_get(params):
607 """Query information about a given OS.
612 os_obj = backend.OSFromDisk(name)
613 except errors.InvalidOS, err:
614 os_obj = objects.OS.FromInvalidOS(err)
615 return os_obj.ToDict()
617 # hooks -----------------------
620 def perspective_hooks_runner(params):
624 hpath, phase, env = params
625 hr = backend.HooksRunner()
626 return hr.RunHooks(hpath, phase, env)
628 # iallocator -----------------
631 def perspective_iallocator_runner(params):
632 """Run an iallocator script.
636 iar = backend.IAllocatorRunner()
637 return iar.Run(name, idata)
639 # test -----------------------
642 def perspective_test_delay(params):
647 return utils.TestDelay(duration)
649 # file storage ---------------
652 def perspective_file_storage_dir_create(params):
653 """Create the file storage directory.
656 file_storage_dir = params[0]
657 return backend.CreateFileStorageDir(file_storage_dir)
660 def perspective_file_storage_dir_remove(params):
661 """Remove the file storage directory.
664 file_storage_dir = params[0]
665 return backend.RemoveFileStorageDir(file_storage_dir)
668 def perspective_file_storage_dir_rename(params):
669 """Rename the file storage directory.
672 old_file_storage_dir = params[0]
673 new_file_storage_dir = params[1]
674 return backend.RenameFileStorageDir(old_file_storage_dir,
675 new_file_storage_dir)
677 # jobs ------------------------
680 @_RequireJobQueueLock
681 def perspective_jobqueue_update(params):
685 (file_name, content) = params
686 return backend.JobQueueUpdate(file_name, content)
689 @_RequireJobQueueLock
690 def perspective_jobqueue_purge(params):
694 return backend.JobQueuePurge()
697 @_RequireJobQueueLock
698 def perspective_jobqueue_rename(params):
699 """Rename a job queue file.
702 # TODO: What if a file fails to rename?
703 return [backend.JobQueueRename(old, new) for old, new in params]
706 def perspective_jobqueue_set_drain(params):
707 """Set/unset the queue drain flag.
710 drain_flag = params[0]
711 return backend.JobQueueSetDrainFlag(drain_flag)
714 # hypervisor ---------------
717 def perspective_hypervisor_validate_params(params):
718 """Validate the hypervisor parameters.
721 (hvname, hvparams) = params
722 return backend.ValidateHVParams(hvname, hvparams)
726 """Parse the command line options.
728 @return: (options, args) as from OptionParser.parse_args()
731 parser = OptionParser(description="Ganeti node daemon",
732 usage="%prog [-f] [-d] [-b ADDRESS]",
733 version="%%prog (ganeti) %s" %
734 constants.RELEASE_VERSION)
736 parser.add_option("-f", "--foreground", dest="fork",
737 help="Don't detach from the current terminal",
738 default=True, action="store_false")
739 parser.add_option("-d", "--debug", dest="debug",
740 help="Enable some debug messages",
741 default=False, action="store_true")
742 parser.add_option("-b", "--bind", dest="bind_address",
744 default="", metavar="ADDRESS")
746 options, args = parser.parse_args()
751 """Main function for the node daemon.
756 options, args = ParseOptions()
761 for fname in (constants.SSL_CERT_FILE,):
762 if not os.path.isfile(fname):
763 print "config %s not there, will not run." % fname
764 sys.exit(constants.EXIT_NOTCLUSTER)
766 port = utils.GetNodeDaemonPort()
768 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
769 dirs.append((constants.LOG_OS_DIR, 0750))
770 dirs.append((constants.LOCK_DIR, 1777))
771 utils.EnsureDirs(dirs)
775 utils.Daemonize(logfile=constants.LOG_NODESERVER)
777 utils.WritePidFile(constants.NODED_PID)
779 utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
780 stderr_logging=not options.fork)
781 logging.info("ganeti node daemon startup")
783 # Read SSL certificate
784 ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
785 ssl_cert_path=constants.SSL_CERT_FILE)
788 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
790 mainloop = daemon.Mainloop()
791 server = NodeHttpServer(mainloop, options.bind_address, port,
792 ssl_params=ssl_params, ssl_verify_peer=True)
799 utils.RemovePidFile(constants.NODED_PID)
802 if __name__ == '__main__':