4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # pylint: disable-msg=C0103,W0142
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
29 # W0142: Used * or ** magic, since we do use it extensively in this
37 from optparse import OptionParser
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48 from ganeti import serializer
49 from ganeti import netutils
51 import ganeti.http.server # pylint: disable-msg=W0611
57 def _PrepareQueueLock():
58 """Try to prepare the queue lock.
60 @return: None for success, otherwise an exception object
63 global queue_lock # pylint: disable-msg=W0603
65 if queue_lock is not None:
70 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
72 except EnvironmentError, err:
76 def _RequireJobQueueLock(fn):
77 """Decorator for job queue manipulating functions.
80 QUEUE_LOCK_TIMEOUT = 10
82 def wrapper(*args, **kwargs):
83 # Locking in exclusive, blocking mode because there could be several
84 # children running at the same time. Waiting up to 10 seconds.
85 if _PrepareQueueLock() is not None:
86 raise errors.JobQueueError("Job queue failed initialization,"
87 " cannot update jobs")
88 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
90 return fn(*args, **kwargs)
97 def _DecodeImportExportIO(ieio, ieioargs):
98 """Decodes import/export I/O information.
101 if ieio == constants.IEIO_RAW_DISK:
102 assert len(ieioargs) == 1
103 return (objects.Disk.FromDict(ieioargs[0]), )
105 if ieio == constants.IEIO_SCRIPT:
106 assert len(ieioargs) == 2
107 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
112 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
113 """Custom Request Executor class that ensures NodeHttpServer children are
117 def __init__(self, *args, **kwargs):
120 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
123 class NodeHttpServer(http.server.HttpServer):
124 """The server implementation.
126 This class holds all methods exposed over the RPC interface.
129 # too many public methods, and unused args - all methods get params
131 # pylint: disable-msg=R0904,W0613
132 def __init__(self, *args, **kwargs):
133 http.server.HttpServer.__init__(self, *args, **kwargs)
134 self.noded_pid = os.getpid()
136 def HandleRequest(self, req):
140 if req.request_method.upper() != http.HTTP_PUT:
141 raise http.HttpBadRequest()
143 path = req.request_path
144 if path.startswith("/"):
147 method = getattr(self, "perspective_%s" % path, None)
149 raise http.HttpNotFound()
152 result = (True, method(serializer.LoadJson(req.request_body)))
154 except backend.RPCFail, err:
155 # our custom failure exception; str(err) works fine if the
156 # exception was constructed with a single argument, and in
157 # this case, err.message == err.args[0] == str(err)
158 result = (False, str(err))
159 except errors.QuitGanetiException, err:
160 # Tell parent to quit
161 logging.info("Shutting down the node daemon, arguments: %s",
163 os.kill(self.noded_pid, signal.SIGTERM)
164 # And return the error's arguments, which must be already in
165 # correct tuple format
167 except Exception, err:
168 logging.exception("Error in RPC call")
169 result = (False, "Error while executing backend function: %s" % str(err))
171 return serializer.DumpJson(result, indent=False)
173 # the new block devices --------------------------
176 def perspective_blockdev_create(params):
177 """Create a block device.
180 bdev_s, size, owner, on_primary, info = params
181 bdev = objects.Disk.FromDict(bdev_s)
183 raise ValueError("can't unserialize data!")
184 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
187 def perspective_blockdev_remove(params):
188 """Remove a block device.
192 bdev = objects.Disk.FromDict(bdev_s)
193 return backend.BlockdevRemove(bdev)
196 def perspective_blockdev_rename(params):
197 """Remove a block device.
200 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
201 return backend.BlockdevRename(devlist)
204 def perspective_blockdev_assemble(params):
205 """Assemble a block device.
208 bdev_s, owner, on_primary = params
209 bdev = objects.Disk.FromDict(bdev_s)
211 raise ValueError("can't unserialize data!")
212 return backend.BlockdevAssemble(bdev, owner, on_primary)
215 def perspective_blockdev_shutdown(params):
216 """Shutdown a block device.
220 bdev = objects.Disk.FromDict(bdev_s)
222 raise ValueError("can't unserialize data!")
223 return backend.BlockdevShutdown(bdev)
226 def perspective_blockdev_addchildren(params):
227 """Add a child to a mirror device.
229 Note: this is only valid for mirror devices. It's the caller's duty
230 to send a correct disk, otherwise we raise an error.
233 bdev_s, ndev_s = params
234 bdev = objects.Disk.FromDict(bdev_s)
235 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
236 if bdev is None or ndevs.count(None) > 0:
237 raise ValueError("can't unserialize data!")
238 return backend.BlockdevAddchildren(bdev, ndevs)
241 def perspective_blockdev_removechildren(params):
242 """Remove a child from a mirror device.
244 This is only valid for mirror devices, of course. It's the callers
245 duty to send a correct disk, otherwise we raise an error.
248 bdev_s, ndev_s = params
249 bdev = objects.Disk.FromDict(bdev_s)
250 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
251 if bdev is None or ndevs.count(None) > 0:
252 raise ValueError("can't unserialize data!")
253 return backend.BlockdevRemovechildren(bdev, ndevs)
256 def perspective_blockdev_getmirrorstatus(params):
257 """Return the mirror status for a list of disks.
260 disks = [objects.Disk.FromDict(dsk_s)
262 return [status.ToDict()
263 for status in backend.BlockdevGetmirrorstatus(disks)]
266 def perspective_blockdev_find(params):
267 """Expose the FindBlockDevice functionality for a disk.
269 This will try to find but not activate a disk.
272 disk = objects.Disk.FromDict(params[0])
274 result = backend.BlockdevFind(disk)
278 return result.ToDict()
281 def perspective_blockdev_snapshot(params):
282 """Create a snapshot device.
284 Note that this is only valid for LVM disks, if we get passed
285 something else we raise an exception. The snapshot device can be
286 remove by calling the generic block device remove call.
289 cfbd = objects.Disk.FromDict(params[0])
290 return backend.BlockdevSnapshot(cfbd)
293 def perspective_blockdev_grow(params):
294 """Grow a stack of devices.
297 cfbd = objects.Disk.FromDict(params[0])
299 return backend.BlockdevGrow(cfbd, amount)
302 def perspective_blockdev_close(params):
303 """Closes the given block devices.
306 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
307 return backend.BlockdevClose(params[0], disks)
310 def perspective_blockdev_getsize(params):
311 """Compute the sizes of the given block devices.
314 disks = [objects.Disk.FromDict(cf) for cf in params[0]]
315 return backend.BlockdevGetsize(disks)
318 def perspective_blockdev_export(params):
319 """Compute the sizes of the given block devices.
322 disk = objects.Disk.FromDict(params[0])
323 dest_node, dest_path, cluster_name = params[1:]
324 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
326 # blockdev/drbd specific methods ----------
329 def perspective_drbd_disconnect_net(params):
330 """Disconnects the network connection of drbd disks.
332 Note that this is only valid for drbd disks, so the members of the
333 disk list must all be drbd devices.
336 nodes_ip, disks = params
337 disks = [objects.Disk.FromDict(cf) for cf in disks]
338 return backend.DrbdDisconnectNet(nodes_ip, disks)
341 def perspective_drbd_attach_net(params):
342 """Attaches the network connection of drbd disks.
344 Note that this is only valid for drbd disks, so the members of the
345 disk list must all be drbd devices.
348 nodes_ip, disks, instance_name, multimaster = params
349 disks = [objects.Disk.FromDict(cf) for cf in disks]
350 return backend.DrbdAttachNet(nodes_ip, disks,
351 instance_name, multimaster)
354 def perspective_drbd_wait_sync(params):
355 """Wait until DRBD disks are synched.
357 Note that this is only valid for drbd disks, so the members of the
358 disk list must all be drbd devices.
361 nodes_ip, disks = params
362 disks = [objects.Disk.FromDict(cf) for cf in disks]
363 return backend.DrbdWaitSync(nodes_ip, disks)
366 def perspective_drbd_helper(params):
367 """Query drbd helper.
370 return backend.GetDrbdUsermodeHelper()
372 # export/import --------------------------
375 def perspective_finalize_export(params):
376 """Expose the finalize export functionality.
379 instance = objects.Instance.FromDict(params[0])
382 for disk in params[1]:
383 if isinstance(disk, bool):
384 snap_disks.append(disk)
386 snap_disks.append(objects.Disk.FromDict(disk))
388 return backend.FinalizeExport(instance, snap_disks)
391 def perspective_export_info(params):
392 """Query information about an existing export on this node.
394 The given path may not contain an export, in which case we return
399 return backend.ExportInfo(path)
402 def perspective_export_list(params):
403 """List the available exports on this node.
405 Note that as opposed to export_info, which may query data about an
406 export in any path, this only queries the standard Ganeti path
407 (constants.EXPORT_DIR).
410 return backend.ListExports()
413 def perspective_export_remove(params):
418 return backend.RemoveExport(export)
420 # volume --------------------------
423 def perspective_lv_list(params):
424 """Query the list of logical volumes in a given volume group.
428 return backend.GetVolumeList(vgname)
431 def perspective_vg_list(params):
432 """Query the list of volume groups.
435 return backend.ListVolumeGroups()
437 # Storage --------------------------
440 def perspective_storage_list(params):
441 """Get list of storage units.
444 (su_name, su_args, name, fields) = params
445 return storage.GetStorage(su_name, *su_args).List(name, fields)
448 def perspective_storage_modify(params):
449 """Modify a storage unit.
452 (su_name, su_args, name, changes) = params
453 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
456 def perspective_storage_execute(params):
457 """Execute an operation on a storage unit.
460 (su_name, su_args, name, op) = params
461 return storage.GetStorage(su_name, *su_args).Execute(name, op)
463 # bridge --------------------------
466 def perspective_bridges_exist(params):
467 """Check if all bridges given exist on this node.
470 bridges_list = params[0]
471 return backend.BridgesExist(bridges_list)
473 # instance --------------------------
476 def perspective_instance_os_add(params):
477 """Install an OS on a given instance.
481 inst = objects.Instance.FromDict(inst_s)
482 reinstall = params[1]
484 return backend.InstanceOsAdd(inst, reinstall, debug)
487 def perspective_instance_run_rename(params):
488 """Runs the OS rename script for an instance.
491 inst_s, old_name, debug = params
492 inst = objects.Instance.FromDict(inst_s)
493 return backend.RunRenameInstance(inst, old_name, debug)
496 def perspective_instance_shutdown(params):
497 """Shutdown an instance.
500 instance = objects.Instance.FromDict(params[0])
502 return backend.InstanceShutdown(instance, timeout)
505 def perspective_instance_start(params):
506 """Start an instance.
509 instance = objects.Instance.FromDict(params[0])
510 return backend.StartInstance(instance)
513 def perspective_migration_info(params):
514 """Gather information about an instance to be migrated.
517 instance = objects.Instance.FromDict(params[0])
518 return backend.MigrationInfo(instance)
521 def perspective_accept_instance(params):
522 """Prepare the node to accept an instance.
525 instance, info, target = params
526 instance = objects.Instance.FromDict(instance)
527 return backend.AcceptInstance(instance, info, target)
530 def perspective_finalize_migration(params):
531 """Finalize the instance migration.
534 instance, info, success = params
535 instance = objects.Instance.FromDict(instance)
536 return backend.FinalizeMigration(instance, info, success)
539 def perspective_instance_migrate(params):
540 """Migrates an instance.
543 instance, target, live = params
544 instance = objects.Instance.FromDict(instance)
545 return backend.MigrateInstance(instance, target, live)
548 def perspective_instance_reboot(params):
549 """Reboot an instance.
552 instance = objects.Instance.FromDict(params[0])
553 reboot_type = params[1]
554 shutdown_timeout = params[2]
555 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
558 def perspective_instance_info(params):
559 """Query instance information.
562 return backend.GetInstanceInfo(params[0], params[1])
565 def perspective_instance_migratable(params):
566 """Query whether the specified instance can be migrated.
569 instance = objects.Instance.FromDict(params[0])
570 return backend.GetInstanceMigratable(instance)
573 def perspective_all_instances_info(params):
574 """Query information about all instances.
577 return backend.GetAllInstancesInfo(params[0])
580 def perspective_instance_list(params):
581 """Query the list of running instances.
584 return backend.GetInstanceList(params[0])
586 # node --------------------------
589 def perspective_node_tcp_ping(params):
590 """Do a TcpPing on the remote node.
593 return netutils.TcpPing(params[1], params[2], timeout=params[3],
594 live_port_needed=params[4], source=params[0])
597 def perspective_node_has_ip_address(params):
598 """Checks if a node has the given ip address.
601 return netutils.IPAddress.Own(params[0])
604 def perspective_node_info(params):
605 """Query node information.
608 vgname, hypervisor_type = params
609 return backend.GetNodeInfo(vgname, hypervisor_type)
612 def perspective_node_verify(params):
613 """Run a verify sequence on this node.
616 return backend.VerifyNode(params[0], params[1])
619 def perspective_node_start_master(params):
620 """Promote this node to master status.
623 return backend.StartMaster(params[0], params[1])
626 def perspective_node_stop_master(params):
627 """Demote this node from master status.
630 return backend.StopMaster(params[0])
633 def perspective_node_leave_cluster(params):
634 """Cleanup after leaving a cluster.
637 return backend.LeaveCluster(params[0])
640 def perspective_node_volumes(params):
641 """Query the list of all logical volume groups.
644 return backend.NodeVolumes()
647 def perspective_node_demote_from_mc(params):
648 """Demote a node from the master candidate role.
651 return backend.DemoteFromMC()
655 def perspective_node_powercycle(params):
656 """Tries to powercycle the nod.
659 hypervisor_type = params[0]
660 return backend.PowercycleNode(hypervisor_type)
663 # cluster --------------------------
666 def perspective_version(params):
667 """Query version information.
670 return constants.PROTOCOL_VERSION
673 def perspective_upload_file(params):
676 Note that the backend implementation imposes strict rules on which
680 return backend.UploadFile(*params)
683 def perspective_master_info(params):
684 """Query master information.
687 return backend.GetMasterInfo()
690 def perspective_write_ssconf_files(params):
691 """Write ssconf files.
695 return backend.WriteSsconfFiles(values)
697 # os -----------------------
700 def perspective_os_diagnose(params):
701 """Query detailed information about existing OSes.
704 return backend.DiagnoseOS()
707 def perspective_os_get(params):
708 """Query information about a given OS.
712 os_obj = backend.OSFromDisk(name)
713 return os_obj.ToDict()
716 def perspective_os_validate(params):
717 """Run a given OS' validation routine.
720 required, name, checks, params = params
721 return backend.ValidateOS(required, name, checks, params)
723 # hooks -----------------------
726 def perspective_hooks_runner(params):
730 hpath, phase, env = params
731 hr = backend.HooksRunner()
732 return hr.RunHooks(hpath, phase, env)
734 # iallocator -----------------
737 def perspective_iallocator_runner(params):
738 """Run an iallocator script.
742 iar = backend.IAllocatorRunner()
743 return iar.Run(name, idata)
745 # test -----------------------
748 def perspective_test_delay(params):
753 status, rval = utils.TestDelay(duration)
755 raise backend.RPCFail(rval)
758 # file storage ---------------
761 def perspective_file_storage_dir_create(params):
762 """Create the file storage directory.
765 file_storage_dir = params[0]
766 return backend.CreateFileStorageDir(file_storage_dir)
769 def perspective_file_storage_dir_remove(params):
770 """Remove the file storage directory.
773 file_storage_dir = params[0]
774 return backend.RemoveFileStorageDir(file_storage_dir)
777 def perspective_file_storage_dir_rename(params):
778 """Rename the file storage directory.
781 old_file_storage_dir = params[0]
782 new_file_storage_dir = params[1]
783 return backend.RenameFileStorageDir(old_file_storage_dir,
784 new_file_storage_dir)
786 # jobs ------------------------
789 @_RequireJobQueueLock
790 def perspective_jobqueue_update(params):
794 (file_name, content) = params
795 return backend.JobQueueUpdate(file_name, content)
798 @_RequireJobQueueLock
799 def perspective_jobqueue_purge(params):
803 return backend.JobQueuePurge()
806 @_RequireJobQueueLock
807 def perspective_jobqueue_rename(params):
808 """Rename a job queue file.
811 # TODO: What if a file fails to rename?
812 return [backend.JobQueueRename(old, new) for old, new in params]
814 # hypervisor ---------------
817 def perspective_hypervisor_validate_params(params):
818 """Validate the hypervisor parameters.
821 (hvname, hvparams) = params
822 return backend.ValidateHVParams(hvname, hvparams)
827 def perspective_x509_cert_create(params):
828 """Creates a new X509 certificate for SSL/TLS.
831 (validity, ) = params
832 return backend.CreateX509Certificate(validity)
835 def perspective_x509_cert_remove(params):
836 """Removes a X509 certificate.
840 return backend.RemoveX509Certificate(name)
845 def perspective_import_start(params):
846 """Starts an import daemon.
849 (opts_s, instance, dest, dest_args) = params
851 opts = objects.ImportExportOptions.FromDict(opts_s)
853 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
855 objects.Instance.FromDict(instance),
857 _DecodeImportExportIO(dest,
861 def perspective_export_start(params):
862 """Starts an export daemon.
865 (opts_s, host, port, instance, source, source_args) = params
867 opts = objects.ImportExportOptions.FromDict(opts_s)
869 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
871 objects.Instance.FromDict(instance),
873 _DecodeImportExportIO(source,
877 def perspective_impexp_status(params):
878 """Retrieves the status of an import or export daemon.
881 return backend.GetImportExportStatus(params[0])
884 def perspective_impexp_abort(params):
885 """Aborts an import or export.
888 return backend.AbortImportExport(params[0])
891 def perspective_impexp_cleanup(params):
892 """Cleans up after an import or export.
895 return backend.CleanupImportExport(params[0])
898 def CheckNoded(_, args):
899 """Initial checks whether to run or exit with a failure.
902 if args: # noded doesn't take any arguments
903 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
905 sys.exit(constants.EXIT_FAILURE)
908 def ExecNoded(options, _):
909 """Main node daemon function, executed with the PID file held.
913 request_executor_class = MlockallRequestExecutor
916 except errors.NoCtypesError:
917 logging.warning("Cannot set memory lock, ctypes module not found")
918 request_executor_class = http.server.HttpServerRequestExecutor
920 request_executor_class = http.server.HttpServerRequestExecutor
922 # Read SSL certificate
924 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
925 ssl_cert_path=options.ssl_cert)
929 err = _PrepareQueueLock()
931 # this might be some kind of file-system/permission error; while
932 # this breaks the job queue functionality, we shouldn't prevent
933 # startup of the whole node daemon because of this
934 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
936 mainloop = daemon.Mainloop()
937 server = NodeHttpServer(mainloop, options.bind_address, options.port,
938 ssl_params=ssl_params, ssl_verify_peer=True,
939 request_executor_class=request_executor_class)
948 """Main function for the node daemon.
951 parser = OptionParser(description="Ganeti node daemon",
952 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
953 version="%%prog (ganeti) %s" %
954 constants.RELEASE_VERSION)
955 parser.add_option("--no-mlock", dest="mlock",
956 help="Do not mlock the node memory in ram",
957 default=True, action="store_false")
959 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
960 dirs.append((constants.LOG_OS_DIR, 0750))
961 dirs.append((constants.LOCK_DIR, 1777))
962 dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE))
963 dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
964 daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
965 default_ssl_cert=constants.NODED_CERT_FILE,
966 default_ssl_key=constants.NODED_CERT_FILE,
967 console_logging=True)
970 if __name__ == '__main__':