4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
35 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
45 from ganeti import storage
47 import ganeti.http.server
53 def _RequireJobQueueLock(fn):
54 """Decorator for job queue manipulating functions.
57 QUEUE_LOCK_TIMEOUT = 10
59 def wrapper(*args, **kwargs):
60 # Locking in exclusive, blocking mode because there could be several
61 # children running at the same time. Waiting up to 10 seconds.
62 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
64 return fn(*args, **kwargs)
71 class NodeHttpServer(http.server.HttpServer):
72 """The server implementation.
74 This class holds all methods exposed over the RPC interface.
77 def __init__(self, *args, **kwargs):
78 http.server.HttpServer.__init__(self, *args, **kwargs)
79 self.noded_pid = os.getpid()
81 def HandleRequest(self, req):
85 if req.request_method.upper() != http.HTTP_PUT:
86 raise http.HttpBadRequest()
88 path = req.request_path
89 if path.startswith("/"):
92 method = getattr(self, "perspective_%s" % path, None)
94 raise http.HttpNotFound()
97 rvalue = method(req.request_body)
100 except backend.RPCFail, err:
101 # our custom failure exception; str(err) works fine if the
102 # exception was constructed with a single argument, and in
103 # this case, err.message == err.args[0] == str(err)
104 return (False, str(err))
105 except errors.QuitGanetiException, err:
106 # Tell parent to quit
107 logging.info("Shutting down the node daemon, arguments: %s",
109 os.kill(self.noded_pid, signal.SIGTERM)
110 # And return the error's arguments, which must be already in
111 # correct tuple format
113 except Exception, err:
114 logging.exception("Error in RPC call")
115 return False, "Error while executing backend function: %s" % str(err)
117 # the new block devices --------------------------
120 def perspective_blockdev_create(params):
121 """Create a block device.
124 bdev_s, size, owner, on_primary, info = params
125 bdev = objects.Disk.FromDict(bdev_s)
127 raise ValueError("can't unserialize data!")
128 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
131 def perspective_blockdev_remove(params):
132 """Remove a block device.
136 bdev = objects.Disk.FromDict(bdev_s)
137 return backend.BlockdevRemove(bdev)
140 def perspective_blockdev_rename(params):
141 """Remove a block device.
144 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
145 return backend.BlockdevRename(devlist)
148 def perspective_blockdev_assemble(params):
149 """Assemble a block device.
152 bdev_s, owner, on_primary = params
153 bdev = objects.Disk.FromDict(bdev_s)
155 raise ValueError("can't unserialize data!")
156 return backend.BlockdevAssemble(bdev, owner, on_primary)
159 def perspective_blockdev_shutdown(params):
160 """Shutdown a block device.
164 bdev = objects.Disk.FromDict(bdev_s)
166 raise ValueError("can't unserialize data!")
167 return backend.BlockdevShutdown(bdev)
170 def perspective_blockdev_addchildren(params):
171 """Add a child to a mirror device.
173 Note: this is only valid for mirror devices. It's the caller's duty
174 to send a correct disk, otherwise we raise an error.
177 bdev_s, ndev_s = params
178 bdev = objects.Disk.FromDict(bdev_s)
179 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
180 if bdev is None or ndevs.count(None) > 0:
181 raise ValueError("can't unserialize data!")
182 return backend.BlockdevAddchildren(bdev, ndevs)
185 def perspective_blockdev_removechildren(params):
186 """Remove a child from a mirror device.
188 This is only valid for mirror devices, of course. It's the callers
189 duty to send a correct disk, otherwise we raise an error.
192 bdev_s, ndev_s = params
193 bdev = objects.Disk.FromDict(bdev_s)
194 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
195 if bdev is None or ndevs.count(None) > 0:
196 raise ValueError("can't unserialize data!")
197 return backend.BlockdevRemovechildren(bdev, ndevs)
200 def perspective_blockdev_getmirrorstatus(params):
201 """Return the mirror status for a list of disks.
204 disks = [objects.Disk.FromDict(dsk_s)
206 return backend.BlockdevGetmirrorstatus(disks)
209 def perspective_blockdev_find(params):
210 """Expose the FindBlockDevice functionality for a disk.
212 This will try to find but not activate a disk.
215 disk = objects.Disk.FromDict(params[0])
216 return backend.BlockdevFind(disk)
219 def perspective_blockdev_snapshot(params):
220 """Create a snapshot device.
222 Note that this is only valid for LVM disks, if we get passed
223 something else we raise an exception. The snapshot device can be
224 remove by calling the generic block device remove call.
227 cfbd = objects.Disk.FromDict(params[0])
228 return backend.BlockdevSnapshot(cfbd)
231 def perspective_blockdev_grow(params):
232 """Grow a stack of devices.
235 cfbd = objects.Disk.FromDict(params[0])
237 return backend.BlockdevGrow(cfbd, amount)
240 def perspective_blockdev_close(params):
241 """Closes the given block devices.
244 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
245 return backend.BlockdevClose(params[0], disks)
247 # blockdev/drbd specific methods ----------
250 def perspective_drbd_disconnect_net(params):
251 """Disconnects the network connection of drbd disks.
253 Note that this is only valid for drbd disks, so the members of the
254 disk list must all be drbd devices.
257 nodes_ip, disks = params
258 disks = [objects.Disk.FromDict(cf) for cf in disks]
259 return backend.DrbdDisconnectNet(nodes_ip, disks)
262 def perspective_drbd_attach_net(params):
263 """Attaches the network connection of drbd disks.
265 Note that this is only valid for drbd disks, so the members of the
266 disk list must all be drbd devices.
269 nodes_ip, disks, instance_name, multimaster = params
270 disks = [objects.Disk.FromDict(cf) for cf in disks]
271 return backend.DrbdAttachNet(nodes_ip, disks,
272 instance_name, multimaster)
275 def perspective_drbd_wait_sync(params):
276 """Wait until DRBD disks are synched.
278 Note that this is only valid for drbd disks, so the members of the
279 disk list must all be drbd devices.
282 nodes_ip, disks = params
283 disks = [objects.Disk.FromDict(cf) for cf in disks]
284 return backend.DrbdWaitSync(nodes_ip, disks)
286 # export/import --------------------------
289 def perspective_snapshot_export(params):
290 """Export a given snapshot.
293 disk = objects.Disk.FromDict(params[0])
294 dest_node = params[1]
295 instance = objects.Instance.FromDict(params[2])
296 cluster_name = params[3]
298 return backend.ExportSnapshot(disk, dest_node, instance,
299 cluster_name, dev_idx)
302 def perspective_finalize_export(params):
303 """Expose the finalize export functionality.
306 instance = objects.Instance.FromDict(params[0])
307 snap_disks = [objects.Disk.FromDict(str_data)
308 for str_data in params[1]]
309 return backend.FinalizeExport(instance, snap_disks)
312 def perspective_export_info(params):
313 """Query information about an existing export on this node.
315 The given path may not contain an export, in which case we return
320 return backend.ExportInfo(path)
323 def perspective_export_list(params):
324 """List the available exports on this node.
326 Note that as opposed to export_info, which may query data about an
327 export in any path, this only queries the standard Ganeti path
328 (constants.EXPORT_DIR).
331 return backend.ListExports()
334 def perspective_export_remove(params):
339 return backend.RemoveExport(export)
341 # volume --------------------------
344 def perspective_lv_list(params):
345 """Query the list of logical volumes in a given volume group.
349 return backend.GetVolumeList(vgname)
352 def perspective_vg_list(params):
353 """Query the list of volume groups.
356 return backend.ListVolumeGroups()
358 # Storage --------------------------
361 def perspective_storage_list(params):
362 """Get list of storage units.
365 (su_name, su_args, name, fields) = params
366 return storage.GetStorage(su_name, *su_args).List(name, fields)
369 def perspective_storage_modify(params):
370 """Modify a storage unit.
373 (su_name, su_args, name, changes) = params
374 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
376 # bridge --------------------------
379 def perspective_bridges_exist(params):
380 """Check if all bridges given exist on this node.
383 bridges_list = params[0]
384 return backend.BridgesExist(bridges_list)
386 # instance --------------------------
389 def perspective_instance_os_add(params):
390 """Install an OS on a given instance.
394 inst = objects.Instance.FromDict(inst_s)
395 reinstall = params[1]
396 return backend.InstanceOsAdd(inst, reinstall)
399 def perspective_instance_run_rename(params):
400 """Runs the OS rename script for an instance.
403 inst_s, old_name = params
404 inst = objects.Instance.FromDict(inst_s)
405 return backend.RunRenameInstance(inst, old_name)
408 def perspective_instance_os_import(params):
409 """Run the import function of an OS onto a given instance.
412 inst_s, src_node, src_images, cluster_name = params
413 inst = objects.Instance.FromDict(inst_s)
414 return backend.ImportOSIntoInstance(inst, src_node, src_images,
418 def perspective_instance_shutdown(params):
419 """Shutdown an instance.
422 instance = objects.Instance.FromDict(params[0])
423 return backend.InstanceShutdown(instance)
426 def perspective_instance_start(params):
427 """Start an instance.
430 instance = objects.Instance.FromDict(params[0])
431 return backend.StartInstance(instance)
434 def perspective_migration_info(params):
435 """Gather information about an instance to be migrated.
438 instance = objects.Instance.FromDict(params[0])
439 return backend.MigrationInfo(instance)
442 def perspective_accept_instance(params):
443 """Prepare the node to accept an instance.
446 instance, info, target = params
447 instance = objects.Instance.FromDict(instance)
448 return backend.AcceptInstance(instance, info, target)
451 def perspective_finalize_migration(params):
452 """Finalize the instance migration.
455 instance, info, success = params
456 instance = objects.Instance.FromDict(instance)
457 return backend.FinalizeMigration(instance, info, success)
460 def perspective_instance_migrate(params):
461 """Migrates an instance.
464 instance, target, live = params
465 instance = objects.Instance.FromDict(instance)
466 return backend.MigrateInstance(instance, target, live)
469 def perspective_instance_reboot(params):
470 """Reboot an instance.
473 instance = objects.Instance.FromDict(params[0])
474 reboot_type = params[1]
475 return backend.InstanceReboot(instance, reboot_type)
478 def perspective_instance_info(params):
479 """Query instance information.
482 return backend.GetInstanceInfo(params[0], params[1])
485 def perspective_instance_migratable(params):
486 """Query whether the specified instance can be migrated.
489 instance = objects.Instance.FromDict(params[0])
490 return backend.GetInstanceMigratable(instance)
493 def perspective_all_instances_info(params):
494 """Query information about all instances.
497 return backend.GetAllInstancesInfo(params[0])
500 def perspective_instance_list(params):
501 """Query the list of running instances.
504 return backend.GetInstanceList(params[0])
506 # node --------------------------
509 def perspective_node_tcp_ping(params):
510 """Do a TcpPing on the remote node.
513 return utils.TcpPing(params[1], params[2], timeout=params[3],
514 live_port_needed=params[4], source=params[0])
517 def perspective_node_has_ip_address(params):
518 """Checks if a node has the given ip address.
521 return utils.OwnIpAddress(params[0])
524 def perspective_node_info(params):
525 """Query node information.
528 vgname, hypervisor_type = params
529 return backend.GetNodeInfo(vgname, hypervisor_type)
532 def perspective_node_add(params):
533 """Complete the registration of this node in the cluster.
536 return backend.AddNode(params[0], params[1], params[2],
537 params[3], params[4], params[5])
540 def perspective_node_verify(params):
541 """Run a verify sequence on this node.
544 return backend.VerifyNode(params[0], params[1])
547 def perspective_node_start_master(params):
548 """Promote this node to master status.
551 return backend.StartMaster(params[0], params[1])
554 def perspective_node_stop_master(params):
555 """Demote this node from master status.
558 return backend.StopMaster(params[0])
561 def perspective_node_leave_cluster(params):
562 """Cleanup after leaving a cluster.
565 return backend.LeaveCluster()
568 def perspective_node_volumes(params):
569 """Query the list of all logical volume groups.
572 return backend.NodeVolumes()
575 def perspective_node_demote_from_mc(params):
576 """Demote a node from the master candidate role.
579 return backend.DemoteFromMC()
583 def perspective_node_powercycle(params):
584 """Tries to powercycle the nod.
587 hypervisor_type = params[0]
588 return backend.PowercycleNode(hypervisor_type)
591 # cluster --------------------------
594 def perspective_version(params):
595 """Query version information.
598 return constants.PROTOCOL_VERSION
601 def perspective_upload_file(params):
604 Note that the backend implementation imposes strict rules on which
608 return backend.UploadFile(*params)
611 def perspective_master_info(params):
612 """Query master information.
615 return backend.GetMasterInfo()
618 def perspective_write_ssconf_files(params):
619 """Write ssconf files.
623 return backend.WriteSsconfFiles(values)
625 # os -----------------------
628 def perspective_os_diagnose(params):
629 """Query detailed information about existing OSes.
632 return backend.DiagnoseOS()
635 def perspective_os_get(params):
636 """Query information about a given OS.
640 os_obj = backend.OSFromDisk(name)
641 return os_obj.ToDict()
643 # hooks -----------------------
646 def perspective_hooks_runner(params):
650 hpath, phase, env = params
651 hr = backend.HooksRunner()
652 return hr.RunHooks(hpath, phase, env)
654 # iallocator -----------------
657 def perspective_iallocator_runner(params):
658 """Run an iallocator script.
662 iar = backend.IAllocatorRunner()
663 return iar.Run(name, idata)
665 # test -----------------------
668 def perspective_test_delay(params):
673 status, rval = utils.TestDelay(duration)
675 raise backend.RPCFail(rval)
678 # file storage ---------------
681 def perspective_file_storage_dir_create(params):
682 """Create the file storage directory.
685 file_storage_dir = params[0]
686 return backend.CreateFileStorageDir(file_storage_dir)
689 def perspective_file_storage_dir_remove(params):
690 """Remove the file storage directory.
693 file_storage_dir = params[0]
694 return backend.RemoveFileStorageDir(file_storage_dir)
697 def perspective_file_storage_dir_rename(params):
698 """Rename the file storage directory.
701 old_file_storage_dir = params[0]
702 new_file_storage_dir = params[1]
703 return backend.RenameFileStorageDir(old_file_storage_dir,
704 new_file_storage_dir)
706 # jobs ------------------------
709 @_RequireJobQueueLock
710 def perspective_jobqueue_update(params):
714 (file_name, content) = params
715 return backend.JobQueueUpdate(file_name, content)
718 @_RequireJobQueueLock
719 def perspective_jobqueue_purge(params):
723 return backend.JobQueuePurge()
726 @_RequireJobQueueLock
727 def perspective_jobqueue_rename(params):
728 """Rename a job queue file.
731 # TODO: What if a file fails to rename?
732 return [backend.JobQueueRename(old, new) for old, new in params]
735 def perspective_jobqueue_set_drain(params):
736 """Set/unset the queue drain flag.
739 drain_flag = params[0]
740 return backend.JobQueueSetDrainFlag(drain_flag)
743 # hypervisor ---------------
746 def perspective_hypervisor_validate_params(params):
747 """Validate the hypervisor parameters.
750 (hvname, hvparams) = params
751 return backend.ValidateHVParams(hvname, hvparams)
754 def ExecNODED(options, args):
755 """Main NODED function, executed with the pidfile held.
760 # Read SSL certificate
762 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
763 ssl_cert_path=options.ssl_cert)
768 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
770 mainloop = daemon.Mainloop()
771 server = NodeHttpServer(mainloop, options.bind_address, options.port,
772 ssl_params=ssl_params, ssl_verify_peer=True)
781 """Main function for the node daemon.
784 parser = OptionParser(description="Ganeti node daemon",
785 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
786 version="%%prog (ganeti) %s" %
787 constants.RELEASE_VERSION)
788 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
789 dirs.append((constants.LOG_OS_DIR, 0750))
790 dirs.append((constants.LOCK_DIR, 1777))
791 daemon.GenericMain(constants.NODED, parser, dirs, None, ExecNODED)
794 if __name__ == '__main__':