Add a generic write file function
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script to show add a new node to the cluster
23
24 """
25
26 # pylint: disable-msg=C0103
27
28 import os
29
30 from twisted.internet.pollreactor import PollReactor
31
32 class ReReactor(PollReactor):
33   """A re-startable Reactor implementation.
34
35   """
36   def run(self, installSignalHandlers=1):
37     """Custom run method.
38
39     This is customized run that, before calling Reactor.run, will
40     reinstall the shutdown events and re-create the threadpool in case
41     these are not present (as will happen on the second run of the
42     reactor).
43
44     """
45     if not 'shutdown' in self._eventTriggers:
46       # the shutdown queue has been killed, we are most probably
47       # at the second run, thus recreate the queue
48       self.addSystemEventTrigger('during', 'shutdown', self.crash)
49       self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
50     if self.threadpool is not None and self.threadpool.joined == 1:
51       # in case the threadpool has been stopped, re-start it
52       # and add a trigger to stop it at reactor shutdown
53       self.threadpool.start()
54       self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
55
56     return PollReactor.run(self, installSignalHandlers)
57
58
59 import twisted.internet.main
60 twisted.internet.main.installReactor(ReReactor())
61
62 from twisted.spread import pb
63 from twisted.internet import reactor
64 from twisted.cred import credentials
65 from OpenSSL import SSL, crypto
66
67 from ganeti import logger
68 from ganeti import utils
69 from ganeti import errors
70 from ganeti import constants
71 from ganeti import objects
72 from ganeti import ssconf
73
74 class NodeController:
75   """Node-handling class.
76
77   For each node that we speak with, we create an instance of this
78   class, so that we have a safe place to store the details of this
79   individual call.
80
81   """
82   def __init__(self, parent, node):
83     self.parent = parent
84     self.node = node
85
86   def _check_end(self):
87     """Stop the reactor if we got all the results.
88
89     """
90     if len(self.parent.results) == len(self.parent.nc):
91       reactor.stop()
92
93   def cb_call(self, obj):
94     """Callback for successful connect.
95
96     If the connect and login sequence succeeded, we proceed with
97     making the actual call.
98
99     """
100     deferred = obj.callRemote(self.parent.procedure, self.parent.args)
101     deferred.addCallbacks(self.cb_done, self.cb_err2)
102
103   def cb_done(self, result):
104     """Callback for successful call.
105
106     When we receive the result from a call, we check if it was an
107     error and if so we raise a generic RemoteError (we can't pass yet
108     the actual exception over). If there was no error, we store the
109     result.
110
111     """
112     tb, self.parent.results[self.node] = result
113     self._check_end()
114     if tb:
115       raise errors.RemoteError("Remote procedure error calling %s on %s:"
116                                "\n%s" % (self.parent.procedure,
117                                          self.node,
118                                          tb))
119
120   def cb_err1(self, reason):
121     """Error callback for unsuccessful connect.
122
123     """
124     logger.Error("caller_connect: could not connect to remote host %s,"
125                  " reason %s" % (self.node, reason))
126     self.parent.results[self.node] = False
127     self._check_end()
128
129   def cb_err2(self, reason):
130     """Error callback for unsuccessful call.
131
132     This is when the call didn't return anything, not even an error,
133     or when it time out, etc.
134
135     """
136     logger.Error("caller_call: could not call %s on node %s,"
137                  " reason %s" % (self.parent.procedure, self.node, reason))
138     self.parent.results[self.node] = False
139     self._check_end()
140
141
142 class MirrorContextFactory:
143   """Certificate verifier factory.
144
145   This factory creates contexts that verify if the remote end has a
146   specific certificate (i.e. our own certificate).
147
148   The checks we do are that the PEM dump of the certificate is the
149   same as our own and (somewhat redundantly) that the SHA checksum is
150   the same.
151
152   """
153   isClient = 1
154
155   def __init__(self):
156     try:
157       fd = open(constants.SSL_CERT_FILE, 'r')
158       try:
159         data = fd.read(16384)
160       finally:
161         fd.close()
162     except EnvironmentError, err:
163       raise errors.ConfigurationError("missing SSL certificate: %s" %
164                                       str(err))
165     self.mycert = crypto.load_certificate(crypto.FILETYPE_PEM, data)
166     self.mypem = crypto.dump_certificate(crypto.FILETYPE_PEM, self.mycert)
167     self.mydigest = self.mycert.digest('SHA')
168
169   def verifier(self, conn, x509, errno, err_depth, retcode):
170     """Certificate verify method.
171
172     """
173     if self.mydigest != x509.digest('SHA'):
174       return False
175     if crypto.dump_certificate(crypto.FILETYPE_PEM, x509) != self.mypem:
176       return False
177     return True
178
179   def getContext(self):
180     """Context generator.
181
182     """
183     context = SSL.Context(SSL.TLSv1_METHOD)
184     context.set_verify(SSL.VERIFY_PEER, self.verifier)
185     return context
186
187 class Client:
188   """RPC Client class.
189
190   This class, given a (remote) ethod name, a list of parameters and a
191   list of nodes, will contact (in parallel) all nodes, and return a
192   dict of results (key: node name, value: result).
193
194   One current bug is that generic failure is still signalled by
195   'False' result, which is not good. This overloading of values can
196   cause bugs.
197
198   """
199   result_set = False
200   result = False
201   allresult = []
202
203   def __init__(self, procedure, args):
204     ss = ssconf.SimpleStore()
205     self.port = ss.GetNodeDaemonPort()
206     self.nodepw = ss.GetNodeDaemonPassword()
207     self.nc = {}
208     self.results = {}
209     self.procedure = procedure
210     self.args = args
211
212   #--- generic connector -------------
213
214   def connect_list(self, node_list):
215     """Add a list of nodes to the target nodes.
216
217     """
218     for node in node_list:
219       self.connect(node)
220
221   def connect(self, connect_node):
222     """Add a node to the target list.
223
224     """
225     factory = pb.PBClientFactory()
226     self.nc[connect_node] = nc = NodeController(self, connect_node)
227     reactor.connectSSL(connect_node, self.port, factory,
228                        MirrorContextFactory())
229     #d = factory.getRootObject()
230     d = factory.login(credentials.UsernamePassword("master_node", self.nodepw))
231     d.addCallbacks(nc.cb_call, nc.cb_err1)
232
233   def getresult(self):
234     """Return the results of the call.
235
236     """
237     return self.results
238
239   def run(self):
240     """Wrapper over reactor.run().
241
242     This function simply calls reactor.run() if we have any requests
243     queued, otherwise it does nothing.
244
245     """
246     if self.nc:
247       reactor.run()
248
249
250 def call_volume_list(node_list, vg_name):
251   """Gets the logical volumes present in a given volume group.
252
253   This is a multi-node call.
254
255   """
256   c = Client("volume_list", [vg_name])
257   c.connect_list(node_list)
258   c.run()
259   return c.getresult()
260
261
262 def call_vg_list(node_list):
263   """Gets the volume group list.
264
265   This is a multi-node call.
266
267   """
268   c = Client("vg_list", [])
269   c.connect_list(node_list)
270   c.run()
271   return c.getresult()
272
273
274 def call_bridges_exist(node, bridges_list):
275   """Checks if a node has all the bridges given.
276
277   This method checks if all bridges given in the bridges_list are
278   present on the remote node, so that an instance that uses interfaces
279   on those bridges can be started.
280
281   This is a single-node call.
282
283   """
284   c = Client("bridges_exist", [bridges_list])
285   c.connect(node)
286   c.run()
287   return c.getresult().get(node, False)
288
289
290 def call_instance_start(node, instance, extra_args):
291   """Stars an instance.
292
293   This is a single-node call.
294
295   """
296   c = Client("instance_start", [instance.ToDict(), extra_args])
297   c.connect(node)
298   c.run()
299   return c.getresult().get(node, False)
300
301
302 def call_instance_shutdown(node, instance):
303   """Stops an instance.
304
305   This is a single-node call.
306
307   """
308   c = Client("instance_shutdown", [instance.ToDict()])
309   c.connect(node)
310   c.run()
311   return c.getresult().get(node, False)
312
313
314 def call_instance_os_add(node, inst, osdev, swapdev):
315   """Installs an OS on the given instance.
316
317   This is a single-node call.
318
319   """
320   params = [inst.ToDict(), osdev, swapdev]
321   c = Client("instance_os_add", params)
322   c.connect(node)
323   c.run()
324   return c.getresult().get(node, False)
325
326
327 def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
328   """Run the OS rename script for an instance.
329
330   This is a single-node call.
331
332   """
333   params = [inst.ToDict(), old_name, osdev, swapdev]
334   c = Client("instance_run_rename", params)
335   c.connect(node)
336   c.run()
337   return c.getresult().get(node, False)
338
339
340 def call_instance_info(node, instance):
341   """Returns information about a single instance.
342
343   This is a single-node call.
344
345   """
346   c = Client("instance_info", [instance])
347   c.connect(node)
348   c.run()
349   return c.getresult().get(node, False)
350
351
352 def call_all_instances_info(node_list):
353   """Returns information about all instances on a given node.
354
355   This is a single-node call.
356
357   """
358   c = Client("all_instances_info", [])
359   c.connect_list(node_list)
360   c.run()
361   return c.getresult()
362
363
364 def call_instance_list(node_list):
365   """Returns the list of running instances on a given node.
366
367   This is a single-node call.
368
369   """
370   c = Client("instance_list", [])
371   c.connect_list(node_list)
372   c.run()
373   return c.getresult()
374
375
376 def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
377   """Do a TcpPing on the remote node
378
379   This is a single-node call.
380   """
381   c = Client("node_tcp_ping", [source, target, port, timeout,
382                                live_port_needed])
383   c.connect(node)
384   c.run()
385   return c.getresult().get(node, False)
386
387
388 def call_node_info(node_list, vg_name):
389   """Return node information.
390
391   This will return memory information and volume group size and free
392   space.
393
394   This is a multi-node call.
395
396   """
397   c = Client("node_info", [vg_name])
398   c.connect_list(node_list)
399   c.run()
400   retux = c.getresult()
401
402   for node_name in retux:
403     ret = retux.get(node_name, False)
404     if type(ret) != dict:
405       logger.Error("could not connect to node %s" % (node_name))
406       ret = {}
407
408     utils.CheckDict(ret,
409                     { 'memory_total' : '-',
410                       'memory_dom0' : '-',
411                       'memory_free' : '-',
412                       'vg_size' : 'node_unreachable',
413                       'vg_free' : '-' },
414                     "call_node_info",
415                     )
416   return retux
417
418
419 def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
420   """Add a node to the cluster.
421
422   This is a single-node call.
423
424   """
425   params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
426   c = Client("node_add", params)
427   c.connect(node)
428   c.run()
429   return c.getresult().get(node, False)
430
431
432 def call_node_verify(node_list, checkdict):
433   """Request verification of given parameters.
434
435   This is a multi-node call.
436
437   """
438   c = Client("node_verify", [checkdict])
439   c.connect_list(node_list)
440   c.run()
441   return c.getresult()
442
443
444 def call_node_start_master(node):
445   """Tells a node to activate itself as a master.
446
447   This is a single-node call.
448
449   """
450   c = Client("node_start_master", [])
451   c.connect(node)
452   c.run()
453   return c.getresult().get(node, False)
454
455
456 def call_node_stop_master(node):
457   """Tells a node to demote itself from master status.
458
459   This is a single-node call.
460
461   """
462   c = Client("node_stop_master", [])
463   c.connect(node)
464   c.run()
465   return c.getresult().get(node, False)
466
467
468 def call_version(node_list):
469   """Query node version.
470
471   This is a multi-node call.
472
473   """
474   c = Client("version", [])
475   c.connect_list(node_list)
476   c.run()
477   return c.getresult()
478
479
480 def call_blockdev_create(node, bdev, size, on_primary, info):
481   """Request creation of a given block device.
482
483   This is a single-node call.
484
485   """
486   params = [bdev.ToDict(), size, on_primary, info]
487   c = Client("blockdev_create", params)
488   c.connect(node)
489   c.run()
490   return c.getresult().get(node, False)
491
492
493 def call_blockdev_remove(node, bdev):
494   """Request removal of a given block device.
495
496   This is a single-node call.
497
498   """
499   c = Client("blockdev_remove", [bdev.ToDict()])
500   c.connect(node)
501   c.run()
502   return c.getresult().get(node, False)
503
504
505 def call_blockdev_assemble(node, disk, on_primary):
506   """Request assembling of a given block device.
507
508   This is a single-node call.
509
510   """
511   params = [disk.ToDict(), on_primary]
512   c = Client("blockdev_assemble", params)
513   c.connect(node)
514   c.run()
515   return c.getresult().get(node, False)
516
517
518 def call_blockdev_shutdown(node, disk):
519   """Request shutdown of a given block device.
520
521   This is a single-node call.
522
523   """
524   c = Client("blockdev_shutdown", [disk.ToDict()])
525   c.connect(node)
526   c.run()
527   return c.getresult().get(node, False)
528
529
530 def call_blockdev_addchild(node, bdev, ndev):
531   """Request adding a new child to a (mirroring) device.
532
533   This is a single-node call.
534
535   """
536   params = [bdev.ToDict(), ndev.ToDict()]
537   c = Client("blockdev_addchild", params)
538   c.connect(node)
539   c.run()
540   return c.getresult().get(node, False)
541
542
543 def call_blockdev_removechild(node, bdev, ndev):
544   """Request removing a new child from a (mirroring) device.
545
546   This is a single-node call.
547
548   """
549   params = [bdev.ToDict(), ndev.ToDict()]
550   c = Client("blockdev_removechild", params)
551   c.connect(node)
552   c.run()
553   return c.getresult().get(node, False)
554
555
556 def call_blockdev_getmirrorstatus(node, disks):
557   """Request status of a (mirroring) device.
558
559   This is a single-node call.
560
561   """
562   params = [dsk.ToDict() for dsk in disks]
563   c = Client("blockdev_getmirrorstatus", params)
564   c.connect(node)
565   c.run()
566   return c.getresult().get(node, False)
567
568
569 def call_blockdev_find(node, disk):
570   """Request identification of a given block device.
571
572   This is a single-node call.
573
574   """
575   c = Client("blockdev_find", [disk.ToDict()])
576   c.connect(node)
577   c.run()
578   return c.getresult().get(node, False)
579
580
581 def call_upload_file(node_list, file_name):
582   """Upload a file.
583
584   The node will refuse the operation in case the file is not on the
585   approved file list.
586
587   This is a multi-node call.
588
589   """
590   fh = file(file_name)
591   try:
592     data = fh.read()
593   finally:
594     fh.close()
595   st = os.stat(file_name)
596   params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
597             st.st_atime, st.st_mtime]
598   c = Client("upload_file", params)
599   c.connect_list(node_list)
600   c.run()
601   return c.getresult()
602
603
604 def call_os_diagnose(node_list):
605   """Request a diagnose of OS definitions.
606
607   This is a multi-node call.
608
609   """
610   c = Client("os_diagnose", [])
611   c.connect_list(node_list)
612   c.run()
613   result = c.getresult()
614   new_result = {}
615   for node_name in result:
616     nr = []
617     if result[node_name]:
618       for data in result[node_name]:
619         if data:
620           if isinstance(data, dict):
621             nr.append(objects.OS.FromDict(data))
622           elif isinstance(data, tuple) and len(data) == 3:
623             nr.append(errors.InvalidOS(data[0], data[1], data[2]))
624           else:
625             raise errors.ProgrammerError("Invalid data from"
626                                          " xcserver.os_diagnose")
627     new_result[node_name] = nr
628   return new_result
629
630
631 def call_os_get(node_list, name):
632   """Returns an OS definition.
633
634   This is a multi-node call.
635
636   """
637   c = Client("os_get", [name])
638   c.connect_list(node_list)
639   c.run()
640   result = c.getresult()
641   new_result = {}
642   for node_name in result:
643     data = result[node_name]
644     if isinstance(data, dict):
645       new_result[node_name] = objects.OS.FromDict(data)
646     elif isinstance(data, tuple) and len(data) == 3:
647       new_result[node_name] = errors.InvalidOS(data[0], data[1], data[2])
648     else:
649       new_result[node_name] = data
650   return new_result
651
652
653 def call_hooks_runner(node_list, hpath, phase, env):
654   """Call the hooks runner.
655
656   Args:
657     - op: the OpCode instance
658     - env: a dictionary with the environment
659
660   This is a multi-node call.
661
662   """
663   params = [hpath, phase, env]
664   c = Client("hooks_runner", params)
665   c.connect_list(node_list)
666   c.run()
667   result = c.getresult()
668   return result
669
670
671 def call_blockdev_snapshot(node, cf_bdev):
672   """Request a snapshot of the given block device.
673
674   This is a single-node call.
675
676   """
677   c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
678   c.connect(node)
679   c.run()
680   return c.getresult().get(node, False)
681
682
683 def call_snapshot_export(node, snap_bdev, dest_node, instance):
684   """Request the export of a given snapshot.
685
686   This is a single-node call.
687
688   """
689   params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
690   c = Client("snapshot_export", params)
691   c.connect(node)
692   c.run()
693   return c.getresult().get(node, False)
694
695
696 def call_finalize_export(node, instance, snap_disks):
697   """Request the completion of an export operation.
698
699   This writes the export config file, etc.
700
701   This is a single-node call.
702
703   """
704   flat_disks = []
705   for disk in snap_disks:
706     flat_disks.append(disk.ToDict())
707   params = [instance.ToDict(), flat_disks]
708   c = Client("finalize_export", params)
709   c.connect(node)
710   c.run()
711   return c.getresult().get(node, False)
712
713
714 def call_export_info(node, path):
715   """Queries the export information in a given path.
716
717   This is a single-node call.
718
719   """
720   c = Client("export_info", [path])
721   c.connect(node)
722   c.run()
723   result = c.getresult().get(node, False)
724   if not result:
725     return result
726   return objects.SerializableConfigParser.Loads(result)
727
728
729 def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
730   """Request the import of a backup into an instance.
731
732   This is a single-node call.
733
734   """
735   params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
736   c = Client("instance_os_import", params)
737   c.connect(node)
738   c.run()
739   return c.getresult().get(node, False)
740
741
742 def call_export_list(node_list):
743   """Gets the stored exports list.
744
745   This is a multi-node call.
746
747   """
748   c = Client("export_list", [])
749   c.connect_list(node_list)
750   c.run()
751   result = c.getresult()
752   return result
753
754
755 def call_export_remove(node, export):
756   """Requests removal of a given export.
757
758   This is a single-node call.
759
760   """
761   c = Client("export_remove", [export])
762   c.connect(node)
763   c.run()
764   return c.getresult().get(node, False)
765
766
767 def call_node_leave_cluster(node):
768   """Requests a node to clean the cluster information it has.
769
770   This will remove the configuration information from the ganeti data
771   dir.
772
773   This is a single-node call.
774
775   """
776   c = Client("node_leave_cluster", [])
777   c.connect(node)
778   c.run()
779   return c.getresult().get(node, False)
780
781
782 def call_node_volumes(node_list):
783   """Gets all volumes on node(s).
784
785   This is a multi-node call.
786
787   """
788   c = Client("node_volumes", [])
789   c.connect_list(node_list)
790   c.run()
791   return c.getresult()