4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
35 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
46 import ganeti.http.server
52 def _RequireJobQueueLock(fn):
53 """Decorator for job queue manipulating functions.
56 QUEUE_LOCK_TIMEOUT = 10
58 def wrapper(*args, **kwargs):
59 # Locking in exclusive, blocking mode because there could be several
60 # children running at the same time. Waiting up to 10 seconds.
61 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
63 return fn(*args, **kwargs)
70 class NodeHttpServer(http.server.HttpServer):
71 """The server implementation.
73 This class holds all methods exposed over the RPC interface.
76 def __init__(self, *args, **kwargs):
77 http.server.HttpServer.__init__(self, *args, **kwargs)
78 self.noded_pid = os.getpid()
80 def HandleRequest(self, req):
84 if req.request_method.upper() != http.HTTP_PUT:
85 raise http.HttpBadRequest()
87 path = req.request_path
88 if path.startswith("/"):
91 method = getattr(self, "perspective_%s" % path, None)
93 raise http.HttpNotFound()
97 return method(req.request_body)
99 logging.exception("Error in RPC call")
101 except errors.QuitGanetiException, err:
102 # Tell parent to quit
103 os.kill(self.noded_pid, signal.SIGTERM)
105 # the new block devices --------------------------
108 def perspective_blockdev_create(params):
109 """Create a block device.
112 bdev_s, size, owner, on_primary, info = params
113 bdev = objects.Disk.FromDict(bdev_s)
115 raise ValueError("can't unserialize data!")
116 return backend.BlockdevCreate(bdev, size, owner, on_primary, info)
119 def perspective_blockdev_remove(params):
120 """Remove a block device.
124 bdev = objects.Disk.FromDict(bdev_s)
125 return backend.BlockdevRemove(bdev)
128 def perspective_blockdev_rename(params):
129 """Remove a block device.
132 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
133 return backend.BlockdevRename(devlist)
136 def perspective_blockdev_assemble(params):
137 """Assemble a block device.
140 bdev_s, owner, on_primary = params
141 bdev = objects.Disk.FromDict(bdev_s)
143 raise ValueError("can't unserialize data!")
144 return backend.BlockdevAssemble(bdev, owner, on_primary)
147 def perspective_blockdev_shutdown(params):
148 """Shutdown a block device.
152 bdev = objects.Disk.FromDict(bdev_s)
154 raise ValueError("can't unserialize data!")
155 return backend.BlockdevShutdown(bdev)
158 def perspective_blockdev_addchildren(params):
159 """Add a child to a mirror device.
161 Note: this is only valid for mirror devices. It's the caller's duty
162 to send a correct disk, otherwise we raise an error.
165 bdev_s, ndev_s = params
166 bdev = objects.Disk.FromDict(bdev_s)
167 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
168 if bdev is None or ndevs.count(None) > 0:
169 raise ValueError("can't unserialize data!")
170 return backend.BlockdevAddchildren(bdev, ndevs)
173 def perspective_blockdev_removechildren(params):
174 """Remove a child from a mirror device.
176 This is only valid for mirror devices, of course. It's the callers
177 duty to send a correct disk, otherwise we raise an error.
180 bdev_s, ndev_s = params
181 bdev = objects.Disk.FromDict(bdev_s)
182 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
183 if bdev is None or ndevs.count(None) > 0:
184 raise ValueError("can't unserialize data!")
185 return backend.BlockdevRemovechildren(bdev, ndevs)
188 def perspective_blockdev_getmirrorstatus(params):
189 """Return the mirror status for a list of disks.
192 disks = [objects.Disk.FromDict(dsk_s)
194 return backend.BlockdevGetmirrorstatus(disks)
197 def perspective_blockdev_find(params):
198 """Expose the FindBlockDevice functionality for a disk.
200 This will try to find but not activate a disk.
203 disk = objects.Disk.FromDict(params[0])
204 return backend.BlockdevFind(disk)
207 def perspective_blockdev_snapshot(params):
208 """Create a snapshot device.
210 Note that this is only valid for LVM disks, if we get passed
211 something else we raise an exception. The snapshot device can be
212 remove by calling the generic block device remove call.
215 cfbd = objects.Disk.FromDict(params[0])
216 return backend.BlockdevSnapshot(cfbd)
219 def perspective_blockdev_grow(params):
220 """Grow a stack of devices.
223 cfbd = objects.Disk.FromDict(params[0])
225 return backend.BlockdevGrow(cfbd, amount)
228 def perspective_blockdev_close(params):
229 """Closes the given block devices.
232 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
233 return backend.BlockdevClose(params[0], disks)
235 # blockdev/drbd specific methods ----------
238 def perspective_drbd_disconnect_net(params):
239 """Disconnects the network connection of drbd disks.
241 Note that this is only valid for drbd disks, so the members of the
242 disk list must all be drbd devices.
245 nodes_ip, disks = params
246 disks = [objects.Disk.FromDict(cf) for cf in disks]
247 return backend.DrbdDisconnectNet(nodes_ip, disks)
250 def perspective_drbd_attach_net(params):
251 """Attaches the network connection of drbd disks.
253 Note that this is only valid for drbd disks, so the members of the
254 disk list must all be drbd devices.
257 nodes_ip, disks, instance_name, multimaster = params
258 disks = [objects.Disk.FromDict(cf) for cf in disks]
259 return backend.DrbdAttachNet(nodes_ip, disks,
260 instance_name, multimaster)
263 def perspective_drbd_wait_sync(params):
264 """Wait until DRBD disks are synched.
266 Note that this is only valid for drbd disks, so the members of the
267 disk list must all be drbd devices.
270 nodes_ip, disks = params
271 disks = [objects.Disk.FromDict(cf) for cf in disks]
272 return backend.DrbdWaitSync(nodes_ip, disks)
274 # export/import --------------------------
277 def perspective_snapshot_export(params):
278 """Export a given snapshot.
281 disk = objects.Disk.FromDict(params[0])
282 dest_node = params[1]
283 instance = objects.Instance.FromDict(params[2])
284 cluster_name = params[3]
286 return backend.ExportSnapshot(disk, dest_node, instance,
287 cluster_name, dev_idx)
290 def perspective_finalize_export(params):
291 """Expose the finalize export functionality.
294 instance = objects.Instance.FromDict(params[0])
295 snap_disks = [objects.Disk.FromDict(str_data)
296 for str_data in params[1]]
297 return backend.FinalizeExport(instance, snap_disks)
300 def perspective_export_info(params):
301 """Query information about an existing export on this node.
303 The given path may not contain an export, in which case we return
308 einfo = backend.ExportInfo(path)
314 def perspective_export_list(params):
315 """List the available exports on this node.
317 Note that as opposed to export_info, which may query data about an
318 export in any path, this only queries the standard Ganeti path
319 (constants.EXPORT_DIR).
322 return backend.ListExports()
325 def perspective_export_remove(params):
330 return backend.RemoveExport(export)
332 # volume --------------------------
335 def perspective_volume_list(params):
336 """Query the list of logical volumes in a given volume group.
340 return backend.GetVolumeList(vgname)
343 def perspective_vg_list(params):
344 """Query the list of volume groups.
347 return backend.ListVolumeGroups()
349 # bridge --------------------------
352 def perspective_bridges_exist(params):
353 """Check if all bridges given exist on this node.
356 bridges_list = params[0]
357 return backend.BridgesExist(bridges_list)
359 # instance --------------------------
362 def perspective_instance_os_add(params):
363 """Install an OS on a given instance.
367 inst = objects.Instance.FromDict(inst_s)
368 return backend.InstanceOsAdd(inst)
371 def perspective_instance_run_rename(params):
372 """Runs the OS rename script for an instance.
375 inst_s, old_name = params
376 inst = objects.Instance.FromDict(inst_s)
377 return backend.RunRenameInstance(inst, old_name)
380 def perspective_instance_os_import(params):
381 """Run the import function of an OS onto a given instance.
384 inst_s, src_node, src_images, cluster_name = params
385 inst = objects.Instance.FromDict(inst_s)
386 return backend.ImportOSIntoInstance(inst, src_node, src_images,
390 def perspective_instance_shutdown(params):
391 """Shutdown an instance.
394 instance = objects.Instance.FromDict(params[0])
395 return backend.ShutdownInstance(instance)
398 def perspective_instance_start(params):
399 """Start an instance.
402 instance = objects.Instance.FromDict(params[0])
403 extra_args = params[1]
404 return backend.StartInstance(instance, extra_args)
407 def perspective_migration_info(params):
408 """Gather information about an instance to be migrated.
411 instance = objects.Instance.FromDict(params[0])
412 return backend.MigrationInfo(instance)
415 def perspective_accept_instance(params):
416 """Prepare the node to accept an instance.
419 instance, info, target = params
420 instance = objects.Instance.FromDict(instance)
421 return backend.AcceptInstance(instance, info, target)
424 def perspective_finalize_migration(params):
425 """Finalize the instance migration.
428 instance, info, success = params
429 instance = objects.Instance.FromDict(instance)
430 return backend.FinalizeMigration(instance, info, success)
433 def perspective_instance_migrate(params):
434 """Migrates an instance.
437 instance, target, live = params
438 instance = objects.Instance.FromDict(instance)
439 return backend.MigrateInstance(instance, target, live)
442 def perspective_instance_reboot(params):
443 """Reboot an instance.
446 instance = objects.Instance.FromDict(params[0])
447 reboot_type = params[1]
448 extra_args = params[2]
449 return backend.InstanceReboot(instance, reboot_type, extra_args)
452 def perspective_instance_info(params):
453 """Query instance information.
456 return backend.GetInstanceInfo(params[0], params[1])
459 def perspective_instance_migratable(params):
460 """Query whether the specified instance can be migrated.
463 instance = objects.Instance.FromDict(params[0])
464 return backend.GetInstanceMigratable(instance)
467 def perspective_all_instances_info(params):
468 """Query information about all instances.
471 return backend.GetAllInstancesInfo(params[0])
474 def perspective_instance_list(params):
475 """Query the list of running instances.
478 return backend.GetInstanceList(params[0])
480 # node --------------------------
483 def perspective_node_tcp_ping(params):
484 """Do a TcpPing on the remote node.
487 return utils.TcpPing(params[1], params[2], timeout=params[3],
488 live_port_needed=params[4], source=params[0])
491 def perspective_node_has_ip_address(params):
492 """Checks if a node has the given ip address.
495 return utils.OwnIpAddress(params[0])
498 def perspective_node_info(params):
499 """Query node information.
502 vgname, hypervisor_type = params
503 return backend.GetNodeInfo(vgname, hypervisor_type)
506 def perspective_node_add(params):
507 """Complete the registration of this node in the cluster.
510 return backend.AddNode(params[0], params[1], params[2],
511 params[3], params[4], params[5])
514 def perspective_node_verify(params):
515 """Run a verify sequence on this node.
518 return backend.VerifyNode(params[0], params[1])
521 def perspective_node_start_master(params):
522 """Promote this node to master status.
525 return backend.StartMaster(params[0])
528 def perspective_node_stop_master(params):
529 """Demote this node from master status.
532 return backend.StopMaster(params[0])
535 def perspective_node_leave_cluster(params):
536 """Cleanup after leaving a cluster.
539 return backend.LeaveCluster()
542 def perspective_node_volumes(params):
543 """Query the list of all logical volume groups.
546 return backend.NodeVolumes()
549 def perspective_node_demote_from_mc(params):
550 """Demote a node from the master candidate role.
553 return backend.DemoteFromMC()
556 # cluster --------------------------
559 def perspective_version(params):
560 """Query version information.
563 return constants.PROTOCOL_VERSION
566 def perspective_upload_file(params):
569 Note that the backend implementation imposes strict rules on which
573 return backend.UploadFile(*params)
576 def perspective_master_info(params):
577 """Query master information.
580 return backend.GetMasterInfo()
583 def perspective_write_ssconf_files(params):
584 """Write ssconf files.
588 return backend.WriteSsconfFiles(values)
590 # os -----------------------
593 def perspective_os_diagnose(params):
594 """Query detailed information about existing OSes.
597 return [os_obj.ToDict() for os_obj in backend.DiagnoseOS()]
600 def perspective_os_get(params):
601 """Query information about a given OS.
606 os_obj = backend.OSFromDisk(name)
607 except errors.InvalidOS, err:
608 os_obj = objects.OS.FromInvalidOS(err)
609 return os_obj.ToDict()
611 # hooks -----------------------
614 def perspective_hooks_runner(params):
618 hpath, phase, env = params
619 hr = backend.HooksRunner()
620 return hr.RunHooks(hpath, phase, env)
622 # iallocator -----------------
625 def perspective_iallocator_runner(params):
626 """Run an iallocator script.
630 iar = backend.IAllocatorRunner()
631 return iar.Run(name, idata)
633 # test -----------------------
636 def perspective_test_delay(params):
641 return utils.TestDelay(duration)
643 # file storage ---------------
646 def perspective_file_storage_dir_create(params):
647 """Create the file storage directory.
650 file_storage_dir = params[0]
651 return backend.CreateFileStorageDir(file_storage_dir)
654 def perspective_file_storage_dir_remove(params):
655 """Remove the file storage directory.
658 file_storage_dir = params[0]
659 return backend.RemoveFileStorageDir(file_storage_dir)
662 def perspective_file_storage_dir_rename(params):
663 """Rename the file storage directory.
666 old_file_storage_dir = params[0]
667 new_file_storage_dir = params[1]
668 return backend.RenameFileStorageDir(old_file_storage_dir,
669 new_file_storage_dir)
671 # jobs ------------------------
674 @_RequireJobQueueLock
675 def perspective_jobqueue_update(params):
679 (file_name, content) = params
680 return backend.JobQueueUpdate(file_name, content)
683 @_RequireJobQueueLock
684 def perspective_jobqueue_purge(params):
688 return backend.JobQueuePurge()
691 @_RequireJobQueueLock
692 def perspective_jobqueue_rename(params):
693 """Rename a job queue file.
696 # TODO: What if a file fails to rename?
697 return [backend.JobQueueRename(old, new) for old, new in params]
700 def perspective_jobqueue_set_drain(params):
701 """Set/unset the queue drain flag.
704 drain_flag = params[0]
705 return backend.JobQueueSetDrainFlag(drain_flag)
708 # hypervisor ---------------
711 def perspective_hypervisor_validate_params(params):
712 """Validate the hypervisor parameters.
715 (hvname, hvparams) = params
716 return backend.ValidateHVParams(hvname, hvparams)
720 """Parse the command line options.
722 @return: (options, args) as from OptionParser.parse_args()
725 parser = OptionParser(description="Ganeti node daemon",
726 usage="%prog [-f] [-d]",
727 version="%%prog (ganeti) %s" %
728 constants.RELEASE_VERSION)
730 parser.add_option("-f", "--foreground", dest="fork",
731 help="Don't detach from the current terminal",
732 default=True, action="store_false")
733 parser.add_option("-d", "--debug", dest="debug",
734 help="Enable some debug messages",
735 default=False, action="store_true")
736 options, args = parser.parse_args()
740 def EnsureRuntimeEnvironment():
741 """Ensure our run-time environment is complete.
743 Currently this creates directories which could be missing, either
744 due to directories being on a tmpfs mount, or due to incomplete
748 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
749 dirs.append((constants.LOG_OS_DIR, 0750))
750 dirs.append((constants.LOCK_DIR, 1777))
751 for dir_name, dir_mode in dirs:
752 if not os.path.exists(dir_name):
754 os.mkdir(dir_name, dir_mode)
755 except EnvironmentError, err:
756 if err.errno != errno.EEXIST:
757 print ("Node setup wrong, cannot create directory '%s': %s" %
760 if not os.path.isdir(dir_name):
761 print ("Node setup wrong, '%s' is not a directory" % dir_name)
766 """Main function for the node daemon.
771 options, args = ParseOptions()
772 utils.debug = options.debug
777 for fname in (constants.SSL_CERT_FILE,):
778 if not os.path.isfile(fname):
779 print "config %s not there, will not run." % fname
783 port = utils.GetNodeDaemonPort()
784 except errors.ConfigurationError, err:
785 print "Cluster configuration incomplete: '%s'" % str(err)
788 EnsureRuntimeEnvironment()
792 utils.Daemonize(logfile=constants.LOG_NODESERVER)
794 utils.WritePidFile(constants.NODED_PID)
796 utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
797 stderr_logging=not options.fork)
798 logging.info("ganeti node daemon startup")
800 # Read SSL certificate
801 ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
802 ssl_cert_path=constants.SSL_CERT_FILE)
805 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
807 mainloop = daemon.Mainloop()
808 server = NodeHttpServer(mainloop, "", port,
809 ssl_params=ssl_params, ssl_verify_peer=True)
816 utils.RemovePidFile(constants.NODED_PID)
819 if __name__ == '__main__':