4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
35 from optparse import OptionParser
37 from ganeti import backend
38 from ganeti import constants
39 from ganeti import objects
40 from ganeti import errors
41 from ganeti import jstore
42 from ganeti import daemon
43 from ganeti import http
44 from ganeti import utils
46 import ganeti.http.server
52 def _RequireJobQueueLock(fn):
53 """Decorator for job queue manipulating functions.
56 QUEUE_LOCK_TIMEOUT = 10
58 def wrapper(*args, **kwargs):
59 # Locking in exclusive, blocking mode because there could be several
60 # children running at the same time. Waiting up to 10 seconds.
61 queue_lock.Exclusive(blocking=True, timeout=QUEUE_LOCK_TIMEOUT)
63 return fn(*args, **kwargs)
70 class NodeHttpServer(http.server.HttpServer):
71 """The server implementation.
73 This class holds all methods exposed over the RPC interface.
76 def __init__(self, *args, **kwargs):
77 http.server.HttpServer.__init__(self, *args, **kwargs)
78 self.noded_pid = os.getpid()
80 def HandleRequest(self, req):
84 if req.request_method.upper() != http.HTTP_PUT:
85 raise http.HttpBadRequest()
87 path = req.request_path
88 if path.startswith("/"):
91 method = getattr(self, "perspective_%s" % path, None)
93 raise http.HttpNotFound()
97 return method(req.request_body)
99 logging.exception("Error in RPC call")
101 except errors.QuitGanetiException, err:
102 # Tell parent to quit
103 os.kill(self.noded_pid, signal.SIGTERM)
105 # the new block devices --------------------------
108 def perspective_blockdev_create(params):
109 """Create a block device.
112 bdev_s, size, owner, on_primary, info = params
113 bdev = objects.Disk.FromDict(bdev_s)
115 raise ValueError("can't unserialize data!")
116 return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
119 def perspective_blockdev_remove(params):
120 """Remove a block device.
124 bdev = objects.Disk.FromDict(bdev_s)
125 return backend.RemoveBlockDevice(bdev)
128 def perspective_blockdev_rename(params):
129 """Remove a block device.
132 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
133 return backend.RenameBlockDevices(devlist)
136 def perspective_blockdev_assemble(params):
137 """Assemble a block device.
140 bdev_s, owner, on_primary = params
141 bdev = objects.Disk.FromDict(bdev_s)
143 raise ValueError("can't unserialize data!")
144 return backend.AssembleBlockDevice(bdev, owner, on_primary)
147 def perspective_blockdev_shutdown(params):
148 """Shutdown a block device.
152 bdev = objects.Disk.FromDict(bdev_s)
154 raise ValueError("can't unserialize data!")
155 return backend.ShutdownBlockDevice(bdev)
158 def perspective_blockdev_addchildren(params):
159 """Add a child to a mirror device.
161 Note: this is only valid for mirror devices. It's the caller's duty
162 to send a correct disk, otherwise we raise an error.
165 bdev_s, ndev_s = params
166 bdev = objects.Disk.FromDict(bdev_s)
167 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
168 if bdev is None or ndevs.count(None) > 0:
169 raise ValueError("can't unserialize data!")
170 return backend.MirrorAddChildren(bdev, ndevs)
173 def perspective_blockdev_removechildren(params):
174 """Remove a child from a mirror device.
176 This is only valid for mirror devices, of course. It's the callers
177 duty to send a correct disk, otherwise we raise an error.
180 bdev_s, ndev_s = params
181 bdev = objects.Disk.FromDict(bdev_s)
182 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
183 if bdev is None or ndevs.count(None) > 0:
184 raise ValueError("can't unserialize data!")
185 return backend.MirrorRemoveChildren(bdev, ndevs)
188 def perspective_blockdev_getmirrorstatus(params):
189 """Return the mirror status for a list of disks.
192 disks = [objects.Disk.FromDict(dsk_s)
194 return backend.GetMirrorStatus(disks)
197 def perspective_blockdev_find(params):
198 """Expose the FindBlockDevice functionality for a disk.
200 This will try to find but not activate a disk.
203 disk = objects.Disk.FromDict(params[0])
204 return backend.FindBlockDevice(disk)
207 def perspective_blockdev_snapshot(params):
208 """Create a snapshot device.
210 Note that this is only valid for LVM disks, if we get passed
211 something else we raise an exception. The snapshot device can be
212 remove by calling the generic block device remove call.
215 cfbd = objects.Disk.FromDict(params[0])
216 return backend.SnapshotBlockDevice(cfbd)
219 def perspective_blockdev_grow(params):
220 """Grow a stack of devices.
223 cfbd = objects.Disk.FromDict(params[0])
225 return backend.GrowBlockDevice(cfbd, amount)
228 def perspective_blockdev_close(params):
229 """Closes the given block devices.
232 disks = [objects.Disk.FromDict(cf) for cf in params]
233 return backend.CloseBlockDevices(disks)
235 # export/import --------------------------
238 def perspective_snapshot_export(params):
239 """Export a given snapshot.
242 disk = objects.Disk.FromDict(params[0])
243 dest_node = params[1]
244 instance = objects.Instance.FromDict(params[2])
245 cluster_name = params[3]
247 return backend.ExportSnapshot(disk, dest_node, instance,
248 cluster_name, dev_idx)
251 def perspective_finalize_export(params):
252 """Expose the finalize export functionality.
255 instance = objects.Instance.FromDict(params[0])
256 snap_disks = [objects.Disk.FromDict(str_data)
257 for str_data in params[1]]
258 return backend.FinalizeExport(instance, snap_disks)
261 def perspective_export_info(params):
262 """Query information about an existing export on this node.
264 The given path may not contain an export, in which case we return
269 einfo = backend.ExportInfo(path)
275 def perspective_export_list(params):
276 """List the available exports on this node.
278 Note that as opposed to export_info, which may query data about an
279 export in any path, this only queries the standard Ganeti path
280 (constants.EXPORT_DIR).
283 return backend.ListExports()
286 def perspective_export_remove(params):
291 return backend.RemoveExport(export)
293 # volume --------------------------
296 def perspective_volume_list(params):
297 """Query the list of logical volumes in a given volume group.
301 return backend.GetVolumeList(vgname)
304 def perspective_vg_list(params):
305 """Query the list of volume groups.
308 return backend.ListVolumeGroups()
310 # bridge --------------------------
313 def perspective_bridges_exist(params):
314 """Check if all bridges given exist on this node.
317 bridges_list = params[0]
318 return backend.BridgesExist(bridges_list)
320 # instance --------------------------
323 def perspective_instance_os_add(params):
324 """Install an OS on a given instance.
328 inst = objects.Instance.FromDict(inst_s)
329 return backend.AddOSToInstance(inst)
332 def perspective_instance_run_rename(params):
333 """Runs the OS rename script for an instance.
336 inst_s, old_name = params
337 inst = objects.Instance.FromDict(inst_s)
338 return backend.RunRenameInstance(inst, old_name)
341 def perspective_instance_os_import(params):
342 """Run the import function of an OS onto a given instance.
345 inst_s, src_node, src_images, cluster_name = params
346 inst = objects.Instance.FromDict(inst_s)
347 return backend.ImportOSIntoInstance(inst, src_node, src_images,
351 def perspective_instance_shutdown(params):
352 """Shutdown an instance.
355 instance = objects.Instance.FromDict(params[0])
356 return backend.ShutdownInstance(instance)
359 def perspective_instance_start(params):
360 """Start an instance.
363 instance = objects.Instance.FromDict(params[0])
364 extra_args = params[1]
365 return backend.StartInstance(instance, extra_args)
368 def perspective_instance_migrate(params):
369 """Migrates an instance.
372 instance, target, live = params
373 instance = objects.Instance.FromDict(instance)
374 return backend.MigrateInstance(instance, target, live)
377 def perspective_instance_reboot(params):
378 """Reboot an instance.
381 instance = objects.Instance.FromDict(params[0])
382 reboot_type = params[1]
383 extra_args = params[2]
384 return backend.RebootInstance(instance, reboot_type, extra_args)
387 def perspective_instance_info(params):
388 """Query instance information.
391 return backend.GetInstanceInfo(params[0], params[1])
394 def perspective_all_instances_info(params):
395 """Query information about all instances.
398 return backend.GetAllInstancesInfo(params[0])
401 def perspective_instance_list(params):
402 """Query the list of running instances.
405 return backend.GetInstanceList(params[0])
407 # node --------------------------
410 def perspective_node_tcp_ping(params):
411 """Do a TcpPing on the remote node.
414 return utils.TcpPing(params[1], params[2], timeout=params[3],
415 live_port_needed=params[4], source=params[0])
418 def perspective_node_has_ip_address(params):
419 """Checks if a node has the given ip address.
422 return utils.OwnIpAddress(params[0])
425 def perspective_node_info(params):
426 """Query node information.
429 vgname, hypervisor_type = params
430 return backend.GetNodeInfo(vgname, hypervisor_type)
433 def perspective_node_add(params):
434 """Complete the registration of this node in the cluster.
437 return backend.AddNode(params[0], params[1], params[2],
438 params[3], params[4], params[5])
441 def perspective_node_verify(params):
442 """Run a verify sequence on this node.
445 return backend.VerifyNode(params[0], params[1])
448 def perspective_node_start_master(params):
449 """Promote this node to master status.
452 return backend.StartMaster(params[0])
455 def perspective_node_stop_master(params):
456 """Demote this node from master status.
459 return backend.StopMaster(params[0])
462 def perspective_node_leave_cluster(params):
463 """Cleanup after leaving a cluster.
466 return backend.LeaveCluster()
469 def perspective_node_volumes(params):
470 """Query the list of all logical volume groups.
473 return backend.NodeVolumes()
476 def perspective_node_demote_from_mc(params):
477 """Demote a node from the master candidate role.
480 return backend.DemoteFromMC()
483 # cluster --------------------------
486 def perspective_version(params):
487 """Query version information.
490 return constants.PROTOCOL_VERSION
493 def perspective_upload_file(params):
496 Note that the backend implementation imposes strict rules on which
500 return backend.UploadFile(*params)
503 def perspective_master_info(params):
504 """Query master information.
507 return backend.GetMasterInfo()
510 def perspective_write_ssconf_files(params):
511 """Write ssconf files.
515 return backend.WriteSsconfFiles(values)
517 # os -----------------------
520 def perspective_os_diagnose(params):
521 """Query detailed information about existing OSes.
524 return [os.ToDict() for os in backend.DiagnoseOS()]
527 def perspective_os_get(params):
528 """Query information about a given OS.
533 os_obj = backend.OSFromDisk(name)
534 except errors.InvalidOS, err:
535 os_obj = objects.OS.FromInvalidOS(err)
536 return os_obj.ToDict()
538 # hooks -----------------------
541 def perspective_hooks_runner(params):
545 hpath, phase, env = params
546 hr = backend.HooksRunner()
547 return hr.RunHooks(hpath, phase, env)
549 # iallocator -----------------
552 def perspective_iallocator_runner(params):
553 """Run an iallocator script.
557 iar = backend.IAllocatorRunner()
558 return iar.Run(name, idata)
560 # test -----------------------
563 def perspective_test_delay(params):
568 return utils.TestDelay(duration)
570 # file storage ---------------
573 def perspective_file_storage_dir_create(params):
574 """Create the file storage directory.
577 file_storage_dir = params[0]
578 return backend.CreateFileStorageDir(file_storage_dir)
581 def perspective_file_storage_dir_remove(params):
582 """Remove the file storage directory.
585 file_storage_dir = params[0]
586 return backend.RemoveFileStorageDir(file_storage_dir)
589 def perspective_file_storage_dir_rename(params):
590 """Rename the file storage directory.
593 old_file_storage_dir = params[0]
594 new_file_storage_dir = params[1]
595 return backend.RenameFileStorageDir(old_file_storage_dir,
596 new_file_storage_dir)
598 # jobs ------------------------
601 @_RequireJobQueueLock
602 def perspective_jobqueue_update(params):
606 (file_name, content) = params
607 return backend.JobQueueUpdate(file_name, content)
610 @_RequireJobQueueLock
611 def perspective_jobqueue_purge(params):
615 return backend.JobQueuePurge()
618 @_RequireJobQueueLock
619 def perspective_jobqueue_rename(params):
620 """Rename a job queue file.
625 return backend.JobQueueRename(old, new)
628 def perspective_jobqueue_set_drain(params):
629 """Set/unset the queue drain flag.
632 drain_flag = params[0]
633 return backend.JobQueueSetDrainFlag(drain_flag)
636 # hypervisor ---------------
639 def perspective_hypervisor_validate_params(params):
640 """Validate the hypervisor parameters.
643 (hvname, hvparams) = params
644 return backend.ValidateHVParams(hvname, hvparams)
648 """Parse the command line options.
651 (options, args) as from OptionParser.parse_args()
654 parser = OptionParser(description="Ganeti node daemon",
655 usage="%prog [-f] [-d]",
656 version="%%prog (ganeti) %s" %
657 constants.RELEASE_VERSION)
659 parser.add_option("-f", "--foreground", dest="fork",
660 help="Don't detach from the current terminal",
661 default=True, action="store_false")
662 parser.add_option("-d", "--debug", dest="debug",
663 help="Enable some debug messages",
664 default=False, action="store_true")
665 options, args = parser.parse_args()
669 def EnsureRuntimeEnvironment():
670 """Ensure our run-time environment is complete.
672 Currently this creates directories which could be missing, either
673 due to directories being on a tmpfs mount, or due to incomplete
677 dirs = [(val, constants.RUN_DIRS_MODE) for val in constants.SUB_RUN_DIRS]
678 dirs.append((constants.LOG_OS_DIR, 0750))
679 for dir_name, dir_mode in dirs:
680 if not os.path.exists(dir_name):
682 os.mkdir(dir_name, dir_mode)
683 except EnvironmentError, err:
684 if err.errno != errno.EEXIST:
685 print ("Node setup wrong, cannot create directory '%s': %s" %
688 if not os.path.isdir(dir_name):
689 print ("Node setup wrong, '%s' is not a directory" % dir_name)
694 """Main function for the node daemon.
699 options, args = ParseOptions()
700 utils.debug = options.debug
701 for fname in (constants.SSL_CERT_FILE,):
702 if not os.path.isfile(fname):
703 print "config %s not there, will not run." % fname
707 port = utils.GetNodeDaemonPort()
708 except errors.ConfigurationError, err:
709 print "Cluster configuration incomplete: '%s'" % str(err)
712 EnsureRuntimeEnvironment()
716 utils.Daemonize(logfile=constants.LOG_NODESERVER)
718 utils.WritePidFile(constants.NODED_PID)
720 utils.SetupLogging(logfile=constants.LOG_NODESERVER, debug=options.debug,
721 stderr_logging=not options.fork)
722 logging.info("ganeti node daemon startup")
724 # Read SSL certificate
725 ssl_params = http.HttpSslParams(ssl_key_path=constants.SSL_CERT_FILE,
726 ssl_cert_path=constants.SSL_CERT_FILE)
729 queue_lock = jstore.InitAndVerifyQueue(must_lock=False)
731 mainloop = daemon.Mainloop()
732 server = NodeHttpServer(mainloop, "", port,
733 ssl_params=ssl_params, ssl_verify_peer=True)
740 utils.RemovePidFile(constants.NODED_PID)
743 if __name__ == '__main__':