Add a rpc call for drbd network reconfiguration
[ganeti-local] / daemons / ganeti-noded
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Ganeti node daemon"""
23
24 # functions in this module need to have a given name structure, so:
25 # pylint: disable-msg=C0103
26
27 import os
28 import sys
29 import traceback
30 import errno
31
32 from optparse import OptionParser
33
34 from ganeti import backend
35 from ganeti import logger
36 from ganeti import constants
37 from ganeti import objects
38 from ganeti import errors
39 from ganeti import ssconf
40 from ganeti import utils
41
42 from twisted.spread import pb
43 from twisted.internet import reactor
44 from twisted.cred import checkers, portal
45 from OpenSSL import SSL
46
47
48 class ServerContextFactory:
49   """SSL context factory class that uses a given certificate.
50
51   """
52   @staticmethod
53   def getContext():
54     """Return a customized context.
55
56     The context will be set to use our certificate.
57
58     """
59     ctx = SSL.Context(SSL.TLSv1_METHOD)
60     ctx.use_certificate_file(constants.SSL_CERT_FILE)
61     ctx.use_privatekey_file(constants.SSL_CERT_FILE)
62     return ctx
63
64
65 class ServerObject(pb.Avatar):
66   """The server implementation.
67
68   This class holds all methods exposed over the RPC interface.
69
70   """
71   def __init__(self, name):
72     self.name = name
73
74   def perspectiveMessageReceived(self, broker, message, args, kw):
75     """Custom message dispatching function.
76
77     This function overrides the pb.Avatar function in order to provide
78     a simple form of exception passing (as text only).
79
80     """
81     args = broker.unserialize(args, self)
82     kw = broker.unserialize(kw, self)
83     method = getattr(self, "perspective_%s" % message)
84     tb = None
85     state = None
86     try:
87       state = method(*args, **kw)
88     except:
89       tb = traceback.format_exc()
90
91     return broker.serialize((tb, state), self, method, args, kw)
92
93   # the new block devices  --------------------------
94
95   @staticmethod
96   def perspective_blockdev_create(params):
97     """Create a block device.
98
99     """
100     bdev_s, size, owner, on_primary, info = params
101     bdev = objects.Disk.FromDict(bdev_s)
102     if bdev is None:
103       raise ValueError("can't unserialize data!")
104     return backend.CreateBlockDevice(bdev, size, owner, on_primary, info)
105
106   @staticmethod
107   def perspective_blockdev_remove(params):
108     """Remove a block device.
109
110     """
111     bdev_s = params[0]
112     bdev = objects.Disk.FromDict(bdev_s)
113     return backend.RemoveBlockDevice(bdev)
114
115   @staticmethod
116   def perspective_blockdev_rename(params):
117     """Remove a block device.
118
119     """
120     devlist = [(objects.Disk.FromDict(ds), uid) for ds, uid in params]
121     return backend.RenameBlockDevices(devlist)
122
123   @staticmethod
124   def perspective_blockdev_assemble(params):
125     """Assemble a block device.
126
127     """
128     bdev_s, owner, on_primary = params
129     bdev = objects.Disk.FromDict(bdev_s)
130     if bdev is None:
131       raise ValueError("can't unserialize data!")
132     return backend.AssembleBlockDevice(bdev, owner, on_primary)
133
134   @staticmethod
135   def perspective_blockdev_shutdown(params):
136     """Shutdown a block device.
137
138     """
139     bdev_s = params[0]
140     bdev = objects.Disk.FromDict(bdev_s)
141     if bdev is None:
142       raise ValueError("can't unserialize data!")
143     return backend.ShutdownBlockDevice(bdev)
144
145   @staticmethod
146   def perspective_blockdev_addchildren(params):
147     """Add a child to a mirror device.
148
149     Note: this is only valid for mirror devices. It's the caller's duty
150     to send a correct disk, otherwise we raise an error.
151
152     """
153     bdev_s, ndev_s = params
154     bdev = objects.Disk.FromDict(bdev_s)
155     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
156     if bdev is None or ndevs.count(None) > 0:
157       raise ValueError("can't unserialize data!")
158     return backend.MirrorAddChildren(bdev, ndevs)
159
160   @staticmethod
161   def perspective_blockdev_removechildren(params):
162     """Remove a child from a mirror device.
163
164     This is only valid for mirror devices, of course. It's the callers
165     duty to send a correct disk, otherwise we raise an error.
166
167     """
168     bdev_s, ndev_s = params
169     bdev = objects.Disk.FromDict(bdev_s)
170     ndevs = [objects.Disk.FromDict(disk_s) for disk_s in ndev_s]
171     if bdev is None or ndevs.count(None) > 0:
172       raise ValueError("can't unserialize data!")
173     return backend.MirrorRemoveChildren(bdev, ndevs)
174
175   @staticmethod
176   def perspective_blockdev_getmirrorstatus(params):
177     """Return the mirror status for a list of disks.
178
179     """
180     disks = [objects.Disk.FromDict(dsk_s)
181             for dsk_s in params]
182     return backend.GetMirrorStatus(disks)
183
184   @staticmethod
185   def perspective_blockdev_find(params):
186     """Expose the FindBlockDevice functionality for a disk.
187
188     This will try to find but not activate a disk.
189
190     """
191     disk = objects.Disk.FromDict(params[0])
192     return backend.FindBlockDevice(disk)
193
194   @staticmethod
195   def perspective_blockdev_snapshot(params):
196     """Create a snapshot device.
197
198     Note that this is only valid for LVM disks, if we get passed
199     something else we raise an exception. The snapshot device can be
200     remove by calling the generic block device remove call.
201
202     """
203     cfbd = objects.Disk.FromDict(params[0])
204     return backend.SnapshotBlockDevice(cfbd)
205
206   @staticmethod
207   def perspective_blockdev_grow(params):
208     """Grow a stack of devices.
209
210     """
211     cfbd = objects.Disk.FromDict(params[0])
212     amount = params[1]
213     return backend.GrowBlockDevice(cfbd, amount)
214
215   @staticmethod
216   def perspective_blockdev_close(params):
217     """Closes the given block devices.
218
219     """
220     disks = [objects.Disk.FromDict(cf) for cf in params[1]]
221     return backend.CloseBlockDevices(params[0], disks)
222
223   # blockdev/drbd specific methods ----------
224
225   @staticmethod
226   def perspective_drbd_reconfig_net(params):
227     """Re-configures the network connection of drbd disks.
228
229     Note that this is only valid for drbd disks, so the members of the
230     disk list must all be drbd devices.
231
232     """
233     instance_name, disks, nodes_ip, multimaster = params
234     disks = [objects.Disk.FromDict(cf) for cf in disks]
235     return backend.DrbdReconfigNet(instance_name, disks, nodes_ip, multimaster)
236
237   # export/import  --------------------------
238
239   @staticmethod
240   def perspective_snapshot_export(params):
241     """Export a given snapshot.
242
243     """
244     disk = objects.Disk.FromDict(params[0])
245     dest_node = params[1]
246     instance = objects.Instance.FromDict(params[2])
247     return backend.ExportSnapshot(disk, dest_node, instance)
248
249   @staticmethod
250   def perspective_finalize_export(params):
251     """Expose the finalize export functionality.
252
253     """
254     instance = objects.Instance.FromDict(params[0])
255     snap_disks = [objects.Disk.FromDict(str_data)
256                   for str_data in params[1]]
257     return backend.FinalizeExport(instance, snap_disks)
258
259   @staticmethod
260   def perspective_export_info(params):
261     """Query information about an existing export on this node.
262
263     The given path may not contain an export, in which case we return
264     None.
265
266     """
267     path = params[0]
268     einfo = backend.ExportInfo(path)
269     if einfo is None:
270       return einfo
271     return einfo.Dumps()
272
273   @staticmethod
274   def perspective_export_list(params):
275     """List the available exports on this node.
276
277     Note that as opposed to export_info, which may query data about an
278     export in any path, this only queries the standard Ganeti path
279     (constants.EXPORT_DIR).
280
281     """
282     return backend.ListExports()
283
284   @staticmethod
285   def perspective_export_remove(params):
286     """Remove an export.
287
288     """
289     export = params[0]
290     return backend.RemoveExport(export)
291
292   # volume  --------------------------
293
294   @staticmethod
295   def perspective_volume_list(params):
296     """Query the list of logical volumes in a given volume group.
297
298     """
299     vgname = params[0]
300     return backend.GetVolumeList(vgname)
301
302   @staticmethod
303   def perspective_vg_list(params):
304     """Query the list of volume groups.
305
306     """
307     return backend.ListVolumeGroups()
308
309   # bridge  --------------------------
310
311   @staticmethod
312   def perspective_bridges_exist(params):
313     """Check if all bridges given exist on this node.
314
315     """
316     bridges_list = params[0]
317     return backend.BridgesExist(bridges_list)
318
319   # instance  --------------------------
320
321   @staticmethod
322   def perspective_instance_os_add(params):
323     """Install an OS on a given instance.
324
325     """
326     inst_s, os_disk, swap_disk = params
327     inst = objects.Instance.FromDict(inst_s)
328     return backend.AddOSToInstance(inst, os_disk, swap_disk)
329
330   @staticmethod
331   def perspective_instance_run_rename(params):
332     """Runs the OS rename script for an instance.
333
334     """
335     inst_s, old_name, os_disk, swap_disk = params
336     inst = objects.Instance.FromDict(inst_s)
337     return backend.RunRenameInstance(inst, old_name, os_disk, swap_disk)
338
339   @staticmethod
340   def perspective_instance_os_import(params):
341     """Run the import function of an OS onto a given instance.
342
343     """
344     inst_s, os_disk, swap_disk, src_node, src_image = params
345     inst = objects.Instance.FromDict(inst_s)
346     return backend.ImportOSIntoInstance(inst, os_disk, swap_disk,
347                                         src_node, src_image)
348
349   @staticmethod
350   def perspective_instance_shutdown(params):
351     """Shutdown an instance.
352
353     """
354     instance = objects.Instance.FromDict(params[0])
355     return backend.ShutdownInstance(instance)
356
357   @staticmethod
358   def perspective_instance_start(params):
359     """Start an instance.
360
361     """
362     instance = objects.Instance.FromDict(params[0])
363     extra_args = params[1]
364     return backend.StartInstance(instance, extra_args)
365
366   @staticmethod
367   def perspective_instance_migrate(params):
368     """Migrates an instance.
369
370     """
371     instance, target, live = params
372     return backend.MigrateInstance(instance, target, live)
373
374   @staticmethod
375   def perspective_instance_reboot(params):
376     """Reboot an instance.
377
378     """
379     instance = objects.Instance.FromDict(params[0])
380     reboot_type = params[1]
381     extra_args = params[2]
382     return backend.RebootInstance(instance, reboot_type, extra_args)
383
384   @staticmethod
385   def perspective_instance_info(params):
386     """Query instance information.
387
388     """
389     return backend.GetInstanceInfo(params[0])
390
391   @staticmethod
392   def perspective_all_instances_info(params):
393     """Query information about all instances.
394
395     """
396     return backend.GetAllInstancesInfo()
397
398   @staticmethod
399   def perspective_instance_list(params):
400     """Query the list of running instances.
401
402     """
403     return backend.GetInstanceList()
404
405   # node --------------------------
406
407   @staticmethod
408   def perspective_node_tcp_ping(params):
409     """Do a TcpPing on the remote node.
410
411     """
412     return utils.TcpPing(params[1], params[2], timeout=params[3],
413                          live_port_needed=params[4], source=params[0])
414
415   @staticmethod
416   def perspective_node_info(params):
417     """Query node information.
418
419     """
420     vgname = params[0]
421     return backend.GetNodeInfo(vgname)
422
423   @staticmethod
424   def perspective_node_add(params):
425     """Complete the registration of this node in the cluster.
426
427     """
428     return backend.AddNode(params[0], params[1], params[2],
429                            params[3], params[4], params[5])
430
431   @staticmethod
432   def perspective_node_verify(params):
433     """Run a verify sequence on this node.
434
435     """
436     return backend.VerifyNode(params[0])
437
438   @staticmethod
439   def perspective_node_start_master(params):
440     """Promote this node to master status.
441
442     """
443     return backend.StartMaster()
444
445   @staticmethod
446   def perspective_node_stop_master(params):
447     """Demote this node from master status.
448
449     """
450     return backend.StopMaster()
451
452   @staticmethod
453   def perspective_node_leave_cluster(params):
454     """Cleanup after leaving a cluster.
455
456     """
457     return backend.LeaveCluster()
458
459   @staticmethod
460   def perspective_node_volumes(params):
461     """Query the list of all logical volume groups.
462
463     """
464     return backend.NodeVolumes()
465
466   # cluster --------------------------
467
468   @staticmethod
469   def perspective_version(params):
470     """Query version information.
471
472     """
473     return constants.PROTOCOL_VERSION
474
475   @staticmethod
476   def perspective_upload_file(params):
477     """Upload a file.
478
479     Note that the backend implementation imposes strict rules on which
480     files are accepted.
481
482     """
483     return backend.UploadFile(*params)
484
485
486   # os -----------------------
487
488   @staticmethod
489   def perspective_os_diagnose(params):
490     """Query detailed information about existing OSes.
491
492     """
493     return [os.ToDict() for os in backend.DiagnoseOS()]
494
495   @staticmethod
496   def perspective_os_get(params):
497     """Query information about a given OS.
498
499     """
500     name = params[0]
501     try:
502       os_obj = backend.OSFromDisk(name)
503     except errors.InvalidOS, err:
504       os_obj = objects.OS.FromInvalidOS(err)
505     return os_obj.ToDict()
506
507   # hooks -----------------------
508
509   @staticmethod
510   def perspective_hooks_runner(params):
511     """Run hook scripts.
512
513     """
514     hpath, phase, env = params
515     hr = backend.HooksRunner()
516     return hr.RunHooks(hpath, phase, env)
517
518   # iallocator -----------------
519
520   @staticmethod
521   def perspective_iallocator_runner(params):
522     """Run an iallocator script.
523
524     """
525     name, idata = params
526     iar = backend.IAllocatorRunner()
527     return iar.Run(name, idata)
528
529   # test -----------------------
530
531   @staticmethod
532   def perspective_test_delay(params):
533     """Run test delay.
534
535     """
536     duration = params[0]
537     return utils.TestDelay(duration)
538
539
540 class MyRealm:
541   """Simple realm that forwards all requests to a ServerObject.
542
543   """
544   __implements__ = portal.IRealm
545
546   def requestAvatar(self, avatarId, mind, *interfaces):
547     """Return an avatar based on our ServerObject class.
548
549     """
550     if pb.IPerspective not in interfaces:
551       raise NotImplementedError
552     return pb.IPerspective, ServerObject(avatarId), lambda:None
553
554
555 def ParseOptions():
556   """Parse the command line options.
557
558   Returns:
559     (options, args) as from OptionParser.parse_args()
560
561   """
562   parser = OptionParser(description="Ganeti node daemon",
563                         usage="%prog [-f] [-d]",
564                         version="%%prog (ganeti) %s" %
565                         constants.RELEASE_VERSION)
566
567   parser.add_option("-f", "--foreground", dest="fork",
568                     help="Don't detach from the current terminal",
569                     default=True, action="store_false")
570   parser.add_option("-d", "--debug", dest="debug",
571                     help="Enable some debug messages",
572                     default=False, action="store_true")
573   options, args = parser.parse_args()
574   return options, args
575
576
577 def main():
578   """Main function for the node daemon.
579
580   """
581   options, args = ParseOptions()
582   utils.debug = options.debug
583   for fname in (constants.SSL_CERT_FILE,):
584     if not os.path.isfile(fname):
585       print "config %s not there, will not run." % fname
586       sys.exit(5)
587
588   try:
589     ss = ssconf.SimpleStore()
590     port = ss.GetNodeDaemonPort()
591     pwdata = ss.GetNodeDaemonPassword()
592   except errors.ConfigurationError, err:
593     print "Cluster configuration incomplete: '%s'" % str(err)
594     sys.exit(5)
595
596   # create the various SUB_RUN_DIRS, if not existing, so that we handle the
597   # situation where RUN_DIR is tmpfs
598   for dir_name in constants.SUB_RUN_DIRS:
599     if not os.path.exists(dir_name):
600       try:
601         os.mkdir(dir_name, 0755)
602       except EnvironmentError, err:
603         if err.errno != errno.EEXIST:
604           print ("Node setup wrong, cannot create directory %s: %s" %
605                  (dir_name, err))
606           sys.exit(5)
607     if not os.path.isdir(dir_name):
608       print ("Node setup wrong, %s is not a directory" % dir_name)
609       sys.exit(5)
610
611   # become a daemon
612   if options.fork:
613     utils.Daemonize(logfile=constants.LOG_NODESERVER)
614
615   logger.SetupLogging(twisted_workaround=True, debug=options.debug,
616                       program="ganeti-noded")
617
618   p = portal.Portal(MyRealm())
619   p.registerChecker(
620     checkers.InMemoryUsernamePasswordDatabaseDontUse(master_node=pwdata))
621   reactor.listenSSL(port, pb.PBServerFactory(p), ServerContextFactory())
622   reactor.run()
623
624
625 if __name__ == '__main__':
626   main()