4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # pylint: disable-msg=C0103,W0142
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
29 # W0142: Used * or ** magic, since we do use it extensively in this
37 from optparse import OptionParser
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
49 import ganeti.http.server # pylint: disable-msg=W0611
55 def _RequireJobQueueLock(fn):
56 """Decorator for job queue manipulating functions.
59 QUEUE_LOCK_TIMEOUT = 10
61 def wrapper(*args, **kwargs):
62 # Locking in exclusive, blocking mode because there could be several
63 # children running at the same time. Waiting up to 10 seconds.
64 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
66 return fn(*args, **kwargs)
73 class NodeHttpServer(http.server.HttpServer):
74 """The server implementation.
76 This class holds all methods exposed over the RPC interface.
79 # too many public methods, and unused args - all methods get params
81 # pylint: disable-msg=R0904,W0613
82 def __init__(self, *args, **kwargs):
83 http.server.HttpServer.__init__(self, *args, **kwargs)
84 self.noded_pid = os.getpid()
86 def HandleRequest(self, req):
90 if req.request_method.upper() != http.HTTP_PUT:
91 raise http.HttpBadRequest()
93 path = req.request_path
94 if path.startswith("/"):
97 method = getattr(self, "perspective_%s" % path, None)
99 raise http.HttpNotFound()
102 rvalue = method(req.request_body)
105 except backend.RPCFail, err:
106 # our custom failure exception; str(err) works fine if the
107 # exception was constructed with a single argument, and in
108 # this case, err.message == err.args[0] == str(err)
109 return (False, str(err))
110 except errors.QuitGanetiException, err:
111 # Tell parent to quit
112 logging.info("Shutting down the node daemon, arguments: %s",
114 os.kill(self.noded_pid, signal.SIGTERM)
115 # And return the error's arguments, which must be already in
116 # correct tuple format
118 except Exception, err:
119 logging.exception("Error in RPC call")
120 return False, "Error while executing backend function: %s" % str(err)
122 # the new block devices --------------------------
125 def perspective_blockdev_create(params):
126 """Create a block device.
129 bdev_s, size, owner, on_primary, info = params
130 bdev = objects.Disk.FromDict(bdev_s)
132 raise ValueError("can't unserialize data!")
133 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
136 def perspective_blockdev_remove(params):
137 """Remove a block device.
141 bdev = objects.Disk.FromDict(bdev_s)
142 return backend.BlockdevRemove(bdev)
145 def perspective_blockdev_rename(params):
146 """Remove a block device.
149 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
150 return backend.BlockdevRename(devlist)
153 def perspective_blockdev_assemble(params):
154 """Assemble a block device.
157 bdev_s, owner, on_primary = params
158 bdev = objects.Disk.FromDict(bdev_s)
160 raise ValueError("can't unserialize data!")
161 return backend.BlockdevAssemble(bdev, owner, on_primary)
164 def perspective_blockdev_shutdown(params):
165 """Shutdown a block device.
169 bdev = objects.Disk.FromDict(bdev_s)
171 raise ValueError("can't unserialize data!")
172 return backend.BlockdevShutdown(bdev)
175 def perspective_blockdev_addchildren(params):
176 """Add a child to a mirror device.
178 Note: this is only valid for mirror devices. It's the caller's duty
179 to send a correct disk, otherwise we raise an error.
182 bdev_s, ndev_s = params
183 bdev = objects.Disk.FromDict(bdev_s)
184 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
185 if bdev is None or ndevs.count(None) > 0:
186 raise ValueError("can't unserialize data!")
187 return backend.BlockdevAddchildren(bdev, ndevs)
190 def perspective_blockdev_removechildren(params):
191 """Remove a child from a mirror device.
193 This is only valid for mirror devices, of course. It's the callers
194 duty to send a correct disk, otherwise we raise an error.
197 bdev_s, ndev_s = params
198 bdev = objects.Disk.FromDict(bdev_s)
199 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
200 if bdev is None or ndevs.count(None) > 0:
201 raise ValueError("can't unserialize data!")
202 return backend.BlockdevRemovechildren(bdev, ndevs)
205 def perspective_blockdev_getmirrorstatus(params):
206 """Return the mirror status for a list of disks.
209 disks = [objects.Disk.FromDict(dsk_s)
211 return [status.ToDict()
212 for status in backend.BlockdevGetmirrorstatus(disks)]
215 def perspective_blockdev_find(params):
216 """Expose the FindBlockDevice functionality for a disk.
218 This will try to find but not activate a disk.
221 disk = objects.Disk.FromDict(params[0])
223 result = backend.BlockdevFind(disk)
227 return result.ToDict()
230 def perspective_blockdev_snapshot(params):
231 """Create a snapshot device.
233 Note that this is only valid for LVM disks, if we get passed
234 something else we raise an exception. The snapshot device can be
235 remove by calling the generic block device remove call.
238 cfbd = objects.Disk.FromDict(params[0])
239 return backend.BlockdevSnapshot(cfbd)
242 def perspective_blockdev_grow(params):
243 """Grow a stack of devices.
246 cfbd = objects.Disk.FromDict(params[0])
248 return backend.BlockdevGrow(cfbd, amount)
251 def perspective_blockdev_close(params):
252 """Closes the given block devices.
255 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
256 return backend.BlockdevClose(params[0], disks)
259 def perspective_blockdev_getsize(params):
260 """Compute the sizes of the given block devices.
263 disks = [objects.Disk.FromDict(cf) for cf in params[0]]
264 return backend.BlockdevGetsize(disks)
267 def perspective_blockdev_export(params):
268 """Compute the sizes of the given block devices.
271 disk = objects.Disk.FromDict(params[0])
272 dest_node, dest_path, cluster_name = params[1:]
273 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
275 # blockdev/drbd specific methods ----------
278 def perspective_drbd_disconnect_net(params):
279 """Disconnects the network connection of drbd disks.
281 Note that this is only valid for drbd disks, so the members of the
282 disk list must all be drbd devices.
285 nodes_ip, disks = params
286 disks = [objects.Disk.FromDict(cf) for cf in disks]
287 return backend.DrbdDisconnectNet(nodes_ip, disks)
290 def perspective_drbd_attach_net(params):
291 """Attaches the network connection of drbd disks.
293 Note that this is only valid for drbd disks, so the members of the
294 disk list must all be drbd devices.
297 nodes_ip, disks, instance_name, multimaster = params
298 disks = [objects.Disk.FromDict(cf) for cf in disks]
299 return backend.DrbdAttachNet(nodes_ip, disks,
300 instance_name, multimaster)
303 def perspective_drbd_wait_sync(params):
304 """Wait until DRBD disks are synched.
306 Note that this is only valid for drbd disks, so the members of the
307 disk list must all be drbd devices.
310 nodes_ip, disks = params
311 disks = [objects.Disk.FromDict(cf) for cf in disks]
312 return backend.DrbdWaitSync(nodes_ip, disks)
314 # export/import --------------------------
317 def perspective_snapshot_export(params):
318 """Export a given snapshot.
321 disk = objects.Disk.FromDict(params[0])
322 dest_node = params[1]
323 instance = objects.Instance.FromDict(params[2])
324 cluster_name = params[3]
326 return backend.ExportSnapshot(disk, dest_node, instance,
327 cluster_name, dev_idx)
330 def perspective_finalize_export(params):
331 """Expose the finalize export functionality.
334 instance = objects.Instance.FromDict(params[0])
335 snap_disks = [objects.Disk.FromDict(str_data)
336 for str_data in params[1]]
337 return backend.FinalizeExport(instance, snap_disks)
340 def perspective_export_info(params):
341 """Query information about an existing export on this node.
343 The given path may not contain an export, in which case we return
348 return backend.ExportInfo(path)
351 def perspective_export_list(params):
352 """List the available exports on this node.
354 Note that as opposed to export_info, which may query data about an
355 export in any path, this only queries the standard Ganeti path
356 (constants.EXPORT_DIR).
359 return backend.ListExports()
362 def perspective_export_remove(params):
367 return backend.RemoveExport(export)
369 # volume --------------------------
372 def perspective_lv_list(params):
373 """Query the list of logical volumes in a given volume group.
377 return backend.GetVolumeList(vgname)
380 def perspective_vg_list(params):
381 """Query the list of volume groups.
384 return backend.ListVolumeGroups()
386 # Storage --------------------------
389 def perspective_storage_list(params):
390 """Get list of storage units.
393 (su_name, su_args, name, fields) = params
394 return storage.GetStorage(su_name, *su_args).List(name, fields)
397 def perspective_storage_modify(params):
398 """Modify a storage unit.
401 (su_name, su_args, name, changes) = params
402 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
405 def perspective_storage_execute(params):
406 """Execute an operation on a storage unit.
409 (su_name, su_args, name, op) = params
410 return storage.GetStorage(su_name, *su_args).Execute(name, op)
412 # bridge --------------------------
415 def perspective_bridges_exist(params):
416 """Check if all bridges given exist on this node.
419 bridges_list = params[0]
420 return backend.BridgesExist(bridges_list)
422 # instance --------------------------
425 def perspective_instance_os_add(params):
426 """Install an OS on a given instance.
430 inst = objects.Instance.FromDict(inst_s)
431 reinstall = params[1]
432 return backend.InstanceOsAdd(inst, reinstall)
435 def perspective_instance_run_rename(params):
436 """Runs the OS rename script for an instance.
439 inst_s, old_name = params
440 inst = objects.Instance.FromDict(inst_s)
441 return backend.RunRenameInstance(inst, old_name)
444 def perspective_instance_os_import(params):
445 """Run the import function of an OS onto a given instance.
448 inst_s, src_node, src_images, cluster_name = params
449 inst = objects.Instance.FromDict(inst_s)
450 return backend.ImportOSIntoInstance(inst, src_node, src_images,
454 def perspective_instance_shutdown(params):
455 """Shutdown an instance.
458 instance = objects.Instance.FromDict(params[0])
460 return backend.InstanceShutdown(instance, timeout)
463 def perspective_instance_start(params):
464 """Start an instance.
467 instance = objects.Instance.FromDict(params[0])
468 return backend.StartInstance(instance)
471 def perspective_migration_info(params):
472 """Gather information about an instance to be migrated.
475 instance = objects.Instance.FromDict(params[0])
476 return backend.MigrationInfo(instance)
479 def perspective_accept_instance(params):
480 """Prepare the node to accept an instance.
483 instance, info, target = params
484 instance = objects.Instance.FromDict(instance)
485 return backend.AcceptInstance(instance, info, target)
488 def perspective_finalize_migration(params):
489 """Finalize the instance migration.
492 instance, info, success = params
493 instance = objects.Instance.FromDict(instance)
494 return backend.FinalizeMigration(instance, info, success)
497 def perspective_instance_migrate(params):
498 """Migrates an instance.
501 instance, target, live = params
502 instance = objects.Instance.FromDict(instance)
503 return backend.MigrateInstance(instance, target, live)
506 def perspective_instance_reboot(params):
507 """Reboot an instance.
510 instance = objects.Instance.FromDict(params[0])
511 reboot_type = params[1]
512 shutdown_timeout = params[2]
513 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
516 def perspective_instance_info(params):
517 """Query instance information.
520 return backend.GetInstanceInfo(params[0], params[1])
523 def perspective_instance_migratable(params):
524 """Query whether the specified instance can be migrated.
527 instance = objects.Instance.FromDict(params[0])
528 return backend.GetInstanceMigratable(instance)
531 def perspective_all_instances_info(params):
532 """Query information about all instances.
535 return backend.GetAllInstancesInfo(params[0])
538 def perspective_instance_list(params):
539 """Query the list of running instances.
542 return backend.GetInstanceList(params[0])
544 # node --------------------------
547 def perspective_node_tcp_ping(params):
548 """Do a TcpPing on the remote node.
551 return utils.TcpPing(params[1], params[2], timeout=params[3],
552 live_port_needed=params[4], source=params[0])
555 def perspective_node_has_ip_address(params):
556 """Checks if a node has the given ip address.
559 return utils.OwnIpAddress(params[0])
562 def perspective_node_info(params):
563 """Query node information.
566 vgname, hypervisor_type = params
567 return backend.GetNodeInfo(vgname, hypervisor_type)
570 def perspective_node_add(params):
571 """Complete the registration of this node in the cluster.
574 return backend.AddNode(params[0], params[1], params[2],
575 params[3], params[4], params[5])
578 def perspective_node_verify(params):
579 """Run a verify sequence on this node.
582 return backend.VerifyNode(params[0], params[1])
585 def perspective_node_start_master(params):
586 """Promote this node to master status.
589 return backend.StartMaster(params[0], params[1])
592 def perspective_node_stop_master(params):
593 """Demote this node from master status.
596 return backend.StopMaster(params[0])
599 def perspective_node_leave_cluster(params):
600 """Cleanup after leaving a cluster.
603 return backend.LeaveCluster(params[0])
606 def perspective_node_volumes(params):
607 """Query the list of all logical volume groups.
610 return backend.NodeVolumes()
613 def perspective_node_demote_from_mc(params):
614 """Demote a node from the master candidate role.
617 return backend.DemoteFromMC()
621 def perspective_node_powercycle(params):
622 """Tries to powercycle the nod.
625 hypervisor_type = params[0]
626 return backend.PowercycleNode(hypervisor_type)
629 # cluster --------------------------
632 def perspective_version(params):
633 """Query version information.
636 return constants.PROTOCOL_VERSION
639 def perspective_upload_file(params):
642 Note that the backend implementation imposes strict rules on which
646 return backend.UploadFile(*params)
649 def perspective_master_info(params):
650 """Query master information.
653 return backend.GetMasterInfo()
656 def perspective_write_ssconf_files(params):
657 """Write ssconf files.
661 return backend.WriteSsconfFiles(values)
663 # os -----------------------
666 def perspective_os_diagnose(params):
667 """Query detailed information about existing OSes.
670 return backend.DiagnoseOS()
673 def perspective_os_get(params):
674 """Query information about a given OS.
678 os_obj = backend.OSFromDisk(name)
679 return os_obj.ToDict()
681 # hooks -----------------------
684 def perspective_hooks_runner(params):
688 hpath, phase, env = params
689 hr = backend.HooksRunner()
690 return hr.RunHooks(hpath, phase, env)
692 # iallocator -----------------
695 def perspective_iallocator_runner(params):
696 """Run an iallocator script.
700 iar = backend.IAllocatorRunner()
701 return iar.Run(name, idata)
703 # test -----------------------
706 def perspective_test_delay(params):
711 status, rval = utils.TestDelay(duration)
713 raise backend.RPCFail(rval)
716 # file storage ---------------
719 def perspective_file_storage_dir_create(params):
720 """Create the file storage directory.
723 file_storage_dir = params[0]
724 return backend.CreateFileStorageDir(file_storage_dir)
727 def perspective_file_storage_dir_remove(params):
728 """Remove the file storage directory.
731 file_storage_dir = params[0]
732 return backend.RemoveFileStorageDir(file_storage_dir)
735 def perspective_file_storage_dir_rename(params):
736 """Rename the file storage directory.
739 old_file_storage_dir = params[0]
740 new_file_storage_dir = params[1]
741 return backend.RenameFileStorageDir(old_file_storage_dir,
742 new_file_storage_dir)
744 # jobs ------------------------
747 @_RequireJobQueueLock
748 def perspective_jobqueue_update(params):
752 (file_name, content) = params
753 return backend.JobQueueUpdate(file_name, content)
756 @_RequireJobQueueLock
757 def perspective_jobqueue_purge(params):
761 return backend.JobQueuePurge()
764 @_RequireJobQueueLock
765 def perspective_jobqueue_rename(params):
766 """Rename a job queue file.
769 # TODO: What if a file fails to rename?
770 return [backend.JobQueueRename(old, new) for old, new in params]
773 def perspective_jobqueue_set_drain(params):
774 """Set/unset the queue drain flag.
777 drain_flag = params[0]
778 return backend.JobQueueSetDrainFlag(drain_flag)
781 # hypervisor ---------------
784 def perspective_hypervisor_validate_params(params):
785 """Validate the hypervisor parameters.
788 (hvname, hvparams) = params
789 return backend.ValidateHVParams(hvname, hvparams)
792 def CheckNoded(_, args):
793 """Initial checks whether to run or exit with a failure.
796 if args: # noded doesn't take any arguments
797 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
799 sys.exit(constants.EXIT_FAILURE)
802 def ExecNoded(options, _):
803 """Main node daemon function, executed with the PID file held.
806 global queue_lock # pylint: disable-msg=W0603
808 # Read SSL certificate
810 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
811 ssl_cert_path=options.ssl_cert)
816 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
818 mainloop = daemon.Mainloop()
819 server = NodeHttpServer(mainloop, options.bind_address, options.port,
820 ssl_params=ssl_params, ssl_verify_peer=True)
829 """Main function for the node daemon.
832 parser = OptionParser(description="Ganeti node daemon",
833 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
834 version="%%prog (ganeti) %s" %
835 constants.RELEASE_VERSION)
836 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
837 dirs.append((constants.LOG_OS_DIR, 0750))
838 dirs.append((constants.LOCK_DIR, 1777))
839 daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded,
840 default_ssl_cert=constants.SSL_CERT_FILE,
841 default_ssl_key=constants.SSL_CERT_FILE)
844 if __name__ == '__main__':