4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # pylint: disable-msg=C0103,W0142
26 # C0103: Functions in this module need to have a given name structure,
27 # and the name of the daemon doesn't match
29 # W0142: Used * or ** magic, since we do use it extensively in this
37 from optparse import OptionParser
39 from ganeti import backend
40 from ganeti import constants
41 from ganeti import objects
42 from ganeti import errors
43 from ganeti import jstore
44 from ganeti import daemon
45 from ganeti import http
46 from ganeti import utils
47 from ganeti import storage
49 import ganeti.http.server # pylint: disable-msg=W0611
55 def _PrepareQueueLock():
56 """Try to prepare the queue lock.
58 @return: None for success, otherwise an exception object
61 global queue_lock # pylint: disable-msg=W0603
63 if queue_lock is not None:
68 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
70 except EnvironmentError, err:
74 def _RequireJobQueueLock(fn):
75 """Decorator for job queue manipulating functions.
78 QUEUE_LOCK_TIMEOUT = 10
80 def wrapper(*args, **kwargs):
81 # Locking in exclusive, blocking mode because there could be several
82 # children running at the same time. Waiting up to 10 seconds.
83 if _PrepareQueueLock() is not None:
84 raise errors.JobQueueError("Job queue failed initialization,"
85 " cannot update jobs")
86 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
88 return fn(*args, **kwargs)
95 class NodeHttpServer(http.server.HttpServer):
96 """The server implementation.
98 This class holds all methods exposed over the RPC interface.
101 # too many public methods, and unused args - all methods get params
103 # pylint: disable-msg=R0904,W0613
104 def __init__(self, *args, **kwargs):
105 http.server.HttpServer.__init__(self, *args, **kwargs)
106 self.noded_pid = os.getpid()
108 def HandleRequest(self, req):
112 if req.request_method.upper() != http.HTTP_PUT:
113 raise http.HttpBadRequest()
115 path = req.request_path
116 if path.startswith("/"):
119 method = getattr(self, "perspective_%s" % path, None)
121 raise http.HttpNotFound()
124 rvalue = method(req.request_body)
127 except backend.RPCFail, err:
128 # our custom failure exception; str(err) works fine if the
129 # exception was constructed with a single argument, and in
130 # this case, err.message == err.args[0] == str(err)
131 return (False, str(err))
132 except errors.QuitGanetiException, err:
133 # Tell parent to quit
134 logging.info("Shutting down the node daemon, arguments: %s",
136 os.kill(self.noded_pid, signal.SIGTERM)
137 # And return the error's arguments, which must be already in
138 # correct tuple format
140 except Exception, err:
141 logging.exception("Error in RPC call")
142 return False, "Error while executing backend function: %s" % str(err)
144 # the new block devices --------------------------
147 def perspective_blockdev_create(params):
148 """Create a block device.
151 bdev_s, size, owner, on_primary, info = params
152 bdev = objects.Disk.FromDict(bdev_s)
154 raise ValueError("can't unserialize data!")
155 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
158 def perspective_blockdev_remove(params):
159 """Remove a block device.
163 bdev = objects.Disk.FromDict(bdev_s)
164 return backend.BlockdevRemove(bdev)
167 def perspective_blockdev_rename(params):
168 """Remove a block device.
171 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
172 return backend.BlockdevRename(devlist)
175 def perspective_blockdev_assemble(params):
176 """Assemble a block device.
179 bdev_s, owner, on_primary = params
180 bdev = objects.Disk.FromDict(bdev_s)
182 raise ValueError("can't unserialize data!")
183 return backend.BlockdevAssemble(bdev, owner, on_primary)
186 def perspective_blockdev_shutdown(params):
187 """Shutdown a block device.
191 bdev = objects.Disk.FromDict(bdev_s)
193 raise ValueError("can't unserialize data!")
194 return backend.BlockdevShutdown(bdev)
197 def perspective_blockdev_addchildren(params):
198 """Add a child to a mirror device.
200 Note: this is only valid for mirror devices. It's the caller's duty
201 to send a correct disk, otherwise we raise an error.
204 bdev_s, ndev_s = params
205 bdev = objects.Disk.FromDict(bdev_s)
206 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
207 if bdev is None or ndevs.count(None) > 0:
208 raise ValueError("can't unserialize data!")
209 return backend.BlockdevAddchildren(bdev, ndevs)
212 def perspective_blockdev_removechildren(params):
213 """Remove a child from a mirror device.
215 This is only valid for mirror devices, of course. It's the callers
216 duty to send a correct disk, otherwise we raise an error.
219 bdev_s, ndev_s = params
220 bdev = objects.Disk.FromDict(bdev_s)
221 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
222 if bdev is None or ndevs.count(None) > 0:
223 raise ValueError("can't unserialize data!")
224 return backend.BlockdevRemovechildren(bdev, ndevs)
227 def perspective_blockdev_getmirrorstatus(params):
228 """Return the mirror status for a list of disks.
231 disks = [objects.Disk.FromDict(dsk_s)
233 return [status.ToDict()
234 for status in backend.BlockdevGetmirrorstatus(disks)]
237 def perspective_blockdev_find(params):
238 """Expose the FindBlockDevice functionality for a disk.
240 This will try to find but not activate a disk.
243 disk = objects.Disk.FromDict(params[0])
245 result = backend.BlockdevFind(disk)
249 return result.ToDict()
252 def perspective_blockdev_snapshot(params):
253 """Create a snapshot device.
255 Note that this is only valid for LVM disks, if we get passed
256 something else we raise an exception. The snapshot device can be
257 remove by calling the generic block device remove call.
260 cfbd = objects.Disk.FromDict(params[0])
261 return backend.BlockdevSnapshot(cfbd)
264 def perspective_blockdev_grow(params):
265 """Grow a stack of devices.
268 cfbd = objects.Disk.FromDict(params[0])
270 return backend.BlockdevGrow(cfbd, amount)
273 def perspective_blockdev_close(params):
274 """Closes the given block devices.
277 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
278 return backend.BlockdevClose(params[0], disks)
281 def perspective_blockdev_getsize(params):
282 """Compute the sizes of the given block devices.
285 disks = [objects.Disk.FromDict(cf) for cf in params[0]]
286 return backend.BlockdevGetsize(disks)
289 def perspective_blockdev_export(params):
290 """Compute the sizes of the given block devices.
293 disk = objects.Disk.FromDict(params[0])
294 dest_node, dest_path, cluster_name = params[1:]
295 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
297 # blockdev/drbd specific methods ----------
300 def perspective_drbd_disconnect_net(params):
301 """Disconnects the network connection of drbd disks.
303 Note that this is only valid for drbd disks, so the members of the
304 disk list must all be drbd devices.
307 nodes_ip, disks = params
308 disks = [objects.Disk.FromDict(cf) for cf in disks]
309 return backend.DrbdDisconnectNet(nodes_ip, disks)
312 def perspective_drbd_attach_net(params):
313 """Attaches the network connection of drbd disks.
315 Note that this is only valid for drbd disks, so the members of the
316 disk list must all be drbd devices.
319 nodes_ip, disks, instance_name, multimaster = params
320 disks = [objects.Disk.FromDict(cf) for cf in disks]
321 return backend.DrbdAttachNet(nodes_ip, disks,
322 instance_name, multimaster)
325 def perspective_drbd_wait_sync(params):
326 """Wait until DRBD disks are synched.
328 Note that this is only valid for drbd disks, so the members of the
329 disk list must all be drbd devices.
332 nodes_ip, disks = params
333 disks = [objects.Disk.FromDict(cf) for cf in disks]
334 return backend.DrbdWaitSync(nodes_ip, disks)
336 # export/import --------------------------
339 def perspective_snapshot_export(params):
340 """Export a given snapshot.
343 disk = objects.Disk.FromDict(params[0])
344 dest_node = params[1]
345 instance = objects.Instance.FromDict(params[2])
346 cluster_name = params[3]
348 return backend.ExportSnapshot(disk, dest_node, instance,
349 cluster_name, dev_idx)
352 def perspective_finalize_export(params):
353 """Expose the finalize export functionality.
356 instance = objects.Instance.FromDict(params[0])
357 snap_disks = [objects.Disk.FromDict(str_data)
358 for str_data in params[1]]
359 return backend.FinalizeExport(instance, snap_disks)
362 def perspective_export_info(params):
363 """Query information about an existing export on this node.
365 The given path may not contain an export, in which case we return
370 return backend.ExportInfo(path)
373 def perspective_export_list(params):
374 """List the available exports on this node.
376 Note that as opposed to export_info, which may query data about an
377 export in any path, this only queries the standard Ganeti path
378 (constants.EXPORT_DIR).
381 return backend.ListExports()
384 def perspective_export_remove(params):
389 return backend.RemoveExport(export)
391 # volume --------------------------
394 def perspective_lv_list(params):
395 """Query the list of logical volumes in a given volume group.
399 return backend.GetVolumeList(vgname)
402 def perspective_vg_list(params):
403 """Query the list of volume groups.
406 return backend.ListVolumeGroups()
408 # Storage --------------------------
411 def perspective_storage_list(params):
412 """Get list of storage units.
415 (su_name, su_args, name, fields) = params
416 return storage.GetStorage(su_name, *su_args).List(name, fields)
419 def perspective_storage_modify(params):
420 """Modify a storage unit.
423 (su_name, su_args, name, changes) = params
424 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
427 def perspective_storage_execute(params):
428 """Execute an operation on a storage unit.
431 (su_name, su_args, name, op) = params
432 return storage.GetStorage(su_name, *su_args).Execute(name, op)
434 # bridge --------------------------
437 def perspective_bridges_exist(params):
438 """Check if all bridges given exist on this node.
441 bridges_list = params[0]
442 return backend.BridgesExist(bridges_list)
444 # instance --------------------------
447 def perspective_instance_os_add(params):
448 """Install an OS on a given instance.
452 inst = objects.Instance.FromDict(inst_s)
453 reinstall = params[1]
454 return backend.InstanceOsAdd(inst, reinstall)
457 def perspective_instance_run_rename(params):
458 """Runs the OS rename script for an instance.
461 inst_s, old_name = params
462 inst = objects.Instance.FromDict(inst_s)
463 return backend.RunRenameInstance(inst, old_name)
466 def perspective_instance_os_import(params):
467 """Run the import function of an OS onto a given instance.
470 inst_s, src_node, src_images, cluster_name = params
471 inst = objects.Instance.FromDict(inst_s)
472 return backend.ImportOSIntoInstance(inst, src_node, src_images,
476 def perspective_instance_shutdown(params):
477 """Shutdown an instance.
480 instance = objects.Instance.FromDict(params[0])
482 return backend.InstanceShutdown(instance, timeout)
485 def perspective_instance_start(params):
486 """Start an instance.
489 instance = objects.Instance.FromDict(params[0])
490 return backend.StartInstance(instance)
493 def perspective_migration_info(params):
494 """Gather information about an instance to be migrated.
497 instance = objects.Instance.FromDict(params[0])
498 return backend.MigrationInfo(instance)
501 def perspective_accept_instance(params):
502 """Prepare the node to accept an instance.
505 instance, info, target = params
506 instance = objects.Instance.FromDict(instance)
507 return backend.AcceptInstance(instance, info, target)
510 def perspective_finalize_migration(params):
511 """Finalize the instance migration.
514 instance, info, success = params
515 instance = objects.Instance.FromDict(instance)
516 return backend.FinalizeMigration(instance, info, success)
519 def perspective_instance_migrate(params):
520 """Migrates an instance.
523 instance, target, live = params
524 instance = objects.Instance.FromDict(instance)
525 return backend.MigrateInstance(instance, target, live)
528 def perspective_instance_reboot(params):
529 """Reboot an instance.
532 instance = objects.Instance.FromDict(params[0])
533 reboot_type = params[1]
534 shutdown_timeout = params[2]
535 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
538 def perspective_instance_info(params):
539 """Query instance information.
542 return backend.GetInstanceInfo(params[0], params[1])
545 def perspective_instance_migratable(params):
546 """Query whether the specified instance can be migrated.
549 instance = objects.Instance.FromDict(params[0])
550 return backend.GetInstanceMigratable(instance)
553 def perspective_all_instances_info(params):
554 """Query information about all instances.
557 return backend.GetAllInstancesInfo(params[0])
560 def perspective_instance_list(params):
561 """Query the list of running instances.
564 return backend.GetInstanceList(params[0])
566 # node --------------------------
569 def perspective_node_tcp_ping(params):
570 """Do a TcpPing on the remote node.
573 return utils.TcpPing(params[1], params[2], timeout=params[3],
574 live_port_needed=params[4], source=params[0])
577 def perspective_node_has_ip_address(params):
578 """Checks if a node has the given ip address.
581 return utils.OwnIpAddress(params[0])
584 def perspective_node_info(params):
585 """Query node information.
588 vgname, hypervisor_type = params
589 return backend.GetNodeInfo(vgname, hypervisor_type)
592 def perspective_node_add(params):
593 """Complete the registration of this node in the cluster.
596 return backend.AddNode(params[0], params[1], params[2],
597 params[3], params[4], params[5])
600 def perspective_node_verify(params):
601 """Run a verify sequence on this node.
604 return backend.VerifyNode(params[0], params[1])
607 def perspective_node_start_master(params):
608 """Promote this node to master status.
611 return backend.StartMaster(params[0], params[1])
614 def perspective_node_stop_master(params):
615 """Demote this node from master status.
618 return backend.StopMaster(params[0])
621 def perspective_node_leave_cluster(params):
622 """Cleanup after leaving a cluster.
625 return backend.LeaveCluster(params[0])
628 def perspective_node_volumes(params):
629 """Query the list of all logical volume groups.
632 return backend.NodeVolumes()
635 def perspective_node_demote_from_mc(params):
636 """Demote a node from the master candidate role.
639 return backend.DemoteFromMC()
643 def perspective_node_powercycle(params):
644 """Tries to powercycle the nod.
647 hypervisor_type = params[0]
648 return backend.PowercycleNode(hypervisor_type)
651 # cluster --------------------------
654 def perspective_version(params):
655 """Query version information.
658 return constants.PROTOCOL_VERSION
661 def perspective_upload_file(params):
664 Note that the backend implementation imposes strict rules on which
668 return backend.UploadFile(*params)
671 def perspective_master_info(params):
672 """Query master information.
675 return backend.GetMasterInfo()
678 def perspective_write_ssconf_files(params):
679 """Write ssconf files.
683 return backend.WriteSsconfFiles(values)
685 # os -----------------------
688 def perspective_os_diagnose(params):
689 """Query detailed information about existing OSes.
692 return backend.DiagnoseOS()
695 def perspective_os_get(params):
696 """Query information about a given OS.
700 os_obj = backend.OSFromDisk(name)
701 return os_obj.ToDict()
703 # hooks -----------------------
706 def perspective_hooks_runner(params):
710 hpath, phase, env = params
711 hr = backend.HooksRunner()
712 return hr.RunHooks(hpath, phase, env)
714 # iallocator -----------------
717 def perspective_iallocator_runner(params):
718 """Run an iallocator script.
722 iar = backend.IAllocatorRunner()
723 return iar.Run(name, idata)
725 # test -----------------------
728 def perspective_test_delay(params):
733 status, rval = utils.TestDelay(duration)
735 raise backend.RPCFail(rval)
738 # file storage ---------------
741 def perspective_file_storage_dir_create(params):
742 """Create the file storage directory.
745 file_storage_dir = params[0]
746 return backend.CreateFileStorageDir(file_storage_dir)
749 def perspective_file_storage_dir_remove(params):
750 """Remove the file storage directory.
753 file_storage_dir = params[0]
754 return backend.RemoveFileStorageDir(file_storage_dir)
757 def perspective_file_storage_dir_rename(params):
758 """Rename the file storage directory.
761 old_file_storage_dir = params[0]
762 new_file_storage_dir = params[1]
763 return backend.RenameFileStorageDir(old_file_storage_dir,
764 new_file_storage_dir)
766 # jobs ------------------------
769 @_RequireJobQueueLock
770 def perspective_jobqueue_update(params):
774 (file_name, content) = params
775 return backend.JobQueueUpdate(file_name, content)
778 @_RequireJobQueueLock
779 def perspective_jobqueue_purge(params):
783 return backend.JobQueuePurge()
786 @_RequireJobQueueLock
787 def perspective_jobqueue_rename(params):
788 """Rename a job queue file.
791 # TODO: What if a file fails to rename?
792 return [backend.JobQueueRename(old, new) for old, new in params]
795 def perspective_jobqueue_set_drain(params):
796 """Set/unset the queue drain flag.
799 drain_flag = params[0]
800 return backend.JobQueueSetDrainFlag(drain_flag)
803 # hypervisor ---------------
806 def perspective_hypervisor_validate_params(params):
807 """Validate the hypervisor parameters.
810 (hvname, hvparams) = params
811 return backend.ValidateHVParams(hvname, hvparams)
814 def CheckNoded(_, args):
815 """Initial checks whether to run or exit with a failure.
818 if args: # noded doesn't take any arguments
819 print >> sys.stderr, ("Usage: %s [-f] [-d] [-p port] [-b ADDRESS]" %
821 sys.exit(constants.EXIT_FAILURE)
824 def ExecNoded(options, _):
825 """Main node daemon function, executed with the PID file held.
828 # Read SSL certificate
830 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
831 ssl_cert_path=options.ssl_cert)
835 err = _PrepareQueueLock()
837 # this might be some kind of file-system/permission error; while
838 # this breaks the job queue functionality, we shouldn't prevent
839 # startup of the whole node daemon because of this
840 logging.critical("Can't init/verify the queue, proceeding anyway: %s", err)
842 mainloop = daemon.Mainloop()
843 server = NodeHttpServer(mainloop, options.bind_address, options.port,
844 ssl_params=ssl_params, ssl_verify_peer=True)
853 """Main function for the node daemon.
856 parser = OptionParser(description="Ganeti node daemon",
857 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
858 version="%%prog (ganeti) %s" %
859 constants.RELEASE_VERSION)
860 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
861 dirs.append((constants.LOG_OS_DIR, 0750))
862 dirs.append((constants.LOCK_DIR, 1777))
863 daemon.GenericMain(constants.NODED, parser, dirs, CheckNoded, ExecNoded)
866 if __name__ == '__main__':