4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # pylint: disable-msg=C0103,W0142
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
29 # W0142: Used * or ** magic, since we do use it extensively in this
38 from optparse import OptionParser
40 from ganeti import backend
41 from ganeti import constants
42 from ganeti import objects
43 from ganeti import errors
44 from ganeti import jstore
45 from ganeti import daemon
46 from ganeti import http
47 from ganeti import utils
48 from ganeti import storage
50 import ganeti.http.server
56 def _RequireJobQueueLock(fn):
57 """Decorator for job queue manipulating functions.
60 QUEUE_LOCK_TIMEOUT = 10
62 def wrapper(*args, **kwargs):
63 # Locking in exclusive, blocking mode because there could be several
64 # children running at the same time. Waiting up to 10 seconds.
65 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
67 return fn(*args, **kwargs)
74 class NodeHttpServer(http.server.HttpServer):
75 """The server implementation.
77 This class holds all methods exposed over the RPC interface.
80 # too many public methods, and unused args - all methods get params
82 # pylint: disable-msg=R0904,W0613
83 def __init__(self, *args, **kwargs):
84 http.server.HttpServer.__init__(self, *args, **kwargs)
85 self.noded_pid = os.getpid()
87 def HandleRequest(self, req):
91 if req.request_method.upper() != http.HTTP_PUT:
92 raise http.HttpBadRequest()
94 path = req.request_path
95 if path.startswith("/"):
98 method = getattr(self, "perspective_%s" % path, None)
100 raise http.HttpNotFound()
103 rvalue = method(req.request_body)
106 except backend.RPCFail, err:
107 # our custom failure exception; str(err) works fine if the
108 # exception was constructed with a single argument, and in
109 # this case, err.message == err.args[0] == str(err)
110 return (False, str(err))
111 except errors.QuitGanetiException, err:
112 # Tell parent to quit
113 logging.info("Shutting down the node daemon, arguments: %s",
115 os.kill(self.noded_pid, signal.SIGTERM)
116 # And return the error's arguments, which must be already in
117 # correct tuple format
119 except Exception, err:
120 logging.exception("Error in RPC call")
121 return False, "Error while executing backend function: %s" % str(err)
123 # the new block devices --------------------------
126 def perspective_blockdev_create(params):
127 """Create a block device.
130 bdev_s, size, owner, on_primary, info = params
131 bdev = objects.Disk.FromDict(bdev_s)
133 raise ValueError("can't unserialize data!")
134 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
137 def perspective_blockdev_remove(params):
138 """Remove a block device.
142 bdev = objects.Disk.FromDict(bdev_s)
143 return backend.BlockdevRemove(bdev)
146 def perspective_blockdev_rename(params):
147 """Remove a block device.
150 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
151 return backend.BlockdevRename(devlist)
154 def perspective_blockdev_assemble(params):
155 """Assemble a block device.
158 bdev_s, owner, on_primary = params
159 bdev = objects.Disk.FromDict(bdev_s)
161 raise ValueError("can't unserialize data!")
162 return backend.BlockdevAssemble(bdev, owner, on_primary)
165 def perspective_blockdev_shutdown(params):
166 """Shutdown a block device.
170 bdev = objects.Disk.FromDict(bdev_s)
172 raise ValueError("can't unserialize data!")
173 return backend.BlockdevShutdown(bdev)
176 def perspective_blockdev_addchildren(params):
177 """Add a child to a mirror device.
179 Note: this is only valid for mirror devices. It's the caller's duty
180 to send a correct disk, otherwise we raise an error.
183 bdev_s, ndev_s = params
184 bdev = objects.Disk.FromDict(bdev_s)
185 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
186 if bdev is None or ndevs.count(None) > 0:
187 raise ValueError("can't unserialize data!")
188 return backend.BlockdevAddchildren(bdev, ndevs)
191 def perspective_blockdev_removechildren(params):
192 """Remove a child from a mirror device.
194 This is only valid for mirror devices, of course. It's the callers
195 duty to send a correct disk, otherwise we raise an error.
198 bdev_s, ndev_s = params
199 bdev = objects.Disk.FromDict(bdev_s)
200 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
201 if bdev is None or ndevs.count(None) > 0:
202 raise ValueError("can't unserialize data!")
203 return backend.BlockdevRemovechildren(bdev, ndevs)
206 def perspective_blockdev_getmirrorstatus(params):
207 """Return the mirror status for a list of disks.
210 disks = [objects.Disk.FromDict(dsk_s)
212 return [status.ToDict()
213 for status in backend.BlockdevGetmirrorstatus(disks)]
216 def perspective_blockdev_find(params):
217 """Expose the FindBlockDevice functionality for a disk.
219 This will try to find but not activate a disk.
222 disk = objects.Disk.FromDict(params[0])
224 result = backend.BlockdevFind(disk)
228 return result.ToDict()
231 def perspective_blockdev_snapshot(params):
232 """Create a snapshot device.
234 Note that this is only valid for LVM disks, if we get passed
235 something else we raise an exception. The snapshot device can be
236 remove by calling the generic block device remove call.
239 cfbd = objects.Disk.FromDict(params[0])
240 return backend.BlockdevSnapshot(cfbd)
243 def perspective_blockdev_grow(params):
244 """Grow a stack of devices.
247 cfbd = objects.Disk.FromDict(params[0])
249 return backend.BlockdevGrow(cfbd, amount)
252 def perspective_blockdev_close(params):
253 """Closes the given block devices.
256 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
257 return backend.BlockdevClose(params[0], disks)
260 def perspective_blockdev_getsize(params):
261 """Compute the sizes of the given block devices.
264 disks = [objects.Disk.FromDict(cf) for cf in params[0]]
265 return backend.BlockdevGetsize(disks)
268 def perspective_blockdev_export(params):
269 """Compute the sizes of the given block devices.
272 disk = objects.Disk.FromDict(params[0])
273 dest_node, dest_path, cluster_name = params[1:]
274 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
276 # blockdev/drbd specific methods ----------
279 def perspective_drbd_disconnect_net(params):
280 """Disconnects the network connection of drbd disks.
282 Note that this is only valid for drbd disks, so the members of the
283 disk list must all be drbd devices.
286 nodes_ip, disks = params
287 disks = [objects.Disk.FromDict(cf) for cf in disks]
288 return backend.DrbdDisconnectNet(nodes_ip, disks)
291 def perspective_drbd_attach_net(params):
292 """Attaches the network connection of drbd disks.
294 Note that this is only valid for drbd disks, so the members of the
295 disk list must all be drbd devices.
298 nodes_ip, disks, instance_name, multimaster = params
299 disks = [objects.Disk.FromDict(cf) for cf in disks]
300 return backend.DrbdAttachNet(nodes_ip, disks,
301 instance_name, multimaster)
304 def perspective_drbd_wait_sync(params):
305 """Wait until DRBD disks are synched.
307 Note that this is only valid for drbd disks, so the members of the
308 disk list must all be drbd devices.
311 nodes_ip, disks = params
312 disks = [objects.Disk.FromDict(cf) for cf in disks]
313 return backend.DrbdWaitSync(nodes_ip, disks)
315 # export/import --------------------------
318 def perspective_snapshot_export(params):
319 """Export a given snapshot.
322 disk = objects.Disk.FromDict(params[0])
323 dest_node = params[1]
324 instance = objects.Instance.FromDict(params[2])
325 cluster_name = params[3]
327 return backend.ExportSnapshot(disk, dest_node, instance,
328 cluster_name, dev_idx)
331 def perspective_finalize_export(params):
332 """Expose the finalize export functionality.
335 instance = objects.Instance.FromDict(params[0])
336 snap_disks = [objects.Disk.FromDict(str_data)
337 for str_data in params[1]]
338 return backend.FinalizeExport(instance, snap_disks)
341 def perspective_export_info(params):
342 """Query information about an existing export on this node.
344 The given path may not contain an export, in which case we return
349 return backend.ExportInfo(path)
352 def perspective_export_list(params):
353 """List the available exports on this node.
355 Note that as opposed to export_info, which may query data about an
356 export in any path, this only queries the standard Ganeti path
357 (constants.EXPORT_DIR).
360 return backend.ListExports()
363 def perspective_export_remove(params):
368 return backend.RemoveExport(export)
370 # volume --------------------------
373 def perspective_lv_list(params):
374 """Query the list of logical volumes in a given volume group.
378 return backend.GetVolumeList(vgname)
381 def perspective_vg_list(params):
382 """Query the list of volume groups.
385 return backend.ListVolumeGroups()
387 # Storage --------------------------
390 def perspective_storage_list(params):
391 """Get list of storage units.
394 (su_name, su_args, name, fields) = params
395 return storage.GetStorage(su_name, *su_args).List(name, fields)
398 def perspective_storage_modify(params):
399 """Modify a storage unit.
402 (su_name, su_args, name, changes) = params
403 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
406 def perspective_storage_execute(params):
407 """Execute an operation on a storage unit.
410 (su_name, su_args, name, op) = params
411 return storage.GetStorage(su_name, *su_args).Execute(name, op)
413 # bridge --------------------------
416 def perspective_bridges_exist(params):
417 """Check if all bridges given exist on this node.
420 bridges_list = params[0]
421 return backend.BridgesExist(bridges_list)
423 # instance --------------------------
426 def perspective_instance_os_add(params):
427 """Install an OS on a given instance.
431 inst = objects.Instance.FromDict(inst_s)
432 reinstall = params[1]
433 return backend.InstanceOsAdd(inst, reinstall)
436 def perspective_instance_run_rename(params):
437 """Runs the OS rename script for an instance.
440 inst_s, old_name = params
441 inst = objects.Instance.FromDict(inst_s)
442 return backend.RunRenameInstance(inst, old_name)
445 def perspective_instance_os_import(params):
446 """Run the import function of an OS onto a given instance.
449 inst_s, src_node, src_images, cluster_name = params
450 inst = objects.Instance.FromDict(inst_s)
451 return backend.ImportOSIntoInstance(inst, src_node, src_images,
455 def perspective_instance_shutdown(params):
456 """Shutdown an instance.
459 instance = objects.Instance.FromDict(params[0])
461 return backend.InstanceShutdown(instance, timeout)
464 def perspective_instance_start(params):
465 """Start an instance.
468 instance = objects.Instance.FromDict(params[0])
469 return backend.StartInstance(instance)
472 def perspective_migration_info(params):
473 """Gather information about an instance to be migrated.
476 instance = objects.Instance.FromDict(params[0])
477 return backend.MigrationInfo(instance)
480 def perspective_accept_instance(params):
481 """Prepare the node to accept an instance.
484 instance, info, target = params
485 instance = objects.Instance.FromDict(instance)
486 return backend.AcceptInstance(instance, info, target)
489 def perspective_finalize_migration(params):
490 """Finalize the instance migration.
493 instance, info, success = params
494 instance = objects.Instance.FromDict(instance)
495 return backend.FinalizeMigration(instance, info, success)
498 def perspective_instance_migrate(params):
499 """Migrates an instance.
502 instance, target, live = params
503 instance = objects.Instance.FromDict(instance)
504 return backend.MigrateInstance(instance, target, live)
507 def perspective_instance_reboot(params):
508 """Reboot an instance.
511 instance = objects.Instance.FromDict(params[0])
512 reboot_type = params[1]
513 shutdown_timeout = params[2]
514 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
517 def perspective_instance_info(params):
518 """Query instance information.
521 return backend.GetInstanceInfo(params[0], params[1])
524 def perspective_instance_migratable(params):
525 """Query whether the specified instance can be migrated.
528 instance = objects.Instance.FromDict(params[0])
529 return backend.GetInstanceMigratable(instance)
532 def perspective_all_instances_info(params):
533 """Query information about all instances.
536 return backend.GetAllInstancesInfo(params[0])
539 def perspective_instance_list(params):
540 """Query the list of running instances.
543 return backend.GetInstanceList(params[0])
545 # node --------------------------
548 def perspective_node_tcp_ping(params):
549 """Do a TcpPing on the remote node.
552 return utils.TcpPing(params[1], params[2], timeout=params[3],
553 live_port_needed=params[4], source=params[0])
556 def perspective_node_has_ip_address(params):
557 """Checks if a node has the given ip address.
560 return utils.OwnIpAddress(params[0])
563 def perspective_node_info(params):
564 """Query node information.
567 vgname, hypervisor_type = params
568 return backend.GetNodeInfo(vgname, hypervisor_type)
571 def perspective_node_add(params):
572 """Complete the registration of this node in the cluster.
575 return backend.AddNode(params[0], params[1], params[2],
576 params[3], params[4], params[5])
579 def perspective_node_verify(params):
580 """Run a verify sequence on this node.
583 return backend.VerifyNode(params[0], params[1])
586 def perspective_node_start_master(params):
587 """Promote this node to master status.
590 return backend.StartMaster(params[0], params[1])
593 def perspective_node_stop_master(params):
594 """Demote this node from master status.
597 return backend.StopMaster(params[0])
600 def perspective_node_leave_cluster(params):
601 """Cleanup after leaving a cluster.
604 return backend.LeaveCluster(params[0])
607 def perspective_node_volumes(params):
608 """Query the list of all logical volume groups.
611 return backend.NodeVolumes()
614 def perspective_node_demote_from_mc(params):
615 """Demote a node from the master candidate role.
618 return backend.DemoteFromMC()
622 def perspective_node_powercycle(params):
623 """Tries to powercycle the nod.
626 hypervisor_type = params[0]
627 return backend.PowercycleNode(hypervisor_type)
630 # cluster --------------------------
633 def perspective_version(params):
634 """Query version information.
637 return constants.PROTOCOL_VERSION
640 def perspective_upload_file(params):
643 Note that the backend implementation imposes strict rules on which
647 return backend.UploadFile(*params)
650 def perspective_master_info(params):
651 """Query master information.
654 return backend.GetMasterInfo()
657 def perspective_write_ssconf_files(params):
658 """Write ssconf files.
662 return backend.WriteSsconfFiles(values)
664 # os -----------------------
667 def perspective_os_diagnose(params):
668 """Query detailed information about existing OSes.
671 return backend.DiagnoseOS()
674 def perspective_os_get(params):
675 """Query information about a given OS.
679 os_obj = backend.OSFromDisk(name)
680 return os_obj.ToDict()
682 # hooks -----------------------
685 def perspective_hooks_runner(params):
689 hpath, phase, env = params
690 hr = backend.HooksRunner()
691 return hr.RunHooks(hpath, phase, env)
693 # iallocator -----------------
696 def perspective_iallocator_runner(params):
697 """Run an iallocator script.
701 iar = backend.IAllocatorRunner()
702 return iar.Run(name, idata)
704 # test -----------------------
707 def perspective_test_delay(params):
712 status, rval = utils.TestDelay(duration)
714 raise backend.RPCFail(rval)
717 # file storage ---------------
720 def perspective_file_storage_dir_create(params):
721 """Create the file storage directory.
724 file_storage_dir = params[0]
725 return backend.CreateFileStorageDir(file_storage_dir)
728 def perspective_file_storage_dir_remove(params):
729 """Remove the file storage directory.
732 file_storage_dir = params[0]
733 return backend.RemoveFileStorageDir(file_storage_dir)
736 def perspective_file_storage_dir_rename(params):
737 """Rename the file storage directory.
740 old_file_storage_dir = params[0]
741 new_file_storage_dir = params[1]
742 return backend.RenameFileStorageDir(old_file_storage_dir,
743 new_file_storage_dir)
745 # jobs ------------------------
748 @_RequireJobQueueLock
749 def perspective_jobqueue_update(params):
753 (file_name, content) = params
754 return backend.JobQueueUpdate(file_name, content)
757 @_RequireJobQueueLock
758 def perspective_jobqueue_purge(params):
762 return backend.JobQueuePurge()
765 @_RequireJobQueueLock
766 def perspective_jobqueue_rename(params):
767 """Rename a job queue file.
770 # TODO: What if a file fails to rename?
771 return [backend.JobQueueRename(old, new) for old, new in params]
774 def perspective_jobqueue_set_drain(params):
775 """Set/unset the queue drain flag.
778 drain_flag = params[0]
779 return backend.JobQueueSetDrainFlag(drain_flag)
782 # hypervisor ---------------
785 def perspective_hypervisor_validate_params(params):
786 """Validate the hypervisor parameters.
789 (hvname, hvparams) = params
790 return backend.ValidateHVParams(hvname, hvparams)
793 def CheckNoded(_, args):
794 """Initial checks whether to run or exit with a failure.
797 if args: # noded doesn't take any arguments
798 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
800 sys.exit(constants.EXIT_FAILURE)
803 def ExecNoded(options, _):
804 """Main node daemon function, executed with the PID file held.
807 global queue_lock # pylint: disable-msg=W0603
809 # Read SSL certificate
811 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
812 ssl_cert_path=options.ssl_cert)
817 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
819 mainloop = daemon.Mainloop()
820 server = NodeHttpServer(mainloop, options.bind_address, options.port,
821 ssl_params=ssl_params, ssl_verify_peer=True)
830 """Main function for the node daemon.
833 parser = OptionParser(description="Ganeti node daemon",
834 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
835 version="%%prog (ganeti) %s" %
836 constants.RELEASE_VERSION)
837 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
838 dirs.append((constants.LOG_OS_DIR, 0750))
839 dirs.append((constants.LOCK_DIR, 1777))
840 daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded)
843 if __name__ == '__main__':