4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # pylint: disable-msg=C0103,W0142
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
29 # W0142: Used * or ** magic, since we do use it extensively in this
37 from optparse import OptionParser
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
48 from ganeti import serializer
50 import ganeti.http.server # pylint: disable-msg=W0611
56 def _RequireJobQueueLock(fn):
57 """Decorator for job queue manipulating functions.
60 QUEUE_LOCK_TIMEOUT = 10
62 def wrapper(*args, **kwargs):
63 # Locking in exclusive, blocking mode because there could be several
64 # children running at the same time. Waiting up to 10 seconds.
65 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
67 return fn(*args, **kwargs)
74 class NodeHttpServer(http.server.HttpServer):
75 """The server implementation.
77 This class holds all methods exposed over the RPC interface.
80 # too many public methods, and unused args - all methods get params
82 # pylint: disable-msg=R0904,W0613
83 def __init__(self, *args, **kwargs):
84 http.server.HttpServer.__init__(self, *args, **kwargs)
85 self.noded_pid = os.getpid()
87 def HandleRequest(self, req):
91 if req.request_method.upper() != http.HTTP_PUT:
92 raise http.HttpBadRequest()
94 path = req.request_path
95 if path.startswith("/"):
98 method = getattr(self, "perspective_%s" % path, None)
100 raise http.HttpNotFound()
103 result = (True, method(serializer.LoadJson(req.request_body)))
105 except backend.RPCFail, err:
106 # our custom failure exception; str(err) works fine if the
107 # exception was constructed with a single argument, and in
108 # this case, err.message == err.args[0] == str(err)
109 result = (False, str(err))
110 except errors.QuitGanetiException, err:
111 # Tell parent to quit
112 logging.info("Shutting down the node daemon, arguments: %s",
114 os.kill(self.noded_pid, signal.SIGTERM)
115 # And return the error's arguments, which must be already in
116 # correct tuple format
118 except Exception, err:
119 logging.exception("Error in RPC call")
120 result = (False, "Error while executing backend function: %s" % str(err))
122 return serializer.DumpJson(result, indent=False)
124 # the new block devices --------------------------
127 def perspective_blockdev_create(params):
128 """Create a block device.
131 bdev_s, size, owner, on_primary, info = params
132 bdev = objects.Disk.FromDict(bdev_s)
134 raise ValueError("can't unserialize data!")
135 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
138 def perspective_blockdev_remove(params):
139 """Remove a block device.
143 bdev = objects.Disk.FromDict(bdev_s)
144 return backend.BlockdevRemove(bdev)
147 def perspective_blockdev_rename(params):
148 """Remove a block device.
151 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
152 return backend.BlockdevRename(devlist)
155 def perspective_blockdev_assemble(params):
156 """Assemble a block device.
159 bdev_s, owner, on_primary = params
160 bdev = objects.Disk.FromDict(bdev_s)
162 raise ValueError("can't unserialize data!")
163 return backend.BlockdevAssemble(bdev, owner, on_primary)
166 def perspective_blockdev_shutdown(params):
167 """Shutdown a block device.
171 bdev = objects.Disk.FromDict(bdev_s)
173 raise ValueError("can't unserialize data!")
174 return backend.BlockdevShutdown(bdev)
177 def perspective_blockdev_addchildren(params):
178 """Add a child to a mirror device.
180 Note: this is only valid for mirror devices. It's the caller's duty
181 to send a correct disk, otherwise we raise an error.
184 bdev_s, ndev_s = params
185 bdev = objects.Disk.FromDict(bdev_s)
186 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
187 if bdev is None or ndevs.count(None) > 0:
188 raise ValueError("can't unserialize data!")
189 return backend.BlockdevAddchildren(bdev, ndevs)
192 def perspective_blockdev_removechildren(params):
193 """Remove a child from a mirror device.
195 This is only valid for mirror devices, of course. It's the callers
196 duty to send a correct disk, otherwise we raise an error.
199 bdev_s, ndev_s = params
200 bdev = objects.Disk.FromDict(bdev_s)
201 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
202 if bdev is None or ndevs.count(None) > 0:
203 raise ValueError("can't unserialize data!")
204 return backend.BlockdevRemovechildren(bdev, ndevs)
207 def perspective_blockdev_getmirrorstatus(params):
208 """Return the mirror status for a list of disks.
211 disks = [objects.Disk.FromDict(dsk_s)
213 return [status.ToDict()
214 for status in backend.BlockdevGetmirrorstatus(disks)]
217 def perspective_blockdev_find(params):
218 """Expose the FindBlockDevice functionality for a disk.
220 This will try to find but not activate a disk.
223 disk = objects.Disk.FromDict(params[0])
225 result = backend.BlockdevFind(disk)
229 return result.ToDict()
232 def perspective_blockdev_snapshot(params):
233 """Create a snapshot device.
235 Note that this is only valid for LVM disks, if we get passed
236 something else we raise an exception. The snapshot device can be
237 remove by calling the generic block device remove call.
240 cfbd = objects.Disk.FromDict(params[0])
241 return backend.BlockdevSnapshot(cfbd)
244 def perspective_blockdev_grow(params):
245 """Grow a stack of devices.
248 cfbd = objects.Disk.FromDict(params[0])
250 return backend.BlockdevGrow(cfbd, amount)
253 def perspective_blockdev_close(params):
254 """Closes the given block devices.
257 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
258 return backend.BlockdevClose(params[0], disks)
261 def perspective_blockdev_getsize(params):
262 """Compute the sizes of the given block devices.
265 disks = [objects.Disk.FromDict(cf) for cf in params[0]]
266 return backend.BlockdevGetsize(disks)
269 def perspective_blockdev_export(params):
270 """Compute the sizes of the given block devices.
273 disk = objects.Disk.FromDict(params[0])
274 dest_node, dest_path, cluster_name = params[1:]
275 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
277 # blockdev/drbd specific methods ----------
280 def perspective_drbd_disconnect_net(params):
281 """Disconnects the network connection of drbd disks.
283 Note that this is only valid for drbd disks, so the members of the
284 disk list must all be drbd devices.
287 nodes_ip, disks = params
288 disks = [objects.Disk.FromDict(cf) for cf in disks]
289 return backend.DrbdDisconnectNet(nodes_ip, disks)
292 def perspective_drbd_attach_net(params):
293 """Attaches the network connection of drbd disks.
295 Note that this is only valid for drbd disks, so the members of the
296 disk list must all be drbd devices.
299 nodes_ip, disks, instance_name, multimaster = params
300 disks = [objects.Disk.FromDict(cf) for cf in disks]
301 return backend.DrbdAttachNet(nodes_ip, disks,
302 instance_name, multimaster)
305 def perspective_drbd_wait_sync(params):
306 """Wait until DRBD disks are synched.
308 Note that this is only valid for drbd disks, so the members of the
309 disk list must all be drbd devices.
312 nodes_ip, disks = params
313 disks = [objects.Disk.FromDict(cf) for cf in disks]
314 return backend.DrbdWaitSync(nodes_ip, disks)
316 # export/import --------------------------
319 def perspective_snapshot_export(params):
320 """Export a given snapshot.
323 disk = objects.Disk.FromDict(params[0])
324 dest_node = params[1]
325 instance = objects.Instance.FromDict(params[2])
326 cluster_name = params[3]
328 return backend.ExportSnapshot(disk, dest_node, instance,
329 cluster_name, dev_idx)
332 def perspective_finalize_export(params):
333 """Expose the finalize export functionality.
336 instance = objects.Instance.FromDict(params[0])
337 snap_disks = [objects.Disk.FromDict(str_data)
338 for str_data in params[1]]
339 return backend.FinalizeExport(instance, snap_disks)
342 def perspective_export_info(params):
343 """Query information about an existing export on this node.
345 The given path may not contain an export, in which case we return
350 return backend.ExportInfo(path)
353 def perspective_export_list(params):
354 """List the available exports on this node.
356 Note that as opposed to export_info, which may query data about an
357 export in any path, this only queries the standard Ganeti path
358 (constants.EXPORT_DIR).
361 return backend.ListExports()
364 def perspective_export_remove(params):
369 return backend.RemoveExport(export)
371 # volume --------------------------
374 def perspective_lv_list(params):
375 """Query the list of logical volumes in a given volume group.
379 return backend.GetVolumeList(vgname)
382 def perspective_vg_list(params):
383 """Query the list of volume groups.
386 return backend.ListVolumeGroups()
388 # Storage --------------------------
391 def perspective_storage_list(params):
392 """Get list of storage units.
395 (su_name, su_args, name, fields) = params
396 return storage.GetStorage(su_name, *su_args).List(name, fields)
399 def perspective_storage_modify(params):
400 """Modify a storage unit.
403 (su_name, su_args, name, changes) = params
404 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
407 def perspective_storage_execute(params):
408 """Execute an operation on a storage unit.
411 (su_name, su_args, name, op) = params
412 return storage.GetStorage(su_name, *su_args).Execute(name, op)
414 # bridge --------------------------
417 def perspective_bridges_exist(params):
418 """Check if all bridges given exist on this node.
421 bridges_list = params[0]
422 return backend.BridgesExist(bridges_list)
424 # instance --------------------------
427 def perspective_instance_os_add(params):
428 """Install an OS on a given instance.
432 inst = objects.Instance.FromDict(inst_s)
433 reinstall = params[1]
434 return backend.InstanceOsAdd(inst, reinstall)
437 def perspective_instance_run_rename(params):
438 """Runs the OS rename script for an instance.
441 inst_s, old_name = params
442 inst = objects.Instance.FromDict(inst_s)
443 return backend.RunRenameInstance(inst, old_name)
446 def perspective_instance_os_import(params):
447 """Run the import function of an OS onto a given instance.
450 inst_s, src_node, src_images, cluster_name = params
451 inst = objects.Instance.FromDict(inst_s)
452 return backend.ImportOSIntoInstance(inst, src_node, src_images,
456 def perspective_instance_shutdown(params):
457 """Shutdown an instance.
460 instance = objects.Instance.FromDict(params[0])
462 return backend.InstanceShutdown(instance, timeout)
465 def perspective_instance_start(params):
466 """Start an instance.
469 instance = objects.Instance.FromDict(params[0])
470 return backend.StartInstance(instance)
473 def perspective_migration_info(params):
474 """Gather information about an instance to be migrated.
477 instance = objects.Instance.FromDict(params[0])
478 return backend.MigrationInfo(instance)
481 def perspective_accept_instance(params):
482 """Prepare the node to accept an instance.
485 instance, info, target = params
486 instance = objects.Instance.FromDict(instance)
487 return backend.AcceptInstance(instance, info, target)
490 def perspective_finalize_migration(params):
491 """Finalize the instance migration.
494 instance, info, success = params
495 instance = objects.Instance.FromDict(instance)
496 return backend.FinalizeMigration(instance, info, success)
499 def perspective_instance_migrate(params):
500 """Migrates an instance.
503 instance, target, live = params
504 instance = objects.Instance.FromDict(instance)
505 return backend.MigrateInstance(instance, target, live)
508 def perspective_instance_reboot(params):
509 """Reboot an instance.
512 instance = objects.Instance.FromDict(params[0])
513 reboot_type = params[1]
514 shutdown_timeout = params[2]
515 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
518 def perspective_instance_info(params):
519 """Query instance information.
522 return backend.GetInstanceInfo(params[0], params[1])
525 def perspective_instance_migratable(params):
526 """Query whether the specified instance can be migrated.
529 instance = objects.Instance.FromDict(params[0])
530 return backend.GetInstanceMigratable(instance)
533 def perspective_all_instances_info(params):
534 """Query information about all instances.
537 return backend.GetAllInstancesInfo(params[0])
540 def perspective_instance_list(params):
541 """Query the list of running instances.
544 return backend.GetInstanceList(params[0])
546 # node --------------------------
549 def perspective_node_tcp_ping(params):
550 """Do a TcpPing on the remote node.
553 return utils.TcpPing(params[1], params[2], timeout=params[3],
554 live_port_needed=params[4], source=params[0])
557 def perspective_node_has_ip_address(params):
558 """Checks if a node has the given ip address.
561 return utils.OwnIpAddress(params[0])
564 def perspective_node_info(params):
565 """Query node information.
568 vgname, hypervisor_type = params
569 return backend.GetNodeInfo(vgname, hypervisor_type)
572 def perspective_node_add(params):
573 """Complete the registration of this node in the cluster.
576 return backend.AddNode(params[0], params[1], params[2],
577 params[3], params[4], params[5])
580 def perspective_node_verify(params):
581 """Run a verify sequence on this node.
584 return backend.VerifyNode(params[0], params[1])
587 def perspective_node_start_master(params):
588 """Promote this node to master status.
591 return backend.StartMaster(params[0], params[1])
594 def perspective_node_stop_master(params):
595 """Demote this node from master status.
598 return backend.StopMaster(params[0])
601 def perspective_node_leave_cluster(params):
602 """Cleanup after leaving a cluster.
605 return backend.LeaveCluster(params[0])
608 def perspective_node_volumes(params):
609 """Query the list of all logical volume groups.
612 return backend.NodeVolumes()
615 def perspective_node_demote_from_mc(params):
616 """Demote a node from the master candidate role.
619 return backend.DemoteFromMC()
623 def perspective_node_powercycle(params):
624 """Tries to powercycle the nod.
627 hypervisor_type = params[0]
628 return backend.PowercycleNode(hypervisor_type)
631 # cluster --------------------------
634 def perspective_version(params):
635 """Query version information.
638 return constants.PROTOCOL_VERSION
641 def perspective_upload_file(params):
644 Note that the backend implementation imposes strict rules on which
648 return backend.UploadFile(*params)
651 def perspective_master_info(params):
652 """Query master information.
655 return backend.GetMasterInfo()
658 def perspective_write_ssconf_files(params):
659 """Write ssconf files.
663 return backend.WriteSsconfFiles(values)
665 # os -----------------------
668 def perspective_os_diagnose(params):
669 """Query detailed information about existing OSes.
672 return backend.DiagnoseOS()
675 def perspective_os_get(params):
676 """Query information about a given OS.
680 os_obj = backend.OSFromDisk(name)
681 return os_obj.ToDict()
683 # hooks -----------------------
686 def perspective_hooks_runner(params):
690 hpath, phase, env = params
691 hr = backend.HooksRunner()
692 return hr.RunHooks(hpath, phase, env)
694 # iallocator -----------------
697 def perspective_iallocator_runner(params):
698 """Run an iallocator script.
702 iar = backend.IAllocatorRunner()
703 return iar.Run(name, idata)
705 # test -----------------------
708 def perspective_test_delay(params):
713 status, rval = utils.TestDelay(duration)
715 raise backend.RPCFail(rval)
718 # file storage ---------------
721 def perspective_file_storage_dir_create(params):
722 """Create the file storage directory.
725 file_storage_dir = params[0]
726 return backend.CreateFileStorageDir(file_storage_dir)
729 def perspective_file_storage_dir_remove(params):
730 """Remove the file storage directory.
733 file_storage_dir = params[0]
734 return backend.RemoveFileStorageDir(file_storage_dir)
737 def perspective_file_storage_dir_rename(params):
738 """Rename the file storage directory.
741 old_file_storage_dir = params[0]
742 new_file_storage_dir = params[1]
743 return backend.RenameFileStorageDir(old_file_storage_dir,
744 new_file_storage_dir)
746 # jobs ------------------------
749 @_RequireJobQueueLock
750 def perspective_jobqueue_update(params):
754 (file_name, content) = params
755 return backend.JobQueueUpdate(file_name, content)
758 @_RequireJobQueueLock
759 def perspective_jobqueue_purge(params):
763 return backend.JobQueuePurge()
766 @_RequireJobQueueLock
767 def perspective_jobqueue_rename(params):
768 """Rename a job queue file.
771 # TODO: What if a file fails to rename?
772 return [backend.JobQueueRename(old, new) for old, new in params]
775 def perspective_jobqueue_set_drain(params):
776 """Set/unset the queue drain flag.
779 drain_flag = params[0]
780 return backend.JobQueueSetDrainFlag(drain_flag)
783 # hypervisor ---------------
786 def perspective_hypervisor_validate_params(params):
787 """Validate the hypervisor parameters.
790 (hvname, hvparams) = params
791 return backend.ValidateHVParams(hvname, hvparams)
794 def CheckNoded(_, args):
795 """Initial checks whether to run or exit with a failure.
798 if args: # noded doesn't take any arguments
799 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
801 sys.exit(constants.EXIT_FAILURE)
804 def ExecNoded(options, _):
805 """Main node daemon function, executed with the PID file held.
808 global queue_lock # pylint: disable-msg=W0603
810 # Read SSL certificate
812 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
813 ssl_cert_path=options.ssl_cert)
818 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
820 mainloop = daemon.Mainloop()
821 server = NodeHttpServer(mainloop, options.bind_address, options.port,
822 ssl_params=ssl_params, ssl_verify_peer=True)
831 """Main function for the node daemon.
834 parser = OptionParser(description="Ganeti node daemon",
835 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
836 version="%%prog (ganeti) %s" %
837 constants.RELEASE_VERSION)
838 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
839 dirs.append((constants.LOG_OS_DIR, 0750))
840 dirs.append((constants.LOCK_DIR, 1777))
841 daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
842 default_ssl_cert=constants.SSL_CERT_FILE,
843 default_ssl_key=constants.SSL_CERT_FILE)
846 if __name__ == '__main__':