Implement device to instance mapping cache
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import resource
30 import traceback
31
32 from optparse import OptionParser
33
34
35 from ganeti import backend
36 from ganeti import logger
37 from ganeti import constants
38 from ganeti import objects
39 from ganeti import errors
40 from ganeti import ssconf
41 from ganeti import utils
42
43 from twisted.spread import pb
44 from twisted.internet import reactor
45 from twisted.cred import checkers, portal
46 from OpenSSL import SSL
47
48
49 class ServerContextFactory:
50   """SSL context factory class that uses a given certificate.
51
52   """
53   @staticmethod
54   def getContext():
55     """Return a customized context.
56
57     The context will be set to use our certificate.
58
59     """
60     ctx = SSL.Context(SSL.TLSv1_METHOD)
61     ctx.use_certificate_file(constants.SSL_CERT_FILE)
62     ctx.use_privatekey_file(constants.SSL_CERT_FILE)
63     return ctx
64
65 class ServerObject(pb.Avatar):
66   """The server implementation.
67
68   This class holds all methods exposed over the RPC interface.
69
70   """
71   def __init__(self, name):
72     self.name = name
73
74   def perspectiveMessageReceived(self, broker, message, args, kw):
75     """Custom message dispatching function.
76
77     This function overrides the pb.Avatar function in order to provide
78     a simple form of exception passing (as text only).
79
80     """
81     args = broker.unserialize(args, self)
82     kw = broker.unserialize(kw, self)
83     method = getattr(self, "perspective_%s" % message)
84     tb = None
85     state = None
86     try:
87       state = method(*args, **kw)
88     except:
89       tb = traceback.format_exc()
90
91     return broker.serialize((tb, state), self, method, args, kw)
92
93   # the new block devices  --------------------------
94
95   @staticmethod
96   def perspective_blockdev_create(params):
97     """Create a block device.
98
99     """
100     bdev_s, size, owner, on_primary, info = params
101     bdev = objects.Disk.FromDict(bdev_s)
102     if bdev is None:
103       raise ValueError("can't unserialize data!")
104     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
105
106   @staticmethod
107   def perspective_blockdev_remove(params):
108     """Remove a block device.
109
110     """
111     bdev_s = params[0]
112     bdev = objects.Disk.FromDict(bdev_s)
113     return backend.RemoveBlockDevice(bdev)
114
115   @staticmethod
116   def perspective_blockdev_rename(params):
117     """Remove a block device.
118
119     """
120     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
121     return backend.RenameBlockDevices(devlist)
122
123   @staticmethod
124   def perspective_blockdev_assemble(params):
125     """Assemble a block device.
126
127     """
128     bdev_s, owner, on_primary = params
129     bdev = objects.Disk.FromDict(bdev_s)
130     if bdev is None:
131       raise ValueError("can't unserialize data!")
132     return backend.AssembleBlockDevice(bdev, owner, on_primary)
133
134   @staticmethod
135   def perspective_blockdev_shutdown(params):
136     """Shutdown a block device.
137
138     """
139     bdev_s = params[0]
140     bdev = objects.Disk.FromDict(bdev_s)
141     if bdev is None:
142       raise ValueError("can't unserialize data!")
143     return backend.ShutdownBlockDevice(bdev)
144
145   @staticmethod
146   def perspective_blockdev_addchildren(params):
147     """Add a child to a mirror device.
148
149     Note: this is only valid for mirror devices. It's the caller's duty
150     to send a correct disk, otherwise we raise an error.
151
152     """
153     bdev_s, ndev_s = params
154     bdev = objects.Disk.FromDict(bdev_s)
155     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
156     if bdev is None or ndevs.count(None) > 0:
157       raise ValueError("can't unserialize data!")
158     return backend.MirrorAddChildren(bdev, ndevs)
159
160   @staticmethod
161   def perspective_blockdev_removechildren(params):
162     """Remove a child from a mirror device.
163
164     This is only valid for mirror devices, of course. It's the callers
165     duty to send a correct disk, otherwise we raise an error.
166
167     """
168     bdev_s, ndev_s = params
169     bdev = objects.Disk.FromDict(bdev_s)
170     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
171     if bdev is None or ndevs.count(None) > 0:
172       raise ValueError("can't unserialize data!")
173     return backend.MirrorRemoveChildren(bdev, ndevs)
174
175   @staticmethod
176   def perspective_blockdev_getmirrorstatus(params):
177     """Return the mirror status for a list of disks.
178
179     """
180     disks = [objects.Disk.FromDict(dsk_s)
181             for dsk_s in params]
182     return backend.GetMirrorStatus(disks)
183
184   @staticmethod
185   def perspective_blockdev_find(params):
186     """Expose the FindBlockDevice functionality for a disk.
187
188     This will try to find but not activate a disk.
189
190     """
191     disk = objects.Disk.FromDict(params[0])
192     return backend.FindBlockDevice(disk)
193
194   @staticmethod
195   def perspective_blockdev_snapshot(params):
196     """Create a snapshot device.
197
198     Note that this is only valid for LVM disks, if we get passed
199     something else we raise an exception. The snapshot device can be
200     remove by calling the generic block device remove call.
201
202     """
203     cfbd = objects.Disk.FromDict(params[0])
204     return backend.SnapshotBlockDevice(cfbd)
205
206   # export/import  --------------------------
207
208   @staticmethod
209   def perspective_snapshot_export(params):
210     """Export a given snapshot.
211
212     """
213     disk = objects.Disk.FromDict(params[0])
214     dest_node = params[1]
215     instance = objects.Instance.FromDict(params[2])
216     return backend.ExportSnapshot(disk, dest_node, instance)
217
218   @staticmethod
219   def perspective_finalize_export(params):
220     """Expose the finalize export functionality.
221
222     """
223     instance = objects.Instance.FromDict(params[0])
224     snap_disks = [objects.Disk.FromDict(str_data)
225                   for str_data in params[1]]
226     return backend.FinalizeExport(instance, snap_disks)
227
228   @staticmethod
229   def perspective_export_info(params):
230     """Query information about an existing export on this node.
231
232     The given path may not contain an export, in which case we return
233     None.
234
235     """
236     path = params[0]
237     einfo = backend.ExportInfo(path)
238     if einfo is None:
239       return einfo
240     return einfo.Dumps()
241
242   @staticmethod
243   def perspective_export_list(params):
244     """List the available exports on this node.
245
246     Note that as opposed to export_info, which may query data about an
247     export in any path, this only queries the standard Ganeti path
248     (constants.EXPORT_DIR).
249
250     """
251     return backend.ListExports()
252
253   @staticmethod
254   def perspective_export_remove(params):
255     """Remove an export.
256
257     """
258     export = params[0]
259     return backend.RemoveExport(export)
260
261   # volume  --------------------------
262
263   @staticmethod
264   def perspective_volume_list(params):
265     """Query the list of logical volumes in a given volume group.
266
267     """
268     vgname = params[0]
269     return backend.GetVolumeList(vgname)
270
271   @staticmethod
272   def perspective_vg_list(params):
273     """Query the list of volume groups.
274
275     """
276     return backend.ListVolumeGroups()
277
278   # bridge  --------------------------
279
280   @staticmethod
281   def perspective_bridges_exist(params):
282     """Check if all bridges given exist on this node.
283
284     """
285     bridges_list = params[0]
286     return backend.BridgesExist(bridges_list)
287
288   # instance  --------------------------
289
290   @staticmethod
291   def perspective_instance_os_add(params):
292     """Install an OS on a given instance.
293
294     """
295     inst_s, os_disk, swap_disk = params
296     inst = objects.Instance.FromDict(inst_s)
297     return backend.AddOSToInstance(inst, os_disk, swap_disk)
298
299   @staticmethod
300   def perspective_instance_run_rename(params):
301     """Runs the OS rename script for an instance.
302
303     """
304     inst_s, old_name, os_disk, swap_disk = params
305     inst = objects.Instance.FromDict(inst_s)
306     return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
307
308   @staticmethod
309   def perspective_instance_os_import(params):
310     """Run the import function of an OS onto a given instance.
311
312     """
313     inst_s, os_disk, swap_disk, src_node, src_image = params
314     inst = objects.Instance.FromDict(inst_s)
315     return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
316                                         src_node, src_image)
317
318   @staticmethod
319   def perspective_instance_shutdown(params):
320     """Shutdown an instance.
321
322     """
323     instance = objects.Instance.FromDict(params[0])
324     return backend.ShutdownInstance(instance)
325
326   @staticmethod
327   def perspective_instance_start(params):
328     """Start an instance.
329
330     """
331     instance = objects.Instance.FromDict(params[0])
332     extra_args = params[1]
333     return backend.StartInstance(instance, extra_args)
334
335   @staticmethod
336   def perspective_instance_reboot(params):
337     """Reboot an instance.
338
339     """
340     instance = objects.Instance.FromDict(params[0])
341     reboot_type = params[1]
342     extra_args = params[2]
343     return backend.RebootInstance(instance, reboot_type, extra_args)
344
345   @staticmethod
346   def perspective_instance_info(params):
347     """Query instance information.
348
349     """
350     return backend.GetInstanceInfo(params[0])
351
352   @staticmethod
353   def perspective_all_instances_info(params):
354     """Query information about all instances.
355
356     """
357     return backend.GetAllInstancesInfo()
358
359   @staticmethod
360   def perspective_instance_list(params):
361     """Query the list of running instances.
362
363     """
364     return backend.GetInstanceList()
365
366   # node --------------------------
367
368   @staticmethod
369   def perspective_node_tcp_ping(params):
370     """Do a TcpPing on the remote node.
371
372     """
373     return utils.TcpPing(params[0], params[1], params[2],
374                          timeout=params[3], live_port_needed=params[4])
375
376   @staticmethod
377   def perspective_node_info(params):
378     """Query node information.
379
380     """
381     vgname = params[0]
382     return backend.GetNodeInfo(vgname)
383
384   @staticmethod
385   def perspective_node_add(params):
386     """Complete the registration of this node in the cluster.
387
388     """
389     return backend.AddNode(params[0], params[1], params[2],
390                            params[3], params[4], params[5])
391
392   @staticmethod
393   def perspective_node_verify(params):
394     """Run a verify sequence on this node.
395
396     """
397     return backend.VerifyNode(params[0])
398
399   @staticmethod
400   def perspective_node_start_master(params):
401     """Promote this node to master status.
402
403     """
404     return backend.StartMaster()
405
406   @staticmethod
407   def perspective_node_stop_master(params):
408     """Demote this node from master status.
409
410     """
411     return backend.StopMaster()
412
413   @staticmethod
414   def perspective_node_leave_cluster(params):
415     """Cleanup after leaving a cluster.
416
417     """
418     return backend.LeaveCluster()
419
420   @staticmethod
421   def perspective_node_volumes(params):
422     """Query the list of all logical volume groups.
423
424     """
425     return backend.NodeVolumes()
426
427   # cluster --------------------------
428
429   @staticmethod
430   def perspective_version(params):
431     """Query version information.
432
433     """
434     return constants.PROTOCOL_VERSION
435
436   @staticmethod
437   def perspective_upload_file(params):
438     """Upload a file.
439
440     Note that the backend implementation imposes strict rules on which
441     files are accepted.
442
443     """
444     return backend.UploadFile(*params)
445
446
447   # os -----------------------
448
449   @staticmethod
450   def perspective_os_diagnose(params):
451     """Query detailed information about existing OSes.
452
453     """
454     os_list = backend.DiagnoseOS()
455     if not os_list:
456       # this catches also return values of 'False',
457       # for which we can't iterate over
458       return os_list
459     result = []
460     for data in os_list:
461       if isinstance(data, objects.OS):
462         result.append(data.ToDict())
463       elif isinstance(data, errors.InvalidOS):
464         result.append(data.args)
465       else:
466         raise errors.ProgrammerError("Invalid result from backend.DiagnoseOS"
467                                      " (class %s, %s)" %
468                                      (str(data.__class__), data))
469
470     return result
471
472   @staticmethod
473   def perspective_os_get(params):
474     """Query information about a given OS.
475
476     """
477     name = params[0]
478     try:
479       os_obj = backend.OSFromDisk(name).ToDict()
480     except errors.InvalidOS, err:
481       os_obj = err.args
482     return os_obj
483
484   # hooks -----------------------
485
486   @staticmethod
487   def perspective_hooks_runner(params):
488     """Run hook scripts.
489
490     """
491     hpath, phase, env = params
492     hr = backend.HooksRunner()
493     return hr.RunHooks(hpath, phase, env)
494
495
496 class MyRealm:
497   """Simple realm that forwards all requests to a ServerObject.
498
499   """
500   __implements__ = portal.IRealm
501
502   def requestAvatar(self, avatarId, mind, *interfaces):
503     """Return an avatar based on our ServerObject class.
504
505     """
506     if pb.IPerspective not in interfaces:
507       raise NotImplementedError
508     return pb.IPerspective, ServerObject(avatarId), lambda:None
509
510
511 def ParseOptions():
512   """Parse the command line options.
513
514   Returns:
515     (options, args) as from OptionParser.parse_args()
516
517   """
518   parser = OptionParser(description="Ganeti node daemon",
519                         usage="%prog [-f] [-d]",
520                         version="%%prog (ganeti) %s" %
521                         constants.RELEASE_VERSION)
522
523   parser.add_option("-f", "--foreground", dest="fork",
524                     help="Don't detach from the current terminal",
525                     default=True, action="store_false")
526   parser.add_option("-d", "--debug", dest="debug",
527                     help="Enable some debug messages",
528                     default=False, action="store_true")
529   options, args = parser.parse_args()
530   return options, args
531
532
533 def main():
534   """Main function for the node daemon.
535
536   """
537   options, args = ParseOptions()
538   for fname in (constants.SSL_CERT_FILE,):
539     if not os.path.isfile(fname):
540       print "config %s not there, will not run." % fname
541       sys.exit(5)
542
543   try:
544     ss = ssconf.SimpleStore()
545     port = ss.GetNodeDaemonPort()
546     pwdata = ss.GetNodeDaemonPassword()
547   except errors.ConfigurationError, err:
548     print "Cluster configuration incomplete: '%s'" % str(err)
549     sys.exit(5)
550
551   # become a daemon
552   if options.fork:
553     createDaemon()
554
555   logger.SetupLogging(twisted_workaround=True, debug=options.debug,
556                       program="ganeti-noded")
557
558   p = portal.Portal(MyRealm())
559   p.registerChecker(
560     checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
561   reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
562   reactor.run()
563
564
565 def createDaemon():
566   """Detach a process from the controlling terminal and run it in the
567   background as a daemon.
568
569   """
570   UMASK = 077
571   WORKDIR = "/"
572   # Default maximum for the number of available file descriptors.
573   if 'SC_OPEN_MAX' in os.sysconf_names:
574     try:
575       MAXFD = os.sysconf('SC_OPEN_MAX')
576       if MAXFD < 0:
577         MAXFD = 1024
578     except OSError:
579       MAXFD = 1024
580   else:
581     MAXFD = 1024
582   # The standard I/O file descriptors are redirected to /dev/null by default.
583   #REDIRECT_TO = getattr(os, "devnull", "/dev/null")
584   REDIRECT_TO = constants.LOG_NODESERVER
585   try:
586     pid = os.fork()
587   except OSError, e:
588     raise Exception("%s [%d]" % (e.strerror, e.errno))
589   if (pid == 0):  # The first child.
590     os.setsid()
591     try:
592       pid = os.fork() # Fork a second child.
593     except OSError, e:
594       raise Exception("%s [%d]" % (e.strerror, e.errno))
595     if (pid == 0):  # The second child.
596       os.chdir(WORKDIR)
597       os.umask(UMASK)
598     else:
599       # exit() or _exit()?  See below.
600       os._exit(0) # Exit parent (the first child) of the second child.
601   else:
602     os._exit(0) # Exit parent of the first child.
603   maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
604   if (maxfd == resource.RLIM_INFINITY):
605     maxfd = MAXFD
606
607   # Iterate through and close all file descriptors.
608   for fd in range(0, maxfd):
609     try:
610       os.close(fd)
611     except OSError: # ERROR, fd wasn't open to begin with (ignored)
612       pass
613   os.open(REDIRECT_TO, os.O_RDWR|os.O_CREAT|os.O_APPEND, 0600)
614   # Duplicate standard input to standard output and standard error.
615   os.dup2(0, 1)     # standard output (1)
616   os.dup2(0, 2)     # standard error (2)
617   return(0)
618
619
620 if __name__ == '__main__':
621   main()