bb880dcb9d871e7f7efef0f5b153db64379155e4
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script to show add a new node to the cluster
23
24 """
25
26 # pylint: disable-msg=C0103,R0201,R0904
27 # C0103: Invalid name, since call_ are not valid
28 # R0201: Method could be a function, we keep all rpcs instance methods
29 # as not to change them back and forth between static/instance methods
30 # if they need to start using instance attributes
31 # R0904: Too many public methods
32
33 import os
34 import socket
35 import httplib
36
37 import simplejson
38
39 from ganeti import logger
40 from ganeti import utils
41 from ganeti import objects
42
43
44 class NodeController:
45   """Node-handling class.
46
47   For each node that we speak with, we create an instance of this
48   class, so that we have a safe place to store the details of this
49   individual call.
50
51   """
52   def __init__(self, parent, node):
53     self.parent = parent
54     self.node = node
55     self.failed = False
56
57     self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
58     try:
59       hc.connect()
60       hc.putrequest('PUT', "/%s" % self.parent.procedure,
61                     skip_accept_encoding=True)
62       hc.putheader('Content-Length', str(len(parent.body)))
63       hc.endheaders()
64       hc.send(parent.body)
65     except socket.error, err:
66       logger.Error("Error connecting to %s: %s" % (node, str(err)))
67       self.failed = True
68
69   def get_response(self):
70     """Try to process the response from the node.
71
72     """
73     if self.failed:
74       # we already failed in connect
75       return False
76     resp = self.http_conn.getresponse()
77     if resp.status != 200:
78       return False
79     try:
80       length = int(resp.getheader('Content-Length', '0'))
81     except ValueError:
82       return False
83     if not length:
84       logger.Error("Zero-length reply from %s" % self.node)
85       return False
86     payload = resp.read(length)
87     unload = simplejson.loads(payload)
88     return unload
89
90
91 class Client:
92   """RPC Client class.
93
94   This class, given a (remote) method name, a list of parameters and a
95   list of nodes, will contact (in parallel) all nodes, and return a
96   dict of results (key: node name, value: result).
97
98   One current bug is that generic failure is still signalled by
99   'False' result, which is not good. This overloading of values can
100   cause bugs.
101
102   """
103   result_set = False
104   result = False
105   allresult = []
106
107   def __init__(self, procedure, args):
108     self.port = utils.GetNodeDaemonPort()
109     self.nodepw = utils.GetNodeDaemonPassword()
110     self.nc = {}
111     self.results = {}
112     self.procedure = procedure
113     self.args = args
114     self.body = simplejson.dumps(args)
115
116   #--- generic connector -------------
117
118   def connect_list(self, node_list):
119     """Add a list of nodes to the target nodes.
120
121     """
122     for node in node_list:
123       self.connect(node)
124
125   def connect(self, connect_node):
126     """Add a node to the target list.
127
128     """
129     self.nc[connect_node] = nc = NodeController(self, connect_node)
130
131   def getresult(self):
132     """Return the results of the call.
133
134     """
135     return self.results
136
137   def run(self):
138     """Wrapper over reactor.run().
139
140     This function simply calls reactor.run() if we have any requests
141     queued, otherwise it does nothing.
142
143     """
144     for node, nc in self.nc.items():
145       self.results[node] = nc.get_response()
146
147
148 class RpcRunner(object):
149   """RPC runner class"""
150
151   def __init__(self, cfg):
152     """Initialized the rpc runner.
153
154     @type cfg:  C{config.ConfigWriter}
155     @param cfg: the configuration object that will be used to get data
156                 about the cluster
157
158     """
159     self._cfg = cfg
160
161   def call_volume_list(self, node_list, vg_name):
162     """Gets the logical volumes present in a given volume group.
163
164     This is a multi-node call.
165
166     """
167     c = Client("volume_list", [vg_name])
168     c.connect_list(node_list)
169     c.run()
170     return c.getresult()
171
172   def call_vg_list(self, node_list):
173     """Gets the volume group list.
174
175     This is a multi-node call.
176
177     """
178     c = Client("vg_list", [])
179     c.connect_list(node_list)
180     c.run()
181     return c.getresult()
182
183
184   def call_bridges_exist(self, node, bridges_list):
185     """Checks if a node has all the bridges given.
186
187     This method checks if all bridges given in the bridges_list are
188     present on the remote node, so that an instance that uses interfaces
189     on those bridges can be started.
190
191     This is a single-node call.
192
193     """
194     c = Client("bridges_exist", [bridges_list])
195     c.connect(node)
196     c.run()
197     return c.getresult().get(node, False)
198
199
200   def call_instance_start(self, node, instance, extra_args):
201     """Starts an instance.
202
203     This is a single-node call.
204
205     """
206     c = Client("instance_start", [instance.ToDict(), extra_args])
207     c.connect(node)
208     c.run()
209     return c.getresult().get(node, False)
210
211
212   def call_instance_shutdown(self, node, instance):
213     """Stops an instance.
214
215     This is a single-node call.
216
217     """
218     c = Client("instance_shutdown", [instance.ToDict()])
219     c.connect(node)
220     c.run()
221     return c.getresult().get(node, False)
222
223
224   def call_instance_migrate(self, node, instance, target, live):
225     """Migrate an instance.
226
227     This is a single-node call.
228
229     @type node: string
230     @param node: the node on which the instance is currently running
231     @type instance: C{objects.Instance}
232     @param instance: the instance definition
233     @type target: string
234     @param target: the target node name
235     @type live: boolean
236     @param live: whether the migration should be done live or not (the
237         interpretation of this parameter is left to the hypervisor)
238
239     """
240     c = Client("instance_migrate", [instance.ToDict(), target, live])
241     c.connect(node)
242     c.run()
243     return c.getresult().get(node, False)
244
245
246   def call_instance_reboot(self, node, instance, reboot_type, extra_args):
247     """Reboots an instance.
248
249     This is a single-node call.
250
251     """
252     c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
253     c.connect(node)
254     c.run()
255     return c.getresult().get(node, False)
256
257
258   def call_instance_os_add(self, node, inst, osdev, swapdev):
259     """Installs an OS on the given instance.
260
261     This is a single-node call.
262
263     """
264     params = [inst.ToDict(), osdev, swapdev]
265     c = Client("instance_os_add", params)
266     c.connect(node)
267     c.run()
268     return c.getresult().get(node, False)
269
270
271   def call_instance_run_rename(self, node, inst, old_name, osdev, swapdev):
272     """Run the OS rename script for an instance.
273
274     This is a single-node call.
275
276     """
277     params = [inst.ToDict(), old_name, osdev, swapdev]
278     c = Client("instance_run_rename", params)
279     c.connect(node)
280     c.run()
281     return c.getresult().get(node, False)
282
283
284   def call_instance_info(self, node, instance, hname):
285     """Returns information about a single instance.
286
287     This is a single-node call.
288
289     @type node_list: list
290     @param node_list: the list of nodes to query
291     @type instance: string
292     @param instance: the instance name
293     @type hname: string
294     @param hname: the hypervisor type of the instance
295
296     """
297     c = Client("instance_info", [instance])
298     c.connect(node)
299     c.run()
300     return c.getresult().get(node, False)
301
302
303   def call_all_instances_info(self, node_list, hypervisor_list):
304     """Returns information about all instances on the given nodes.
305
306     This is a multi-node call.
307
308     @type node_list: list
309     @param node_list: the list of nodes to query
310     @type hypervisor_list: list
311     @param hypervisor_list: the hypervisors to query for instances
312
313     """
314     c = Client("all_instances_info", [hypervisor_list])
315     c.connect_list(node_list)
316     c.run()
317     return c.getresult()
318
319
320   def call_instance_list(self, node_list, hypervisor_list):
321     """Returns the list of running instances on a given node.
322
323     This is a multi-node call.
324
325     @type node_list: list
326     @param node_list: the list of nodes to query
327     @type hypervisor_list: list
328     @param hypervisor_list: the hypervisors to query for instances
329
330     """
331     c = Client("instance_list", [hypervisor_list])
332     c.connect_list(node_list)
333     c.run()
334     return c.getresult()
335
336
337   def call_node_tcp_ping(self, node, source, target, port, timeout,
338                          live_port_needed):
339     """Do a TcpPing on the remote node
340
341     This is a single-node call.
342     """
343     c = Client("node_tcp_ping", [source, target, port, timeout,
344                                  live_port_needed])
345     c.connect(node)
346     c.run()
347     return c.getresult().get(node, False)
348
349
350   def call_node_info(self, node_list, vg_name, hypervisor_type):
351     """Return node information.
352
353     This will return memory information and volume group size and free
354     space.
355
356     This is a multi-node call.
357
358     @type node_list: list
359     @param node_list: the list of nodes to query
360     @type vgname: C{string}
361     @param vgname: the name of the volume group to ask for disk space
362         information
363     @type hypervisor_type: C{str}
364     @param hypervisor_type: the name of the hypervisor to ask for
365         memory information
366
367     """
368     c = Client("node_info", [vg_name, hypervisor_type])
369     c.connect_list(node_list)
370     c.run()
371     retux = c.getresult()
372
373     for node_name in retux:
374       ret = retux.get(node_name, False)
375       if type(ret) != dict:
376         logger.Error("could not connect to node %s" % (node_name))
377         ret = {}
378
379       utils.CheckDict(ret,
380                       { 'memory_total' : '-',
381                         'memory_dom0' : '-',
382                         'memory_free' : '-',
383                         'vg_size' : 'node_unreachable',
384                         'vg_free' : '-' },
385                       "call_node_info",
386                       )
387     return retux
388
389
390   def call_node_add(self, node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
391     """Add a node to the cluster.
392
393     This is a single-node call.
394
395     """
396     params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
397     c = Client("node_add", params)
398     c.connect(node)
399     c.run()
400     return c.getresult().get(node, False)
401
402
403   def call_node_verify(self, node_list, checkdict, cluster_name):
404     """Request verification of given parameters.
405
406     This is a multi-node call.
407
408     """
409     c = Client("node_verify", [checkdict, cluster_name])
410     c.connect_list(node_list)
411     c.run()
412     return c.getresult()
413
414
415   @staticmethod
416   def call_node_start_master(node, start_daemons):
417     """Tells a node to activate itself as a master.
418
419     This is a single-node call.
420
421     """
422     c = Client("node_start_master", [start_daemons])
423     c.connect(node)
424     c.run()
425     return c.getresult().get(node, False)
426
427
428   @staticmethod
429   def call_node_stop_master(node, stop_daemons):
430     """Tells a node to demote itself from master status.
431
432     This is a single-node call.
433
434     """
435     c = Client("node_stop_master", [stop_daemons])
436     c.connect(node)
437     c.run()
438     return c.getresult().get(node, False)
439
440
441   @staticmethod
442   def call_master_info(node_list):
443     """Query master info.
444
445     This is a multi-node call.
446
447     """
448     # TODO: should this method query down nodes?
449     c = Client("master_info", [])
450     c.connect_list(node_list)
451     c.run()
452     return c.getresult()
453
454
455   def call_version(self, node_list):
456     """Query node version.
457
458     This is a multi-node call.
459
460     """
461     c = Client("version", [])
462     c.connect_list(node_list)
463     c.run()
464     return c.getresult()
465
466
467   def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
468     """Request creation of a given block device.
469
470     This is a single-node call.
471
472     """
473     params = [bdev.ToDict(), size, owner, on_primary, info]
474     c = Client("blockdev_create", params)
475     c.connect(node)
476     c.run()
477     return c.getresult().get(node, False)
478
479
480   def call_blockdev_remove(self, node, bdev):
481     """Request removal of a given block device.
482
483     This is a single-node call.
484
485     """
486     c = Client("blockdev_remove", [bdev.ToDict()])
487     c.connect(node)
488     c.run()
489     return c.getresult().get(node, False)
490
491
492   def call_blockdev_rename(self, node, devlist):
493     """Request rename of the given block devices.
494
495     This is a single-node call.
496
497     """
498     params = [(d.ToDict(), uid) for d, uid in devlist]
499     c = Client("blockdev_rename", params)
500     c.connect(node)
501     c.run()
502     return c.getresult().get(node, False)
503
504
505   def call_blockdev_assemble(self, node, disk, owner, on_primary):
506     """Request assembling of a given block device.
507
508     This is a single-node call.
509
510     """
511     params = [disk.ToDict(), owner, on_primary]
512     c = Client("blockdev_assemble", params)
513     c.connect(node)
514     c.run()
515     return c.getresult().get(node, False)
516
517
518   def call_blockdev_shutdown(self, node, disk):
519     """Request shutdown of a given block device.
520
521     This is a single-node call.
522
523     """
524     c = Client("blockdev_shutdown", [disk.ToDict()])
525     c.connect(node)
526     c.run()
527     return c.getresult().get(node, False)
528
529
530   def call_blockdev_addchildren(self, node, bdev, ndevs):
531     """Request adding a list of children to a (mirroring) device.
532
533     This is a single-node call.
534
535     """
536     params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
537     c = Client("blockdev_addchildren", params)
538     c.connect(node)
539     c.run()
540     return c.getresult().get(node, False)
541
542
543   def call_blockdev_removechildren(self, node, bdev, ndevs):
544     """Request removing a list of children from a (mirroring) device.
545
546     This is a single-node call.
547
548     """
549     params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
550     c = Client("blockdev_removechildren", params)
551     c.connect(node)
552     c.run()
553     return c.getresult().get(node, False)
554
555
556   def call_blockdev_getmirrorstatus(self, node, disks):
557     """Request status of a (mirroring) device.
558
559     This is a single-node call.
560
561     """
562     params = [dsk.ToDict() for dsk in disks]
563     c = Client("blockdev_getmirrorstatus", params)
564     c.connect(node)
565     c.run()
566     return c.getresult().get(node, False)
567
568
569   def call_blockdev_find(self, node, disk):
570     """Request identification of a given block device.
571
572     This is a single-node call.
573
574     """
575     c = Client("blockdev_find", [disk.ToDict()])
576     c.connect(node)
577     c.run()
578     return c.getresult().get(node, False)
579
580
581   def call_blockdev_close(self, node, disks):
582     """Closes the given block devices.
583
584     This is a single-node call.
585
586     """
587     params = [cf.ToDict() for cf in disks]
588     c = Client("blockdev_close", params)
589     c.connect(node)
590     c.run()
591     return c.getresult().get(node, False)
592
593
594   @staticmethod
595   def call_upload_file(node_list, file_name):
596     """Upload a file.
597
598     The node will refuse the operation in case the file is not on the
599     approved file list.
600
601     This is a multi-node call.
602
603     """
604     fh = file(file_name)
605     try:
606       data = fh.read()
607     finally:
608       fh.close()
609     st = os.stat(file_name)
610     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
611               st.st_atime, st.st_mtime]
612     c = Client("upload_file", params)
613     c.connect_list(node_list)
614     c.run()
615     return c.getresult()
616
617   @staticmethod
618   def call_upload_file(node_list, file_name):
619     """Upload a file.
620
621     The node will refuse the operation in case the file is not on the
622     approved file list.
623
624     This is a multi-node call.
625
626     """
627     fh = file(file_name)
628     try:
629       data = fh.read()
630     finally:
631       fh.close()
632     st = os.stat(file_name)
633     params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
634               st.st_atime, st.st_mtime]
635     c = Client("upload_file", params)
636     c.connect_list(node_list)
637     c.run()
638     return c.getresult()
639
640   def call_os_diagnose(self, node_list):
641     """Request a diagnose of OS definitions.
642
643     This is a multi-node call.
644
645     """
646     c = Client("os_diagnose", [])
647     c.connect_list(node_list)
648     c.run()
649     result = c.getresult()
650     new_result = {}
651     for node_name in result:
652       if result[node_name]:
653         nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
654       else:
655         nr = []
656       new_result[node_name] = nr
657     return new_result
658
659
660   def call_os_get(self, node, name):
661     """Returns an OS definition.
662
663     This is a single-node call.
664
665     """
666     c = Client("os_get", [name])
667     c.connect(node)
668     c.run()
669     result = c.getresult().get(node, False)
670     if isinstance(result, dict):
671       return objects.OS.FromDict(result)
672     else:
673       return result
674
675
676   def call_hooks_runner(self, node_list, hpath, phase, env):
677     """Call the hooks runner.
678
679     Args:
680       - op: the OpCode instance
681       - env: a dictionary with the environment
682
683     This is a multi-node call.
684
685     """
686     params = [hpath, phase, env]
687     c = Client("hooks_runner", params)
688     c.connect_list(node_list)
689     c.run()
690     result = c.getresult()
691     return result
692
693
694   def call_iallocator_runner(self, node, name, idata):
695     """Call an iallocator on a remote node
696
697     Args:
698       - name: the iallocator name
699       - input: the json-encoded input string
700
701     This is a single-node call.
702
703     """
704     params = [name, idata]
705     c = Client("iallocator_runner", params)
706     c.connect(node)
707     c.run()
708     result = c.getresult().get(node, False)
709     return result
710
711
712   def call_blockdev_grow(self, node, cf_bdev, amount):
713     """Request a snapshot of the given block device.
714
715     This is a single-node call.
716
717     """
718     c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
719     c.connect(node)
720     c.run()
721     return c.getresult().get(node, False)
722
723
724   def call_blockdev_snapshot(self, node, cf_bdev):
725     """Request a snapshot of the given block device.
726
727     This is a single-node call.
728
729     """
730     c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
731     c.connect(node)
732     c.run()
733     return c.getresult().get(node, False)
734
735
736   def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
737                            cluster_name):
738     """Request the export of a given snapshot.
739
740     This is a single-node call.
741
742     """
743     params = [snap_bdev.ToDict(), dest_node, instance.ToDict(), cluster_name]
744     c = Client("snapshot_export", params)
745     c.connect(node)
746     c.run()
747     return c.getresult().get(node, False)
748
749
750   def call_finalize_export(self, node, instance, snap_disks):
751     """Request the completion of an export operation.
752
753     This writes the export config file, etc.
754
755     This is a single-node call.
756
757     """
758     flat_disks = []
759     for disk in snap_disks:
760       flat_disks.append(disk.ToDict())
761     params = [instance.ToDict(), flat_disks]
762     c = Client("finalize_export", params)
763     c.connect(node)
764     c.run()
765     return c.getresult().get(node, False)
766
767
768   def call_export_info(self, node, path):
769     """Queries the export information in a given path.
770
771     This is a single-node call.
772
773     """
774     c = Client("export_info", [path])
775     c.connect(node)
776     c.run()
777     result = c.getresult().get(node, False)
778     if not result:
779       return result
780     return objects.SerializableConfigParser.Loads(str(result))
781
782
783   def call_instance_os_import(self, node, inst, osdev, swapdev,
784                               src_node, src_image, cluster_name):
785     """Request the import of a backup into an instance.
786
787     This is a single-node call.
788
789     """
790     params = [inst.ToDict(), osdev, swapdev, src_node, src_image, cluster_name]
791     c = Client("instance_os_import", params)
792     c.connect(node)
793     c.run()
794     return c.getresult().get(node, False)
795
796
797   def call_export_list(self, node_list):
798     """Gets the stored exports list.
799
800     This is a multi-node call.
801
802     """
803     c = Client("export_list", [])
804     c.connect_list(node_list)
805     c.run()
806     result = c.getresult()
807     return result
808
809
810   def call_export_remove(self, node, export):
811     """Requests removal of a given export.
812
813     This is a single-node call.
814
815     """
816     c = Client("export_remove", [export])
817     c.connect(node)
818     c.run()
819     return c.getresult().get(node, False)
820
821
822   def call_node_leave_cluster(self, node):
823     """Requests a node to clean the cluster information it has.
824
825     This will remove the configuration information from the ganeti data
826     dir.
827
828     This is a single-node call.
829
830     """
831     c = Client("node_leave_cluster", [])
832     c.connect(node)
833     c.run()
834     return c.getresult().get(node, False)
835
836
837   def call_node_volumes(self, node_list):
838     """Gets all volumes on node(s).
839
840     This is a multi-node call.
841
842     """
843     c = Client("node_volumes", [])
844     c.connect_list(node_list)
845     c.run()
846     return c.getresult()
847
848
849   def call_test_delay(self, node_list, duration):
850     """Sleep for a fixed time on given node(s).
851
852     This is a multi-node call.
853
854     """
855     c = Client("test_delay", [duration])
856     c.connect_list(node_list)
857     c.run()
858     return c.getresult()
859
860
861   def call_file_storage_dir_create(self, node, file_storage_dir):
862     """Create the given file storage directory.
863
864     This is a single-node call.
865
866     """
867     c = Client("file_storage_dir_create", [file_storage_dir])
868     c.connect(node)
869     c.run()
870     return c.getresult().get(node, False)
871
872
873   def call_file_storage_dir_remove(self, node, file_storage_dir):
874     """Remove the given file storage directory.
875
876     This is a single-node call.
877
878     """
879     c = Client("file_storage_dir_remove", [file_storage_dir])
880     c.connect(node)
881     c.run()
882     return c.getresult().get(node, False)
883
884
885   def call_file_storage_dir_rename(self, node, old_file_storage_dir,
886                                    new_file_storage_dir):
887     """Rename file storage directory.
888
889     This is a single-node call.
890
891     """
892     c = Client("file_storage_dir_rename",
893                [old_file_storage_dir, new_file_storage_dir])
894     c.connect(node)
895     c.run()
896     return c.getresult().get(node, False)
897
898
899   @staticmethod
900   def call_jobqueue_update(node_list, file_name, content):
901     """Update job queue.
902
903     This is a multi-node call.
904
905     """
906     c = Client("jobqueue_update", [file_name, content])
907     c.connect_list(node_list)
908     c.run()
909     result = c.getresult()
910     return result
911
912
913   @staticmethod
914   def call_jobqueue_purge(node):
915     """Purge job queue.
916
917     This is a single-node call.
918
919     """
920     c = Client("jobqueue_purge", [])
921     c.connect(node)
922     c.run()
923     return c.getresult().get(node, False)
924
925
926   @staticmethod
927   def call_jobqueue_rename(node_list, old, new):
928     """Rename a job queue file.
929
930     This is a multi-node call.
931
932     """
933     c = Client("jobqueue_rename", [old, new])
934     c.connect_list(node_list)
935     c.run()
936     result = c.getresult()
937     return result