4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # pylint: disable-msg=C0103,W0142
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
29 # W0142: Used * or ** magic, since we do use it extensively in this
37 from optparse import OptionParser
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48 from ganeti import serializer
50 import ganeti.http.server # pylint: disable-msg=W0611
56 def _PrepareQueueLock():
57 """Try to prepare the queue lock.
59 @return: None for success, otherwise an exception object
62 global queue_lock # pylint: disable-msg=W0603
64 if queue_lock is not None:
69 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
71 except EnvironmentError, err:
75 def _RequireJobQueueLock(fn):
76 """Decorator for job queue manipulating functions.
79 QUEUE_LOCK_TIMEOUT = 10
81 def wrapper(*args, **kwargs):
82 # Locking in exclusive, blocking mode because there could be several
83 # children running at the same time. Waiting up to 10 seconds.
84 if _PrepareQueueLock() is not None:
85 raise errors.JobQueueError("Job queue failed initialization,"
86 " cannot update jobs")
87 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
89 return fn(*args, **kwargs)
96 class NodeHttpServer(http.server.HttpServer):
97 """The server implementation.
99 This class holds all methods exposed over the RPC interface.
102 # too many public methods, and unused args - all methods get params
104 # pylint: disable-msg=R0904,W0613
105 def __init__(self, *args, **kwargs):
106 http.server.HttpServer.__init__(self, *args, **kwargs)
107 self.noded_pid = os.getpid()
109 def HandleRequest(self, req):
113 if req.request_method.upper() != http.HTTP_PUT:
114 raise http.HttpBadRequest()
116 path = req.request_path
117 if path.startswith("/"):
120 method = getattr(self, "perspective_%s" % path, None)
122 raise http.HttpNotFound()
125 result = (True, method(serializer.LoadJson(req.request_body)))
127 except backend.RPCFail, err:
128 # our custom failure exception; str(err) works fine if the
129 # exception was constructed with a single argument, and in
130 # this case, err.message == err.args[0] == str(err)
131 result = (False, str(err))
132 except errors.QuitGanetiException, err:
133 # Tell parent to quit
134 logging.info("Shutting down the node daemon, arguments: %s",
136 os.kill(self.noded_pid, signal.SIGTERM)
137 # And return the error's arguments, which must be already in
138 # correct tuple format
140 except Exception, err:
141 logging.exception("Error in RPC call")
142 result = (False, "Error while executing backend function: %s" % str(err))
144 return serializer.DumpJson(result, indent=False)
146 # the new block devices --------------------------
149 def perspective_blockdev_create(params):
150 """Create a block device.
153 bdev_s, size, owner, on_primary, info = params
154 bdev = objects.Disk.FromDict(bdev_s)
156 raise ValueError("can't unserialize data!")
157 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
160 def perspective_blockdev_remove(params):
161 """Remove a block device.
165 bdev = objects.Disk.FromDict(bdev_s)
166 return backend.BlockdevRemove(bdev)
169 def perspective_blockdev_rename(params):
170 """Remove a block device.
173 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
174 return backend.BlockdevRename(devlist)
177 def perspective_blockdev_assemble(params):
178 """Assemble a block device.
181 bdev_s, owner, on_primary = params
182 bdev = objects.Disk.FromDict(bdev_s)
184 raise ValueError("can't unserialize data!")
185 return backend.BlockdevAssemble(bdev, owner, on_primary)
188 def perspective_blockdev_shutdown(params):
189 """Shutdown a block device.
193 bdev = objects.Disk.FromDict(bdev_s)
195 raise ValueError("can't unserialize data!")
196 return backend.BlockdevShutdown(bdev)
199 def perspective_blockdev_addchildren(params):
200 """Add a child to a mirror device.
202 Note: this is only valid for mirror devices. It's the caller's duty
203 to send a correct disk, otherwise we raise an error.
206 bdev_s, ndev_s = params
207 bdev = objects.Disk.FromDict(bdev_s)
208 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
209 if bdev is None or ndevs.count(None) > 0:
210 raise ValueError("can't unserialize data!")
211 return backend.BlockdevAddchildren(bdev, ndevs)
214 def perspective_blockdev_removechildren(params):
215 """Remove a child from a mirror device.
217 This is only valid for mirror devices, of course. It's the callers
218 duty to send a correct disk, otherwise we raise an error.
221 bdev_s, ndev_s = params
222 bdev = objects.Disk.FromDict(bdev_s)
223 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
224 if bdev is None or ndevs.count(None) > 0:
225 raise ValueError("can't unserialize data!")
226 return backend.BlockdevRemovechildren(bdev, ndevs)
229 def perspective_blockdev_getmirrorstatus(params):
230 """Return the mirror status for a list of disks.
233 disks = [objects.Disk.FromDict(dsk_s)
235 return [status.ToDict()
236 for status in backend.BlockdevGetmirrorstatus(disks)]
239 def perspective_blockdev_find(params):
240 """Expose the FindBlockDevice functionality for a disk.
242 This will try to find but not activate a disk.
245 disk = objects.Disk.FromDict(params[0])
247 result = backend.BlockdevFind(disk)
251 return result.ToDict()
254 def perspective_blockdev_snapshot(params):
255 """Create a snapshot device.
257 Note that this is only valid for LVM disks, if we get passed
258 something else we raise an exception. The snapshot device can be
259 remove by calling the generic block device remove call.
262 cfbd = objects.Disk.FromDict(params[0])
263 return backend.BlockdevSnapshot(cfbd)
266 def perspective_blockdev_grow(params):
267 """Grow a stack of devices.
270 cfbd = objects.Disk.FromDict(params[0])
272 return backend.BlockdevGrow(cfbd, amount)
275 def perspective_blockdev_close(params):
276 """Closes the given block devices.
279 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
280 return backend.BlockdevClose(params[0], disks)
283 def perspective_blockdev_getsize(params):
284 """Compute the sizes of the given block devices.
287 disks = [objects.Disk.FromDict(cf) for cf in params[0]]
288 return backend.BlockdevGetsize(disks)
291 def perspective_blockdev_export(params):
292 """Compute the sizes of the given block devices.
295 disk = objects.Disk.FromDict(params[0])
296 dest_node, dest_path, cluster_name = params[1:]
297 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
299 # blockdev/drbd specific methods ----------
302 def perspective_drbd_disconnect_net(params):
303 """Disconnects the network connection of drbd disks.
305 Note that this is only valid for drbd disks, so the members of the
306 disk list must all be drbd devices.
309 nodes_ip, disks = params
310 disks = [objects.Disk.FromDict(cf) for cf in disks]
311 return backend.DrbdDisconnectNet(nodes_ip, disks)
314 def perspective_drbd_attach_net(params):
315 """Attaches the network connection of drbd disks.
317 Note that this is only valid for drbd disks, so the members of the
318 disk list must all be drbd devices.
321 nodes_ip, disks, instance_name, multimaster = params
322 disks = [objects.Disk.FromDict(cf) for cf in disks]
323 return backend.DrbdAttachNet(nodes_ip, disks,
324 instance_name, multimaster)
327 def perspective_drbd_wait_sync(params):
328 """Wait until DRBD disks are synched.
330 Note that this is only valid for drbd disks, so the members of the
331 disk list must all be drbd devices.
334 nodes_ip, disks = params
335 disks = [objects.Disk.FromDict(cf) for cf in disks]
336 return backend.DrbdWaitSync(nodes_ip, disks)
338 # export/import --------------------------
341 def perspective_snapshot_export(params):
342 """Export a given snapshot.
345 disk = objects.Disk.FromDict(params[0])
346 dest_node = params[1]
347 instance = objects.Instance.FromDict(params[2])
348 cluster_name = params[3]
351 return backend.ExportSnapshot(disk, dest_node, instance,
352 cluster_name, dev_idx, debug)
355 def perspective_finalize_export(params):
356 """Expose the finalize export functionality.
359 instance = objects.Instance.FromDict(params[0])
360 snap_disks = [objects.Disk.FromDict(str_data)
361 for str_data in params[1]]
362 return backend.FinalizeExport(instance, snap_disks)
365 def perspective_export_info(params):
366 """Query information about an existing export on this node.
368 The given path may not contain an export, in which case we return
373 return backend.ExportInfo(path)
376 def perspective_export_list(params):
377 """List the available exports on this node.
379 Note that as opposed to export_info, which may query data about an
380 export in any path, this only queries the standard Ganeti path
381 (constants.EXPORT_DIR).
384 return backend.ListExports()
387 def perspective_export_remove(params):
392 return backend.RemoveExport(export)
394 # volume --------------------------
397 def perspective_lv_list(params):
398 """Query the list of logical volumes in a given volume group.
402 return backend.GetVolumeList(vgname)
405 def perspective_vg_list(params):
406 """Query the list of volume groups.
409 return backend.ListVolumeGroups()
411 # Storage --------------------------
414 def perspective_storage_list(params):
415 """Get list of storage units.
418 (su_name, su_args, name, fields) = params
419 return storage.GetStorage(su_name, *su_args).List(name, fields)
422 def perspective_storage_modify(params):
423 """Modify a storage unit.
426 (su_name, su_args, name, changes) = params
427 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
430 def perspective_storage_execute(params):
431 """Execute an operation on a storage unit.
434 (su_name, su_args, name, op) = params
435 return storage.GetStorage(su_name, *su_args).Execute(name, op)
437 # bridge --------------------------
440 def perspective_bridges_exist(params):
441 """Check if all bridges given exist on this node.
444 bridges_list = params[0]
445 return backend.BridgesExist(bridges_list)
447 # instance --------------------------
450 def perspective_instance_os_add(params):
451 """Install an OS on a given instance.
455 inst = objects.Instance.FromDict(inst_s)
456 reinstall = params[1]
458 return backend.InstanceOsAdd(inst, reinstall, debug)
461 def perspective_instance_run_rename(params):
462 """Runs the OS rename script for an instance.
465 inst_s, old_name, debug = params
466 inst = objects.Instance.FromDict(inst_s)
467 return backend.RunRenameInstance(inst, old_name, debug)
470 def perspective_instance_os_import(params):
471 """Run the import function of an OS onto a given instance.
474 inst_s, src_node, src_images, cluster_name, debug = params
475 inst = objects.Instance.FromDict(inst_s)
476 return backend.ImportOSIntoInstance(inst, src_node, src_images,
480 def perspective_instance_shutdown(params):
481 """Shutdown an instance.
484 instance = objects.Instance.FromDict(params[0])
486 return backend.InstanceShutdown(instance, timeout)
489 def perspective_instance_start(params):
490 """Start an instance.
493 instance = objects.Instance.FromDict(params[0])
494 return backend.StartInstance(instance)
497 def perspective_migration_info(params):
498 """Gather information about an instance to be migrated.
501 instance = objects.Instance.FromDict(params[0])
502 return backend.MigrationInfo(instance)
505 def perspective_accept_instance(params):
506 """Prepare the node to accept an instance.
509 instance, info, target = params
510 instance = objects.Instance.FromDict(instance)
511 return backend.AcceptInstance(instance, info, target)
514 def perspective_finalize_migration(params):
515 """Finalize the instance migration.
518 instance, info, success = params
519 instance = objects.Instance.FromDict(instance)
520 return backend.FinalizeMigration(instance, info, success)
523 def perspective_instance_migrate(params):
524 """Migrates an instance.
527 instance, target, live = params
528 instance = objects.Instance.FromDict(instance)
529 return backend.MigrateInstance(instance, target, live)
532 def perspective_instance_reboot(params):
533 """Reboot an instance.
536 instance = objects.Instance.FromDict(params[0])
537 reboot_type = params[1]
538 shutdown_timeout = params[2]
539 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
542 def perspective_instance_info(params):
543 """Query instance information.
546 return backend.GetInstanceInfo(params[0], params[1])
549 def perspective_instance_migratable(params):
550 """Query whether the specified instance can be migrated.
553 instance = objects.Instance.FromDict(params[0])
554 return backend.GetInstanceMigratable(instance)
557 def perspective_all_instances_info(params):
558 """Query information about all instances.
561 return backend.GetAllInstancesInfo(params[0])
564 def perspective_instance_list(params):
565 """Query the list of running instances.
568 return backend.GetInstanceList(params[0])
570 # node --------------------------
573 def perspective_node_tcp_ping(params):
574 """Do a TcpPing on the remote node.
577 return utils.TcpPing(params[1], params[2], timeout=params[3],
578 live_port_needed=params[4], source=params[0])
581 def perspective_node_has_ip_address(params):
582 """Checks if a node has the given ip address.
585 return utils.OwnIpAddress(params[0])
588 def perspective_node_info(params):
589 """Query node information.
592 vgname, hypervisor_type = params
593 return backend.GetNodeInfo(vgname, hypervisor_type)
596 def perspective_node_add(params):
597 """Complete the registration of this node in the cluster.
600 return backend.AddNode(params[0], params[1], params[2],
601 params[3], params[4], params[5])
604 def perspective_node_verify(params):
605 """Run a verify sequence on this node.
608 return backend.VerifyNode(params[0], params[1])
611 def perspective_node_start_master(params):
612 """Promote this node to master status.
615 return backend.StartMaster(params[0], params[1])
618 def perspective_node_stop_master(params):
619 """Demote this node from master status.
622 return backend.StopMaster(params[0])
625 def perspective_node_leave_cluster(params):
626 """Cleanup after leaving a cluster.
629 return backend.LeaveCluster(params[0])
632 def perspective_node_volumes(params):
633 """Query the list of all logical volume groups.
636 return backend.NodeVolumes()
639 def perspective_node_demote_from_mc(params):
640 """Demote a node from the master candidate role.
643 return backend.DemoteFromMC()
647 def perspective_node_powercycle(params):
648 """Tries to powercycle the nod.
651 hypervisor_type = params[0]
652 return backend.PowercycleNode(hypervisor_type)
655 # cluster --------------------------
658 def perspective_version(params):
659 """Query version information.
662 return constants.PROTOCOL_VERSION
665 def perspective_upload_file(params):
668 Note that the backend implementation imposes strict rules on which
672 return backend.UploadFile(*params)
675 def perspective_master_info(params):
676 """Query master information.
679 return backend.GetMasterInfo()
682 def perspective_write_ssconf_files(params):
683 """Write ssconf files.
687 return backend.WriteSsconfFiles(values)
689 # os -----------------------
692 def perspective_os_diagnose(params):
693 """Query detailed information about existing OSes.
696 return backend.DiagnoseOS()
699 def perspective_os_get(params):
700 """Query information about a given OS.
704 os_obj = backend.OSFromDisk(name)
705 return os_obj.ToDict()
707 # hooks -----------------------
710 def perspective_hooks_runner(params):
714 hpath, phase, env = params
715 hr = backend.HooksRunner()
716 return hr.RunHooks(hpath, phase, env)
718 # iallocator -----------------
721 def perspective_iallocator_runner(params):
722 """Run an iallocator script.
726 iar = backend.IAllocatorRunner()
727 return iar.Run(name, idata)
729 # test -----------------------
732 def perspective_test_delay(params):
737 status, rval = utils.TestDelay(duration)
739 raise backend.RPCFail(rval)
742 # file storage ---------------
745 def perspective_file_storage_dir_create(params):
746 """Create the file storage directory.
749 file_storage_dir = params[0]
750 return backend.CreateFileStorageDir(file_storage_dir)
753 def perspective_file_storage_dir_remove(params):
754 """Remove the file storage directory.
757 file_storage_dir = params[0]
758 return backend.RemoveFileStorageDir(file_storage_dir)
761 def perspective_file_storage_dir_rename(params):
762 """Rename the file storage directory.
765 old_file_storage_dir = params[0]
766 new_file_storage_dir = params[1]
767 return backend.RenameFileStorageDir(old_file_storage_dir,
768 new_file_storage_dir)
770 # jobs ------------------------
773 @_RequireJobQueueLock
774 def perspective_jobqueue_update(params):
778 (file_name, content) = params
779 return backend.JobQueueUpdate(file_name, content)
782 @_RequireJobQueueLock
783 def perspective_jobqueue_purge(params):
787 return backend.JobQueuePurge()
790 @_RequireJobQueueLock
791 def perspective_jobqueue_rename(params):
792 """Rename a job queue file.
795 # TODO: What if a file fails to rename?
796 return [backend.JobQueueRename(old, new) for old, new in params]
799 def perspective_jobqueue_set_drain(params):
800 """Set/unset the queue drain flag.
803 drain_flag = params[0]
804 return backend.JobQueueSetDrainFlag(drain_flag)
807 # hypervisor ---------------
810 def perspective_hypervisor_validate_params(params):
811 """Validate the hypervisor parameters.
814 (hvname, hvparams) = params
815 return backend.ValidateHVParams(hvname, hvparams)
818 def CheckNoded(_, args):
819 """Initial checks whether to run or exit with a failure.
822 if args: # noded doesn't take any arguments
823 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
825 sys.exit(constants.EXIT_FAILURE)
828 def ExecNoded(options, _):
829 """Main node daemon function, executed with the PID file held.
832 # Read SSL certificate
834 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
835 ssl_cert_path=options.ssl_cert)
839 err = _PrepareQueueLock()
841 # this might be some kind of file-system/permission error; while
842 # this breaks the job queue functionality, we shouldn't prevent
843 # startup of the whole node daemon because of this
844 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
846 mainloop = daemon.Mainloop()
847 server = NodeHttpServer(mainloop, options.bind_address, options.port,
848 ssl_params=ssl_params, ssl_verify_peer=True)
857 """Main function for the node daemon.
860 parser = OptionParser(description="Ganeti node daemon",
861 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
862 version="%%prog (ganeti) %s" %
863 constants.RELEASE_VERSION)
864 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
865 dirs.append((constants.LOG_OS_DIR, 0750))
866 dirs.append((constants.LOCK_DIR, 1777))
867 daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
868 default_ssl_cert=constants.SSL_CERT_FILE,
869 default_ssl_key=constants.SSL_CERT_FILE)
872 if __name__ == '__main__':