4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
35 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
45 from ganeti import storage
47 import ganeti.http.server
53 def _RequireJobQueueLock(fn):
54 """Decorator for job queue manipulating functions.
57 QUEUE_LOCK_TIMEOUT = 10
59 def wrapper(*args, **kwargs):
60 # Locking in exclusive, blocking mode because there could be several
61 # children running at the same time. Waiting up to 10 seconds.
62 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
64 return fn(*args, **kwargs)
71 class NodeHttpServer(http.server.HttpServer):
72 """The server implementation.
74 This class holds all methods exposed over the RPC interface.
77 def __init__(self, *args, **kwargs):
78 http.server.HttpServer.__init__(self, *args, **kwargs)
79 self.noded_pid = os.getpid()
81 def HandleRequest(self, req):
85 if req.request_method.upper() != http.HTTP_PUT:
86 raise http.HttpBadRequest()
88 path = req.request_path
89 if path.startswith("/"):
92 method = getattr(self, "perspective_%s" % path, None)
94 raise http.HttpNotFound()
97 rvalue = method(req.request_body)
100 except backend.RPCFail, err:
101 # our custom failure exception; str(err) works fine if the
102 # exception was constructed with a single argument, and in
103 # this case, err.message == err.args[0] == str(err)
104 return (False, str(err))
105 except errors.QuitGanetiException, err:
106 # Tell parent to quit
107 logging.info("Shutting down the node daemon, arguments: %s",
109 os.kill(self.noded_pid, signal.SIGTERM)
110 # And return the error's arguments, which must be already in
111 # correct tuple format
113 except Exception, err:
114 logging.exception("Error in RPC call")
115 return False, "Error while executing backend function: %s" % str(err)
117 # the new block devices --------------------------
120 def perspective_blockdev_create(params):
121 """Create a block device.
124 bdev_s, size, owner, on_primary, info = params
125 bdev = objects.Disk.FromDict(bdev_s)
127 raise ValueError("can't unserialize data!")
128 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
131 def perspective_blockdev_remove(params):
132 """Remove a block device.
136 bdev = objects.Disk.FromDict(bdev_s)
137 return backend.BlockdevRemove(bdev)
140 def perspective_blockdev_rename(params):
141 """Remove a block device.
144 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
145 return backend.BlockdevRename(devlist)
148 def perspective_blockdev_assemble(params):
149 """Assemble a block device.
152 bdev_s, owner, on_primary = params
153 bdev = objects.Disk.FromDict(bdev_s)
155 raise ValueError("can't unserialize data!")
156 return backend.BlockdevAssemble(bdev, owner, on_primary)
159 def perspective_blockdev_shutdown(params):
160 """Shutdown a block device.
164 bdev = objects.Disk.FromDict(bdev_s)
166 raise ValueError("can't unserialize data!")
167 return backend.BlockdevShutdown(bdev)
170 def perspective_blockdev_addchildren(params):
171 """Add a child to a mirror device.
173 Note: this is only valid for mirror devices. It's the caller's duty
174 to send a correct disk, otherwise we raise an error.
177 bdev_s, ndev_s = params
178 bdev = objects.Disk.FromDict(bdev_s)
179 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
180 if bdev is None or ndevs.count(None) > 0:
181 raise ValueError("can't unserialize data!")
182 return backend.BlockdevAddchildren(bdev, ndevs)
185 def perspective_blockdev_removechildren(params):
186 """Remove a child from a mirror device.
188 This is only valid for mirror devices, of course. It's the callers
189 duty to send a correct disk, otherwise we raise an error.
192 bdev_s, ndev_s = params
193 bdev = objects.Disk.FromDict(bdev_s)
194 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
195 if bdev is None or ndevs.count(None) > 0:
196 raise ValueError("can't unserialize data!")
197 return backend.BlockdevRemovechildren(bdev, ndevs)
200 def perspective_blockdev_getmirrorstatus(params):
201 """Return the mirror status for a list of disks.
204 disks = [objects.Disk.FromDict(dsk_s)
206 return [status.ToDict()
207 for status in backend.BlockdevGetmirrorstatus(disks)]
210 def perspective_blockdev_find(params):
211 """Expose the FindBlockDevice functionality for a disk.
213 This will try to find but not activate a disk.
216 disk = objects.Disk.FromDict(params[0])
218 result = backend.BlockdevFind(disk)
222 return result.ToDict()
225 def perspective_blockdev_snapshot(params):
226 """Create a snapshot device.
228 Note that this is only valid for LVM disks, if we get passed
229 something else we raise an exception. The snapshot device can be
230 remove by calling the generic block device remove call.
233 cfbd = objects.Disk.FromDict(params[0])
234 return backend.BlockdevSnapshot(cfbd)
237 def perspective_blockdev_grow(params):
238 """Grow a stack of devices.
241 cfbd = objects.Disk.FromDict(params[0])
243 return backend.BlockdevGrow(cfbd, amount)
246 def perspective_blockdev_close(params):
247 """Closes the given block devices.
250 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
251 return backend.BlockdevClose(params[0], disks)
253 # blockdev/drbd specific methods ----------
256 def perspective_drbd_disconnect_net(params):
257 """Disconnects the network connection of drbd disks.
259 Note that this is only valid for drbd disks, so the members of the
260 disk list must all be drbd devices.
263 nodes_ip, disks = params
264 disks = [objects.Disk.FromDict(cf) for cf in disks]
265 return backend.DrbdDisconnectNet(nodes_ip, disks)
268 def perspective_drbd_attach_net(params):
269 """Attaches the network connection of drbd disks.
271 Note that this is only valid for drbd disks, so the members of the
272 disk list must all be drbd devices.
275 nodes_ip, disks, instance_name, multimaster = params
276 disks = [objects.Disk.FromDict(cf) for cf in disks]
277 return backend.DrbdAttachNet(nodes_ip, disks,
278 instance_name, multimaster)
281 def perspective_drbd_wait_sync(params):
282 """Wait until DRBD disks are synched.
284 Note that this is only valid for drbd disks, so the members of the
285 disk list must all be drbd devices.
288 nodes_ip, disks = params
289 disks = [objects.Disk.FromDict(cf) for cf in disks]
290 return backend.DrbdWaitSync(nodes_ip, disks)
292 # export/import --------------------------
295 def perspective_snapshot_export(params):
296 """Export a given snapshot.
299 disk = objects.Disk.FromDict(params[0])
300 dest_node = params[1]
301 instance = objects.Instance.FromDict(params[2])
302 cluster_name = params[3]
304 return backend.ExportSnapshot(disk, dest_node, instance,
305 cluster_name, dev_idx)
308 def perspective_finalize_export(params):
309 """Expose the finalize export functionality.
312 instance = objects.Instance.FromDict(params[0])
313 snap_disks = [objects.Disk.FromDict(str_data)
314 for str_data in params[1]]
315 return backend.FinalizeExport(instance, snap_disks)
318 def perspective_export_info(params):
319 """Query information about an existing export on this node.
321 The given path may not contain an export, in which case we return
326 return backend.ExportInfo(path)
329 def perspective_export_list(params):
330 """List the available exports on this node.
332 Note that as opposed to export_info, which may query data about an
333 export in any path, this only queries the standard Ganeti path
334 (constants.EXPORT_DIR).
337 return backend.ListExports()
340 def perspective_export_remove(params):
345 return backend.RemoveExport(export)
347 # volume --------------------------
350 def perspective_lv_list(params):
351 """Query the list of logical volumes in a given volume group.
355 return backend.GetVolumeList(vgname)
358 def perspective_vg_list(params):
359 """Query the list of volume groups.
362 return backend.ListVolumeGroups()
364 # Storage --------------------------
367 def perspective_storage_list(params):
368 """Get list of storage units.
371 (su_name, su_args, name, fields) = params
372 return storage.GetStorage(su_name, *su_args).List(name, fields)
375 def perspective_storage_modify(params):
376 """Modify a storage unit.
379 (su_name, su_args, name, changes) = params
380 return storage.GetStorage(su_name, *su_args).Modify(name, changes)
382 # bridge --------------------------
385 def perspective_bridges_exist(params):
386 """Check if all bridges given exist on this node.
389 bridges_list = params[0]
390 return backend.BridgesExist(bridges_list)
392 # instance --------------------------
395 def perspective_instance_os_add(params):
396 """Install an OS on a given instance.
400 inst = objects.Instance.FromDict(inst_s)
401 reinstall = params[1]
402 return backend.InstanceOsAdd(inst, reinstall)
405 def perspective_instance_run_rename(params):
406 """Runs the OS rename script for an instance.
409 inst_s, old_name = params
410 inst = objects.Instance.FromDict(inst_s)
411 return backend.RunRenameInstance(inst, old_name)
414 def perspective_instance_os_import(params):
415 """Run the import function of an OS onto a given instance.
418 inst_s, src_node, src_images, cluster_name = params
419 inst = objects.Instance.FromDict(inst_s)
420 return backend.ImportOSIntoInstance(inst, src_node, src_images,
424 def perspective_instance_shutdown(params):
425 """Shutdown an instance.
428 instance = objects.Instance.FromDict(params[0])
429 return backend.InstanceShutdown(instance)
432 def perspective_instance_start(params):
433 """Start an instance.
436 instance = objects.Instance.FromDict(params[0])
437 return backend.StartInstance(instance)
440 def perspective_migration_info(params):
441 """Gather information about an instance to be migrated.
444 instance = objects.Instance.FromDict(params[0])
445 return backend.MigrationInfo(instance)
448 def perspective_accept_instance(params):
449 """Prepare the node to accept an instance.
452 instance, info, target = params
453 instance = objects.Instance.FromDict(instance)
454 return backend.AcceptInstance(instance, info, target)
457 def perspective_finalize_migration(params):
458 """Finalize the instance migration.
461 instance, info, success = params
462 instance = objects.Instance.FromDict(instance)
463 return backend.FinalizeMigration(instance, info, success)
466 def perspective_instance_migrate(params):
467 """Migrates an instance.
470 instance, target, live = params
471 instance = objects.Instance.FromDict(instance)
472 return backend.MigrateInstance(instance, target, live)
475 def perspective_instance_reboot(params):
476 """Reboot an instance.
479 instance = objects.Instance.FromDict(params[0])
480 reboot_type = params[1]
481 return backend.InstanceReboot(instance, reboot_type)
484 def perspective_instance_info(params):
485 """Query instance information.
488 return backend.GetInstanceInfo(params[0], params[1])
491 def perspective_instance_migratable(params):
492 """Query whether the specified instance can be migrated.
495 instance = objects.Instance.FromDict(params[0])
496 return backend.GetInstanceMigratable(instance)
499 def perspective_all_instances_info(params):
500 """Query information about all instances.
503 return backend.GetAllInstancesInfo(params[0])
506 def perspective_instance_list(params):
507 """Query the list of running instances.
510 return backend.GetInstanceList(params[0])
512 # node --------------------------
515 def perspective_node_tcp_ping(params):
516 """Do a TcpPing on the remote node.
519 return utils.TcpPing(params[1], params[2], timeout=params[3],
520 live_port_needed=params[4], source=params[0])
523 def perspective_node_has_ip_address(params):
524 """Checks if a node has the given ip address.
527 return utils.OwnIpAddress(params[0])
530 def perspective_node_info(params):
531 """Query node information.
534 vgname, hypervisor_type = params
535 return backend.GetNodeInfo(vgname, hypervisor_type)
538 def perspective_node_add(params):
539 """Complete the registration of this node in the cluster.
542 return backend.AddNode(params[0], params[1], params[2],
543 params[3], params[4], params[5])
546 def perspective_node_verify(params):
547 """Run a verify sequence on this node.
550 return backend.VerifyNode(params[0], params[1])
553 def perspective_node_start_master(params):
554 """Promote this node to master status.
557 return backend.StartMaster(params[0], params[1])
560 def perspective_node_stop_master(params):
561 """Demote this node from master status.
564 return backend.StopMaster(params[0])
567 def perspective_node_leave_cluster(params):
568 """Cleanup after leaving a cluster.
571 return backend.LeaveCluster()
574 def perspective_node_volumes(params):
575 """Query the list of all logical volume groups.
578 return backend.NodeVolumes()
581 def perspective_node_demote_from_mc(params):
582 """Demote a node from the master candidate role.
585 return backend.DemoteFromMC()
589 def perspective_node_powercycle(params):
590 """Tries to powercycle the nod.
593 hypervisor_type = params[0]
594 return backend.PowercycleNode(hypervisor_type)
597 # cluster --------------------------
600 def perspective_version(params):
601 """Query version information.
604 return constants.PROTOCOL_VERSION
607 def perspective_upload_file(params):
610 Note that the backend implementation imposes strict rules on which
614 return backend.UploadFile(*params)
617 def perspective_master_info(params):
618 """Query master information.
621 return backend.GetMasterInfo()
624 def perspective_write_ssconf_files(params):
625 """Write ssconf files.
629 return backend.WriteSsconfFiles(values)
631 # os -----------------------
634 def perspective_os_diagnose(params):
635 """Query detailed information about existing OSes.
638 return backend.DiagnoseOS()
641 def perspective_os_get(params):
642 """Query information about a given OS.
646 os_obj = backend.OSFromDisk(name)
647 return os_obj.ToDict()
649 # hooks -----------------------
652 def perspective_hooks_runner(params):
656 hpath, phase, env = params
657 hr = backend.HooksRunner()
658 return hr.RunHooks(hpath, phase, env)
660 # iallocator -----------------
663 def perspective_iallocator_runner(params):
664 """Run an iallocator script.
668 iar = backend.IAllocatorRunner()
669 return iar.Run(name, idata)
671 # test -----------------------
674 def perspective_test_delay(params):
679 status, rval = utils.TestDelay(duration)
681 raise backend.RPCFail(rval)
684 # file storage ---------------
687 def perspective_file_storage_dir_create(params):
688 """Create the file storage directory.
691 file_storage_dir = params[0]
692 return backend.CreateFileStorageDir(file_storage_dir)
695 def perspective_file_storage_dir_remove(params):
696 """Remove the file storage directory.
699 file_storage_dir = params[0]
700 return backend.RemoveFileStorageDir(file_storage_dir)
703 def perspective_file_storage_dir_rename(params):
704 """Rename the file storage directory.
707 old_file_storage_dir = params[0]
708 new_file_storage_dir = params[1]
709 return backend.RenameFileStorageDir(old_file_storage_dir,
710 new_file_storage_dir)
712 # jobs ------------------------
715 @_RequireJobQueueLock
716 def perspective_jobqueue_update(params):
720 (file_name, content) = params
721 return backend.JobQueueUpdate(file_name, content)
724 @_RequireJobQueueLock
725 def perspective_jobqueue_purge(params):
729 return backend.JobQueuePurge()
732 @_RequireJobQueueLock
733 def perspective_jobqueue_rename(params):
734 """Rename a job queue file.
737 # TODO: What if a file fails to rename?
738 return [backend.JobQueueRename(old, new) for old, new in params]
741 def perspective_jobqueue_set_drain(params):
742 """Set/unset the queue drain flag.
745 drain_flag = params[0]
746 return backend.JobQueueSetDrainFlag(drain_flag)
749 # hypervisor ---------------
752 def perspective_hypervisor_validate_params(params):
753 """Validate the hypervisor parameters.
756 (hvname, hvparams) = params
757 return backend.ValidateHVParams(hvname, hvparams)
760 def ExecNODED(options, args):
761 """Main NODED function, executed with the pidfile held.
766 # Read SSL certificate
768 ssl_params = http.HttpSslParams(ssl_key_path=options.ssl_key,
769 ssl_cert_path=options.ssl_cert)
774 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
776 mainloop = daemon.Mainloop()
777 server = NodeHttpServer(mainloop, options.bind_address, options.port,
778 ssl_params=ssl_params, ssl_verify_peer=True)
787 """Main function for the node daemon.
790 parser = OptionParser(description="Ganeti node daemon",
791 usage="%prog [-f] [-d] [-p port] [-b ADDRESS]",
792 version="%%prog (ganeti) %s" %
793 constants.RELEASE_VERSION)
794 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
795 dirs.append((constants.LOG_OS_DIR, 0750))
796 dirs.append((constants.LOCK_DIR, 1777))
797 daemon.GenericMain(constants.NODED, parser, dirs, None, ExecNODED)
800 if __name__ == '__main__':