4 # Copyright (C) 2006, 2007, 2010 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # pylint: disable-msg=C0103,W0142
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
29 # W0142: Used * or ** magic, since we do use it extensively in this
37 from optparse import OptionParser
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48 from ganeti import serializer
49 from ganeti import netutils
51 import ganeti.http.server # pylint: disable-msg=W0611
57 def _PrepareQueueLock():
58 """Try to prepare the queue lock.
60 @return: None for success, otherwise an exception object
63 global queue_lock # pylint: disable-msg=W0603
65 if queue_lock is not None:
70 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
72 except EnvironmentError, err:
76 def _RequireJobQueueLock(fn):
77 """Decorator for job queue manipulating functions.
80 QUEUE_LOCK_TIMEOUT = 10
82 def wrapper(*args, **kwargs):
83 # Locking in exclusive, blocking mode because there could be several
84 # children running at the same time. Waiting up to 10 seconds.
85 if _PrepareQueueLock() is not None:
86 raise errors.JobQueueError("Job queue failed initialization,"
87 " cannot update jobs")
88 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
90 return fn(*args, **kwargs)
97 def _DecodeImportExportIO(ieio, ieioargs):
98 """Decodes import/export I/O information.
101 if ieio == constants.IEIO_RAW_DISK:
102 assert len(ieioargs) == 1
103 return (objects.Disk.FromDict(ieioargs[0]), )
105 if ieio == constants.IEIO_SCRIPT:
106 assert len(ieioargs) == 2
107 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
112 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
113 """Custom Request Executor class that ensures NodeHttpServer children are
117 def __init__(self, *args, **kwargs):
120 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
123 class NodeHttpServer(http.server.HttpServer):
124 """The server implementation.
126 This class holds all methods exposed over the RPC interface.
129 # too many public methods, and unused args - all methods get params
131 # pylint: disable-msg=R0904,W0613
132 def __init__(self, *args, **kwargs):
133 http.server.HttpServer.__init__(self, *args, **kwargs)
134 self.noded_pid = os.getpid()
136 def HandleRequest(self, req):
140 if req.request_method.upper() != http.HTTP_PUT:
141 raise http.HttpBadRequest()
143 path = req.request_path
144 if path.startswith("/"):
147 method = getattr(self, "perspective_%s" % path, None)
149 raise http.HttpNotFound()
152 result = (True, method(serializer.LoadJson(req.request_body)))
154 except backend.RPCFail, err:
155 # our custom failure exception; str(err) works fine if the
156 # exception was constructed with a single argument, and in
157 # this case, err.message == err.args[0] == str(err)
158 result = (False, str(err))
159 except errors.QuitGanetiException, err:
160 # Tell parent to quit
161 logging.info("Shutting down the node daemon, arguments: %s",
163 os.kill(self.noded_pid, signal.SIGTERM)
164 # And return the error's arguments, which must be already in
165 # correct tuple format
167 except Exception, err:
168 logging.exception("Error in RPC call")
169 result = (False, "Error while executing backend function: %s" % str(err))
171 return serializer.DumpJson(result, indent=False)
173 # the new block devices --------------------------
176 def perspective_blockdev_create(params):
177 """Create a block device.
180 bdev_s, size, owner, on_primary, info = params
181 bdev = objects.Disk.FromDict(bdev_s)
183 raise ValueError("can't unserialize data!")
184 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
187 def perspective_blockdev_pause_resume_sync(params):
188 """Pause/resume sync of a block device.
191 disks_s, pause = params
192 disks = [objects.Disk.FromDict(bdev_s) for bdev_s in disks_s]
193 return backend.BlockdevPauseResumeSync(disks, pause)
196 def perspective_blockdev_wipe(params):
197 """Wipe a block device.
200 bdev_s, offset, size = params
201 bdev = objects.Disk.FromDict(bdev_s)
202 return backend.BlockdevWipe(bdev, offset, size)
205 def perspective_blockdev_remove(params):
206 """Remove a block device.
210 bdev = objects.Disk.FromDict(bdev_s)
211 return backend.BlockdevRemove(bdev)
214 def perspective_blockdev_rename(params):
215 """Remove a block device.
218 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
219 return backend.BlockdevRename(devlist)
222 def perspective_blockdev_assemble(params):
223 """Assemble a block device.
226 bdev_s, owner, on_primary = params
227 bdev = objects.Disk.FromDict(bdev_s)
229 raise ValueError("can't unserialize data!")
230 return backend.BlockdevAssemble(bdev, owner, on_primary)
233 def perspective_blockdev_shutdown(params):
234 """Shutdown a block device.
238 bdev = objects.Disk.FromDict(bdev_s)
240 raise ValueError("can't unserialize data!")
241 return backend.BlockdevShutdown(bdev)
244 def perspective_blockdev_addchildren(params):
245 """Add a child to a mirror device.
247 Note: this is only valid for mirror devices. It's the caller's duty
248 to send a correct disk, otherwise we raise an error.
251 bdev_s, ndev_s = params
252 bdev = objects.Disk.FromDict(bdev_s)
253 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
254 if bdev is None or ndevs.count(None) > 0:
255 raise ValueError("can't unserialize data!")
256 return backend.BlockdevAddchildren(bdev, ndevs)
259 def perspective_blockdev_removechildren(params):
260 """Remove a child from a mirror device.
262 This is only valid for mirror devices, of course. It's the callers
263 duty to send a correct disk, otherwise we raise an error.
266 bdev_s, ndev_s = params
267 bdev = objects.Disk.FromDict(bdev_s)
268 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
269 if bdev is None or ndevs.count(None) > 0:
270 raise ValueError("can't unserialize data!")
271 return backend.BlockdevRemovechildren(bdev, ndevs)
274 def perspective_blockdev_getmirrorstatus(params):
275 """Return the mirror status for a list of disks.
278 disks = [objects.Disk.FromDict(dsk_s)
280 return [status.ToDict()
281 for status in backend.BlockdevGetmirrorstatus(disks)]
284 def perspective_blockdev_getmirrorstatus_multi(params):
285 """Return the mirror status for a list of disks.
288 (node_disks, ) = params
290 node_name = netutils.Hostname.GetSysName()
292 disks = [objects.Disk.FromDict(dsk_s)
293 for dsk_s in node_disks.get(node_name, [])]
297 for (success, status) in backend.BlockdevGetmirrorstatusMulti(disks):
299 result.append((success, status.ToDict()))
301 result.append((success, status))
306 def perspective_blockdev_find(params):
307 """Expose the FindBlockDevice functionality for a disk.
309 This will try to find but not activate a disk.
312 disk = objects.Disk.FromDict(params[0])
314 result = backend.BlockdevFind(disk)
318 return result.ToDict()
321 def perspective_blockdev_snapshot(params):
322 """Create a snapshot device.
324 Note that this is only valid for LVM disks, if we get passed
325 something else we raise an exception. The snapshot device can be
326 remove by calling the generic block device remove call.
329 cfbd = objects.Disk.FromDict(params[0])
330 return backend.BlockdevSnapshot(cfbd)
333 def perspective_blockdev_grow(params):
334 """Grow a stack of devices.
337 cfbd = objects.Disk.FromDict(params[0])
339 return backend.BlockdevGrow(cfbd, amount)
342 def perspective_blockdev_close(params):
343 """Closes the given block devices.
346 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
347 return backend.BlockdevClose(params[0], disks)
350 def perspective_blockdev_getsize(params):
351 """Compute the sizes of the given block devices.
354 disks = [objects.Disk.FromDict(cf) for cf in params[0]]
355 return backend.BlockdevGetsize(disks)
358 def perspective_blockdev_export(params):
359 """Compute the sizes of the given block devices.
362 disk = objects.Disk.FromDict(params[0])
363 dest_node, dest_path, cluster_name = params[1:]
364 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
366 # blockdev/drbd specific methods ----------
369 def perspective_drbd_disconnect_net(params):
370 """Disconnects the network connection of drbd disks.
372 Note that this is only valid for drbd disks, so the members of the
373 disk list must all be drbd devices.
376 nodes_ip, disks = params
377 disks = [objects.Disk.FromDict(cf) for cf in disks]
378 return backend.DrbdDisconnectNet(nodes_ip, disks)
381 def perspective_drbd_attach_net(params):
382 """Attaches the network connection of drbd disks.
384 Note that this is only valid for drbd disks, so the members of the
385 disk list must all be drbd devices.
388 nodes_ip, disks, instance_name, multimaster = params
389 disks = [objects.Disk.FromDict(cf) for cf in disks]
390 return backend.DrbdAttachNet(nodes_ip, disks,
391 instance_name, multimaster)
394 def perspective_drbd_wait_sync(params):
395 """Wait until DRBD disks are synched.
397 Note that this is only valid for drbd disks, so the members of the
398 disk list must all be drbd devices.
401 nodes_ip, disks = params
402 disks = [objects.Disk.FromDict(cf) for cf in disks]
403 return backend.DrbdWaitSync(nodes_ip, disks)
406 def perspective_drbd_helper(params):
407 """Query drbd helper.
410 return backend.GetDrbdUsermodeHelper()
412 # export/import --------------------------
415 def perspective_finalize_export(params):
416 """Expose the finalize export functionality.
419 instance = objects.Instance.FromDict(params[0])
422 for disk in params[1]:
423 if isinstance(disk, bool):
424 snap_disks.append(disk)
426 snap_disks.append(objects.Disk.FromDict(disk))
428 return backend.FinalizeExport(instance, snap_disks)
431 def perspective_export_info(params):
432 """Query information about an existing export on this node.
434 The given path may not contain an export, in which case we return
439 return backend.ExportInfo(path)
442 def perspective_export_list(params):
443 """List the available exports on this node.
445 Note that as opposed to export_info, which may query data about an
446 export in any path, this only queries the standard Ganeti path
447 (constants.EXPORT_DIR).
450 return backend.ListExports()
453 def perspective_export_remove(params):
458 return backend.RemoveExport(export)
460 # volume --------------------------
463 def perspective_lv_list(params):
464 """Query the list of logical volumes in a given volume group.
468 return backend.GetVolumeList(vgname)
471 def perspective_vg_list(params):
472 """Query the list of volume groups.
475 return backend.ListVolumeGroups()
477 # Storage --------------------------
480 def perspective_storage_list(params):
481 """Get list of storage units.
484 (su_name, su_args, name, fields) = params
485 return storage.GetStorage(su_name, *su_args).List(name, fields)
488 def perspective_storage_modify(params):
489 """Modify a storage unit.
492 (su_name, su_args, name, changes) = params
493 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
496 def perspective_storage_execute(params):
497 """Execute an operation on a storage unit.
500 (su_name, su_args, name, op) = params
501 return storage.GetStorage(su_name, *su_args).Execute(name, op)
503 # bridge --------------------------
506 def perspective_bridges_exist(params):
507 """Check if all bridges given exist on this node.
510 bridges_list = params[0]
511 return backend.BridgesExist(bridges_list)
513 # instance --------------------------
516 def perspective_instance_os_add(params):
517 """Install an OS on a given instance.
521 inst = objects.Instance.FromDict(inst_s)
522 reinstall = params[1]
524 return backend.InstanceOsAdd(inst, reinstall, debug)
527 def perspective_instance_run_rename(params):
528 """Runs the OS rename script for an instance.
531 inst_s, old_name, debug = params
532 inst = objects.Instance.FromDict(inst_s)
533 return backend.RunRenameInstance(inst, old_name, debug)
536 def perspective_instance_shutdown(params):
537 """Shutdown an instance.
540 instance = objects.Instance.FromDict(params[0])
542 return backend.InstanceShutdown(instance, timeout)
545 def perspective_instance_start(params):
546 """Start an instance.
549 instance = objects.Instance.FromDict(params[0])
550 return backend.StartInstance(instance)
553 def perspective_migration_info(params):
554 """Gather information about an instance to be migrated.
557 instance = objects.Instance.FromDict(params[0])
558 return backend.MigrationInfo(instance)
561 def perspective_accept_instance(params):
562 """Prepare the node to accept an instance.
565 instance, info, target = params
566 instance = objects.Instance.FromDict(instance)
567 return backend.AcceptInstance(instance, info, target)
570 def perspective_finalize_migration(params):
571 """Finalize the instance migration.
574 instance, info, success = params
575 instance = objects.Instance.FromDict(instance)
576 return backend.FinalizeMigration(instance, info, success)
579 def perspective_instance_migrate(params):
580 """Migrates an instance.
583 instance, target, live = params
584 instance = objects.Instance.FromDict(instance)
585 return backend.MigrateInstance(instance, target, live)
588 def perspective_instance_reboot(params):
589 """Reboot an instance.
592 instance = objects.Instance.FromDict(params[0])
593 reboot_type = params[1]
594 shutdown_timeout = params[2]
595 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
598 def perspective_instance_info(params):
599 """Query instance information.
602 return backend.GetInstanceInfo(params[0], params[1])
605 def perspective_instance_migratable(params):
606 """Query whether the specified instance can be migrated.
609 instance = objects.Instance.FromDict(params[0])
610 return backend.GetInstanceMigratable(instance)
613 def perspective_all_instances_info(params):
614 """Query information about all instances.
617 return backend.GetAllInstancesInfo(params[0])
620 def perspective_instance_list(params):
621 """Query the list of running instances.
624 return backend.GetInstanceList(params[0])
626 # node --------------------------
629 def perspective_node_tcp_ping(params):
630 """Do a TcpPing on the remote node.
633 return netutils.TcpPing(params[1], params[2], timeout=params[3],
634 live_port_needed=params[4], source=params[0])
637 def perspective_node_has_ip_address(params):
638 """Checks if a node has the given ip address.
641 return netutils.IPAddress.Own(params[0])
644 def perspective_node_info(params):
645 """Query node information.
648 vgname, hypervisor_type = params
649 return backend.GetNodeInfo(vgname, hypervisor_type)
652 def perspective_etc_hosts_modify(params):
653 """Modify a node entry in /etc/hosts.
656 backend.EtcHostsModify(params[0], params[1], params[2])
661 def perspective_node_verify(params):
662 """Run a verify sequence on this node.
665 return backend.VerifyNode(params[0], params[1])
668 def perspective_node_start_master(params):
669 """Promote this node to master status.
672 return backend.StartMaster(params[0], params[1])
675 def perspective_node_stop_master(params):
676 """Demote this node from master status.
679 return backend.StopMaster(params[0])
682 def perspective_node_leave_cluster(params):
683 """Cleanup after leaving a cluster.
686 return backend.LeaveCluster(params[0])
689 def perspective_node_volumes(params):
690 """Query the list of all logical volume groups.
693 return backend.NodeVolumes()
696 def perspective_node_demote_from_mc(params):
697 """Demote a node from the master candidate role.
700 return backend.DemoteFromMC()
704 def perspective_node_powercycle(params):
705 """Tries to powercycle the nod.
708 hypervisor_type = params[0]
709 return backend.PowercycleNode(hypervisor_type)
712 # cluster --------------------------
715 def perspective_version(params):
716 """Query version information.
719 return constants.PROTOCOL_VERSION
722 def perspective_upload_file(params):
725 Note that the backend implementation imposes strict rules on which
729 return backend.UploadFile(*params)
732 def perspective_master_info(params):
733 """Query master information.
736 return backend.GetMasterInfo()
739 def perspective_run_oob(params):
743 output = backend.RunOob(params[0], params[1], params[2], params[3])
745 result = serializer.LoadJson(output)
751 def perspective_write_ssconf_files(params):
752 """Write ssconf files.
756 return backend.WriteSsconfFiles(values)
758 # os -----------------------
761 def perspective_os_diagnose(params):
762 """Query detailed information about existing OSes.
765 return backend.DiagnoseOS()
768 def perspective_os_get(params):
769 """Query information about a given OS.
773 os_obj = backend.OSFromDisk(name)
774 return os_obj.ToDict()
777 def perspective_os_validate(params):
778 """Run a given OS' validation routine.
781 required, name, checks, params = params
782 return backend.ValidateOS(required, name, checks, params)
784 # hooks -----------------------
787 def perspective_hooks_runner(params):
791 hpath, phase, env = params
792 hr = backend.HooksRunner()
793 return hr.RunHooks(hpath, phase, env)
795 # iallocator -----------------
798 def perspective_iallocator_runner(params):
799 """Run an iallocator script.
803 iar = backend.IAllocatorRunner()
804 return iar.Run(name, idata)
806 # test -----------------------
809 def perspective_test_delay(params):
814 status, rval = utils.TestDelay(duration)
816 raise backend.RPCFail(rval)
819 # file storage ---------------
822 def perspective_file_storage_dir_create(params):
823 """Create the file storage directory.
826 file_storage_dir = params[0]
827 return backend.CreateFileStorageDir(file_storage_dir)
830 def perspective_file_storage_dir_remove(params):
831 """Remove the file storage directory.
834 file_storage_dir = params[0]
835 return backend.RemoveFileStorageDir(file_storage_dir)
838 def perspective_file_storage_dir_rename(params):
839 """Rename the file storage directory.
842 old_file_storage_dir = params[0]
843 new_file_storage_dir = params[1]
844 return backend.RenameFileStorageDir(old_file_storage_dir,
845 new_file_storage_dir)
847 # jobs ------------------------
850 @_RequireJobQueueLock
851 def perspective_jobqueue_update(params):
855 (file_name, content) = params
856 return backend.JobQueueUpdate(file_name, content)
859 @_RequireJobQueueLock
860 def perspective_jobqueue_purge(params):
864 return backend.JobQueuePurge()
867 @_RequireJobQueueLock
868 def perspective_jobqueue_rename(params):
869 """Rename a job queue file.
872 # TODO: What if a file fails to rename?
873 return [backend.JobQueueRename(old, new) for old, new in params]
875 # hypervisor ---------------
878 def perspective_hypervisor_validate_params(params):
879 """Validate the hypervisor parameters.
882 (hvname, hvparams) = params
883 return backend.ValidateHVParams(hvname, hvparams)
888 def perspective_x509_cert_create(params):
889 """Creates a new X509 certificate for SSL/TLS.
892 (validity, ) = params
893 return backend.CreateX509Certificate(validity)
896 def perspective_x509_cert_remove(params):
897 """Removes a X509 certificate.
901 return backend.RemoveX509Certificate(name)
906 def perspective_import_start(params):
907 """Starts an import daemon.
910 (opts_s, instance, dest, dest_args) = params
912 opts = objects.ImportExportOptions.FromDict(opts_s)
914 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
916 objects.Instance.FromDict(instance),
918 _DecodeImportExportIO(dest,
922 def perspective_export_start(params):
923 """Starts an export daemon.
926 (opts_s, host, port, instance, source, source_args) = params
928 opts = objects.ImportExportOptions.FromDict(opts_s)
930 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
932 objects.Instance.FromDict(instance),
934 _DecodeImportExportIO(source,
938 def perspective_impexp_status(params):
939 """Retrieves the status of an import or export daemon.
942 return backend.GetImportExportStatus(params[0])
945 def perspective_impexp_abort(params):
946 """Aborts an import or export.
949 return backend.AbortImportExport(params[0])
952 def perspective_impexp_cleanup(params):
953 """Cleans up after an import or export.
956 return backend.CleanupImportExport(params[0])
959 def CheckNoded(_, args):
960 """Initial checks whether to run or exit with a failure.
963 if args: # noded doesn't take any arguments
964 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
966 sys.exit(constants.EXIT_FAILURE)
969 def PrepNoded(options, _):
970 """Preparation node daemon function, executed with the PID file held.
974 request_executor_class = MlockallRequestExecutor
977 except errors.NoCtypesError:
978 logging.warning("Cannot set memory lock, ctypes module not found")
979 request_executor_class = http.server.HttpServerRequestExecutor
981 request_executor_class = http.server.HttpServerRequestExecutor
983 # Read SSL certificate
985 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
986 ssl_cert_path=options.ssl_cert)
990 err = _PrepareQueueLock()
992 # this might be some kind of file-system/permission error; while
993 # this breaks the job queue functionality, we shouldn't prevent
994 # startup of the whole node daemon because of this
995 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
997 mainloop = daemon.Mainloop()
998 server = NodeHttpServer(mainloop, options.bind_address, options.port,
999 ssl_params=ssl_params, ssl_verify_peer=True,
1000 request_executor_class=request_executor_class)
1002 return (mainloop, server)
1005 def ExecNoded(options, args, prep_data): # pylint: disable-msg=W0613
1006 """Main node daemon function, executed with the PID file held.
1009 (mainloop, server) = prep_data
1017 """Main function for the node daemon.
1020 parser = OptionParser(description="Ganeti node daemon",
1021 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
1022 version="%%prog (ganeti) %s" %
1023 constants.RELEASE_VERSION)
1024 parser.add_option("--no-mlock", dest="mlock",
1025 help="Do not mlock the node memory in ram",
1026 default=True, action="store_false")
1028 daemon.GenericMain(constants.NODED, parser, CheckNoded, PrepNoded, ExecNoded,
1029 default_ssl_cert=constants.NODED_CERT_FILE,
1030 default_ssl_key=constants.NODED_CERT_FILE,
1031 console_logging=True)