4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # pylint: disable-msg=C0103,W0142
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
29 # W0142: Used * or ** magic, since we do use it extensively in this
37 from optparse import OptionParser
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48 from ganeti import serializer
50 import ganeti.http.server # pylint: disable-msg=W0611
56 def _PrepareQueueLock():
57 """Try to prepare the queue lock.
59 @return: None for success, otherwise an exception object
62 global queue_lock # pylint: disable-msg=W0603
64 if queue_lock is not None:
69 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
71 except EnvironmentError, err:
75 def _RequireJobQueueLock(fn):
76 """Decorator for job queue manipulating functions.
79 QUEUE_LOCK_TIMEOUT = 10
81 def wrapper(*args, **kwargs):
82 # Locking in exclusive, blocking mode because there could be several
83 # children running at the same time. Waiting up to 10 seconds.
84 if _PrepareQueueLock() is not None:
85 raise errors.JobQueueError("Job queue failed initialization,"
86 " cannot update jobs")
87 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
89 return fn(*args, **kwargs)
96 def _DecodeImportExportIO(ieio, ieioargs):
97 """Decodes import/export I/O information.
100 if ieio == constants.IEIO_RAW_DISK:
101 assert len(ieioargs) == 1
102 return (objects.Disk.FromDict(ieioargs[0]), )
104 if ieio == constants.IEIO_SCRIPT:
105 assert len(ieioargs) == 2
106 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
111 class NodeHttpServer(http.server.HttpServer):
112 """The server implementation.
114 This class holds all methods exposed over the RPC interface.
117 # too many public methods, and unused args - all methods get params
119 # pylint: disable-msg=R0904,W0613
120 def __init__(self, *args, **kwargs):
121 http.server.HttpServer.__init__(self, *args, **kwargs)
122 self.noded_pid = os.getpid()
124 def HandleRequest(self, req):
128 if req.request_method.upper() != http.HTTP_PUT:
129 raise http.HttpBadRequest()
131 path = req.request_path
132 if path.startswith("/"):
135 method = getattr(self, "perspective_%s" % path, None)
137 raise http.HttpNotFound()
140 result = (True, method(serializer.LoadJson(req.request_body)))
142 except backend.RPCFail, err:
143 # our custom failure exception; str(err) works fine if the
144 # exception was constructed with a single argument, and in
145 # this case, err.message == err.args[0] == str(err)
146 result = (False, str(err))
147 except errors.QuitGanetiException, err:
148 # Tell parent to quit
149 logging.info("Shutting down the node daemon, arguments: %s",
151 os.kill(self.noded_pid, signal.SIGTERM)
152 # And return the error's arguments, which must be already in
153 # correct tuple format
155 except Exception, err:
156 logging.exception("Error in RPC call")
157 result = (False, "Error while executing backend function: %s" % str(err))
159 return serializer.DumpJson(result, indent=False)
161 # the new block devices --------------------------
164 def perspective_blockdev_create(params):
165 """Create a block device.
168 bdev_s, size, owner, on_primary, info = params
169 bdev = objects.Disk.FromDict(bdev_s)
171 raise ValueError("can't unserialize data!")
172 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
175 def perspective_blockdev_remove(params):
176 """Remove a block device.
180 bdev = objects.Disk.FromDict(bdev_s)
181 return backend.BlockdevRemove(bdev)
184 def perspective_blockdev_rename(params):
185 """Remove a block device.
188 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
189 return backend.BlockdevRename(devlist)
192 def perspective_blockdev_assemble(params):
193 """Assemble a block device.
196 bdev_s, owner, on_primary = params
197 bdev = objects.Disk.FromDict(bdev_s)
199 raise ValueError("can't unserialize data!")
200 return backend.BlockdevAssemble(bdev, owner, on_primary)
203 def perspective_blockdev_shutdown(params):
204 """Shutdown a block device.
208 bdev = objects.Disk.FromDict(bdev_s)
210 raise ValueError("can't unserialize data!")
211 return backend.BlockdevShutdown(bdev)
214 def perspective_blockdev_addchildren(params):
215 """Add a child to a mirror device.
217 Note: this is only valid for mirror devices. It's the caller's duty
218 to send a correct disk, otherwise we raise an error.
221 bdev_s, ndev_s = params
222 bdev = objects.Disk.FromDict(bdev_s)
223 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
224 if bdev is None or ndevs.count(None) > 0:
225 raise ValueError("can't unserialize data!")
226 return backend.BlockdevAddchildren(bdev, ndevs)
229 def perspective_blockdev_removechildren(params):
230 """Remove a child from a mirror device.
232 This is only valid for mirror devices, of course. It's the callers
233 duty to send a correct disk, otherwise we raise an error.
236 bdev_s, ndev_s = params
237 bdev = objects.Disk.FromDict(bdev_s)
238 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
239 if bdev is None or ndevs.count(None) > 0:
240 raise ValueError("can't unserialize data!")
241 return backend.BlockdevRemovechildren(bdev, ndevs)
244 def perspective_blockdev_getmirrorstatus(params):
245 """Return the mirror status for a list of disks.
248 disks = [objects.Disk.FromDict(dsk_s)
250 return [status.ToDict()
251 for status in backend.BlockdevGetmirrorstatus(disks)]
254 def perspective_blockdev_find(params):
255 """Expose the FindBlockDevice functionality for a disk.
257 This will try to find but not activate a disk.
260 disk = objects.Disk.FromDict(params[0])
262 result = backend.BlockdevFind(disk)
266 return result.ToDict()
269 def perspective_blockdev_snapshot(params):
270 """Create a snapshot device.
272 Note that this is only valid for LVM disks, if we get passed
273 something else we raise an exception. The snapshot device can be
274 remove by calling the generic block device remove call.
277 cfbd = objects.Disk.FromDict(params[0])
278 return backend.BlockdevSnapshot(cfbd)
281 def perspective_blockdev_grow(params):
282 """Grow a stack of devices.
285 cfbd = objects.Disk.FromDict(params[0])
287 return backend.BlockdevGrow(cfbd, amount)
290 def perspective_blockdev_close(params):
291 """Closes the given block devices.
294 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
295 return backend.BlockdevClose(params[0], disks)
298 def perspective_blockdev_getsize(params):
299 """Compute the sizes of the given block devices.
302 disks = [objects.Disk.FromDict(cf) for cf in params[0]]
303 return backend.BlockdevGetsize(disks)
306 def perspective_blockdev_export(params):
307 """Compute the sizes of the given block devices.
310 disk = objects.Disk.FromDict(params[0])
311 dest_node, dest_path, cluster_name = params[1:]
312 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
314 # blockdev/drbd specific methods ----------
317 def perspective_drbd_disconnect_net(params):
318 """Disconnects the network connection of drbd disks.
320 Note that this is only valid for drbd disks, so the members of the
321 disk list must all be drbd devices.
324 nodes_ip, disks = params
325 disks = [objects.Disk.FromDict(cf) for cf in disks]
326 return backend.DrbdDisconnectNet(nodes_ip, disks)
329 def perspective_drbd_attach_net(params):
330 """Attaches the network connection of drbd disks.
332 Note that this is only valid for drbd disks, so the members of the
333 disk list must all be drbd devices.
336 nodes_ip, disks, instance_name, multimaster = params
337 disks = [objects.Disk.FromDict(cf) for cf in disks]
338 return backend.DrbdAttachNet(nodes_ip, disks,
339 instance_name, multimaster)
342 def perspective_drbd_wait_sync(params):
343 """Wait until DRBD disks are synched.
345 Note that this is only valid for drbd disks, so the members of the
346 disk list must all be drbd devices.
349 nodes_ip, disks = params
350 disks = [objects.Disk.FromDict(cf) for cf in disks]
351 return backend.DrbdWaitSync(nodes_ip, disks)
353 # export/import --------------------------
356 def perspective_finalize_export(params):
357 """Expose the finalize export functionality.
360 instance = objects.Instance.FromDict(params[0])
363 for disk in params[1]:
364 if isinstance(disk, bool):
365 snap_disks.append(disk)
367 snap_disks.append(objects.Disk.FromDict(disk))
369 return backend.FinalizeExport(instance, snap_disks)
372 def perspective_export_info(params):
373 """Query information about an existing export on this node.
375 The given path may not contain an export, in which case we return
380 return backend.ExportInfo(path)
383 def perspective_export_list(params):
384 """List the available exports on this node.
386 Note that as opposed to export_info, which may query data about an
387 export in any path, this only queries the standard Ganeti path
388 (constants.EXPORT_DIR).
391 return backend.ListExports()
394 def perspective_export_remove(params):
399 return backend.RemoveExport(export)
401 # volume --------------------------
404 def perspective_lv_list(params):
405 """Query the list of logical volumes in a given volume group.
409 return backend.GetVolumeList(vgname)
412 def perspective_vg_list(params):
413 """Query the list of volume groups.
416 return backend.ListVolumeGroups()
418 # Storage --------------------------
421 def perspective_storage_list(params):
422 """Get list of storage units.
425 (su_name, su_args, name, fields) = params
426 return storage.GetStorage(su_name, *su_args).List(name, fields)
429 def perspective_storage_modify(params):
430 """Modify a storage unit.
433 (su_name, su_args, name, changes) = params
434 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
437 def perspective_storage_execute(params):
438 """Execute an operation on a storage unit.
441 (su_name, su_args, name, op) = params
442 return storage.GetStorage(su_name, *su_args).Execute(name, op)
444 # bridge --------------------------
447 def perspective_bridges_exist(params):
448 """Check if all bridges given exist on this node.
451 bridges_list = params[0]
452 return backend.BridgesExist(bridges_list)
454 # instance --------------------------
457 def perspective_instance_os_add(params):
458 """Install an OS on a given instance.
462 inst = objects.Instance.FromDict(inst_s)
463 reinstall = params[1]
465 return backend.InstanceOsAdd(inst, reinstall, debug)
468 def perspective_instance_run_rename(params):
469 """Runs the OS rename script for an instance.
472 inst_s, old_name, debug = params
473 inst = objects.Instance.FromDict(inst_s)
474 return backend.RunRenameInstance(inst, old_name, debug)
477 def perspective_instance_shutdown(params):
478 """Shutdown an instance.
481 instance = objects.Instance.FromDict(params[0])
483 return backend.InstanceShutdown(instance, timeout)
486 def perspective_instance_start(params):
487 """Start an instance.
490 instance = objects.Instance.FromDict(params[0])
491 return backend.StartInstance(instance)
494 def perspective_migration_info(params):
495 """Gather information about an instance to be migrated.
498 instance = objects.Instance.FromDict(params[0])
499 return backend.MigrationInfo(instance)
502 def perspective_accept_instance(params):
503 """Prepare the node to accept an instance.
506 instance, info, target = params
507 instance = objects.Instance.FromDict(instance)
508 return backend.AcceptInstance(instance, info, target)
511 def perspective_finalize_migration(params):
512 """Finalize the instance migration.
515 instance, info, success = params
516 instance = objects.Instance.FromDict(instance)
517 return backend.FinalizeMigration(instance, info, success)
520 def perspective_instance_migrate(params):
521 """Migrates an instance.
524 instance, target, live = params
525 instance = objects.Instance.FromDict(instance)
526 return backend.MigrateInstance(instance, target, live)
529 def perspective_instance_reboot(params):
530 """Reboot an instance.
533 instance = objects.Instance.FromDict(params[0])
534 reboot_type = params[1]
535 shutdown_timeout = params[2]
536 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
539 def perspective_instance_info(params):
540 """Query instance information.
543 return backend.GetInstanceInfo(params[0], params[1])
546 def perspective_instance_migratable(params):
547 """Query whether the specified instance can be migrated.
550 instance = objects.Instance.FromDict(params[0])
551 return backend.GetInstanceMigratable(instance)
554 def perspective_all_instances_info(params):
555 """Query information about all instances.
558 return backend.GetAllInstancesInfo(params[0])
561 def perspective_instance_list(params):
562 """Query the list of running instances.
565 return backend.GetInstanceList(params[0])
567 # node --------------------------
570 def perspective_node_tcp_ping(params):
571 """Do a TcpPing on the remote node.
574 return utils.TcpPing(params[1], params[2], timeout=params[3],
575 live_port_needed=params[4], source=params[0])
578 def perspective_node_has_ip_address(params):
579 """Checks if a node has the given ip address.
582 return utils.OwnIpAddress(params[0])
585 def perspective_node_info(params):
586 """Query node information.
589 vgname, hypervisor_type = params
590 return backend.GetNodeInfo(vgname, hypervisor_type)
593 def perspective_node_add(params):
594 """Complete the registration of this node in the cluster.
597 return backend.AddNode(params[0], params[1], params[2],
598 params[3], params[4], params[5])
601 def perspective_node_verify(params):
602 """Run a verify sequence on this node.
605 return backend.VerifyNode(params[0], params[1])
608 def perspective_node_start_master(params):
609 """Promote this node to master status.
612 return backend.StartMaster(params[0], params[1])
615 def perspective_node_stop_master(params):
616 """Demote this node from master status.
619 return backend.StopMaster(params[0])
622 def perspective_node_leave_cluster(params):
623 """Cleanup after leaving a cluster.
626 return backend.LeaveCluster(params[0])
629 def perspective_node_volumes(params):
630 """Query the list of all logical volume groups.
633 return backend.NodeVolumes()
636 def perspective_node_demote_from_mc(params):
637 """Demote a node from the master candidate role.
640 return backend.DemoteFromMC()
644 def perspective_node_powercycle(params):
645 """Tries to powercycle the nod.
648 hypervisor_type = params[0]
649 return backend.PowercycleNode(hypervisor_type)
652 # cluster --------------------------
655 def perspective_version(params):
656 """Query version information.
659 return constants.PROTOCOL_VERSION
662 def perspective_upload_file(params):
665 Note that the backend implementation imposes strict rules on which
669 return backend.UploadFile(*params)
672 def perspective_master_info(params):
673 """Query master information.
676 return backend.GetMasterInfo()
679 def perspective_write_ssconf_files(params):
680 """Write ssconf files.
684 return backend.WriteSsconfFiles(values)
686 # os -----------------------
689 def perspective_os_diagnose(params):
690 """Query detailed information about existing OSes.
693 return backend.DiagnoseOS()
696 def perspective_os_get(params):
697 """Query information about a given OS.
701 os_obj = backend.OSFromDisk(name)
702 return os_obj.ToDict()
704 # hooks -----------------------
707 def perspective_hooks_runner(params):
711 hpath, phase, env = params
712 hr = backend.HooksRunner()
713 return hr.RunHooks(hpath, phase, env)
715 # iallocator -----------------
718 def perspective_iallocator_runner(params):
719 """Run an iallocator script.
723 iar = backend.IAllocatorRunner()
724 return iar.Run(name, idata)
726 # test -----------------------
729 def perspective_test_delay(params):
734 status, rval = utils.TestDelay(duration)
736 raise backend.RPCFail(rval)
739 # file storage ---------------
742 def perspective_file_storage_dir_create(params):
743 """Create the file storage directory.
746 file_storage_dir = params[0]
747 return backend.CreateFileStorageDir(file_storage_dir)
750 def perspective_file_storage_dir_remove(params):
751 """Remove the file storage directory.
754 file_storage_dir = params[0]
755 return backend.RemoveFileStorageDir(file_storage_dir)
758 def perspective_file_storage_dir_rename(params):
759 """Rename the file storage directory.
762 old_file_storage_dir = params[0]
763 new_file_storage_dir = params[1]
764 return backend.RenameFileStorageDir(old_file_storage_dir,
765 new_file_storage_dir)
767 # jobs ------------------------
770 @_RequireJobQueueLock
771 def perspective_jobqueue_update(params):
775 (file_name, content) = params
776 return backend.JobQueueUpdate(file_name, content)
779 @_RequireJobQueueLock
780 def perspective_jobqueue_purge(params):
784 return backend.JobQueuePurge()
787 @_RequireJobQueueLock
788 def perspective_jobqueue_rename(params):
789 """Rename a job queue file.
792 # TODO: What if a file fails to rename?
793 return [backend.JobQueueRename(old, new) for old, new in params]
796 def perspective_jobqueue_set_drain(params):
797 """Set/unset the queue drain flag.
800 drain_flag = params[0]
801 return backend.JobQueueSetDrainFlag(drain_flag)
804 # hypervisor ---------------
807 def perspective_hypervisor_validate_params(params):
808 """Validate the hypervisor parameters.
811 (hvname, hvparams) = params
812 return backend.ValidateHVParams(hvname, hvparams)
817 def perspective_x509_cert_create(params):
818 """Creates a new X509 certificate for SSL/TLS.
821 (validity, ) = params
822 return backend.CreateX509Certificate(validity)
825 def perspective_x509_cert_remove(params):
826 """Removes a X509 certificate.
830 return backend.RemoveX509Certificate(name)
835 def perspective_import_start(params):
836 """Starts an import daemon.
839 (x509_key_name, source_x509_ca, instance, dest, dest_args) = params
840 return backend.StartImportExportDaemon(constants.IEM_IMPORT,
841 x509_key_name, source_x509_ca,
843 objects.Instance.FromDict(instance),
845 _DecodeImportExportIO(dest,
848 def perspective_export_start(params):
849 """Starts an export daemon.
852 (x509_key_name, dest_x509_ca, host, port, instance,
853 source, source_args) = params
854 return backend.StartImportExportDaemon(constants.IEM_EXPORT,
855 x509_key_name, dest_x509_ca,
857 objects.Instance.FromDict(instance),
859 _DecodeImportExportIO(source,
863 def perspective_impexp_status(params):
864 """Retrieves the status of an import or export daemon.
867 return backend.GetImportExportStatus(params[0])
870 def perspective_impexp_abort(params):
871 """Aborts an import or export.
874 return backend.AbortImportExport(params[0])
877 def perspective_impexp_cleanup(params):
878 """Cleans up after an import or export.
881 return backend.CleanupImportExport(params[0])
884 def CheckNoded(_, args):
885 """Initial checks whether to run or exit with a failure.
888 if args: # noded doesn't take any arguments
889 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
891 sys.exit(constants.EXIT_FAILURE)
894 def ExecNoded(options, _):
895 """Main node daemon function, executed with the PID file held.
898 # Read SSL certificate
900 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
901 ssl_cert_path=options.ssl_cert)
905 err = _PrepareQueueLock()
907 # this might be some kind of file-system/permission error; while
908 # this breaks the job queue functionality, we shouldn't prevent
909 # startup of the whole node daemon because of this
910 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
912 mainloop = daemon.Mainloop()
913 server = NodeHttpServer(mainloop, options.bind_address, options.port,
914 ssl_params=ssl_params, ssl_verify_peer=True)
923 """Main function for the node daemon.
926 parser = OptionParser(description="Ganeti node daemon",
927 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
928 version="%%prog (ganeti) %s" %
929 constants.RELEASE_VERSION)
930 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
931 dirs.append((constants.LOG_OS_DIR, 0750))
932 dirs.append((constants.LOCK_DIR, 1777))
933 dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE))
934 dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
935 daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
936 default_ssl_cert=constants.NODED_CERT_FILE,
937 default_ssl_key=constants.NODED_CERT_FILE)
940 if __name__ == '__main__':