4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
33 from optparse import OptionParser
35 from ganeti import backend
36 from ganeti import constants
37 from ganeti import objects
38 from ganeti import errors
39 from ganeti import jstore
40 from ganeti import daemon
41 from ganeti import http
42 from ganeti import utils
43 from ganeti import storage
45 import ganeti.http.server
51 def _RequireJobQueueLock(fn):
52 """Decorator for job queue manipulating functions.
55 QUEUE_LOCK_TIMEOUT = 10
57 def wrapper(*args, **kwargs):
58 # Locking in exclusive, blocking mode because there could be several
59 # children running at the same time. Waiting up to 10 seconds.
60 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
62 return fn(*args, **kwargs)
69 class NodeHttpServer(http.server.HttpServer):
70 """The server implementation.
72 This class holds all methods exposed over the RPC interface.
75 def __init__(self, *args, **kwargs):
76 http.server.HttpServer.__init__(self, *args, **kwargs)
77 self.noded_pid = os.getpid()
79 def HandleRequest(self, req):
83 if req.request_method.upper() != http.HTTP_PUT:
84 raise http.HttpBadRequest()
86 path = req.request_path
87 if path.startswith("/"):
90 method = getattr(self, "perspective_%s" % path, None)
92 raise http.HttpNotFound()
95 rvalue = method(req.request_body)
98 except backend.RPCFail, err:
99 # our custom failure exception; str(err) works fine if the
100 # exception was constructed with a single argument, and in
101 # this case, err.message == err.args[0] == str(err)
102 return (False, str(err))
103 except errors.QuitGanetiException, err:
104 # Tell parent to quit
105 logging.info("Shutting down the node daemon, arguments: %s",
107 os.kill(self.noded_pid, signal.SIGTERM)
108 # And return the error's arguments, which must be already in
109 # correct tuple format
111 except Exception, err:
112 logging.exception("Error in RPC call")
113 return False, "Error while executing backend function: %s" % str(err)
115 # the new block devices --------------------------
118 def perspective_blockdev_create(params):
119 """Create a block device.
122 bdev_s, size, owner, on_primary, info = params
123 bdev = objects.Disk.FromDict(bdev_s)
125 raise ValueError("can't unserialize data!")
126 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
129 def perspective_blockdev_remove(params):
130 """Remove a block device.
134 bdev = objects.Disk.FromDict(bdev_s)
135 return backend.BlockdevRemove(bdev)
138 def perspective_blockdev_rename(params):
139 """Remove a block device.
142 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
143 return backend.BlockdevRename(devlist)
146 def perspective_blockdev_assemble(params):
147 """Assemble a block device.
150 bdev_s, owner, on_primary = params
151 bdev = objects.Disk.FromDict(bdev_s)
153 raise ValueError("can't unserialize data!")
154 return backend.BlockdevAssemble(bdev, owner, on_primary)
157 def perspective_blockdev_shutdown(params):
158 """Shutdown a block device.
162 bdev = objects.Disk.FromDict(bdev_s)
164 raise ValueError("can't unserialize data!")
165 return backend.BlockdevShutdown(bdev)
168 def perspective_blockdev_addchildren(params):
169 """Add a child to a mirror device.
171 Note: this is only valid for mirror devices. It's the caller's duty
172 to send a correct disk, otherwise we raise an error.
175 bdev_s, ndev_s = params
176 bdev = objects.Disk.FromDict(bdev_s)
177 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
178 if bdev is None or ndevs.count(None) > 0:
179 raise ValueError("can't unserialize data!")
180 return backend.BlockdevAddchildren(bdev, ndevs)
183 def perspective_blockdev_removechildren(params):
184 """Remove a child from a mirror device.
186 This is only valid for mirror devices, of course. It's the callers
187 duty to send a correct disk, otherwise we raise an error.
190 bdev_s, ndev_s = params
191 bdev = objects.Disk.FromDict(bdev_s)
192 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
193 if bdev is None or ndevs.count(None) > 0:
194 raise ValueError("can't unserialize data!")
195 return backend.BlockdevRemovechildren(bdev, ndevs)
198 def perspective_blockdev_getmirrorstatus(params):
199 """Return the mirror status for a list of disks.
202 disks = [objects.Disk.FromDict(dsk_s)
204 return [status.ToDict()
205 for status in backend.BlockdevGetmirrorstatus(disks)]
208 def perspective_blockdev_find(params):
209 """Expose the FindBlockDevice functionality for a disk.
211 This will try to find but not activate a disk.
214 disk = objects.Disk.FromDict(params[0])
216 result = backend.BlockdevFind(disk)
220 return result.ToDict()
223 def perspective_blockdev_snapshot(params):
224 """Create a snapshot device.
226 Note that this is only valid for LVM disks, if we get passed
227 something else we raise an exception. The snapshot device can be
228 remove by calling the generic block device remove call.
231 cfbd = objects.Disk.FromDict(params[0])
232 return backend.BlockdevSnapshot(cfbd)
235 def perspective_blockdev_grow(params):
236 """Grow a stack of devices.
239 cfbd = objects.Disk.FromDict(params[0])
241 return backend.BlockdevGrow(cfbd, amount)
244 def perspective_blockdev_close(params):
245 """Closes the given block devices.
248 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
249 return backend.BlockdevClose(params[0], disks)
252 def perspective_blockdev_getsize(params):
253 """Compute the sizes of the given block devices.
256 disks = [objects.Disk.FromDict(cf) for cf in params[0]]
257 return backend.BlockdevGetsize(disks)
260 def perspective_blockdev_export(params):
261 """Compute the sizes of the given block devices.
264 disk = objects.Disk.FromDict(params[0])
265 dest_node, dest_path, cluster_name = params[1:]
266 return backend.BlockdevExport(disk, dest_node, dest_path, cluster_name)
268 # blockdev/drbd specific methods ----------
271 def perspective_drbd_disconnect_net(params):
272 """Disconnects the network connection of drbd disks.
274 Note that this is only valid for drbd disks, so the members of the
275 disk list must all be drbd devices.
278 nodes_ip, disks = params
279 disks = [objects.Disk.FromDict(cf) for cf in disks]
280 return backend.DrbdDisconnectNet(nodes_ip, disks)
283 def perspective_drbd_attach_net(params):
284 """Attaches the network connection of drbd disks.
286 Note that this is only valid for drbd disks, so the members of the
287 disk list must all be drbd devices.
290 nodes_ip, disks, instance_name, multimaster = params
291 disks = [objects.Disk.FromDict(cf) for cf in disks]
292 return backend.DrbdAttachNet(nodes_ip, disks,
293 instance_name, multimaster)
296 def perspective_drbd_wait_sync(params):
297 """Wait until DRBD disks are synched.
299 Note that this is only valid for drbd disks, so the members of the
300 disk list must all be drbd devices.
303 nodes_ip, disks = params
304 disks = [objects.Disk.FromDict(cf) for cf in disks]
305 return backend.DrbdWaitSync(nodes_ip, disks)
307 # export/import --------------------------
310 def perspective_snapshot_export(params):
311 """Export a given snapshot.
314 disk = objects.Disk.FromDict(params[0])
315 dest_node = params[1]
316 instance = objects.Instance.FromDict(params[2])
317 cluster_name = params[3]
319 return backend.ExportSnapshot(disk, dest_node, instance,
320 cluster_name, dev_idx)
323 def perspective_finalize_export(params):
324 """Expose the finalize export functionality.
327 instance = objects.Instance.FromDict(params[0])
328 snap_disks = [objects.Disk.FromDict(str_data)
329 for str_data in params[1]]
330 return backend.FinalizeExport(instance, snap_disks)
333 def perspective_export_info(params):
334 """Query information about an existing export on this node.
336 The given path may not contain an export, in which case we return
341 return backend.ExportInfo(path)
344 def perspective_export_list(params):
345 """List the available exports on this node.
347 Note that as opposed to export_info, which may query data about an
348 export in any path, this only queries the standard Ganeti path
349 (constants.EXPORT_DIR).
352 return backend.ListExports()
355 def perspective_export_remove(params):
360 return backend.RemoveExport(export)
362 # volume --------------------------
365 def perspective_lv_list(params):
366 """Query the list of logical volumes in a given volume group.
370 return backend.GetVolumeList(vgname)
373 def perspective_vg_list(params):
374 """Query the list of volume groups.
377 return backend.ListVolumeGroups()
379 # Storage --------------------------
382 def perspective_storage_list(params):
383 """Get list of storage units.
386 (su_name, su_args, name, fields) = params
387 return storage.GetStorage(su_name, *su_args).List(name, fields)
390 def perspective_storage_modify(params):
391 """Modify a storage unit.
394 (su_name, su_args, name, changes) = params
395 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
398 def perspective_storage_execute(params):
399 """Execute an operation on a storage unit.
402 (su_name, su_args, name, op) = params
403 return storage.GetStorage(su_name, *su_args).Execute(name, op)
405 # bridge --------------------------
408 def perspective_bridges_exist(params):
409 """Check if all bridges given exist on this node.
412 bridges_list = params[0]
413 return backend.BridgesExist(bridges_list)
415 # instance --------------------------
418 def perspective_instance_os_add(params):
419 """Install an OS on a given instance.
423 inst = objects.Instance.FromDict(inst_s)
424 reinstall = params[1]
425 return backend.InstanceOsAdd(inst, reinstall)
428 def perspective_instance_run_rename(params):
429 """Runs the OS rename script for an instance.
432 inst_s, old_name = params
433 inst = objects.Instance.FromDict(inst_s)
434 return backend.RunRenameInstance(inst, old_name)
437 def perspective_instance_os_import(params):
438 """Run the import function of an OS onto a given instance.
441 inst_s, src_node, src_images, cluster_name = params
442 inst = objects.Instance.FromDict(inst_s)
443 return backend.ImportOSIntoInstance(inst, src_node, src_images,
447 def perspective_instance_shutdown(params):
448 """Shutdown an instance.
451 instance = objects.Instance.FromDict(params[0])
453 return backend.InstanceShutdown(instance, timeout)
456 def perspective_instance_start(params):
457 """Start an instance.
460 instance = objects.Instance.FromDict(params[0])
461 return backend.StartInstance(instance)
464 def perspective_migration_info(params):
465 """Gather information about an instance to be migrated.
468 instance = objects.Instance.FromDict(params[0])
469 return backend.MigrationInfo(instance)
472 def perspective_accept_instance(params):
473 """Prepare the node to accept an instance.
476 instance, info, target = params
477 instance = objects.Instance.FromDict(instance)
478 return backend.AcceptInstance(instance, info, target)
481 def perspective_finalize_migration(params):
482 """Finalize the instance migration.
485 instance, info, success = params
486 instance = objects.Instance.FromDict(instance)
487 return backend.FinalizeMigration(instance, info, success)
490 def perspective_instance_migrate(params):
491 """Migrates an instance.
494 instance, target, live = params
495 instance = objects.Instance.FromDict(instance)
496 return backend.MigrateInstance(instance, target, live)
499 def perspective_instance_reboot(params):
500 """Reboot an instance.
503 instance = objects.Instance.FromDict(params[0])
504 reboot_type = params[1]
505 shutdown_timeout = params[2]
506 return backend.InstanceReboot(instance, reboot_type, shutdown_timeout)
509 def perspective_instance_info(params):
510 """Query instance information.
513 return backend.GetInstanceInfo(params[0], params[1])
516 def perspective_instance_migratable(params):
517 """Query whether the specified instance can be migrated.
520 instance = objects.Instance.FromDict(params[0])
521 return backend.GetInstanceMigratable(instance)
524 def perspective_all_instances_info(params):
525 """Query information about all instances.
528 return backend.GetAllInstancesInfo(params[0])
531 def perspective_instance_list(params):
532 """Query the list of running instances.
535 return backend.GetInstanceList(params[0])
537 # node --------------------------
540 def perspective_node_tcp_ping(params):
541 """Do a TcpPing on the remote node.
544 return utils.TcpPing(params[1], params[2], timeout=params[3],
545 live_port_needed=params[4], source=params[0])
548 def perspective_node_has_ip_address(params):
549 """Checks if a node has the given ip address.
552 return utils.OwnIpAddress(params[0])
555 def perspective_node_info(params):
556 """Query node information.
559 vgname, hypervisor_type = params
560 return backend.GetNodeInfo(vgname, hypervisor_type)
563 def perspective_node_add(params):
564 """Complete the registration of this node in the cluster.
567 return backend.AddNode(params[0], params[1], params[2],
568 params[3], params[4], params[5])
571 def perspective_node_verify(params):
572 """Run a verify sequence on this node.
575 return backend.VerifyNode(params[0], params[1])
578 def perspective_node_start_master(params):
579 """Promote this node to master status.
582 return backend.StartMaster(params[0], params[1])
585 def perspective_node_stop_master(params):
586 """Demote this node from master status.
589 return backend.StopMaster(params[0])
592 def perspective_node_leave_cluster(params):
593 """Cleanup after leaving a cluster.
596 return backend.LeaveCluster(params[0])
599 def perspective_node_volumes(params):
600 """Query the list of all logical volume groups.
603 return backend.NodeVolumes()
606 def perspective_node_demote_from_mc(params):
607 """Demote a node from the master candidate role.
610 return backend.DemoteFromMC()
614 def perspective_node_powercycle(params):
615 """Tries to powercycle the nod.
618 hypervisor_type = params[0]
619 return backend.PowercycleNode(hypervisor_type)
622 # cluster --------------------------
625 def perspective_version(params):
626 """Query version information.
629 return constants.PROTOCOL_VERSION
632 def perspective_upload_file(params):
635 Note that the backend implementation imposes strict rules on which
639 return backend.UploadFile(*params)
642 def perspective_master_info(params):
643 """Query master information.
646 return backend.GetMasterInfo()
649 def perspective_write_ssconf_files(params):
650 """Write ssconf files.
654 return backend.WriteSsconfFiles(values)
656 # os -----------------------
659 def perspective_os_diagnose(params):
660 """Query detailed information about existing OSes.
663 return backend.DiagnoseOS()
666 def perspective_os_get(params):
667 """Query information about a given OS.
671 os_obj = backend.OSFromDisk(name)
672 return os_obj.ToDict()
674 # hooks -----------------------
677 def perspective_hooks_runner(params):
681 hpath, phase, env = params
682 hr = backend.HooksRunner()
683 return hr.RunHooks(hpath, phase, env)
685 # iallocator -----------------
688 def perspective_iallocator_runner(params):
689 """Run an iallocator script.
693 iar = backend.IAllocatorRunner()
694 return iar.Run(name, idata)
696 # test -----------------------
699 def perspective_test_delay(params):
704 status, rval = utils.TestDelay(duration)
706 raise backend.RPCFail(rval)
709 # file storage ---------------
712 def perspective_file_storage_dir_create(params):
713 """Create the file storage directory.
716 file_storage_dir = params[0]
717 return backend.CreateFileStorageDir(file_storage_dir)
720 def perspective_file_storage_dir_remove(params):
721 """Remove the file storage directory.
724 file_storage_dir = params[0]
725 return backend.RemoveFileStorageDir(file_storage_dir)
728 def perspective_file_storage_dir_rename(params):
729 """Rename the file storage directory.
732 old_file_storage_dir = params[0]
733 new_file_storage_dir = params[1]
734 return backend.RenameFileStorageDir(old_file_storage_dir,
735 new_file_storage_dir)
737 # jobs ------------------------
740 @_RequireJobQueueLock
741 def perspective_jobqueue_update(params):
745 (file_name, content) = params
746 return backend.JobQueueUpdate(file_name, content)
749 @_RequireJobQueueLock
750 def perspective_jobqueue_purge(params):
754 return backend.JobQueuePurge()
757 @_RequireJobQueueLock
758 def perspective_jobqueue_rename(params):
759 """Rename a job queue file.
762 # TODO: What if a file fails to rename?
763 return [backend.JobQueueRename(old, new) for old, new in params]
766 def perspective_jobqueue_set_drain(params):
767 """Set/unset the queue drain flag.
770 drain_flag = params[0]
771 return backend.JobQueueSetDrainFlag(drain_flag)
774 # hypervisor ---------------
777 def perspective_hypervisor_validate_params(params):
778 """Validate the hypervisor parameters.
781 (hvname, hvparams) = params
782 return backend.ValidateHVParams(hvname, hvparams)
785 def ExecNoded(options, args):
786 """Main node daemon function, executed with the PID file held.
791 # Read SSL certificate
793 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
794 ssl_cert_path=options.ssl_cert)
799 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
801 mainloop = daemon.Mainloop()
802 server = NodeHttpServer(mainloop, options.bind_address, options.port,
803 ssl_params=ssl_params, ssl_verify_peer=True)
812 """Main function for the node daemon.
815 parser = OptionParser(description="Ganeti node daemon",
816 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
817 version="%%prog (ganeti) %s" %
818 constants.RELEASE_VERSION)
819 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
820 dirs.append((constants.LOG_OS_DIR, 0750))
821 dirs.append((constants.LOCK_DIR, 1777))
822 daemon.GenericMain(constants.NODED, parser, dirs, None, ExecNoded,
823 default_ssl_cert=constants.SSL_CERT_FILE,
824 default_ssl_key=constants.SSL_CERT_FILE)
827 if __name__ == '__main__':