4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # pylint: disable-msg=C0103,W0142
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
29 # W0142: Used * or ** magic, since we do use it extensively in this
37 from optparse import OptionParser
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48 from ganeti import serializer
50 import ganeti.http.server # pylint: disable-msg=W0611
56 def _PrepareQueueLock():
57 """Try to prepare the queue lock.
59 @return: None for success, otherwise an exception object
62 global queue_lock # pylint: disable-msg=W0603
64 if queue_lock is not None:
69 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
71 except EnvironmentError, err:
75 def _RequireJobQueueLock(fn):
76 """Decorator for job queue manipulating functions.
79 QUEUE_LOCK_TIMEOUT = 10
81 def wrapper(*args, **kwargs):
82 # Locking in exclusive, blocking mode because there could be several
83 # children running at the same time. Waiting up to 10 seconds.
84 if _PrepareQueueLock() is not None:
85 raise errors.JobQueueError("Job queue failed initialization,"
86 " cannot update jobs")
87 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
89 return fn(*args, **kwargs)
96 def _DecodeImportExportIO(ieio, ieioargs):
97 """Decodes import/export I/O information.
100 if ieio == constants.IEIO_RAW_DISK:
101 assert len(ieioargs) == 1
102 return (objects.Disk.FromDict(ieioargs[0]), )
104 if ieio == constants.IEIO_SCRIPT:
105 assert len(ieioargs) == 2
106 return (objects.Disk.FromDict(ieioargs[0]), ieioargs[1])
111 class MlockallRequestExecutor(http.server.HttpServerRequestExecutor):
112 """Custom Request Executor class that ensures NodeHttpServer children are
116 def __init__(self, *args, **kwargs):
119 http.server.HttpServerRequestExecutor.__init__(self, *args, **kwargs)
122 class NodeHttpServer(http.server.HttpServer):
123 """The server implementation.
125 This class holds all methods exposed over the RPC interface.
128 # too many public methods, and unused args - all methods get params
130 # pylint: disable-msg=R0904,W0613
131 def __init__(self, *args, **kwargs):
132 http.server.HttpServer.__init__(self, *args, **kwargs)
133 self.noded_pid = os.getpid()
135 def HandleRequest(self, req):
139 if req.request_method.upper() != http.HTTP_PUT:
140 raise http.HttpBadRequest()
142 path = req.request_path
143 if path.startswith("/"):
146 method = getattr(self, "perspective_%s" % path, None)
148 raise http.HttpNotFound()
151 result = (True, method(serializer.LoadJson(req.request_body)))
153 except backend.RPCFail, err:
154 # our custom failure exception; str(err) works fine if the
155 # exception was constructed with a single argument, and in
156 # this case, err.message == err.args[0] == str(err)
157 result = (False, str(err))
158 except errors.QuitGanetiException, err:
159 # Tell parent to quit
160 logging.info("Shutting down the node daemon, arguments: %s",
162 os.kill(self.noded_pid, signal.SIGTERM)
163 # And return the error's arguments, which must be already in
164 # correct tuple format
166 except Exception, err:
167 logging.exception("Error in RPC call")
168 result = (False, "Error while executing backend function: %s" % str(err))
170 return serializer.DumpJson(result, indent=False)
172 # the new block devices --------------------------
175 def perspective_blockdev_create(params):
176 """Create a block device.
179 bdev_s, size, owner, on_primary, info = params
180 bdev = objects.Disk.FromDict(bdev_s)
182 raise ValueError("can't unserialize data!")
183 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
186 def perspective_blockdev_remove(params):
187 """Remove a block device.
191 bdev = objects.Disk.FromDict(bdev_s)
192 return backend.BlockdevRemove(bdev)
195 def perspective_blockdev_rename(params):
196 """Remove a block device.
199 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
200 return backend.BlockdevRename(devlist)
203 def perspective_blockdev_assemble(params):
204 """Assemble a block device.
207 bdev_s, owner, on_primary = params
208 bdev = objects.Disk.FromDict(bdev_s)
210 raise ValueError("can't unserialize data!")
211 return backend.BlockdevAssemble(bdev, owner, on_primary)
214 def perspective_blockdev_shutdown(params):
215 """Shutdown a block device.
219 bdev = objects.Disk.FromDict(bdev_s)
221 raise ValueError("can't unserialize data!")
222 return backend.BlockdevShutdown(bdev)
225 def perspective_blockdev_addchildren(params):
226 """Add a child to a mirror device.
228 Note: this is only valid for mirror devices. It's the caller's duty
229 to send a correct disk, otherwise we raise an error.
232 bdev_s, ndev_s = params
233 bdev = objects.Disk.FromDict(bdev_s)
234 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
235 if bdev is None or ndevs.count(None) > 0:
236 raise ValueError("can't unserialize data!")
237 return backend.BlockdevAddchildren(bdev, ndevs)
240 def perspective_blockdev_removechildren(params):
241 """Remove a child from a mirror device.
243 This is only valid for mirror devices, of course. It's the callers
244 duty to send a correct disk, otherwise we raise an error.
247 bdev_s, ndev_s = params
248 bdev = objects.Disk.FromDict(bdev_s)
249 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
250 if bdev is None or ndevs.count(None) > 0:
251 raise ValueError("can't unserialize data!")
252 return backend.BlockdevRemovechildren(bdev, ndevs)
255 def perspective_blockdev_getmirrorstatus(params):
256 """Return the mirror status for a list of disks.
259 disks = [objects.Disk.FromDict(dsk_s)
261 return [status.ToDict()
262 for status in backend.BlockdevGetmirrorstatus(disks)]
265 def perspective_blockdev_find(params):
266 """Expose the FindBlockDevice functionality for a disk.
268 This will try to find but not activate a disk.
271 disk = objects.Disk.FromDict(params[0])
273 result = backend.BlockdevFind(disk)
277 return result.ToDict()
280 def perspective_blockdev_snapshot(params):
281 """Create a snapshot device.
283 Note that this is only valid for LVM disks, if we get passed
284 something else we raise an exception. The snapshot device can be
285 remove by calling the generic block device remove call.
288 cfbd = objects.Disk.FromDict(params[0])
289 return backend.BlockdevSnapshot(cfbd)
292 def perspective_blockdev_grow(params):
293 """Grow a stack of devices.
296 cfbd = objects.Disk.FromDict(params[0])
298 return backend.BlockdevGrow(cfbd, amount)
301 def perspective_blockdev_close(params):
302 """Closes the given block devices.
305 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
306 return backend.BlockdevClose(params[0], disks)
309 def perspective_blockdev_getsize(params):
310 """Compute the sizes of the given block devices.
313 disks = [objects.Disk.FromDict(cf) for cf in params[0]]
314 return backend.BlockdevGetsize(disks)
317 def perspective_blockdev_export(params):
318 """Compute the sizes of the given block devices.
321 disk = objects.Disk.FromDict(params[0])
322 dest_node, dest_path, cluster_name = params[1:]
323 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
325 # blockdev/drbd specific methods ----------
328 def perspective_drbd_disconnect_net(params):
329 """Disconnects the network connection of drbd disks.
331 Note that this is only valid for drbd disks, so the members of the
332 disk list must all be drbd devices.
335 nodes_ip, disks = params
336 disks = [objects.Disk.FromDict(cf) for cf in disks]
337 return backend.DrbdDisconnectNet(nodes_ip, disks)
340 def perspective_drbd_attach_net(params):
341 """Attaches the network connection of drbd disks.
343 Note that this is only valid for drbd disks, so the members of the
344 disk list must all be drbd devices.
347 nodes_ip, disks, instance_name, multimaster = params
348 disks = [objects.Disk.FromDict(cf) for cf in disks]
349 return backend.DrbdAttachNet(nodes_ip, disks,
350 instance_name, multimaster)
353 def perspective_drbd_wait_sync(params):
354 """Wait until DRBD disks are synched.
356 Note that this is only valid for drbd disks, so the members of the
357 disk list must all be drbd devices.
360 nodes_ip, disks = params
361 disks = [objects.Disk.FromDict(cf) for cf in disks]
362 return backend.DrbdWaitSync(nodes_ip, disks)
364 # export/import --------------------------
367 def perspective_finalize_export(params):
368 """Expose the finalize export functionality.
371 instance = objects.Instance.FromDict(params[0])
374 for disk in params[1]:
375 if isinstance(disk, bool):
376 snap_disks.append(disk)
378 snap_disks.append(objects.Disk.FromDict(disk))
380 return backend.FinalizeExport(instance, snap_disks)
383 def perspective_export_info(params):
384 """Query information about an existing export on this node.
386 The given path may not contain an export, in which case we return
391 return backend.ExportInfo(path)
394 def perspective_export_list(params):
395 """List the available exports on this node.
397 Note that as opposed to export_info, which may query data about an
398 export in any path, this only queries the standard Ganeti path
399 (constants.EXPORT_DIR).
402 return backend.ListExports()
405 def perspective_export_remove(params):
410 return backend.RemoveExport(export)
412 # volume --------------------------
415 def perspective_lv_list(params):
416 """Query the list of logical volumes in a given volume group.
420 return backend.GetVolumeList(vgname)
423 def perspective_vg_list(params):
424 """Query the list of volume groups.
427 return backend.ListVolumeGroups()
429 # Storage --------------------------
432 def perspective_storage_list(params):
433 """Get list of storage units.
436 (su_name, su_args, name, fields) = params
437 return storage.GetStorage(su_name, *su_args).List(name, fields)
440 def perspective_storage_modify(params):
441 """Modify a storage unit.
444 (su_name, su_args, name, changes) = params
445 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
448 def perspective_storage_execute(params):
449 """Execute an operation on a storage unit.
452 (su_name, su_args, name, op) = params
453 return storage.GetStorage(su_name, *su_args).Execute(name, op)
455 # bridge --------------------------
458 def perspective_bridges_exist(params):
459 """Check if all bridges given exist on this node.
462 bridges_list = params[0]
463 return backend.BridgesExist(bridges_list)
465 # instance --------------------------
468 def perspective_instance_os_add(params):
469 """Install an OS on a given instance.
473 inst = objects.Instance.FromDict(inst_s)
474 reinstall = params[1]
476 return backend.InstanceOsAdd(inst, reinstall, debug)
479 def perspective_instance_run_rename(params):
480 """Runs the OS rename script for an instance.
483 inst_s, old_name, debug = params
484 inst = objects.Instance.FromDict(inst_s)
485 return backend.RunRenameInstance(inst, old_name, debug)
488 def perspective_instance_shutdown(params):
489 """Shutdown an instance.
492 instance = objects.Instance.FromDict(params[0])
494 return backend.InstanceShutdown(instance, timeout)
497 def perspective_instance_start(params):
498 """Start an instance.
501 instance = objects.Instance.FromDict(params[0])
502 return backend.StartInstance(instance)
505 def perspective_migration_info(params):
506 """Gather information about an instance to be migrated.
509 instance = objects.Instance.FromDict(params[0])
510 return backend.MigrationInfo(instance)
513 def perspective_accept_instance(params):
514 """Prepare the node to accept an instance.
517 instance, info, target = params
518 instance = objects.Instance.FromDict(instance)
519 return backend.AcceptInstance(instance, info, target)
522 def perspective_finalize_migration(params):
523 """Finalize the instance migration.
526 instance, info, success = params
527 instance = objects.Instance.FromDict(instance)
528 return backend.FinalizeMigration(instance, info, success)
531 def perspective_instance_migrate(params):
532 """Migrates an instance.
535 instance, target, live = params
536 instance = objects.Instance.FromDict(instance)
537 return backend.MigrateInstance(instance, target, live)
540 def perspective_instance_reboot(params):
541 """Reboot an instance.
544 instance = objects.Instance.FromDict(params[0])
545 reboot_type = params[1]
546 shutdown_timeout = params[2]
547 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
550 def perspective_instance_info(params):
551 """Query instance information.
554 return backend.GetInstanceInfo(params[0], params[1])
557 def perspective_instance_migratable(params):
558 """Query whether the specified instance can be migrated.
561 instance = objects.Instance.FromDict(params[0])
562 return backend.GetInstanceMigratable(instance)
565 def perspective_all_instances_info(params):
566 """Query information about all instances.
569 return backend.GetAllInstancesInfo(params[0])
572 def perspective_instance_list(params):
573 """Query the list of running instances.
576 return backend.GetInstanceList(params[0])
578 # node --------------------------
581 def perspective_node_tcp_ping(params):
582 """Do a TcpPing on the remote node.
585 return utils.TcpPing(params[1], params[2], timeout=params[3],
586 live_port_needed=params[4], source=params[0])
589 def perspective_node_has_ip_address(params):
590 """Checks if a node has the given ip address.
593 return utils.OwnIpAddress(params[0])
596 def perspective_node_info(params):
597 """Query node information.
600 vgname, hypervisor_type = params
601 return backend.GetNodeInfo(vgname, hypervisor_type)
604 def perspective_node_add(params):
605 """Complete the registration of this node in the cluster.
608 return backend.AddNode(params[0], params[1], params[2],
609 params[3], params[4], params[5])
612 def perspective_node_verify(params):
613 """Run a verify sequence on this node.
616 return backend.VerifyNode(params[0], params[1])
619 def perspective_node_start_master(params):
620 """Promote this node to master status.
623 return backend.StartMaster(params[0], params[1])
626 def perspective_node_stop_master(params):
627 """Demote this node from master status.
630 return backend.StopMaster(params[0])
633 def perspective_node_leave_cluster(params):
634 """Cleanup after leaving a cluster.
637 return backend.LeaveCluster(params[0])
640 def perspective_node_volumes(params):
641 """Query the list of all logical volume groups.
644 return backend.NodeVolumes()
647 def perspective_node_demote_from_mc(params):
648 """Demote a node from the master candidate role.
651 return backend.DemoteFromMC()
655 def perspective_node_powercycle(params):
656 """Tries to powercycle the nod.
659 hypervisor_type = params[0]
660 return backend.PowercycleNode(hypervisor_type)
663 # cluster --------------------------
666 def perspective_version(params):
667 """Query version information.
670 return constants.PROTOCOL_VERSION
673 def perspective_upload_file(params):
676 Note that the backend implementation imposes strict rules on which
680 return backend.UploadFile(*params)
683 def perspective_master_info(params):
684 """Query master information.
687 return backend.GetMasterInfo()
690 def perspective_write_ssconf_files(params):
691 """Write ssconf files.
695 return backend.WriteSsconfFiles(values)
697 # os -----------------------
700 def perspective_os_diagnose(params):
701 """Query detailed information about existing OSes.
704 return backend.DiagnoseOS()
707 def perspective_os_get(params):
708 """Query information about a given OS.
712 os_obj = backend.OSFromDisk(name)
713 return os_obj.ToDict()
715 # hooks -----------------------
718 def perspective_hooks_runner(params):
722 hpath, phase, env = params
723 hr = backend.HooksRunner()
724 return hr.RunHooks(hpath, phase, env)
726 # iallocator -----------------
729 def perspective_iallocator_runner(params):
730 """Run an iallocator script.
734 iar = backend.IAllocatorRunner()
735 return iar.Run(name, idata)
737 # test -----------------------
740 def perspective_test_delay(params):
745 status, rval = utils.TestDelay(duration)
747 raise backend.RPCFail(rval)
750 # file storage ---------------
753 def perspective_file_storage_dir_create(params):
754 """Create the file storage directory.
757 file_storage_dir = params[0]
758 return backend.CreateFileStorageDir(file_storage_dir)
761 def perspective_file_storage_dir_remove(params):
762 """Remove the file storage directory.
765 file_storage_dir = params[0]
766 return backend.RemoveFileStorageDir(file_storage_dir)
769 def perspective_file_storage_dir_rename(params):
770 """Rename the file storage directory.
773 old_file_storage_dir = params[0]
774 new_file_storage_dir = params[1]
775 return backend.RenameFileStorageDir(old_file_storage_dir,
776 new_file_storage_dir)
778 # jobs ------------------------
781 @_RequireJobQueueLock
782 def perspective_jobqueue_update(params):
786 (file_name, content) = params
787 return backend.JobQueueUpdate(file_name, content)
790 @_RequireJobQueueLock
791 def perspective_jobqueue_purge(params):
795 return backend.JobQueuePurge()
798 @_RequireJobQueueLock
799 def perspective_jobqueue_rename(params):
800 """Rename a job queue file.
803 # TODO: What if a file fails to rename?
804 return [backend.JobQueueRename(old, new) for old, new in params]
807 def perspective_jobqueue_set_drain(params):
808 """Set/unset the queue drain flag.
811 drain_flag = params[0]
812 return backend.JobQueueSetDrainFlag(drain_flag)
815 # hypervisor ---------------
818 def perspective_hypervisor_validate_params(params):
819 """Validate the hypervisor parameters.
822 (hvname, hvparams) = params
823 return backend.ValidateHVParams(hvname, hvparams)
828 def perspective_x509_cert_create(params):
829 """Creates a new X509 certificate for SSL/TLS.
832 (validity, ) = params
833 return backend.CreateX509Certificate(validity)
836 def perspective_x509_cert_remove(params):
837 """Removes a X509 certificate.
841 return backend.RemoveX509Certificate(name)
846 def perspective_import_start(params):
847 """Starts an import daemon.
850 (opts_s, instance, dest, dest_args) = params
852 opts = objects.ImportExportOptions.FromDict(opts_s)
854 return backend.StartImportExportDaemon(constants.IEM_IMPORT, opts,
856 objects.Instance.FromDict(instance),
858 _DecodeImportExportIO(dest,
862 def perspective_export_start(params):
863 """Starts an export daemon.
866 (opts_s, host, port, instance, source, source_args) = params
868 opts = objects.ImportExportOptions.FromDict(opts_s)
870 return backend.StartImportExportDaemon(constants.IEM_EXPORT, opts,
872 objects.Instance.FromDict(instance),
874 _DecodeImportExportIO(source,
878 def perspective_impexp_status(params):
879 """Retrieves the status of an import or export daemon.
882 return backend.GetImportExportStatus(params[0])
885 def perspective_impexp_abort(params):
886 """Aborts an import or export.
889 return backend.AbortImportExport(params[0])
892 def perspective_impexp_cleanup(params):
893 """Cleans up after an import or export.
896 return backend.CleanupImportExport(params[0])
899 def CheckNoded(_, args):
900 """Initial checks whether to run or exit with a failure.
903 if args: # noded doesn't take any arguments
904 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
906 sys.exit(constants.EXIT_FAILURE)
909 def ExecNoded(options, _):
910 """Main node daemon function, executed with the PID file held.
915 request_executor_class = MlockallRequestExecutor
917 request_executor_class = http.server.HttpServerRequestExecutor
919 # Read SSL certificate
921 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
922 ssl_cert_path=options.ssl_cert)
926 err = _PrepareQueueLock()
928 # this might be some kind of file-system/permission error; while
929 # this breaks the job queue functionality, we shouldn't prevent
930 # startup of the whole node daemon because of this
931 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
933 mainloop = daemon.Mainloop()
934 server = NodeHttpServer(mainloop, options.bind_address, options.port,
935 ssl_params=ssl_params, ssl_verify_peer=True,
936 request_executor_class=request_executor_class)
945 """Main function for the node daemon.
948 parser = OptionParser(description="Ganeti node daemon",
949 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
950 version="%%prog (ganeti) %s" %
951 constants.RELEASE_VERSION)
952 parser.add_option("--no-mlock", dest="mlock",
953 help="Do not mlock the node memory in ram",
954 default=True, action="store_false")
956 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
957 dirs.append((constants.LOG_OS_DIR, 0750))
958 dirs.append((constants.LOCK_DIR, 1777))
959 dirs.append((constants.CRYPTO_KEYS_DIR, constants.CRYPTO_KEYS_DIR_MODE))
960 dirs.append((constants.IMPORT_EXPORT_DIR, constants.IMPORT_EXPORT_DIR_MODE))
961 daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
962 default_ssl_cert=constants.NODED_CERT_FILE,
963 default_ssl_key=constants.NODED_CERT_FILE,
964 console_logging=True)
967 if __name__ == '__main__':