4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
35 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
46 import ganeti.http.server
52 def _RequireJobQueueLock(fn):
53 """Decorator for job queue manipulating functions.
56 QUEUE_LOCK_TIMEOUT = 10
58 def wrapper(*args, **kwargs):
59 # Locking in exclusive, blocking mode because there could be several
60 # children running at the same time. Waiting up to 10 seconds.
61 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
63 return fn(*args, **kwargs)
70 class NodeHttpServer(http.server.HttpServer):
71 """The server implementation.
73 This class holds all methods exposed over the RPC interface.
76 def __init__(self, *args, **kwargs):
77 http.server.HttpServer.__init__(self, *args, **kwargs)
78 self.noded_pid = os.getpid()
80 def HandleRequest(self, req):
84 if req.request_method.upper() != http.HTTP_PUT:
85 raise http.HttpBadRequest()
87 path = req.request_path
88 if path.startswith("/"):
91 method = getattr(self, "perspective_%s" % path, None)
93 raise http.HttpNotFound()
96 return method(req.request_body)
97 except backend.RPCFail, err:
98 # our custom failure exception; str(err) works fine if the
99 # exception was constructed with a single argument, and in
100 # this case, err.message == err.args[0] == str(err)
101 return (False, str(err))
102 except errors.QuitGanetiException, err:
103 # Tell parent to quit
104 logging.info("Shutting down the node daemon, arguments: %s",
106 os.kill(self.noded_pid, signal.SIGTERM)
107 # And return the error's arguments, which must be already in
108 # correct tuple format
111 logging.exception("Error in RPC call")
114 # the new block devices --------------------------
117 def perspective_blockdev_create(params):
118 """Create a block device.
121 bdev_s, size, owner, on_primary, info = params
122 bdev = objects.Disk.FromDict(bdev_s)
124 raise ValueError("can't unserialize data!")
125 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
128 def perspective_blockdev_remove(params):
129 """Remove a block device.
133 bdev = objects.Disk.FromDict(bdev_s)
134 return backend.BlockdevRemove(bdev)
137 def perspective_blockdev_rename(params):
138 """Remove a block device.
141 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
142 return backend.BlockdevRename(devlist)
145 def perspective_blockdev_assemble(params):
146 """Assemble a block device.
149 bdev_s, owner, on_primary = params
150 bdev = objects.Disk.FromDict(bdev_s)
152 raise ValueError("can't unserialize data!")
153 return backend.BlockdevAssemble(bdev, owner, on_primary)
156 def perspective_blockdev_shutdown(params):
157 """Shutdown a block device.
161 bdev = objects.Disk.FromDict(bdev_s)
163 raise ValueError("can't unserialize data!")
164 return backend.BlockdevShutdown(bdev)
167 def perspective_blockdev_addchildren(params):
168 """Add a child to a mirror device.
170 Note: this is only valid for mirror devices. It's the caller's duty
171 to send a correct disk, otherwise we raise an error.
174 bdev_s, ndev_s = params
175 bdev = objects.Disk.FromDict(bdev_s)
176 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
177 if bdev is None or ndevs.count(None) > 0:
178 raise ValueError("can't unserialize data!")
179 return backend.BlockdevAddchildren(bdev, ndevs)
182 def perspective_blockdev_removechildren(params):
183 """Remove a child from a mirror device.
185 This is only valid for mirror devices, of course. It's the callers
186 duty to send a correct disk, otherwise we raise an error.
189 bdev_s, ndev_s = params
190 bdev = objects.Disk.FromDict(bdev_s)
191 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
192 if bdev is None or ndevs.count(None) > 0:
193 raise ValueError("can't unserialize data!")
194 return backend.BlockdevRemovechildren(bdev, ndevs)
197 def perspective_blockdev_getmirrorstatus(params):
198 """Return the mirror status for a list of disks.
201 disks = [objects.Disk.FromDict(dsk_s)
203 return backend.BlockdevGetmirrorstatus(disks)
206 def perspective_blockdev_find(params):
207 """Expose the FindBlockDevice functionality for a disk.
209 This will try to find but not activate a disk.
212 disk = objects.Disk.FromDict(params[0])
213 return backend.BlockdevFind(disk)
216 def perspective_blockdev_snapshot(params):
217 """Create a snapshot device.
219 Note that this is only valid for LVM disks, if we get passed
220 something else we raise an exception. The snapshot device can be
221 remove by calling the generic block device remove call.
224 cfbd = objects.Disk.FromDict(params[0])
225 return backend.BlockdevSnapshot(cfbd)
228 def perspective_blockdev_grow(params):
229 """Grow a stack of devices.
232 cfbd = objects.Disk.FromDict(params[0])
234 return backend.BlockdevGrow(cfbd, amount)
237 def perspective_blockdev_close(params):
238 """Closes the given block devices.
241 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
242 return backend.BlockdevClose(params[0], disks)
244 # blockdev/drbd specific methods ----------
247 def perspective_drbd_disconnect_net(params):
248 """Disconnects the network connection of drbd disks.
250 Note that this is only valid for drbd disks, so the members of the
251 disk list must all be drbd devices.
254 nodes_ip, disks = params
255 disks = [objects.Disk.FromDict(cf) for cf in disks]
256 return backend.DrbdDisconnectNet(nodes_ip, disks)
259 def perspective_drbd_attach_net(params):
260 """Attaches the network connection of drbd disks.
262 Note that this is only valid for drbd disks, so the members of the
263 disk list must all be drbd devices.
266 nodes_ip, disks, instance_name, multimaster = params
267 disks = [objects.Disk.FromDict(cf) for cf in disks]
268 return backend.DrbdAttachNet(nodes_ip, disks,
269 instance_name, multimaster)
272 def perspective_drbd_wait_sync(params):
273 """Wait until DRBD disks are synched.
275 Note that this is only valid for drbd disks, so the members of the
276 disk list must all be drbd devices.
279 nodes_ip, disks = params
280 disks = [objects.Disk.FromDict(cf) for cf in disks]
281 return backend.DrbdWaitSync(nodes_ip, disks)
283 # export/import --------------------------
286 def perspective_snapshot_export(params):
287 """Export a given snapshot.
290 disk = objects.Disk.FromDict(params[0])
291 dest_node = params[1]
292 instance = objects.Instance.FromDict(params[2])
293 cluster_name = params[3]
295 return backend.ExportSnapshot(disk, dest_node, instance,
296 cluster_name, dev_idx)
299 def perspective_finalize_export(params):
300 """Expose the finalize export functionality.
303 instance = objects.Instance.FromDict(params[0])
304 snap_disks = [objects.Disk.FromDict(str_data)
305 for str_data in params[1]]
306 return backend.FinalizeExport(instance, snap_disks)
309 def perspective_export_info(params):
310 """Query information about an existing export on this node.
312 The given path may not contain an export, in which case we return
317 return backend.ExportInfo(path)
320 def perspective_export_list(params):
321 """List the available exports on this node.
323 Note that as opposed to export_info, which may query data about an
324 export in any path, this only queries the standard Ganeti path
325 (constants.EXPORT_DIR).
328 return backend.ListExports()
331 def perspective_export_remove(params):
336 return backend.RemoveExport(export)
338 # volume --------------------------
341 def perspective_volume_list(params):
342 """Query the list of logical volumes in a given volume group.
346 return True, backend.GetVolumeList(vgname)
349 def perspective_vg_list(params):
350 """Query the list of volume groups.
353 return backend.ListVolumeGroups()
355 # bridge --------------------------
358 def perspective_bridges_exist(params):
359 """Check if all bridges given exist on this node.
362 bridges_list = params[0]
363 return backend.BridgesExist(bridges_list)
365 # instance --------------------------
368 def perspective_instance_os_add(params):
369 """Install an OS on a given instance.
373 inst = objects.Instance.FromDict(inst_s)
374 reinstall = params[1]
375 return backend.InstanceOsAdd(inst, reinstall)
378 def perspective_instance_run_rename(params):
379 """Runs the OS rename script for an instance.
382 inst_s, old_name = params
383 inst = objects.Instance.FromDict(inst_s)
384 return backend.RunRenameInstance(inst, old_name)
387 def perspective_instance_os_import(params):
388 """Run the import function of an OS onto a given instance.
391 inst_s, src_node, src_images, cluster_name = params
392 inst = objects.Instance.FromDict(inst_s)
393 return backend.ImportOSIntoInstance(inst, src_node, src_images,
397 def perspective_instance_shutdown(params):
398 """Shutdown an instance.
401 instance = objects.Instance.FromDict(params[0])
402 return backend.InstanceShutdown(instance)
405 def perspective_instance_start(params):
406 """Start an instance.
409 instance = objects.Instance.FromDict(params[0])
410 return backend.StartInstance(instance)
413 def perspective_migration_info(params):
414 """Gather information about an instance to be migrated.
417 instance = objects.Instance.FromDict(params[0])
418 return backend.MigrationInfo(instance)
421 def perspective_accept_instance(params):
422 """Prepare the node to accept an instance.
425 instance, info, target = params
426 instance = objects.Instance.FromDict(instance)
427 return backend.AcceptInstance(instance, info, target)
430 def perspective_finalize_migration(params):
431 """Finalize the instance migration.
434 instance, info, success = params
435 instance = objects.Instance.FromDict(instance)
436 return backend.FinalizeMigration(instance, info, success)
439 def perspective_instance_migrate(params):
440 """Migrates an instance.
443 instance, target, live = params
444 instance = objects.Instance.FromDict(instance)
445 return backend.MigrateInstance(instance, target, live)
448 def perspective_instance_reboot(params):
449 """Reboot an instance.
452 instance = objects.Instance.FromDict(params[0])
453 reboot_type = params[1]
454 return backend.InstanceReboot(instance, reboot_type)
457 def perspective_instance_info(params):
458 """Query instance information.
461 return backend.GetInstanceInfo(params[0], params[1])
464 def perspective_instance_migratable(params):
465 """Query whether the specified instance can be migrated.
468 instance = objects.Instance.FromDict(params[0])
469 return backend.GetInstanceMigratable(instance)
472 def perspective_all_instances_info(params):
473 """Query information about all instances.
476 return backend.GetAllInstancesInfo(params[0])
479 def perspective_instance_list(params):
480 """Query the list of running instances.
483 return True, backend.GetInstanceList(params[0])
485 # node --------------------------
488 def perspective_node_tcp_ping(params):
489 """Do a TcpPing on the remote node.
492 return utils.TcpPing(params[1], params[2], timeout=params[3],
493 live_port_needed=params[4], source=params[0])
496 def perspective_node_has_ip_address(params):
497 """Checks if a node has the given ip address.
500 return True, utils.OwnIpAddress(params[0])
503 def perspective_node_info(params):
504 """Query node information.
507 vgname, hypervisor_type = params
508 return backend.GetNodeInfo(vgname, hypervisor_type)
511 def perspective_node_add(params):
512 """Complete the registration of this node in the cluster.
515 return backend.AddNode(params[0], params[1], params[2],
516 params[3], params[4], params[5])
519 def perspective_node_verify(params):
520 """Run a verify sequence on this node.
523 return backend.VerifyNode(params[0], params[1])
526 def perspective_node_start_master(params):
527 """Promote this node to master status.
530 return backend.StartMaster(params[0])
533 def perspective_node_stop_master(params):
534 """Demote this node from master status.
537 return backend.StopMaster(params[0])
540 def perspective_node_leave_cluster(params):
541 """Cleanup after leaving a cluster.
544 return backend.LeaveCluster()
547 def perspective_node_volumes(params):
548 """Query the list of all logical volume groups.
551 return backend.NodeVolumes()
554 def perspective_node_demote_from_mc(params):
555 """Demote a node from the master candidate role.
558 return backend.DemoteFromMC()
562 def perspective_node_powercycle(params):
563 """Tries to powercycle the nod.
566 hypervisor_type = params[0]
567 return backend.PowercycleNode(hypervisor_type)
570 # cluster --------------------------
573 def perspective_version(params):
574 """Query version information.
577 return True, constants.PROTOCOL_VERSION
580 def perspective_upload_file(params):
583 Note that the backend implementation imposes strict rules on which
587 return backend.UploadFile(*params)
590 def perspective_master_info(params):
591 """Query master information.
594 return backend.GetMasterInfo()
597 def perspective_write_ssconf_files(params):
598 """Write ssconf files.
602 return backend.WriteSsconfFiles(values)
604 # os -----------------------
607 def perspective_os_diagnose(params):
608 """Query detailed information about existing OSes.
611 return True, [os_obj.ToDict() for os_obj in backend.DiagnoseOS()]
614 def perspective_os_get(params):
615 """Query information about a given OS.
620 os_obj = backend.OSFromDisk(name)
621 except errors.InvalidOS, err:
622 os_obj = objects.OS.FromInvalidOS(err)
623 return os_obj.ToDict()
625 # hooks -----------------------
628 def perspective_hooks_runner(params):
632 hpath, phase, env = params
633 hr = backend.HooksRunner()
634 return hr.RunHooks(hpath, phase, env)
636 # iallocator -----------------
639 def perspective_iallocator_runner(params):
640 """Run an iallocator script.
644 iar = backend.IAllocatorRunner()
645 return iar.Run(name, idata)
647 # test -----------------------
650 def perspective_test_delay(params):
655 return utils.TestDelay(duration)
657 # file storage ---------------
660 def perspective_file_storage_dir_create(params):
661 """Create the file storage directory.
664 file_storage_dir = params[0]
665 return backend.CreateFileStorageDir(file_storage_dir)
668 def perspective_file_storage_dir_remove(params):
669 """Remove the file storage directory.
672 file_storage_dir = params[0]
673 return backend.RemoveFileStorageDir(file_storage_dir)
676 def perspective_file_storage_dir_rename(params):
677 """Rename the file storage directory.
680 old_file_storage_dir = params[0]
681 new_file_storage_dir = params[1]
682 return backend.RenameFileStorageDir(old_file_storage_dir,
683 new_file_storage_dir)
685 # jobs ------------------------
688 @_RequireJobQueueLock
689 def perspective_jobqueue_update(params):
693 (file_name, content) = params
694 return backend.JobQueueUpdate(file_name, content)
697 @_RequireJobQueueLock
698 def perspective_jobqueue_purge(params):
702 return backend.JobQueuePurge()
705 @_RequireJobQueueLock
706 def perspective_jobqueue_rename(params):
707 """Rename a job queue file.
710 # TODO: What if a file fails to rename?
711 return [backend.JobQueueRename(old, new) for old, new in params]
714 def perspective_jobqueue_set_drain(params):
715 """Set/unset the queue drain flag.
718 drain_flag = params[0]
719 return backend.JobQueueSetDrainFlag(drain_flag)
722 # hypervisor ---------------
725 def perspective_hypervisor_validate_params(params):
726 """Validate the hypervisor parameters.
729 (hvname, hvparams) = params
730 return backend.ValidateHVParams(hvname, hvparams)
734 """Parse the command line options.
736 @return: (options, args) as from OptionParser.parse_args()
739 parser = OptionParser(description="Ganeti node daemon",
740 usage="%prog [-f] [-d] [-b ADDRESS]",
741 version="%%prog (ganeti) %s" %
742 constants.RELEASE_VERSION)
744 parser.add_option("-f", "--foreground", dest="fork",
745 help="Don't detach from the current terminal",
746 default=True, action="store_false")
747 parser.add_option("-d", "--debug", dest="debug",
748 help="Enable some debug messages",
749 default=False, action="store_true")
750 parser.add_option("-b", "--bind", dest="bind_address",
752 default="", metavar="ADDRESS")
754 options, args = parser.parse_args()
759 """Main function for the node daemon.
764 options, args = ParseOptions()
765 utils.debug = options.debug
770 for fname in (constants.SSL_CERT_FILE,):
771 if not os.path.isfile(fname):
772 print "config %s not there, will not run." % fname
776 port = utils.GetNodeDaemonPort()
777 except errors.ConfigurationError, err:
778 print "Cluster configuration incomplete: '%s'" % str(err)
781 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
782 dirs.append((constants.LOG_OS_DIR, 0750))
783 dirs.append((constants.LOCK_DIR, 1777))
784 utils.EnsureDirs(dirs)
788 utils.Daemonize(logfile=constants.LOG_NODESERVER)
790 utils.WritePidFile(constants.NODED_PID)
792 utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
793 stderr_logging=not options.fork)
794 logging.info("ganeti node daemon startup")
796 # Read SSL certificate
797 ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
798 ssl_cert_path=constants.SSL_CERT_FILE)
801 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
803 mainloop = daemon.Mainloop()
804 server = NodeHttpServer(mainloop, options.bind_address, port,
805 ssl_params=ssl_params, ssl_verify_peer=True)
812 utils.RemovePidFile(constants.NODED_PID)
815 if __name__ == '__main__':