4 # Copyright (C) 2006, 2007 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script to show add a new node to the cluster
26 # pylint: disable-msg=C0103
30 from twisted.internet.pollreactor import PollReactor
32 class ReReactor(PollReactor):
33 """A re-startable Reactor implementation.
36 def run(self, installSignalHandlers=1):
39 This is customized run that, before calling Reactor.run, will
40 reinstall the shutdown events and re-create the threadpool in case
41 these are not present (as will happen on the second run of the
45 if not 'shutdown' in self._eventTriggers:
46 # the shutdown queue has been killed, we are most probably
47 # at the second run, thus recreate the queue
48 self.addSystemEventTrigger('during', 'shutdown', self.crash)
49 self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
50 if self.threadpool is not None and self.threadpool.joined == 1:
51 # in case the threadpool has been stopped, re-start it
52 # and add a trigger to stop it at reactor shutdown
53 self.threadpool.start()
54 self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
56 return PollReactor.run(self, installSignalHandlers)
59 import twisted.internet.main
60 twisted.internet.main.installReactor(ReReactor())
62 from twisted.spread import pb
63 from twisted.internet import reactor
64 from twisted.cred import credentials
65 from OpenSSL import SSL, crypto
67 from ganeti import logger
68 from ganeti import utils
69 from ganeti import errors
70 from ganeti import constants
71 from ganeti import objects
72 from ganeti import ssconf
75 """Node-handling class.
77 For each node that we speak with, we create an instance of this
78 class, so that we have a safe place to store the details of this
82 def __init__(self, parent, node):
87 """Stop the reactor if we got all the results.
90 if len(self.parent.results) == len(self.parent.nc):
93 def cb_call(self, obj):
94 """Callback for successful connect.
96 If the connect and login sequence succeeded, we proceed with
97 making the actual call.
100 deferred = obj.callRemote(self.parent.procedure, self.parent.args)
101 deferred.addCallbacks(self.cb_done, self.cb_err2)
103 def cb_done(self, result):
104 """Callback for successful call.
106 When we receive the result from a call, we check if it was an
107 error and if so we raise a generic RemoteError (we can't pass yet
108 the actual exception over). If there was no error, we store the
112 tb, self.parent.results[self.node] = result
115 raise errors.RemoteError("Remote procedure error calling %s on %s:"
116 "\n%s" % (self.parent.procedure,
120 def cb_err1(self, reason):
121 """Error callback for unsuccessful connect.
124 logger.Error("caller_connect: could not connect to remote host %s,"
125 " reason %s" % (self.node, reason))
126 self.parent.results[self.node] = False
129 def cb_err2(self, reason):
130 """Error callback for unsuccessful call.
132 This is when the call didn't return anything, not even an error,
133 or when it time out, etc.
136 logger.Error("caller_call: could not call %s on node %s,"
137 " reason %s" % (self.parent.procedure, self.node, reason))
138 self.parent.results[self.node] = False
142 class MirrorContextFactory:
143 """Certificate verifier factory.
145 This factory creates contexts that verify if the remote end has a
146 specific certificate (i.e. our own certificate).
148 The checks we do are that the PEM dump of the certificate is the
149 same as our own and (somewhat redundantly) that the SHA checksum is
157 fd = open(constants.SSL_CERT_FILE, 'r')
159 data = fd.read(16384)
162 except EnvironmentError, err:
163 raise errors.ConfigurationError("missing SSL certificate: %s" %
165 self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
166 self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
167 self.mydigest = self.mycert.digest('SHA')
169 def verifier(self, conn, x509, errno, err_depth, retcode):
170 """Certificate verify method.
173 if self.mydigest != x509.digest('SHA'):
175 if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
179 def getContext(self):
180 """Context generator.
183 context = SSL.Context(SSL.TLSv1_METHOD)
184 context.set_verify(SSL.VERIFY_PEER, self.verifier)
190 This class, given a (remote) ethod name, a list of parameters and a
191 list of nodes, will contact (in parallel) all nodes, and return a
192 dict of results (key: node name, value: result).
194 One current bug is that generic failure is still signalled by
195 'False' result, which is not good. This overloading of values can
203 def __init__(self, procedure, args):
204 ss = ssconf.SimpleStore()
205 self.port = ss.GetNodeDaemonPort()
206 self.nodepw = ss.GetNodeDaemonPassword()
209 self.procedure = procedure
212 #--- generic connector -------------
214 def connect_list(self, node_list):
215 """Add a list of nodes to the target nodes.
218 for node in node_list:
221 def connect(self, connect_node):
222 """Add a node to the target list.
225 factory = pb.PBClientFactory()
226 self.nc[connect_node] = nc = NodeController(self, connect_node)
227 reactor.connectSSL(connect_node, self.port, factory,
228 MirrorContextFactory())
229 #d = factory.getRootObject()
230 d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
231 d.addCallbacks(nc.cb_call, nc.cb_err1)
234 """Return the results of the call.
240 """Wrapper over reactor.run().
242 This function simply calls reactor.run() if we have any requests
243 queued, otherwise it does nothing.
250 def call_volume_list(node_list, vg_name):
251 """Gets the logical volumes present in a given volume group.
253 This is a multi-node call.
256 c = Client("volume_list", [vg_name])
257 c.connect_list(node_list)
262 def call_vg_list(node_list):
263 """Gets the volume group list.
265 This is a multi-node call.
268 c = Client("vg_list", [])
269 c.connect_list(node_list)
274 def call_bridges_exist(node, bridges_list):
275 """Checks if a node has all the bridges given.
277 This method checks if all bridges given in the bridges_list are
278 present on the remote node, so that an instance that uses interfaces
279 on those bridges can be started.
281 This is a single-node call.
284 c = Client("bridges_exist", [bridges_list])
287 return c.getresult().get(node, False)
290 def call_instance_start(node, instance, extra_args):
291 """Stars an instance.
293 This is a single-node call.
296 c = Client("instance_start", [instance.ToDict(), extra_args])
299 return c.getresult().get(node, False)
302 def call_instance_shutdown(node, instance):
303 """Stops an instance.
305 This is a single-node call.
308 c = Client("instance_shutdown", [instance.ToDict()])
311 return c.getresult().get(node, False)
314 def call_instance_os_add(node, inst, osdev, swapdev):
315 """Installs an OS on the given instance.
317 This is a single-node call.
320 params = [inst.ToDict(), osdev, swapdev]
321 c = Client("instance_os_add", params)
324 return c.getresult().get(node, False)
327 def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
328 """Run the OS rename script for an instance.
330 This is a single-node call.
333 params = [inst.ToDict(), old_name, osdev, swapdev]
334 c = Client("instance_run_rename", params)
337 return c.getresult().get(node, False)
340 def call_instance_info(node, instance):
341 """Returns information about a single instance.
343 This is a single-node call.
346 c = Client("instance_info", [instance])
349 return c.getresult().get(node, False)
352 def call_all_instances_info(node_list):
353 """Returns information about all instances on a given node.
355 This is a single-node call.
358 c = Client("all_instances_info", [])
359 c.connect_list(node_list)
364 def call_instance_list(node_list):
365 """Returns the list of running instances on a given node.
367 This is a single-node call.
370 c = Client("instance_list", [])
371 c.connect_list(node_list)
376 def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
377 """Do a TcpPing on the remote node
379 This is a single-node call.
381 c = Client("node_tcp_ping", [source, target, port, timeout,
385 return c.getresult().get(node, False)
388 def call_node_info(node_list, vg_name):
389 """Return node information.
391 This will return memory information and volume group size and free
394 This is a multi-node call.
397 c = Client("node_info", [vg_name])
398 c.connect_list(node_list)
400 retux = c.getresult()
402 for node_name in retux:
403 ret = retux.get(node_name, False)
404 if type(ret) != dict:
405 logger.Error("could not connect to node %s" % (node_name))
409 { 'memory_total' : '-',
412 'vg_size' : 'node_unreachable',
419 def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
420 """Add a node to the cluster.
422 This is a single-node call.
425 params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
426 c = Client("node_add", params)
429 return c.getresult().get(node, False)
432 def call_node_verify(node_list, checkdict):
433 """Request verification of given parameters.
435 This is a multi-node call.
438 c = Client("node_verify", [checkdict])
439 c.connect_list(node_list)
444 def call_node_start_master(node):
445 """Tells a node to activate itself as a master.
447 This is a single-node call.
450 c = Client("node_start_master", [])
453 return c.getresult().get(node, False)
456 def call_node_stop_master(node):
457 """Tells a node to demote itself from master status.
459 This is a single-node call.
462 c = Client("node_stop_master", [])
465 return c.getresult().get(node, False)
468 def call_version(node_list):
469 """Query node version.
471 This is a multi-node call.
474 c = Client("version", [])
475 c.connect_list(node_list)
480 def call_blockdev_create(node, bdev, size, on_primary, info):
481 """Request creation of a given block device.
483 This is a single-node call.
486 params = [bdev.ToDict(), size, on_primary, info]
487 c = Client("blockdev_create", params)
490 return c.getresult().get(node, False)
493 def call_blockdev_remove(node, bdev):
494 """Request removal of a given block device.
496 This is a single-node call.
499 c = Client("blockdev_remove", [bdev.ToDict()])
502 return c.getresult().get(node, False)
505 def call_blockdev_assemble(node, disk, on_primary):
506 """Request assembling of a given block device.
508 This is a single-node call.
511 params = [disk.ToDict(), on_primary]
512 c = Client("blockdev_assemble", params)
515 return c.getresult().get(node, False)
518 def call_blockdev_shutdown(node, disk):
519 """Request shutdown of a given block device.
521 This is a single-node call.
524 c = Client("blockdev_shutdown", [disk.ToDict()])
527 return c.getresult().get(node, False)
530 def call_blockdev_addchild(node, bdev, ndev):
531 """Request adding a new child to a (mirroring) device.
533 This is a single-node call.
536 params = [bdev.ToDict(), ndev.ToDict()]
537 c = Client("blockdev_addchild", params)
540 return c.getresult().get(node, False)
543 def call_blockdev_removechild(node, bdev, ndev):
544 """Request removing a new child from a (mirroring) device.
546 This is a single-node call.
549 params = [bdev.ToDict(), ndev.ToDict()]
550 c = Client("blockdev_removechild", params)
553 return c.getresult().get(node, False)
556 def call_blockdev_getmirrorstatus(node, disks):
557 """Request status of a (mirroring) device.
559 This is a single-node call.
562 params = [dsk.ToDict() for dsk in disks]
563 c = Client("blockdev_getmirrorstatus", params)
566 return c.getresult().get(node, False)
569 def call_blockdev_find(node, disk):
570 """Request identification of a given block device.
572 This is a single-node call.
575 c = Client("blockdev_find", [disk.ToDict()])
578 return c.getresult().get(node, False)
581 def call_upload_file(node_list, file_name):
584 The node will refuse the operation in case the file is not on the
587 This is a multi-node call.
595 st = os.stat(file_name)
596 params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
597 st.st_atime, st.st_mtime]
598 c = Client("upload_file", params)
599 c.connect_list(node_list)
604 def call_os_diagnose(node_list):
605 """Request a diagnose of OS definitions.
607 This is a multi-node call.
610 c = Client("os_diagnose", [])
611 c.connect_list(node_list)
613 result = c.getresult()
615 for node_name in result:
617 if result[node_name]:
618 for data in result[node_name]:
620 if isinstance(data, dict):
621 nr.append(objects.OS.FromDict(data))
622 elif isinstance(data, tuple) and len(data) == 3:
623 nr.append(errors.InvalidOS(data[0], data[1], data[2]))
625 raise errors.ProgrammerError("Invalid data from"
626 " xcserver.os_diagnose")
627 new_result[node_name] = nr
631 def call_os_get(node_list, name):
632 """Returns an OS definition.
634 This is a multi-node call.
637 c = Client("os_get", [name])
638 c.connect_list(node_list)
640 result = c.getresult()
642 for node_name in result:
643 data = result[node_name]
644 if isinstance(data, dict):
645 new_result[node_name] = objects.OS.FromDict(data)
646 elif isinstance(data, tuple) and len(data) == 3:
647 new_result[node_name] = errors.InvalidOS(data[0], data[1], data[2])
649 new_result[node_name] = data
653 def call_hooks_runner(node_list, hpath, phase, env):
654 """Call the hooks runner.
657 - op: the OpCode instance
658 - env: a dictionary with the environment
660 This is a multi-node call.
663 params = [hpath, phase, env]
664 c = Client("hooks_runner", params)
665 c.connect_list(node_list)
667 result = c.getresult()
671 def call_blockdev_snapshot(node, cf_bdev):
672 """Request a snapshot of the given block device.
674 This is a single-node call.
677 c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
680 return c.getresult().get(node, False)
683 def call_snapshot_export(node, snap_bdev, dest_node, instance):
684 """Request the export of a given snapshot.
686 This is a single-node call.
689 params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
690 c = Client("snapshot_export", params)
693 return c.getresult().get(node, False)
696 def call_finalize_export(node, instance, snap_disks):
697 """Request the completion of an export operation.
699 This writes the export config file, etc.
701 This is a single-node call.
705 for disk in snap_disks:
706 flat_disks.append(disk.ToDict())
707 params = [instance.ToDict(), flat_disks]
708 c = Client("finalize_export", params)
711 return c.getresult().get(node, False)
714 def call_export_info(node, path):
715 """Queries the export information in a given path.
717 This is a single-node call.
720 c = Client("export_info", [path])
723 result = c.getresult().get(node, False)
726 return objects.SerializableConfigParser.Loads(result)
729 def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
730 """Request the import of a backup into an instance.
732 This is a single-node call.
735 params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
736 c = Client("instance_os_import", params)
739 return c.getresult().get(node, False)
742 def call_export_list(node_list):
743 """Gets the stored exports list.
745 This is a multi-node call.
748 c = Client("export_list", [])
749 c.connect_list(node_list)
751 result = c.getresult()
755 def call_export_remove(node, export):
756 """Requests removal of a given export.
758 This is a single-node call.
761 c = Client("export_remove", [export])
764 return c.getresult().get(node, False)
767 def call_node_leave_cluster(node):
768 """Requests a node to clean the cluster information it has.
770 This will remove the configuration information from the ganeti data
773 This is a single-node call.
776 c = Client("node_leave_cluster", [])
779 return c.getresult().get(node, False)
782 def call_node_volumes(node_list):
783 """Gets all volumes on node(s).
785 This is a multi-node call.
788 c = Client("node_volumes", [])
789 c.connect_list(node_list)