Import first version of Ganeti Remote API
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import errno
31
32 from optparse import OptionParser
33
34
35 from ganeti import backend
36 from ganeti import logger
37 from ganeti import constants
38 from ganeti import objects
39 from ganeti import errors
40 from ganeti import ssconf
41 from ganeti import utils
42
43 from twisted.spread import pb
44 from twisted.internet import reactor
45 from twisted.cred import checkers, portal
46 from OpenSSL import SSL
47
48
49 class ServerContextFactory:
50   """SSL context factory class that uses a given certificate.
51
52   """
53   @staticmethod
54   def getContext():
55     """Return a customized context.
56
57     The context will be set to use our certificate.
58
59     """
60     ctx = SSL.Context(SSL.TLSv1_METHOD)
61     ctx.use_certificate_file(constants.SSL_CERT_FILE)
62     ctx.use_privatekey_file(constants.SSL_CERT_FILE)
63     return ctx
64
65 class ServerObject(pb.Avatar):
66   """The server implementation.
67
68   This class holds all methods exposed over the RPC interface.
69
70   """
71   def __init__(self, name):
72     self.name = name
73
74   def perspectiveMessageReceived(self, broker, message, args, kw):
75     """Custom message dispatching function.
76
77     This function overrides the pb.Avatar function in order to provide
78     a simple form of exception passing (as text only).
79
80     """
81     args = broker.unserialize(args, self)
82     kw = broker.unserialize(kw, self)
83     method = getattr(self, "perspective_%s" % message)
84     tb = None
85     state = None
86     try:
87       state = method(*args, **kw)
88     except:
89       tb = traceback.format_exc()
90
91     return broker.serialize((tb, state), self, method, args, kw)
92
93   # the new block devices  --------------------------
94
95   @staticmethod
96   def perspective_blockdev_create(params):
97     """Create a block device.
98
99     """
100     bdev_s, size, owner, on_primary, info = params
101     bdev = objects.Disk.FromDict(bdev_s)
102     if bdev is None:
103       raise ValueError("can't unserialize data!")
104     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
105
106   @staticmethod
107   def perspective_blockdev_remove(params):
108     """Remove a block device.
109
110     """
111     bdev_s = params[0]
112     bdev = objects.Disk.FromDict(bdev_s)
113     return backend.RemoveBlockDevice(bdev)
114
115   @staticmethod
116   def perspective_blockdev_rename(params):
117     """Remove a block device.
118
119     """
120     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
121     return backend.RenameBlockDevices(devlist)
122
123   @staticmethod
124   def perspective_blockdev_assemble(params):
125     """Assemble a block device.
126
127     """
128     bdev_s, owner, on_primary = params
129     bdev = objects.Disk.FromDict(bdev_s)
130     if bdev is None:
131       raise ValueError("can't unserialize data!")
132     return backend.AssembleBlockDevice(bdev, owner, on_primary)
133
134   @staticmethod
135   def perspective_blockdev_shutdown(params):
136     """Shutdown a block device.
137
138     """
139     bdev_s = params[0]
140     bdev = objects.Disk.FromDict(bdev_s)
141     if bdev is None:
142       raise ValueError("can't unserialize data!")
143     return backend.ShutdownBlockDevice(bdev)
144
145   @staticmethod
146   def perspective_blockdev_addchildren(params):
147     """Add a child to a mirror device.
148
149     Note: this is only valid for mirror devices. It's the caller's duty
150     to send a correct disk, otherwise we raise an error.
151
152     """
153     bdev_s, ndev_s = params
154     bdev = objects.Disk.FromDict(bdev_s)
155     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
156     if bdev is None or ndevs.count(None) > 0:
157       raise ValueError("can't unserialize data!")
158     return backend.MirrorAddChildren(bdev, ndevs)
159
160   @staticmethod
161   def perspective_blockdev_removechildren(params):
162     """Remove a child from a mirror device.
163
164     This is only valid for mirror devices, of course. It's the callers
165     duty to send a correct disk, otherwise we raise an error.
166
167     """
168     bdev_s, ndev_s = params
169     bdev = objects.Disk.FromDict(bdev_s)
170     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
171     if bdev is None or ndevs.count(None) > 0:
172       raise ValueError("can't unserialize data!")
173     return backend.MirrorRemoveChildren(bdev, ndevs)
174
175   @staticmethod
176   def perspective_blockdev_getmirrorstatus(params):
177     """Return the mirror status for a list of disks.
178
179     """
180     disks = [objects.Disk.FromDict(dsk_s)
181             for dsk_s in params]
182     return backend.GetMirrorStatus(disks)
183
184   @staticmethod
185   def perspective_blockdev_find(params):
186     """Expose the FindBlockDevice functionality for a disk.
187
188     This will try to find but not activate a disk.
189
190     """
191     disk = objects.Disk.FromDict(params[0])
192     return backend.FindBlockDevice(disk)
193
194   @staticmethod
195   def perspective_blockdev_snapshot(params):
196     """Create a snapshot device.
197
198     Note that this is only valid for LVM disks, if we get passed
199     something else we raise an exception. The snapshot device can be
200     remove by calling the generic block device remove call.
201
202     """
203     cfbd = objects.Disk.FromDict(params[0])
204     return backend.SnapshotBlockDevice(cfbd)
205
206   # export/import  --------------------------
207
208   @staticmethod
209   def perspective_snapshot_export(params):
210     """Export a given snapshot.
211
212     """
213     disk = objects.Disk.FromDict(params[0])
214     dest_node = params[1]
215     instance = objects.Instance.FromDict(params[2])
216     return backend.ExportSnapshot(disk, dest_node, instance)
217
218   @staticmethod
219   def perspective_finalize_export(params):
220     """Expose the finalize export functionality.
221
222     """
223     instance = objects.Instance.FromDict(params[0])
224     snap_disks = [objects.Disk.FromDict(str_data)
225                   for str_data in params[1]]
226     return backend.FinalizeExport(instance, snap_disks)
227
228   @staticmethod
229   def perspective_export_info(params):
230     """Query information about an existing export on this node.
231
232     The given path may not contain an export, in which case we return
233     None.
234
235     """
236     path = params[0]
237     einfo = backend.ExportInfo(path)
238     if einfo is None:
239       return einfo
240     return einfo.Dumps()
241
242   @staticmethod
243   def perspective_export_list(params):
244     """List the available exports on this node.
245
246     Note that as opposed to export_info, which may query data about an
247     export in any path, this only queries the standard Ganeti path
248     (constants.EXPORT_DIR).
249
250     """
251     return backend.ListExports()
252
253   @staticmethod
254   def perspective_export_remove(params):
255     """Remove an export.
256
257     """
258     export = params[0]
259     return backend.RemoveExport(export)
260
261   # volume  --------------------------
262
263   @staticmethod
264   def perspective_volume_list(params):
265     """Query the list of logical volumes in a given volume group.
266
267     """
268     vgname = params[0]
269     return backend.GetVolumeList(vgname)
270
271   @staticmethod
272   def perspective_vg_list(params):
273     """Query the list of volume groups.
274
275     """
276     return backend.ListVolumeGroups()
277
278   # bridge  --------------------------
279
280   @staticmethod
281   def perspective_bridges_exist(params):
282     """Check if all bridges given exist on this node.
283
284     """
285     bridges_list = params[0]
286     return backend.BridgesExist(bridges_list)
287
288   # instance  --------------------------
289
290   @staticmethod
291   def perspective_instance_os_add(params):
292     """Install an OS on a given instance.
293
294     """
295     inst_s, os_disk, swap_disk = params
296     inst = objects.Instance.FromDict(inst_s)
297     return backend.AddOSToInstance(inst, os_disk, swap_disk)
298
299   @staticmethod
300   def perspective_instance_run_rename(params):
301     """Runs the OS rename script for an instance.
302
303     """
304     inst_s, old_name, os_disk, swap_disk = params
305     inst = objects.Instance.FromDict(inst_s)
306     return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
307
308   @staticmethod
309   def perspective_instance_os_import(params):
310     """Run the import function of an OS onto a given instance.
311
312     """
313     inst_s, os_disk, swap_disk, src_node, src_image = params
314     inst = objects.Instance.FromDict(inst_s)
315     return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
316                                         src_node, src_image)
317
318   @staticmethod
319   def perspective_instance_shutdown(params):
320     """Shutdown an instance.
321
322     """
323     instance = objects.Instance.FromDict(params[0])
324     return backend.ShutdownInstance(instance)
325
326   @staticmethod
327   def perspective_instance_start(params):
328     """Start an instance.
329
330     """
331     instance = objects.Instance.FromDict(params[0])
332     extra_args = params[1]
333     return backend.StartInstance(instance, extra_args)
334
335   @staticmethod
336   def perspective_instance_reboot(params):
337     """Reboot an instance.
338
339     """
340     instance = objects.Instance.FromDict(params[0])
341     reboot_type = params[1]
342     extra_args = params[2]
343     return backend.RebootInstance(instance, reboot_type, extra_args)
344
345   @staticmethod
346   def perspective_instance_info(params):
347     """Query instance information.
348
349     """
350     return backend.GetInstanceInfo(params[0])
351
352   @staticmethod
353   def perspective_all_instances_info(params):
354     """Query information about all instances.
355
356     """
357     return backend.GetAllInstancesInfo()
358
359   @staticmethod
360   def perspective_instance_list(params):
361     """Query the list of running instances.
362
363     """
364     return backend.GetInstanceList()
365
366   # node --------------------------
367
368   @staticmethod
369   def perspective_node_tcp_ping(params):
370     """Do a TcpPing on the remote node.
371
372     """
373     return utils.TcpPing(params[1], params[2], timeout=params[3],
374                          live_port_needed=params[4], source=params[0])
375
376   @staticmethod
377   def perspective_node_info(params):
378     """Query node information.
379
380     """
381     vgname = params[0]
382     return backend.GetNodeInfo(vgname)
383
384   @staticmethod
385   def perspective_node_add(params):
386     """Complete the registration of this node in the cluster.
387
388     """
389     return backend.AddNode(params[0], params[1], params[2],
390                            params[3], params[4], params[5])
391
392   @staticmethod
393   def perspective_node_verify(params):
394     """Run a verify sequence on this node.
395
396     """
397     return backend.VerifyNode(params[0])
398
399   @staticmethod
400   def perspective_node_start_master(params):
401     """Promote this node to master status.
402
403     """
404     return backend.StartMaster()
405
406   @staticmethod
407   def perspective_node_stop_master(params):
408     """Demote this node from master status.
409
410     """
411     return backend.StopMaster()
412
413   @staticmethod
414   def perspective_node_leave_cluster(params):
415     """Cleanup after leaving a cluster.
416
417     """
418     return backend.LeaveCluster()
419
420   @staticmethod
421   def perspective_node_volumes(params):
422     """Query the list of all logical volume groups.
423
424     """
425     return backend.NodeVolumes()
426
427   # cluster --------------------------
428
429   @staticmethod
430   def perspective_version(params):
431     """Query version information.
432
433     """
434     return constants.PROTOCOL_VERSION
435
436   @staticmethod
437   def perspective_upload_file(params):
438     """Upload a file.
439
440     Note that the backend implementation imposes strict rules on which
441     files are accepted.
442
443     """
444     return backend.UploadFile(*params)
445
446
447   # os -----------------------
448
449   @staticmethod
450   def perspective_os_diagnose(params):
451     """Query detailed information about existing OSes.
452
453     """
454     return [os.ToDict() for os in backend.DiagnoseOS()]
455
456   @staticmethod
457   def perspective_os_get(params):
458     """Query information about a given OS.
459
460     """
461     name = params[0]
462     try:
463       os_obj = backend.OSFromDisk(name)
464     except errors.InvalidOS, err:
465       os_obj = objects.OS.FromInvalidOS(err)
466     return os_obj.ToDict()
467
468   # hooks -----------------------
469
470   @staticmethod
471   def perspective_hooks_runner(params):
472     """Run hook scripts.
473
474     """
475     hpath, phase, env = params
476     hr = backend.HooksRunner()
477     return hr.RunHooks(hpath, phase, env)
478
479   # test -----------------------
480
481   @staticmethod
482   def perspective_test_delay(params):
483     """Run test delay.
484
485     """
486     duration = params[0]
487     return utils.TestDelay(duration)
488
489
490 class MyRealm:
491   """Simple realm that forwards all requests to a ServerObject.
492
493   """
494   __implements__ = portal.IRealm
495
496   def requestAvatar(self, avatarId, mind, *interfaces):
497     """Return an avatar based on our ServerObject class.
498
499     """
500     if pb.IPerspective not in interfaces:
501       raise NotImplementedError
502     return pb.IPerspective, ServerObject(avatarId), lambda:None
503
504
505 def ParseOptions():
506   """Parse the command line options.
507
508   Returns:
509     (options, args) as from OptionParser.parse_args()
510
511   """
512   parser = OptionParser(description="Ganeti node daemon",
513                         usage="%prog [-f] [-d]",
514                         version="%%prog (ganeti) %s" %
515                         constants.RELEASE_VERSION)
516
517   parser.add_option("-f", "--foreground", dest="fork",
518                     help="Don't detach from the current terminal",
519                     default=True, action="store_false")
520   parser.add_option("-d", "--debug", dest="debug",
521                     help="Enable some debug messages",
522                     default=False, action="store_true")
523   options, args = parser.parse_args()
524   return options, args
525
526
527 def main():
528   """Main function for the node daemon.
529
530   """
531   options, args = ParseOptions()
532   utils.debug = options.debug
533   for fname in (constants.SSL_CERT_FILE,):
534     if not os.path.isfile(fname):
535       print "config %s not there, will not run." % fname
536       sys.exit(5)
537
538   try:
539     ss = ssconf.SimpleStore()
540     port = ss.GetNodeDaemonPort()
541     pwdata = ss.GetNodeDaemonPassword()
542   except errors.ConfigurationError, err:
543     print "Cluster configuration incomplete: '%s'" % str(err)
544     sys.exit(5)
545
546   # create /var/run/ganeti if not existing, in order to take care of
547   # tmpfs /var/run
548   if not os.path.exists(constants.BDEV_CACHE_DIR):
549     try:
550       os.mkdir(constants.BDEV_CACHE_DIR, 0755)
551     except EnvironmentError, err:
552       if err.errno != errno.EEXIST:
553         print ("Node setup wrong, cannot create directory %s: %s" %
554                (constants.BDEV_CACHE_DIR, err))
555         sys.exit(5)
556   if not os.path.isdir(constants.BDEV_CACHE_DIR):
557     print ("Node setup wrong, %s is not a directory" %
558            constants.BDEV_CACHE_DIR)
559     sys.exit(5)
560
561   # become a daemon
562   if options.fork:
563     utils.Daemonize(logfile=constants.LOG_NODESERVER)
564
565   logger.SetupLogging(twisted_workaround=True, debug=options.debug,
566                       program="ganeti-noded")
567
568   p = portal.Portal(MyRealm())
569   p.registerChecker(
570     checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
571   reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
572   reactor.run()
573
574
575 if __name__ == '__main__':
576   main()