4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
32 from optparse import OptionParser
34 from ganeti import backend
35 from ganeti import logger
36 from ganeti import constants
37 from ganeti import objects
38 from ganeti import errors
39 from ganeti import ssconf
40 from ganeti import utils
42 from twisted.spread import pb
43 from twisted.internet import reactor
44 from twisted.cred import checkers, portal
45 from OpenSSL import SSL
48 class ServerContextFactory:
49 """SSL context factory class that uses a given certificate.
54 """Return a customized context.
56 The context will be set to use our certificate.
59 ctx = SSL.Context(SSL.TLSv1_METHOD)
60 ctx.use_certificate_file(constants.SSL_CERT_FILE)
61 ctx.use_privatekey_file(constants.SSL_CERT_FILE)
65 class ServerObject(pb.Avatar):
66 """The server implementation.
68 This class holds all methods exposed over the RPC interface.
71 def __init__(self, name):
74 def perspectiveMessageReceived(self, broker, message, args, kw):
75 """Custom message dispatching function.
77 This function overrides the pb.Avatar function in order to provide
78 a simple form of exception passing (as text only).
81 args = broker.unserialize(args, self)
82 kw = broker.unserialize(kw, self)
83 method = getattr(self, "perspective_%s" % message)
87 state = method(*args, **kw)
89 tb = traceback.format_exc()
91 return broker.serialize((tb, state), self, method, args, kw)
93 # the new block devices --------------------------
96 def perspective_blockdev_create(params):
97 """Create a block device.
100 bdev_s, size, owner, on_primary, info = params
101 bdev = objects.Disk.FromDict(bdev_s)
103 raise ValueError("can't unserialize data!")
104 return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
107 def perspective_blockdev_remove(params):
108 """Remove a block device.
112 bdev = objects.Disk.FromDict(bdev_s)
113 return backend.RemoveBlockDevice(bdev)
116 def perspective_blockdev_rename(params):
117 """Remove a block device.
120 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
121 return backend.RenameBlockDevices(devlist)
124 def perspective_blockdev_assemble(params):
125 """Assemble a block device.
128 bdev_s, owner, on_primary = params
129 bdev = objects.Disk.FromDict(bdev_s)
131 raise ValueError("can't unserialize data!")
132 return backend.AssembleBlockDevice(bdev, owner, on_primary)
135 def perspective_blockdev_shutdown(params):
136 """Shutdown a block device.
140 bdev = objects.Disk.FromDict(bdev_s)
142 raise ValueError("can't unserialize data!")
143 return backend.ShutdownBlockDevice(bdev)
146 def perspective_blockdev_addchildren(params):
147 """Add a child to a mirror device.
149 Note: this is only valid for mirror devices. It's the caller's duty
150 to send a correct disk, otherwise we raise an error.
153 bdev_s, ndev_s = params
154 bdev = objects.Disk.FromDict(bdev_s)
155 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
156 if bdev is None or ndevs.count(None) > 0:
157 raise ValueError("can't unserialize data!")
158 return backend.MirrorAddChildren(bdev, ndevs)
161 def perspective_blockdev_removechildren(params):
162 """Remove a child from a mirror device.
164 This is only valid for mirror devices, of course. It's the callers
165 duty to send a correct disk, otherwise we raise an error.
168 bdev_s, ndev_s = params
169 bdev = objects.Disk.FromDict(bdev_s)
170 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
171 if bdev is None or ndevs.count(None) > 0:
172 raise ValueError("can't unserialize data!")
173 return backend.MirrorRemoveChildren(bdev, ndevs)
176 def perspective_blockdev_getmirrorstatus(params):
177 """Return the mirror status for a list of disks.
180 disks = [objects.Disk.FromDict(dsk_s)
182 return backend.GetMirrorStatus(disks)
185 def perspective_blockdev_find(params):
186 """Expose the FindBlockDevice functionality for a disk.
188 This will try to find but not activate a disk.
191 disk = objects.Disk.FromDict(params[0])
192 return backend.FindBlockDevice(disk)
195 def perspective_blockdev_snapshot(params):
196 """Create a snapshot device.
198 Note that this is only valid for LVM disks, if we get passed
199 something else we raise an exception. The snapshot device can be
200 remove by calling the generic block device remove call.
203 cfbd = objects.Disk.FromDict(params[0])
204 return backend.SnapshotBlockDevice(cfbd)
207 def perspective_blockdev_grow(params):
208 """Grow a stack of devices.
211 cfbd = objects.Disk.FromDict(params[0])
213 return backend.GrowBlockDevice(cfbd, amount)
216 def perspective_blockdev_close(params):
217 """Closes the given block devices.
220 disks = [objects.Disk.FromDict(cf) for cf in params[1]]
221 return backend.CloseBlockDevices(params[0], disks)
223 # blockdev/drbd specific methods ----------
226 def perspective_drbd_reconfig_net(params):
227 """Re-configures the network connection of drbd disks.
229 Note that this is only valid for drbd disks, so the members of the
230 disk list must all be drbd devices.
233 instance_name, disks, nodes_ip, multimaster = params
234 disks = [objects.Disk.FromDict(cf) for cf in disks]
235 return backend.DrbdReconfigNet(instance_name, disks, nodes_ip, multimaster)
237 # export/import --------------------------
240 def perspective_snapshot_export(params):
241 """Export a given snapshot.
244 disk = objects.Disk.FromDict(params[0])
245 dest_node = params[1]
246 instance = objects.Instance.FromDict(params[2])
247 return backend.ExportSnapshot(disk, dest_node, instance)
250 def perspective_finalize_export(params):
251 """Expose the finalize export functionality.
254 instance = objects.Instance.FromDict(params[0])
255 snap_disks = [objects.Disk.FromDict(str_data)
256 for str_data in params[1]]
257 return backend.FinalizeExport(instance, snap_disks)
260 def perspective_export_info(params):
261 """Query information about an existing export on this node.
263 The given path may not contain an export, in which case we return
268 einfo = backend.ExportInfo(path)
274 def perspective_export_list(params):
275 """List the available exports on this node.
277 Note that as opposed to export_info, which may query data about an
278 export in any path, this only queries the standard Ganeti path
279 (constants.EXPORT_DIR).
282 return backend.ListExports()
285 def perspective_export_remove(params):
290 return backend.RemoveExport(export)
292 # volume --------------------------
295 def perspective_volume_list(params):
296 """Query the list of logical volumes in a given volume group.
300 return backend.GetVolumeList(vgname)
303 def perspective_vg_list(params):
304 """Query the list of volume groups.
307 return backend.ListVolumeGroups()
309 # bridge --------------------------
312 def perspective_bridges_exist(params):
313 """Check if all bridges given exist on this node.
316 bridges_list = params[0]
317 return backend.BridgesExist(bridges_list)
319 # instance --------------------------
322 def perspective_instance_os_add(params):
323 """Install an OS on a given instance.
326 inst_s, os_disk, swap_disk = params
327 inst = objects.Instance.FromDict(inst_s)
328 return backend.AddOSToInstance(inst, os_disk, swap_disk)
331 def perspective_instance_run_rename(params):
332 """Runs the OS rename script for an instance.
335 inst_s, old_name, os_disk, swap_disk = params
336 inst = objects.Instance.FromDict(inst_s)
337 return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
340 def perspective_instance_os_import(params):
341 """Run the import function of an OS onto a given instance.
344 inst_s, os_disk, swap_disk, src_node, src_image = params
345 inst = objects.Instance.FromDict(inst_s)
346 return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
350 def perspective_instance_shutdown(params):
351 """Shutdown an instance.
354 instance = objects.Instance.FromDict(params[0])
355 return backend.ShutdownInstance(instance)
358 def perspective_instance_start(params):
359 """Start an instance.
362 instance = objects.Instance.FromDict(params[0])
363 extra_args = params[1]
364 return backend.StartInstance(instance, extra_args)
367 def perspective_instance_migrate(params):
368 """Migrates an instance.
371 instance, target, live = params
372 return backend.MigrateInstance(instance, target, live)
375 def perspective_instance_reboot(params):
376 """Reboot an instance.
379 instance = objects.Instance.FromDict(params[0])
380 reboot_type = params[1]
381 extra_args = params[2]
382 return backend.RebootInstance(instance, reboot_type, extra_args)
385 def perspective_instance_info(params):
386 """Query instance information.
389 return backend.GetInstanceInfo(params[0])
392 def perspective_all_instances_info(params):
393 """Query information about all instances.
396 return backend.GetAllInstancesInfo()
399 def perspective_instance_list(params):
400 """Query the list of running instances.
403 return backend.GetInstanceList()
405 # node --------------------------
408 def perspective_node_tcp_ping(params):
409 """Do a TcpPing on the remote node.
412 return utils.TcpPing(params[1], params[2], timeout=params[3],
413 live_port_needed=params[4], source=params[0])
416 def perspective_node_info(params):
417 """Query node information.
421 return backend.GetNodeInfo(vgname)
424 def perspective_node_add(params):
425 """Complete the registration of this node in the cluster.
428 return backend.AddNode(params[0], params[1], params[2],
429 params[3], params[4], params[5])
432 def perspective_node_verify(params):
433 """Run a verify sequence on this node.
436 return backend.VerifyNode(params[0])
439 def perspective_node_start_master(params):
440 """Promote this node to master status.
443 return backend.StartMaster()
446 def perspective_node_stop_master(params):
447 """Demote this node from master status.
450 return backend.StopMaster()
453 def perspective_node_leave_cluster(params):
454 """Cleanup after leaving a cluster.
457 return backend.LeaveCluster()
460 def perspective_node_volumes(params):
461 """Query the list of all logical volume groups.
464 return backend.NodeVolumes()
466 # cluster --------------------------
469 def perspective_version(params):
470 """Query version information.
473 return constants.PROTOCOL_VERSION
476 def perspective_upload_file(params):
479 Note that the backend implementation imposes strict rules on which
483 return backend.UploadFile(*params)
486 # os -----------------------
489 def perspective_os_diagnose(params):
490 """Query detailed information about existing OSes.
493 return [os.ToDict() for os in backend.DiagnoseOS()]
496 def perspective_os_get(params):
497 """Query information about a given OS.
502 os_obj = backend.OSFromDisk(name)
503 except errors.InvalidOS, err:
504 os_obj = objects.OS.FromInvalidOS(err)
505 return os_obj.ToDict()
507 # hooks -----------------------
510 def perspective_hooks_runner(params):
514 hpath, phase, env = params
515 hr = backend.HooksRunner()
516 return hr.RunHooks(hpath, phase, env)
518 # iallocator -----------------
521 def perspective_iallocator_runner(params):
522 """Run an iallocator script.
526 iar = backend.IAllocatorRunner()
527 return iar.Run(name, idata)
529 # test -----------------------
532 def perspective_test_delay(params):
537 return utils.TestDelay(duration)
541 """Simple realm that forwards all requests to a ServerObject.
544 __implements__ = portal.IRealm
546 def requestAvatar(self, avatarId, mind, *interfaces):
547 """Return an avatar based on our ServerObject class.
550 if pb.IPerspective not in interfaces:
551 raise NotImplementedError
552 return pb.IPerspective, ServerObject(avatarId), lambda:None
556 """Parse the command line options.
559 (options, args) as from OptionParser.parse_args()
562 parser = OptionParser(description="Ganeti node daemon",
563 usage="%prog [-f] [-d]",
564 version="%%prog (ganeti) %s" %
565 constants.RELEASE_VERSION)
567 parser.add_option("-f", "--foreground", dest="fork",
568 help="Don't detach from the current terminal",
569 default=True, action="store_false")
570 parser.add_option("-d", "--debug", dest="debug",
571 help="Enable some debug messages",
572 default=False, action="store_true")
573 options, args = parser.parse_args()
578 """Main function for the node daemon.
581 options, args = ParseOptions()
582 utils.debug = options.debug
583 for fname in (constants.SSL_CERT_FILE,):
584 if not os.path.isfile(fname):
585 print "config %s not there, will not run." % fname
589 ss = ssconf.SimpleStore()
590 port = ss.GetNodeDaemonPort()
591 pwdata = ss.GetNodeDaemonPassword()
592 except errors.ConfigurationError, err:
593 print "Cluster configuration incomplete: '%s'" % str(err)
596 # create the various SUB_RUN_DIRS, if not existing, so that we handle the
597 # situation where RUN_DIR is tmpfs
598 for dir_name in constants.SUB_RUN_DIRS:
599 if not os.path.exists(dir_name):
601 os.mkdir(dir_name, 0755)
602 except EnvironmentError, err:
603 if err.errno != errno.EEXIST:
604 print ("Node setup wrong, cannot create directory %s: %s" %
607 if not os.path.isdir(dir_name):
608 print ("Node setup wrong, %s is not a directory" % dir_name)
613 utils.Daemonize(logfile=constants.LOG_NODESERVER)
615 logger.SetupLogging(twisted_workaround=True, debug=options.debug,
616 program="ganeti-noded")
618 p = portal.Portal(MyRealm())
620 checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
621 reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
625 if __name__ == '__main__':