4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Ganeti node daemon"""
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
32 from optparse import OptionParser
35 from ganeti import backend
36 from ganeti import logger
37 from ganeti import constants
38 from ganeti import objects
39 from ganeti import errors
40 from ganeti import ssconf
41 from ganeti import utils
43 from twisted.spread import pb
44 from twisted.internet import reactor
45 from twisted.cred import checkers, portal
46 from OpenSSL import SSL
49 class ServerContextFactory:
50 """SSL context factory class that uses a given certificate.
55 """Return a customized context.
57 The context will be set to use our certificate.
60 ctx = SSL.Context(SSL.TLSv1_METHOD)
61 ctx.use_certificate_file(constants.SSL_CERT_FILE)
62 ctx.use_privatekey_file(constants.SSL_CERT_FILE)
65 class ServerObject(pb.Avatar):
66 """The server implementation.
68 This class holds all methods exposed over the RPC interface.
71 def __init__(self, name):
74 def perspectiveMessageReceived(self, broker, message, args, kw):
75 """Custom message dispatching function.
77 This function overrides the pb.Avatar function in order to provide
78 a simple form of exception passing (as text only).
81 args = broker.unserialize(args, self)
82 kw = broker.unserialize(kw, self)
83 method = getattr(self, "perspective_%s" % message)
87 state = method(*args, **kw)
89 tb = traceback.format_exc()
91 return broker.serialize((tb, state), self, method, args, kw)
93 # the new block devices --------------------------
96 def perspective_blockdev_create(params):
97 """Create a block device.
100 bdev_s, size, owner, on_primary, info = params
101 bdev = objects.Disk.FromDict(bdev_s)
103 raise ValueError("can't unserialize data!")
104 return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
107 def perspective_blockdev_remove(params):
108 """Remove a block device.
112 bdev = objects.Disk.FromDict(bdev_s)
113 return backend.RemoveBlockDevice(bdev)
116 def perspective_blockdev_rename(params):
117 """Remove a block device.
120 devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
121 return backend.RenameBlockDevices(devlist)
124 def perspective_blockdev_assemble(params):
125 """Assemble a block device.
128 bdev_s, owner, on_primary = params
129 bdev = objects.Disk.FromDict(bdev_s)
131 raise ValueError("can't unserialize data!")
132 return backend.AssembleBlockDevice(bdev, owner, on_primary)
135 def perspective_blockdev_shutdown(params):
136 """Shutdown a block device.
140 bdev = objects.Disk.FromDict(bdev_s)
142 raise ValueError("can't unserialize data!")
143 return backend.ShutdownBlockDevice(bdev)
146 def perspective_blockdev_addchildren(params):
147 """Add a child to a mirror device.
149 Note: this is only valid for mirror devices. It's the caller's duty
150 to send a correct disk, otherwise we raise an error.
153 bdev_s, ndev_s = params
154 bdev = objects.Disk.FromDict(bdev_s)
155 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
156 if bdev is None or ndevs.count(None) > 0:
157 raise ValueError("can't unserialize data!")
158 return backend.MirrorAddChildren(bdev, ndevs)
161 def perspective_blockdev_removechildren(params):
162 """Remove a child from a mirror device.
164 This is only valid for mirror devices, of course. It's the callers
165 duty to send a correct disk, otherwise we raise an error.
168 bdev_s, ndev_s = params
169 bdev = objects.Disk.FromDict(bdev_s)
170 ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
171 if bdev is None or ndevs.count(None) > 0:
172 raise ValueError("can't unserialize data!")
173 return backend.MirrorRemoveChildren(bdev, ndevs)
176 def perspective_blockdev_getmirrorstatus(params):
177 """Return the mirror status for a list of disks.
180 disks = [objects.Disk.FromDict(dsk_s)
182 return backend.GetMirrorStatus(disks)
185 def perspective_blockdev_find(params):
186 """Expose the FindBlockDevice functionality for a disk.
188 This will try to find but not activate a disk.
191 disk = objects.Disk.FromDict(params[0])
192 return backend.FindBlockDevice(disk)
195 def perspective_blockdev_snapshot(params):
196 """Create a snapshot device.
198 Note that this is only valid for LVM disks, if we get passed
199 something else we raise an exception. The snapshot device can be
200 remove by calling the generic block device remove call.
203 cfbd = objects.Disk.FromDict(params[0])
204 return backend.SnapshotBlockDevice(cfbd)
206 # export/import --------------------------
209 def perspective_snapshot_export(params):
210 """Export a given snapshot.
213 disk = objects.Disk.FromDict(params[0])
214 dest_node = params[1]
215 instance = objects.Instance.FromDict(params[2])
216 return backend.ExportSnapshot(disk, dest_node, instance)
219 def perspective_finalize_export(params):
220 """Expose the finalize export functionality.
223 instance = objects.Instance.FromDict(params[0])
224 snap_disks = [objects.Disk.FromDict(str_data)
225 for str_data in params[1]]
226 return backend.FinalizeExport(instance, snap_disks)
229 def perspective_export_info(params):
230 """Query information about an existing export on this node.
232 The given path may not contain an export, in which case we return
237 einfo = backend.ExportInfo(path)
243 def perspective_export_list(params):
244 """List the available exports on this node.
246 Note that as opposed to export_info, which may query data about an
247 export in any path, this only queries the standard Ganeti path
248 (constants.EXPORT_DIR).
251 return backend.ListExports()
254 def perspective_export_remove(params):
259 return backend.RemoveExport(export)
261 # volume --------------------------
264 def perspective_volume_list(params):
265 """Query the list of logical volumes in a given volume group.
269 return backend.GetVolumeList(vgname)
272 def perspective_vg_list(params):
273 """Query the list of volume groups.
276 return backend.ListVolumeGroups()
278 # bridge --------------------------
281 def perspective_bridges_exist(params):
282 """Check if all bridges given exist on this node.
285 bridges_list = params[0]
286 return backend.BridgesExist(bridges_list)
288 # instance --------------------------
291 def perspective_instance_os_add(params):
292 """Install an OS on a given instance.
295 inst_s, os_disk, swap_disk = params
296 inst = objects.Instance.FromDict(inst_s)
297 return backend.AddOSToInstance(inst, os_disk, swap_disk)
300 def perspective_instance_run_rename(params):
301 """Runs the OS rename script for an instance.
304 inst_s, old_name, os_disk, swap_disk = params
305 inst = objects.Instance.FromDict(inst_s)
306 return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
309 def perspective_instance_os_import(params):
310 """Run the import function of an OS onto a given instance.
313 inst_s, os_disk, swap_disk, src_node, src_image = params
314 inst = objects.Instance.FromDict(inst_s)
315 return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
319 def perspective_instance_shutdown(params):
320 """Shutdown an instance.
323 instance = objects.Instance.FromDict(params[0])
324 return backend.ShutdownInstance(instance)
327 def perspective_instance_start(params):
328 """Start an instance.
331 instance = objects.Instance.FromDict(params[0])
332 extra_args = params[1]
333 return backend.StartInstance(instance, extra_args)
336 def perspective_instance_reboot(params):
337 """Reboot an instance.
340 instance = objects.Instance.FromDict(params[0])
341 reboot_type = params[1]
342 extra_args = params[2]
343 return backend.RebootInstance(instance, reboot_type, extra_args)
346 def perspective_instance_info(params):
347 """Query instance information.
350 return backend.GetInstanceInfo(params[0])
353 def perspective_all_instances_info(params):
354 """Query information about all instances.
357 return backend.GetAllInstancesInfo()
360 def perspective_instance_list(params):
361 """Query the list of running instances.
364 return backend.GetInstanceList()
366 # node --------------------------
369 def perspective_node_tcp_ping(params):
370 """Do a TcpPing on the remote node.
373 return utils.TcpPing(params[0], params[1], params[2],
374 timeout=params[3], live_port_needed=params[4])
377 def perspective_node_info(params):
378 """Query node information.
382 return backend.GetNodeInfo(vgname)
385 def perspective_node_add(params):
386 """Complete the registration of this node in the cluster.
389 return backend.AddNode(params[0], params[1], params[2],
390 params[3], params[4], params[5])
393 def perspective_node_verify(params):
394 """Run a verify sequence on this node.
397 return backend.VerifyNode(params[0])
400 def perspective_node_start_master(params):
401 """Promote this node to master status.
404 return backend.StartMaster()
407 def perspective_node_stop_master(params):
408 """Demote this node from master status.
411 return backend.StopMaster()
414 def perspective_node_leave_cluster(params):
415 """Cleanup after leaving a cluster.
418 return backend.LeaveCluster()
421 def perspective_node_volumes(params):
422 """Query the list of all logical volume groups.
425 return backend.NodeVolumes()
427 # cluster --------------------------
430 def perspective_version(params):
431 """Query version information.
434 return constants.PROTOCOL_VERSION
437 def perspective_upload_file(params):
440 Note that the backend implementation imposes strict rules on which
444 return backend.UploadFile(*params)
447 # os -----------------------
450 def perspective_os_diagnose(params):
451 """Query detailed information about existing OSes.
454 os_list = backend.DiagnoseOS()
456 # this catches also return values of 'False',
457 # for which we can't iterate over
461 if isinstance(data, objects.OS):
462 result.append(data.ToDict())
463 elif isinstance(data, errors.InvalidOS):
464 result.append(data.args)
466 raise errors.ProgrammerError("Invalid result from backend.DiagnoseOS"
468 (str(data.__class__), data))
473 def perspective_os_get(params):
474 """Query information about a given OS.
479 os_obj = backend.OSFromDisk(name).ToDict()
480 except errors.InvalidOS, err:
484 # hooks -----------------------
487 def perspective_hooks_runner(params):
491 hpath, phase, env = params
492 hr = backend.HooksRunner()
493 return hr.RunHooks(hpath, phase, env)
497 """Simple realm that forwards all requests to a ServerObject.
500 __implements__ = portal.IRealm
502 def requestAvatar(self, avatarId, mind, *interfaces):
503 """Return an avatar based on our ServerObject class.
506 if pb.IPerspective not in interfaces:
507 raise NotImplementedError
508 return pb.IPerspective, ServerObject(avatarId), lambda:None
512 """Parse the command line options.
515 (options, args) as from OptionParser.parse_args()
518 parser = OptionParser(description="Ganeti node daemon",
519 usage="%prog [-f] [-d]",
520 version="%%prog (ganeti) %s" %
521 constants.RELEASE_VERSION)
523 parser.add_option("-f", "--foreground", dest="fork",
524 help="Don't detach from the current terminal",
525 default=True, action="store_false")
526 parser.add_option("-d", "--debug", dest="debug",
527 help="Enable some debug messages",
528 default=False, action="store_true")
529 options, args = parser.parse_args()
534 """Main function for the node daemon.
537 options, args = ParseOptions()
538 for fname in (constants.SSL_CERT_FILE,):
539 if not os.path.isfile(fname):
540 print "config %s not there, will not run." % fname
544 ss = ssconf.SimpleStore()
545 port = ss.GetNodeDaemonPort()
546 pwdata = ss.GetNodeDaemonPassword()
547 except errors.ConfigurationError, err:
548 print "Cluster configuration incomplete: '%s'" % str(err)
555 logger.SetupLogging(twisted_workaround=True, debug=options.debug,
556 program="ganeti-noded")
558 p = portal.Portal(MyRealm())
560 checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
561 reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
566 """Detach a process from the controlling terminal and run it in the
567 background as a daemon.
572 # Default maximum for the number of available file descriptors.
573 if 'SC_OPEN_MAX' in os.sysconf_names:
575 MAXFD = os.sysconf('SC_OPEN_MAX')
582 # The standard I/O file descriptors are redirected to /dev/null by default.
583 #REDIRECT_TO = getattr(os, "devnull", "/dev/null")
584 REDIRECT_TO = constants.LOG_NODESERVER
588 raise Exception("%s [%d]" % (e.strerror, e.errno))
589 if (pid == 0): # The first child.
592 pid = os.fork() # Fork a second child.
594 raise Exception("%s [%d]" % (e.strerror, e.errno))
595 if (pid == 0): # The second child.
599 # exit() or _exit()? See below.
600 os._exit(0) # Exit parent (the first child) of the second child.
602 os._exit(0) # Exit parent of the first child.
603 maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
604 if (maxfd == resource.RLIM_INFINITY):
607 # Iterate through and close all file descriptors.
608 for fd in range(0, maxfd):
611 except OSError: # ERROR, fd wasn't open to begin with (ignored)
613 os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND, 0600)
614 # Duplicate standard input to standard output and standard error.
615 os.dup2(0, 1) # standard output (1)
616 os.dup2(0, 2) # standard error (2)
620 if __name__ == '__main__':