4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script to show add a new node to the cluster
26 # pylint: disable-msg=C0103
30 from twisted.internet.pollreactor import PollReactor
32 class ReReactor(PollReactor):
33 """A re-startable Reactor implementation.
36 def run(self, installSignalHandlers=1):
39 This is customized run that, before calling Reactor.run, will
40 reinstall the shutdown events and re-create the threadpool in case
41 these are not present (as will happen on the second run of the
45 if not 'shutdown' in self._eventTriggers:
46 # the shutdown queue has been killed, we are most probably
47 # at the second run, thus recreate the queue
48 self.addSystemEventTrigger('during', 'shutdown', self.crash)
49 self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
50 if self.threadpool is not None and self.threadpool.joined == 1:
51 # in case the threadpool has been stopped, re-start it
52 # and add a trigger to stop it at reactor shutdown
53 self.threadpool.start()
54 self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
56 return PollReactor.run(self, installSignalHandlers)
59 import twisted.internet.main
60 twisted.internet.main.installReactor(ReReactor())
62 from twisted.spread import pb
63 from twisted.internet import reactor
64 from twisted.cred import credentials
65 from OpenSSL import SSL, crypto
67 from ganeti import logger
68 from ganeti import utils
69 from ganeti import errors
70 from ganeti import constants
71 from ganeti import objects
72 from ganeti import ssconf
75 """Node-handling class.
77 For each node that we speak with, we create an instance of this
78 class, so that we have a safe place to store the details of this
82 def __init__(self, parent, node):
87 """Stop the reactor if we got all the results.
90 if len(self.parent.results) == len(self.parent.nc):
93 def cb_call(self, obj):
94 """Callback for successfull connect.
96 If the connect and login sequence succeeded, we proceed with
97 making the actual call.
100 deferred = obj.callRemote(self.parent.procedure, self.parent.args)
101 deferred.addCallbacks(self.cb_done, self.cb_err2)
103 def cb_done(self, result):
104 """Callback for successful call.
106 When we receive the result from a call, we check if it was an
107 error and if so we raise a generic RemoteError (we can't pass yet
108 the actual exception over). If there was no error, we store the
112 tb, self.parent.results[self.node] = result
115 raise errors.RemoteError("Remote procedure error calling %s on %s:"
116 "\n%s" % (self.parent.procedure,
120 def cb_err1(self, reason):
121 """Error callback for unsuccessful connect.
124 logger.Error("caller_connect: could not connect to remote host %s,"
125 " reason %s" % (self.node, reason))
126 self.parent.results[self.node] = False
129 def cb_err2(self, reason):
130 """Error callback for unsuccessful call.
132 This is when the call didn't return anything, not even an error,
133 or when it time out, etc.
136 logger.Error("caller_call: could not call %s on node %s,"
137 " reason %s" % (self.parent.procedure, self.node, reason))
138 self.parent.results[self.node] = False
142 class MirrorContextFactory:
143 """Certificate verifier factory.
145 This factory creates contexts that verify if the remote end has a
146 specific certificate (i.e. our own certificate).
148 The checks we do are that the PEM dump of the certificate is the
149 same as our own and (somewhat redundantly) that the SHA checksum is
157 fd = open(constants.SSL_CERT_FILE, 'r')
159 data = fd.read(16384)
162 except EnvironmentError, err:
163 raise errors.ConfigurationError("missing SSL certificate: %s" %
165 self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
166 self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
167 self.mydigest = self.mycert.digest('SHA')
169 def verifier(self, conn, x509, errno, err_depth, retcode):
170 """Certificate verify method.
173 if self.mydigest != x509.digest('SHA'):
175 if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
179 def getContext(self):
180 """Context generator.
183 context = SSL.Context(SSL.TLSv1_METHOD)
184 context.set_verify(SSL.VERIFY_PEER, self.verifier)
190 This class, given a (remote) ethod name, a list of parameters and a
191 list of nodes, will contact (in parallel) all nodes, and return a
192 dict of results (key: node name, value: result).
194 One current bug is that generic failure is still signalled by
195 'False' result, which is not good. This overloading of values can
203 def __init__(self, procedure, args):
204 ss = ssconf.SimpleStore()
205 self.port = ss.GetNodeDaemonPort()
206 self.nodepw = ss.GetNodeDaemonPassword()
209 self.procedure = procedure
212 #--- generic connector -------------
214 def connect_list(self, node_list):
215 """Add a list of nodes to the target nodes.
218 for node in node_list:
221 def connect(self, connect_node):
222 """Add a node to the target list.
225 factory = pb.PBClientFactory()
226 self.nc[connect_node] = nc = NodeController(self, connect_node)
227 reactor.connectSSL(connect_node, self.port, factory,
228 MirrorContextFactory())
229 #d = factory.getRootObject()
230 d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
231 d.addCallbacks(nc.cb_call, nc.cb_err1)
234 """Return the results of the call.
240 """Wrapper over reactor.run().
242 This function simply calls reactor.run() if we have any requests
243 queued, otherwise it does nothing.
250 def call_volume_list(node_list, vg_name):
251 """Gets the logical volumes present in a given volume group.
253 This is a multi-node call.
256 c = Client("volume_list", [vg_name])
257 c.connect_list(node_list)
262 def call_vg_list(node_list):
263 """Gets the volume group list.
265 This is a multi-node call.
268 c = Client("vg_list", [])
269 c.connect_list(node_list)
274 def call_bridges_exist(node, bridges_list):
275 """Checks if a node has all the bridges given.
277 This method checks if all bridges given in the bridges_list are
278 present on the remote node, so that an instance that uses interfaces
279 on those bridges can be started.
281 This is a single-node call.
284 c = Client("bridges_exist", [bridges_list])
287 return c.getresult().get(node, False)
290 def call_instance_start(node, instance, extra_args):
291 """Stars an instance.
293 This is a single-node call.
296 c = Client("instance_start", [instance.Dumps(), extra_args])
299 return c.getresult().get(node, False)
302 def call_instance_shutdown(node, instance):
303 """Stops an instance.
305 This is a single-node call.
308 c = Client("instance_shutdown", [instance.Dumps()])
311 return c.getresult().get(node, False)
314 def call_instance_os_add(node, inst, osdev, swapdev):
315 """Installs an OS on the given instance.
317 This is a single-node call.
320 params = [inst.Dumps(), osdev, swapdev]
321 c = Client("instance_os_add", params)
324 return c.getresult().get(node, False)
327 def call_instance_info(node, instance):
328 """Returns information about a single instance.
330 This is a single-node call.
333 c = Client("instance_info", [instance])
336 return c.getresult().get(node, False)
339 def call_all_instances_info(node_list):
340 """Returns information about all instances on a given node.
342 This is a single-node call.
345 c = Client("all_instances_info", [])
346 c.connect_list(node_list)
351 def call_instance_list(node_list):
352 """Returns the list of running instances on a given node.
354 This is a single-node call.
357 c = Client("instance_list", [])
358 c.connect_list(node_list)
363 def call_node_info(node_list, vg_name):
364 """Return node information.
366 This will return memory information and volume group size and free
369 This is a multi-node call.
372 c = Client("node_info", [vg_name])
373 c.connect_list(node_list)
375 retux = c.getresult()
377 for node_name in retux:
378 ret = retux.get(node_name, False)
379 if type(ret) != dict:
380 logger.Error("could not connect to node %s" % (node_name))
384 { 'memory_total' : '-',
387 'vg_size' : 'node_unreachable',
394 def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
395 """Add a node to the cluster.
397 This is a single-node call.
400 params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
401 c = Client("node_add", params)
404 return c.getresult().get(node, False)
407 def call_node_verify(node_list, checkdict):
408 """Request verification of given parameters.
410 This is a multi-node call.
413 c = Client("node_verify", [checkdict])
414 c.connect_list(node_list)
419 def call_node_start_master(node):
420 """Tells a node to activate itself as a master.
422 This is a single-node call.
425 c = Client("node_start_master", [])
428 return c.getresult().get(node, False)
431 def call_node_stop_master(node):
432 """Tells a node to demote itself from master status.
434 This is a single-node call.
437 c = Client("node_stop_master", [])
440 return c.getresult().get(node, False)
443 def call_version(node_list):
444 """Query node version.
446 This is a multi-node call.
449 c = Client("version", [])
450 c.connect_list(node_list)
455 def call_blockdev_create(node, bdev, size, on_primary, info):
456 """Request creation of a given block device.
458 This is a single-node call.
461 params = [bdev.Dumps(), size, on_primary, info]
462 c = Client("blockdev_create", params)
465 return c.getresult().get(node, False)
468 def call_blockdev_remove(node, bdev):
469 """Request removal of a given block device.
471 This is a single-node call.
474 c = Client("blockdev_remove", [bdev.Dumps()])
477 return c.getresult().get(node, False)
480 def call_blockdev_assemble(node, disk, on_primary):
481 """Request assembling of a given block device.
483 This is a single-node call.
486 params = [disk.Dumps(), on_primary]
487 c = Client("blockdev_assemble", params)
490 return c.getresult().get(node, False)
493 def call_blockdev_shutdown(node, disk):
494 """Request shutdown of a given block device.
496 This is a single-node call.
499 c = Client("blockdev_shutdown", [disk.Dumps()])
502 return c.getresult().get(node, False)
505 def call_blockdev_addchild(node, bdev, ndev):
506 """Request adding a new child to a (mirroring) device.
508 This is a single-node call.
511 params = [bdev.Dumps(), ndev.Dumps()]
512 c = Client("blockdev_addchild", params)
515 return c.getresult().get(node, False)
518 def call_blockdev_removechild(node, bdev, ndev):
519 """Request removing a new child from a (mirroring) device.
521 This is a single-node call.
524 params = [bdev.Dumps(), ndev.Dumps()]
525 c = Client("blockdev_removechild", params)
528 return c.getresult().get(node, False)
531 def call_blockdev_getmirrorstatus(node, disks):
532 """Request status of a (mirroring) device.
534 This is a single-node call.
537 params = [dsk.Dumps() for dsk in disks]
538 c = Client("blockdev_getmirrorstatus", params)
541 return c.getresult().get(node, False)
544 def call_blockdev_find(node, disk):
545 """Request identification of a given block device.
547 This is a single-node call.
550 c = Client("blockdev_find", [disk.Dumps()])
553 return c.getresult().get(node, False)
556 def call_upload_file(node_list, file_name):
559 The node will refuse the operation in case the file is not on the
562 This is a multi-node call.
570 st = os.stat(file_name)
571 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
572 st.st_atime, st.st_mtime]
573 c = Client("upload_file", params)
574 c.connect_list(node_list)
579 def call_os_diagnose(node_list):
580 """Request a diagnose of OS definitions.
582 This is a multi-node call.
585 c = Client("os_diagnose", [])
586 c.connect_list(node_list)
588 result = c.getresult()
590 for node_name in result:
592 if result[node_name]:
593 for data in result[node_name]:
595 if isinstance(data, basestring):
596 nr.append(objects.ConfigObject.Loads(data))
597 elif isinstance(data, tuple) and len(data) == 2:
598 nr.append(errors.InvalidOS(data[0], data[1]))
600 raise errors.ProgrammerError("Invalid data from"
601 " xcserver.os_diagnose")
602 new_result[node_name] = nr
606 def call_os_get(node_list, name):
607 """Returns an OS definition.
609 This is a multi-node call.
612 c = Client("os_get", [name])
613 c.connect_list(node_list)
615 result = c.getresult()
617 for node_name in result:
618 data = result[node_name]
619 if isinstance(data, basestring):
620 new_result[node_name] = objects.ConfigObject.Loads(data)
621 elif isinstance(data, tuple) and len(data) == 2:
622 new_result[node_name] = errors.InvalidOS(data[0], data[1])
624 new_result[node_name] = data
628 def call_hooks_runner(node_list, hpath, phase, env):
629 """Call the hooks runner.
632 - op: the OpCode instance
633 - env: a dictionary with the environment
635 This is a multi-node call.
638 params = [hpath, phase, env]
639 c = Client("hooks_runner", params)
640 c.connect_list(node_list)
642 result = c.getresult()
646 def call_blockdev_snapshot(node, cf_bdev):
647 """Request a snapshot of the given block device.
649 This is a single-node call.
652 c = Client("blockdev_snapshot", [cf_bdev.Dumps()])
655 return c.getresult().get(node, False)
658 def call_snapshot_export(node, snap_bdev, dest_node, instance):
659 """Request the export of a given snapshot.
661 This is a single-node call.
664 params = [snap_bdev.Dumps(), dest_node, instance.Dumps()]
665 c = Client("snapshot_export", params)
668 return c.getresult().get(node, False)
671 def call_finalize_export(node, instance, snap_disks):
672 """Request the completion of an export operation.
674 This writes the export config file, etc.
676 This is a single-node call.
680 for disk in snap_disks:
681 flat_disks.append(disk.Dumps())
682 params = [instance.Dumps(), flat_disks]
683 c = Client("finalize_export", params)
686 return c.getresult().get(node, False)
689 def call_export_info(node, path):
690 """Queries the export information in a given path.
692 This is a single-node call.
695 c = Client("export_info", [path])
698 result = c.getresult().get(node, False)
701 return objects.SerializableConfigParser.Loads(result)
704 def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
705 """Request the import of a backup into an instance.
707 This is a single-node call.
710 params = [inst.Dumps(), osdev, swapdev, src_node, src_image]
711 c = Client("instance_os_import", params)
714 return c.getresult().get(node, False)
717 def call_export_list(node_list):
718 """Gets the stored exports list.
720 This is a multi-node call.
723 c = Client("export_list", [])
724 c.connect_list(node_list)
726 result = c.getresult()
730 def call_export_remove(node, export):
731 """Requests removal of a given export.
733 This is a single-node call.
736 c = Client("export_remove", [export])
739 return c.getresult().get(node, False)
742 def call_node_leave_cluster(node):
743 """Requests a node to clean the cluster information it has.
745 This will remove the configuration information from the ganeti data
748 This is a single-node call.
751 c = Client("node_leave_cluster", [])
754 return c.getresult().get(node, False)
757 def call_node_volumes(node_list):
758 """Gets all volumes on node(s).
760 This is a multi-node call.
763 c = Client("node_volumes", [])
764 c.connect_list(node_list)