Ship (and display) path for InvalidOS errors too.
[ganeti-local] / lib / rpc.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script to show add a new node to the cluster
23
24 """
25
26 # pylint: disable-msg=C0103
27
28 import os
29
30 from twisted.internet.pollreactor import PollReactor
31
32 class ReReactor(PollReactor):
33   """A re-startable Reactor implementation.
34
35   """
36   def run(self, installSignalHandlers=1):
37     """Custom run method.
38
39     This is customized run that, before calling Reactor.run, will
40     reinstall the shutdown events and re-create the threadpool in case
41     these are not present (as will happen on the second run of the
42     reactor).
43
44     """
45     if not 'shutdown' in self._eventTriggers:
46       # the shutdown queue has been killed, we are most probably
47       # at the second run, thus recreate the queue
48       self.addSystemEventTrigger('during', 'shutdown', self.crash)
49       self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
50     if self.threadpool is not None and self.threadpool.joined == 1:
51       # in case the threadpool has been stopped, re-start it
52       # and add a trigger to stop it at reactor shutdown
53       self.threadpool.start()
54       self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
55
56     return PollReactor.run(self, installSignalHandlers)
57
58
59 import twisted.internet.main
60 twisted.internet.main.installReactor(ReReactor())
61
62 from twisted.spread import pb
63 from twisted.internet import reactor
64 from twisted.cred import credentials
65 from OpenSSL import SSL, crypto
66
67 from ganeti import logger
68 from ganeti import utils
69 from ganeti import errors
70 from ganeti import constants
71 from ganeti import objects
72 from ganeti import ssconf
73
74 class NodeController:
75   """Node-handling class.
76
77   For each node that we speak with, we create an instance of this
78   class, so that we have a safe place to store the details of this
79   individual call.
80
81   """
82   def __init__(self, parent, node):
83     self.parent = parent
84     self.node = node
85
86   def _check_end(self):
87     """Stop the reactor if we got all the results.
88
89     """
90     if len(self.parent.results) == len(self.parent.nc):
91       reactor.stop()
92
93   def cb_call(self, obj):
94     """Callback for successful connect.
95
96     If the connect and login sequence succeeded, we proceed with
97     making the actual call.
98
99     """
100     deferred = obj.callRemote(self.parent.procedure, self.parent.args)
101     deferred.addCallbacks(self.cb_done, self.cb_err2)
102
103   def cb_done(self, result):
104     """Callback for successful call.
105
106     When we receive the result from a call, we check if it was an
107     error and if so we raise a generic RemoteError (we can't pass yet
108     the actual exception over). If there was no error, we store the
109     result.
110
111     """
112     tb, self.parent.results[self.node] = result
113     self._check_end()
114     if tb:
115       raise errors.RemoteError("Remote procedure error calling %s on %s:"
116                                "\n%s" % (self.parent.procedure,
117                                          self.node,
118                                          tb))
119
120   def cb_err1(self, reason):
121     """Error callback for unsuccessful connect.
122
123     """
124     logger.Error("caller_connect: could not connect to remote host %s,"
125                  " reason %s" % (self.node, reason))
126     self.parent.results[self.node] = False
127     self._check_end()
128
129   def cb_err2(self, reason):
130     """Error callback for unsuccessful call.
131
132     This is when the call didn't return anything, not even an error,
133     or when it time out, etc.
134
135     """
136     logger.Error("caller_call: could not call %s on node %s,"
137                  " reason %s" % (self.parent.procedure, self.node, reason))
138     self.parent.results[self.node] = False
139     self._check_end()
140
141
142 class MirrorContextFactory:
143   """Certificate verifier factory.
144
145   This factory creates contexts that verify if the remote end has a
146   specific certificate (i.e. our own certificate).
147
148   The checks we do are that the PEM dump of the certificate is the
149   same as our own and (somewhat redundantly) that the SHA checksum is
150   the same.
151
152   """
153   isClient = 1
154
155   def __init__(self):
156     try:
157       fd = open(constants.SSL_CERT_FILE, 'r')
158       try:
159         data = fd.read(16384)
160       finally:
161         fd.close()
162     except EnvironmentError, err:
163       raise errors.ConfigurationError("missing SSL certificate: %s" %
164                                       str(err))
165     self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
166     self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
167     self.mydigest = self.mycert.digest('SHA')
168
169   def verifier(self, conn, x509, errno, err_depth, retcode):
170     """Certificate verify method.
171
172     """
173     if self.mydigest != x509.digest('SHA'):
174       return False
175     if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
176       return False
177     return True
178
179   def getContext(self):
180     """Context generator.
181
182     """
183     context = SSL.Context(SSL.TLSv1_METHOD)
184     context.set_verify(SSL.VERIFY_PEER, self.verifier)
185     return context
186
187 class Client:
188   """RPC Client class.
189
190   This class, given a (remote) ethod name, a list of parameters and a
191   list of nodes, will contact (in parallel) all nodes, and return a
192   dict of results (key: node name, value: result).
193
194   One current bug is that generic failure is still signalled by
195   'False' result, which is not good. This overloading of values can
196   cause bugs.
197
198   """
199   result_set = False
200   result = False
201   allresult = []
202
203   def __init__(self, procedure, args):
204     ss = ssconf.SimpleStore()
205     self.port = ss.GetNodeDaemonPort()
206     self.nodepw = ss.GetNodeDaemonPassword()
207     self.nc = {}
208     self.results = {}
209     self.procedure = procedure
210     self.args = args
211
212   #--- generic connector -------------
213
214   def connect_list(self, node_list):
215     """Add a list of nodes to the target nodes.
216
217     """
218     for node in node_list:
219       self.connect(node)
220
221   def connect(self, connect_node):
222     """Add a node to the target list.
223
224     """
225     factory = pb.PBClientFactory()
226     self.nc[connect_node] = nc = NodeController(self, connect_node)
227     reactor.connectSSL(connect_node, self.port, factory,
228                        MirrorContextFactory())
229     #d = factory.getRootObject()
230     d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
231     d.addCallbacks(nc.cb_call, nc.cb_err1)
232
233   def getresult(self):
234     """Return the results of the call.
235
236     """
237     return self.results
238
239   def run(self):
240     """Wrapper over reactor.run().
241
242     This function simply calls reactor.run() if we have any requests
243     queued, otherwise it does nothing.
244
245     """
246     if self.nc:
247       reactor.run()
248
249
250 def call_volume_list(node_list, vg_name):
251   """Gets the logical volumes present in a given volume group.
252
253   This is a multi-node call.
254
255   """
256   c = Client("volume_list", [vg_name])
257   c.connect_list(node_list)
258   c.run()
259   return c.getresult()
260
261
262 def call_vg_list(node_list):
263   """Gets the volume group list.
264
265   This is a multi-node call.
266
267   """
268   c = Client("vg_list", [])
269   c.connect_list(node_list)
270   c.run()
271   return c.getresult()
272
273
274 def call_bridges_exist(node, bridges_list):
275   """Checks if a node has all the bridges given.
276
277   This method checks if all bridges given in the bridges_list are
278   present on the remote node, so that an instance that uses interfaces
279   on those bridges can be started.
280
281   This is a single-node call.
282
283   """
284   c = Client("bridges_exist", [bridges_list])
285   c.connect(node)
286   c.run()
287   return c.getresult().get(node, False)
288
289
290 def call_instance_start(node, instance, extra_args):
291   """Stars an instance.
292
293   This is a single-node call.
294
295   """
296   c = Client("instance_start", [instance.ToDict(), extra_args])
297   c.connect(node)
298   c.run()
299   return c.getresult().get(node, False)
300
301
302 def call_instance_shutdown(node, instance):
303   """Stops an instance.
304
305   This is a single-node call.
306
307   """
308   c = Client("instance_shutdown", [instance.ToDict()])
309   c.connect(node)
310   c.run()
311   return c.getresult().get(node, False)
312
313
314 def call_instance_os_add(node, inst, osdev, swapdev):
315   """Installs an OS on the given instance.
316
317   This is a single-node call.
318
319   """
320   params = [inst.ToDict(), osdev, swapdev]
321   c = Client("instance_os_add", params)
322   c.connect(node)
323   c.run()
324   return c.getresult().get(node, False)
325
326
327 def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
328   """Run the OS rename script for an instance.
329
330   This is a single-node call.
331
332   """
333   params = [inst.ToDict(), old_name, osdev, swapdev]
334   c = Client("instance_run_rename", params)
335   c.connect(node)
336   c.run()
337   return c.getresult().get(node, False)
338
339
340 def call_instance_info(node, instance):
341   """Returns information about a single instance.
342
343   This is a single-node call.
344
345   """
346   c = Client("instance_info", [instance])
347   c.connect(node)
348   c.run()
349   return c.getresult().get(node, False)
350
351
352 def call_all_instances_info(node_list):
353   """Returns information about all instances on a given node.
354
355   This is a single-node call.
356
357   """
358   c = Client("all_instances_info", [])
359   c.connect_list(node_list)
360   c.run()
361   return c.getresult()
362
363
364 def call_instance_list(node_list):
365   """Returns the list of running instances on a given node.
366
367   This is a single-node call.
368
369   """
370   c = Client("instance_list", [])
371   c.connect_list(node_list)
372   c.run()
373   return c.getresult()
374
375
376 def call_node_info(node_list, vg_name):
377   """Return node information.
378
379   This will return memory information and volume group size and free
380   space.
381
382   This is a multi-node call.
383
384   """
385   c = Client("node_info", [vg_name])
386   c.connect_list(node_list)
387   c.run()
388   retux = c.getresult()
389
390   for node_name in retux:
391     ret = retux.get(node_name, False)
392     if type(ret) != dict:
393       logger.Error("could not connect to node %s" % (node_name))
394       ret = {}
395
396     utils.CheckDict(ret,
397                     { 'memory_total' : '-',
398                       'memory_dom0' : '-',
399                       'memory_free' : '-',
400                       'vg_size' : 'node_unreachable',
401                       'vg_free' : '-' },
402                     "call_node_info",
403                     )
404   return retux
405
406
407 def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
408   """Add a node to the cluster.
409
410   This is a single-node call.
411
412   """
413   params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
414   c = Client("node_add", params)
415   c.connect(node)
416   c.run()
417   return c.getresult().get(node, False)
418
419
420 def call_node_verify(node_list, checkdict):
421   """Request verification of given parameters.
422
423   This is a multi-node call.
424
425   """
426   c = Client("node_verify", [checkdict])
427   c.connect_list(node_list)
428   c.run()
429   return c.getresult()
430
431
432 def call_node_start_master(node):
433   """Tells a node to activate itself as a master.
434
435   This is a single-node call.
436
437   """
438   c = Client("node_start_master", [])
439   c.connect(node)
440   c.run()
441   return c.getresult().get(node, False)
442
443
444 def call_node_stop_master(node):
445   """Tells a node to demote itself from master status.
446
447   This is a single-node call.
448
449   """
450   c = Client("node_stop_master", [])
451   c.connect(node)
452   c.run()
453   return c.getresult().get(node, False)
454
455
456 def call_version(node_list):
457   """Query node version.
458
459   This is a multi-node call.
460
461   """
462   c = Client("version", [])
463   c.connect_list(node_list)
464   c.run()
465   return c.getresult()
466
467
468 def call_blockdev_create(node, bdev, size, on_primary, info):
469   """Request creation of a given block device.
470
471   This is a single-node call.
472
473   """
474   params = [bdev.ToDict(), size, on_primary, info]
475   c = Client("blockdev_create", params)
476   c.connect(node)
477   c.run()
478   return c.getresult().get(node, False)
479
480
481 def call_blockdev_remove(node, bdev):
482   """Request removal of a given block device.
483
484   This is a single-node call.
485
486   """
487   c = Client("blockdev_remove", [bdev.ToDict()])
488   c.connect(node)
489   c.run()
490   return c.getresult().get(node, False)
491
492
493 def call_blockdev_assemble(node, disk, on_primary):
494   """Request assembling of a given block device.
495
496   This is a single-node call.
497
498   """
499   params = [disk.ToDict(), on_primary]
500   c = Client("blockdev_assemble", params)
501   c.connect(node)
502   c.run()
503   return c.getresult().get(node, False)
504
505
506 def call_blockdev_shutdown(node, disk):
507   """Request shutdown of a given block device.
508
509   This is a single-node call.
510
511   """
512   c = Client("blockdev_shutdown", [disk.ToDict()])
513   c.connect(node)
514   c.run()
515   return c.getresult().get(node, False)
516
517
518 def call_blockdev_addchild(node, bdev, ndev):
519   """Request adding a new child to a (mirroring) device.
520
521   This is a single-node call.
522
523   """
524   params = [bdev.ToDict(), ndev.ToDict()]
525   c = Client("blockdev_addchild", params)
526   c.connect(node)
527   c.run()
528   return c.getresult().get(node, False)
529
530
531 def call_blockdev_removechild(node, bdev, ndev):
532   """Request removing a new child from a (mirroring) device.
533
534   This is a single-node call.
535
536   """
537   params = [bdev.ToDict(), ndev.ToDict()]
538   c = Client("blockdev_removechild", params)
539   c.connect(node)
540   c.run()
541   return c.getresult().get(node, False)
542
543
544 def call_blockdev_getmirrorstatus(node, disks):
545   """Request status of a (mirroring) device.
546
547   This is a single-node call.
548
549   """
550   params = [dsk.ToDict() for dsk in disks]
551   c = Client("blockdev_getmirrorstatus", params)
552   c.connect(node)
553   c.run()
554   return c.getresult().get(node, False)
555
556
557 def call_blockdev_find(node, disk):
558   """Request identification of a given block device.
559
560   This is a single-node call.
561
562   """
563   c = Client("blockdev_find", [disk.ToDict()])
564   c.connect(node)
565   c.run()
566   return c.getresult().get(node, False)
567
568
569 def call_upload_file(node_list, file_name):
570   """Upload a file.
571
572   The node will refuse the operation in case the file is not on the
573   approved file list.
574
575   This is a multi-node call.
576
577   """
578   fh = file(file_name)
579   try:
580     data = fh.read()
581   finally:
582     fh.close()
583   st = os.stat(file_name)
584   params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
585             st.st_atime, st.st_mtime]
586   c = Client("upload_file", params)
587   c.connect_list(node_list)
588   c.run()
589   return c.getresult()
590
591
592 def call_os_diagnose(node_list):
593   """Request a diagnose of OS definitions.
594
595   This is a multi-node call.
596
597   """
598   c = Client("os_diagnose", [])
599   c.connect_list(node_list)
600   c.run()
601   result = c.getresult()
602   new_result = {}
603   for node_name in result:
604     nr = []
605     if result[node_name]:
606       for data in result[node_name]:
607         if data:
608           if isinstance(data, dict):
609             nr.append(objects.OS.FromDict(data))
610           elif isinstance(data, tuple) and len(data) == 3:
611             nr.append(errors.InvalidOS(data[0], data[1], data[2]))
612           else:
613             raise errors.ProgrammerError("Invalid data from"
614                                          " xcserver.os_diagnose")
615     new_result[node_name] = nr
616   return new_result
617
618
619 def call_os_get(node_list, name):
620   """Returns an OS definition.
621
622   This is a multi-node call.
623
624   """
625   c = Client("os_get", [name])
626   c.connect_list(node_list)
627   c.run()
628   result = c.getresult()
629   new_result = {}
630   for node_name in result:
631     data = result[node_name]
632     if isinstance(data, dict):
633       new_result[node_name] = objects.OS.FromDict(data)
634     elif isinstance(data, tuple) and len(data) == 3:
635       new_result[node_name] = errors.InvalidOS(data[0], data[1], data[2])
636     else:
637       new_result[node_name] = data
638   return new_result
639
640
641 def call_hooks_runner(node_list, hpath, phase, env):
642   """Call the hooks runner.
643
644   Args:
645     - op: the OpCode instance
646     - env: a dictionary with the environment
647
648   This is a multi-node call.
649
650   """
651   params = [hpath, phase, env]
652   c = Client("hooks_runner", params)
653   c.connect_list(node_list)
654   c.run()
655   result = c.getresult()
656   return result
657
658
659 def call_blockdev_snapshot(node, cf_bdev):
660   """Request a snapshot of the given block device.
661
662   This is a single-node call.
663
664   """
665   c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
666   c.connect(node)
667   c.run()
668   return c.getresult().get(node, False)
669
670
671 def call_snapshot_export(node, snap_bdev, dest_node, instance):
672   """Request the export of a given snapshot.
673
674   This is a single-node call.
675
676   """
677   params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
678   c = Client("snapshot_export", params)
679   c.connect(node)
680   c.run()
681   return c.getresult().get(node, False)
682
683
684 def call_finalize_export(node, instance, snap_disks):
685   """Request the completion of an export operation.
686
687   This writes the export config file, etc.
688
689   This is a single-node call.
690
691   """
692   flat_disks = []
693   for disk in snap_disks:
694     flat_disks.append(disk.ToDict())
695   params = [instance.ToDict(), flat_disks]
696   c = Client("finalize_export", params)
697   c.connect(node)
698   c.run()
699   return c.getresult().get(node, False)
700
701
702 def call_export_info(node, path):
703   """Queries the export information in a given path.
704
705   This is a single-node call.
706
707   """
708   c = Client("export_info", [path])
709   c.connect(node)
710   c.run()
711   result = c.getresult().get(node, False)
712   if not result:
713     return result
714   return objects.SerializableConfigParser.Loads(result)
715
716
717 def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
718   """Request the import of a backup into an instance.
719
720   This is a single-node call.
721
722   """
723   params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
724   c = Client("instance_os_import", params)
725   c.connect(node)
726   c.run()
727   return c.getresult().get(node, False)
728
729
730 def call_export_list(node_list):
731   """Gets the stored exports list.
732
733   This is a multi-node call.
734
735   """
736   c = Client("export_list", [])
737   c.connect_list(node_list)
738   c.run()
739   result = c.getresult()
740   return result
741
742
743 def call_export_remove(node, export):
744   """Requests removal of a given export.
745
746   This is a single-node call.
747
748   """
749   c = Client("export_remove", [export])
750   c.connect(node)
751   c.run()
752   return c.getresult().get(node, False)
753
754
755 def call_node_leave_cluster(node):
756   """Requests a node to clean the cluster information it has.
757
758   This will remove the configuration information from the ganeti data
759   dir.
760
761   This is a single-node call.
762
763   """
764   c = Client("node_leave_cluster", [])
765   c.connect(node)
766   c.run()
767   return c.getresult().get(node, False)
768
769
770 def call_node_volumes(node_list):
771   """Gets all volumes on node(s).
772
773   This is a multi-node call.
774
775   """
776   c = Client("node_volumes", [])
777   c.connect_list(node_list)
778   c.run()
779   return c.getresult()