4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # pylint: disable-msg=C0103,W0142
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
29 # W0142: Used * or ** magic, since we do use it extensively in this
37 from optparse import OptionParser
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48 from ganeti import serializer
50 import ganeti.http.server # pylint: disable-msg=W0611
56 def _PrepareQueueLock():
57 """Try to prepare the queue lock.
59 @return: None for success, otherwise an exception object
62 global queue_lock # pylint: disable-msg=W0603
64 if queue_lock is not None:
69 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
71 except EnvironmentError, err:
75 def _RequireJobQueueLock(fn):
76 """Decorator for job queue manipulating functions.
79 QUEUE_LOCK_TIMEOUT = 10
81 def wrapper(*args, **kwargs):
82 # Locking in exclusive, blocking mode because there could be several
83 # children running at the same time. Waiting up to 10 seconds.
84 if _PrepareQueueLock() is not None:
85 raise errors.JobQueueError("Job queue failed initialization,"
86 " cannot update jobs")
87 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
89 return fn(*args, **kwargs)
96 def _DecodeImportExportIO(ieio, ieioargs):
97 """Decodes import/export I/O information.
100 if ieio == constants.IEIO_RAW_DISK:
101 assert len(ieioargs) == 1
102 return (objects.Disk.FromDict(ieioargs[0]), )
104 if ieio == constants.IEIO_SCRIPT:
105 assert len(ieioargs) == 2
106 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
111 class NodeHttpServer(http.server.HttpServer):
112 """The server implementation.
114 This class holds all methods exposed over the RPC interface.
117 # too many public methods, and unused args - all methods get params
119 # pylint: disable-msg=R0904,W0613
120 def __init__(self, *args, **kwargs):
121 http.server.HttpServer.__init__(self, *args, **kwargs)
122 self.noded_pid = os.getpid()
124 def HandleRequest(self, req):
128 if req.request_method.upper() != http.HTTP_PUT:
129 raise http.HttpBadRequest()
131 path = req.request_path
132 if path.startswith("/"):
135 method = getattr(self, "perspective_%s" % path, None)
137 raise http.HttpNotFound()
140 result = (True, method(serializer.LoadJson(req.request_body)))
142 except backend.RPCFail, err:
143 # our custom failure exception; str(err) works fine if the
144 # exception was constructed with a single argument, and in
145 # this case, err.message == err.args[0] == str(err)
146 result = (False, str(err))
147 except errors.QuitGanetiException, err:
148 # Tell parent to quit
149 logging.info("Shutting down the node daemon, arguments: %s",
151 os.kill(self.noded_pid, signal.SIGTERM)
152 # And return the error's arguments, which must be already in
153 # correct tuple format
155 except Exception, err:
156 logging.exception("Error in RPC call")
157 result = (False, "Error while executing backend function: %s" % str(err))
159 return serializer.DumpJson(result, indent=False)
161 # the new block devices --------------------------
164 def perspective_blockdev_create(params):
165 """Create a block device.
168 bdev_s, size, owner, on_primary, info = params
169 bdev = objects.Disk.FromDict(bdev_s)
171 raise ValueError("can't unserialize data!")
172 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
175 def perspective_blockdev_remove(params):
176 """Remove a block device.
180 bdev = objects.Disk.FromDict(bdev_s)
181 return backend.BlockdevRemove(bdev)
184 def perspective_blockdev_rename(params):
185 """Remove a block device.
188 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
189 return backend.BlockdevRename(devlist)
192 def perspective_blockdev_assemble(params):
193 """Assemble a block device.
196 bdev_s, owner, on_primary = params
197 bdev = objects.Disk.FromDict(bdev_s)
199 raise ValueError("can't unserialize data!")
200 return backend.BlockdevAssemble(bdev, owner, on_primary)
203 def perspective_blockdev_shutdown(params):
204 """Shutdown a block device.
208 bdev = objects.Disk.FromDict(bdev_s)
210 raise ValueError("can't unserialize data!")
211 return backend.BlockdevShutdown(bdev)
214 def perspective_blockdev_addchildren(params):
215 """Add a child to a mirror device.
217 Note: this is only valid for mirror devices. It's the caller's duty
218 to send a correct disk, otherwise we raise an error.
221 bdev_s, ndev_s = params
222 bdev = objects.Disk.FromDict(bdev_s)
223 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
224 if bdev is None or ndevs.count(None) > 0:
225 raise ValueError("can't unserialize data!")
226 return backend.BlockdevAddchildren(bdev, ndevs)
229 def perspective_blockdev_removechildren(params):
230 """Remove a child from a mirror device.
232 This is only valid for mirror devices, of course. It's the callers
233 duty to send a correct disk, otherwise we raise an error.
236 bdev_s, ndev_s = params
237 bdev = objects.Disk.FromDict(bdev_s)
238 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
239 if bdev is None or ndevs.count(None) > 0:
240 raise ValueError("can't unserialize data!")
241 return backend.BlockdevRemovechildren(bdev, ndevs)
244 def perspective_blockdev_getmirrorstatus(params):
245 """Return the mirror status for a list of disks.
248 disks = [objects.Disk.FromDict(dsk_s)
250 return [status.ToDict()
251 for status in backend.BlockdevGetmirrorstatus(disks)]
254 def perspective_blockdev_find(params):
255 """Expose the FindBlockDevice functionality for a disk.
257 This will try to find but not activate a disk.
260 disk = objects.Disk.FromDict(params[0])
262 result = backend.BlockdevFind(disk)
266 return result.ToDict()
269 def perspective_blockdev_snapshot(params):
270 """Create a snapshot device.
272 Note that this is only valid for LVM disks, if we get passed
273 something else we raise an exception. The snapshot device can be
274 remove by calling the generic block device remove call.
277 cfbd = objects.Disk.FromDict(params[0])
278 return backend.BlockdevSnapshot(cfbd)
281 def perspective_blockdev_grow(params):
282 """Grow a stack of devices.
285 cfbd = objects.Disk.FromDict(params[0])
287 return backend.BlockdevGrow(cfbd, amount)
290 def perspective_blockdev_close(params):
291 """Closes the given block devices.
294 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
295 return backend.BlockdevClose(params[0], disks)
298 def perspective_blockdev_getsize(params):
299 """Compute the sizes of the given block devices.
302 disks = [objects.Disk.FromDict(cf) for cf in params[0]]
303 return backend.BlockdevGetsize(disks)
306 def perspective_blockdev_export(params):
307 """Compute the sizes of the given block devices.
310 disk = objects.Disk.FromDict(params[0])
311 dest_node, dest_path, cluster_name = params[1:]
312 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
314 # blockdev/drbd specific methods ----------
317 def perspective_drbd_disconnect_net(params):
318 """Disconnects the network connection of drbd disks.
320 Note that this is only valid for drbd disks, so the members of the
321 disk list must all be drbd devices.
324 nodes_ip, disks = params
325 disks = [objects.Disk.FromDict(cf) for cf in disks]
326 return backend.DrbdDisconnectNet(nodes_ip, disks)
329 def perspective_drbd_attach_net(params):
330 """Attaches the network connection of drbd disks.
332 Note that this is only valid for drbd disks, so the members of the
333 disk list must all be drbd devices.
336 nodes_ip, disks, instance_name, multimaster = params
337 disks = [objects.Disk.FromDict(cf) for cf in disks]
338 return backend.DrbdAttachNet(nodes_ip, disks,
339 instance_name, multimaster)
342 def perspective_drbd_wait_sync(params):
343 """Wait until DRBD disks are synched.
345 Note that this is only valid for drbd disks, so the members of the
346 disk list must all be drbd devices.
349 nodes_ip, disks = params
350 disks = [objects.Disk.FromDict(cf) for cf in disks]
351 return backend.DrbdWaitSync(nodes_ip, disks)
353 # export/import --------------------------
356 def perspective_snapshot_export(params):
357 """Export a given snapshot.
360 disk = objects.Disk.FromDict(params[0])
361 dest_node = params[1]
362 instance = objects.Instance.FromDict(params[2])
363 cluster_name = params[3]
366 return backend.ExportSnapshot(disk, dest_node, instance,
367 cluster_name, dev_idx, debug)
370 def perspective_finalize_export(params):
371 """Expose the finalize export functionality.
374 instance = objects.Instance.FromDict(params[0])
377 for disk in params[1]:
378 if isinstance(disk, bool):
379 snap_disks.append(disk)
381 snap_disks.append(objects.Disk.FromDict(disk))
383 return backend.FinalizeExport(instance, snap_disks)
386 def perspective_export_info(params):
387 """Query information about an existing export on this node.
389 The given path may not contain an export, in which case we return
394 return backend.ExportInfo(path)
397 def perspective_export_list(params):
398 """List the available exports on this node.
400 Note that as opposed to export_info, which may query data about an
401 export in any path, this only queries the standard Ganeti path
402 (constants.EXPORT_DIR).
405 return backend.ListExports()
408 def perspective_export_remove(params):
413 return backend.RemoveExport(export)
415 # volume --------------------------
418 def perspective_lv_list(params):
419 """Query the list of logical volumes in a given volume group.
423 return backend.GetVolumeList(vgname)
426 def perspective_vg_list(params):
427 """Query the list of volume groups.
430 return backend.ListVolumeGroups()
432 # Storage --------------------------
435 def perspective_storage_list(params):
436 """Get list of storage units.
439 (su_name, su_args, name, fields) = params
440 return storage.GetStorage(su_name, *su_args).List(name, fields)
443 def perspective_storage_modify(params):
444 """Modify a storage unit.
447 (su_name, su_args, name, changes) = params
448 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
451 def perspective_storage_execute(params):
452 """Execute an operation on a storage unit.
455 (su_name, su_args, name, op) = params
456 return storage.GetStorage(su_name, *su_args).Execute(name, op)
458 # bridge --------------------------
461 def perspective_bridges_exist(params):
462 """Check if all bridges given exist on this node.
465 bridges_list = params[0]
466 return backend.BridgesExist(bridges_list)
468 # instance --------------------------
471 def perspective_instance_os_add(params):
472 """Install an OS on a given instance.
476 inst = objects.Instance.FromDict(inst_s)
477 reinstall = params[1]
479 return backend.InstanceOsAdd(inst, reinstall, debug)
482 def perspective_instance_run_rename(params):
483 """Runs the OS rename script for an instance.
486 inst_s, old_name, debug = params
487 inst = objects.Instance.FromDict(inst_s)
488 return backend.RunRenameInstance(inst, old_name, debug)
491 def perspective_instance_os_import(params):
492 """Run the import function of an OS onto a given instance.
495 inst_s, src_node, src_images, cluster_name, debug = params
496 inst = objects.Instance.FromDict(inst_s)
497 return backend.ImportOSIntoInstance(inst, src_node, src_images,
501 def perspective_instance_shutdown(params):
502 """Shutdown an instance.
505 instance = objects.Instance.FromDict(params[0])
507 return backend.InstanceShutdown(instance, timeout)
510 def perspective_instance_start(params):
511 """Start an instance.
514 instance = objects.Instance.FromDict(params[0])
515 return backend.StartInstance(instance)
518 def perspective_migration_info(params):
519 """Gather information about an instance to be migrated.
522 instance = objects.Instance.FromDict(params[0])
523 return backend.MigrationInfo(instance)
526 def perspective_accept_instance(params):
527 """Prepare the node to accept an instance.
530 instance, info, target = params
531 instance = objects.Instance.FromDict(instance)
532 return backend.AcceptInstance(instance, info, target)
535 def perspective_finalize_migration(params):
536 """Finalize the instance migration.
539 instance, info, success = params
540 instance = objects.Instance.FromDict(instance)
541 return backend.FinalizeMigration(instance, info, success)
544 def perspective_instance_migrate(params):
545 """Migrates an instance.
548 instance, target, live = params
549 instance = objects.Instance.FromDict(instance)
550 return backend.MigrateInstance(instance, target, live)
553 def perspective_instance_reboot(params):
554 """Reboot an instance.
557 instance = objects.Instance.FromDict(params[0])
558 reboot_type = params[1]
559 shutdown_timeout = params[2]
560 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
563 def perspective_instance_info(params):
564 """Query instance information.
567 return backend.GetInstanceInfo(params[0], params[1])
570 def perspective_instance_migratable(params):
571 """Query whether the specified instance can be migrated.
574 instance = objects.Instance.FromDict(params[0])
575 return backend.GetInstanceMigratable(instance)
578 def perspective_all_instances_info(params):
579 """Query information about all instances.
582 return backend.GetAllInstancesInfo(params[0])
585 def perspective_instance_list(params):
586 """Query the list of running instances.
589 return backend.GetInstanceList(params[0])
591 # node --------------------------
594 def perspective_node_tcp_ping(params):
595 """Do a TcpPing on the remote node.
598 return utils.TcpPing(params[1], params[2], timeout=params[3],
599 live_port_needed=params[4], source=params[0])
602 def perspective_node_has_ip_address(params):
603 """Checks if a node has the given ip address.
606 return utils.OwnIpAddress(params[0])
609 def perspective_node_info(params):
610 """Query node information.
613 vgname, hypervisor_type = params
614 return backend.GetNodeInfo(vgname, hypervisor_type)
617 def perspective_node_add(params):
618 """Complete the registration of this node in the cluster.
621 return backend.AddNode(params[0], params[1], params[2],
622 params[3], params[4], params[5])
625 def perspective_node_verify(params):
626 """Run a verify sequence on this node.
629 return backend.VerifyNode(params[0], params[1])
632 def perspective_node_start_master(params):
633 """Promote this node to master status.
636 return backend.StartMaster(params[0], params[1])
639 def perspective_node_stop_master(params):
640 """Demote this node from master status.
643 return backend.StopMaster(params[0])
646 def perspective_node_leave_cluster(params):
647 """Cleanup after leaving a cluster.
650 return backend.LeaveCluster(params[0])
653 def perspective_node_volumes(params):
654 """Query the list of all logical volume groups.
657 return backend.NodeVolumes()
660 def perspective_node_demote_from_mc(params):
661 """Demote a node from the master candidate role.
664 return backend.DemoteFromMC()
668 def perspective_node_powercycle(params):
669 """Tries to powercycle the nod.
672 hypervisor_type = params[0]
673 return backend.PowercycleNode(hypervisor_type)
676 # cluster --------------------------
679 def perspective_version(params):
680 """Query version information.
683 return constants.PROTOCOL_VERSION
686 def perspective_upload_file(params):
689 Note that the backend implementation imposes strict rules on which
693 return backend.UploadFile(*params)
696 def perspective_master_info(params):
697 """Query master information.
700 return backend.GetMasterInfo()
703 def perspective_write_ssconf_files(params):
704 """Write ssconf files.
708 return backend.WriteSsconfFiles(values)
710 # os -----------------------
713 def perspective_os_diagnose(params):
714 """Query detailed information about existing OSes.
717 return backend.DiagnoseOS()
720 def perspective_os_get(params):
721 """Query information about a given OS.
725 os_obj = backend.OSFromDisk(name)
726 return os_obj.ToDict()
728 # hooks -----------------------
731 def perspective_hooks_runner(params):
735 hpath, phase, env = params
736 hr = backend.HooksRunner()
737 return hr.RunHooks(hpath, phase, env)
739 # iallocator -----------------
742 def perspective_iallocator_runner(params):
743 """Run an iallocator script.
747 iar = backend.IAllocatorRunner()
748 return iar.Run(name, idata)
750 # test -----------------------
753 def perspective_test_delay(params):
758 status, rval = utils.TestDelay(duration)
760 raise backend.RPCFail(rval)
763 # file storage ---------------
766 def perspective_file_storage_dir_create(params):
767 """Create the file storage directory.
770 file_storage_dir = params[0]
771 return backend.CreateFileStorageDir(file_storage_dir)
774 def perspective_file_storage_dir_remove(params):
775 """Remove the file storage directory.
778 file_storage_dir = params[0]
779 return backend.RemoveFileStorageDir(file_storage_dir)
782 def perspective_file_storage_dir_rename(params):
783 """Rename the file storage directory.
786 old_file_storage_dir = params[0]
787 new_file_storage_dir = params[1]
788 return backend.RenameFileStorageDir(old_file_storage_dir,
789 new_file_storage_dir)
791 # jobs ------------------------
794 @_RequireJobQueueLock
795 def perspective_jobqueue_update(params):
799 (file_name, content) = params
800 return backend.JobQueueUpdate(file_name, content)
803 @_RequireJobQueueLock
804 def perspective_jobqueue_purge(params):
808 return backend.JobQueuePurge()
811 @_RequireJobQueueLock
812 def perspective_jobqueue_rename(params):
813 """Rename a job queue file.
816 # TODO: What if a file fails to rename?
817 return [backend.JobQueueRename(old, new) for old, new in params]
820 def perspective_jobqueue_set_drain(params):
821 """Set/unset the queue drain flag.
824 drain_flag = params[0]
825 return backend.JobQueueSetDrainFlag(drain_flag)
828 # hypervisor ---------------
831 def perspective_hypervisor_validate_params(params):
832 """Validate the hypervisor parameters.
835 (hvname, hvparams) = params
836 return backend.ValidateHVParams(hvname, hvparams)
841 def perspective_x509_cert_create(params):
842 """Creates a new X509 certificate for SSL/TLS.
845 (validity, ) = params
846 return backend.CreateX509Certificate(validity)
849 def perspective_x509_cert_remove(params):
850 """Removes a X509 certificate.
854 return backend.RemoveX509Certificate(name)
859 def perspective_import_start(params):
860 """Starts an import daemon.
863 (x509_key_name, source_x509_ca, instance, dest, dest_args) = params
864 return backend.StartImportExportDaemon(constants.IEM_IMPORT,
865 x509_key_name, source_x509_ca,
867 objects.Instance.FromDict(instance),
869 _DecodeImportExportIO(dest,
872 def perspective_export_start(params):
873 """Starts an export daemon.
876 (x509_key_name, dest_x509_ca, host, port, instance,
877 source, source_args) = params
878 return backend.StartImportExportDaemon(constants.IEM_EXPORT,
879 x509_key_name, dest_x509_ca,
881 objects.Instance.FromDict(instance),
883 _DecodeImportExportIO(source,
887 def perspective_impexp_status(params):
888 """Retrieves the status of an import or export daemon.
891 return backend.GetImportExportStatus(params[0])
894 def perspective_impexp_cleanup(params):
895 """Cleans up after an import or export.
898 return backend.CleanupImportExport(params[0])
901 def CheckNoded(_, args):
902 """Initial checks whether to run or exit with a failure.
905 if args: # noded doesn't take any arguments
906 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
908 sys.exit(constants.EXIT_FAILURE)
911 def ExecNoded(options, _):
912 """Main node daemon function, executed with the PID file held.
915 # Read SSL certificate
917 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
918 ssl_cert_path=options.ssl_cert)
922 err = _PrepareQueueLock()
924 # this might be some kind of file-system/permission error; while
925 # this breaks the job queue functionality, we shouldn't prevent
926 # startup of the whole node daemon because of this
927 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
929 mainloop = daemon.Mainloop()
930 server = NodeHttpServer(mainloop, options.bind_address, options.port,
931 ssl_params=ssl_params, ssl_verify_peer=True)
940 """Main function for the node daemon.
943 parser = OptionParser(description="Ganeti node daemon",
944 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
945 version="%%prog (ganeti) %s" %
946 constants.RELEASE_VERSION)
947 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
948 dirs.append((constants.LOG_OS_DIR, 0750))
949 dirs.append((constants.LOCK_DIR, 1777))
950 dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE))
951 dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
952 daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
953 default_ssl_cert=constants.NODED_CERT_FILE,
954 default_ssl_key=constants.NODED_CERT_FILE)
957 if __name__ == '__main__':