Fix pylint-detected issues
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script to show add a new node to the cluster
23
24 """
25
26 # pylint: disable-msg=C0103
27
28 import os
29 import socket
30 import httplib
31
32 import simplejson
33
34 from ganeti import logger
35 from ganeti import utils
36 from ganeti import objects
37 from ganeti import ssconf
38
39
40 class NodeController:
41   """Node-handling class.
42
43   For each node that we speak with, we create an instance of this
44   class, so that we have a safe place to store the details of this
45   individual call.
46
47   """
48   def __init__(self, parent, node):
49     self.parent = parent
50     self.node = node
51     self.failed = False
52
53     self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
54     try:
55       hc.connect()
56       hc.putrequest('PUT', "/%s" % self.parent.procedure,
57                     skip_accept_encoding=True)
58       hc.putheader('Content-Length', str(len(parent.body)))
59       hc.endheaders()
60       hc.send(parent.body)
61     except socket.error, err:
62       logger.Error("Error connecting to %s: %s" % (node, str(err)))
63       self.failed = True
64
65   def get_response(self):
66     """Try to process the response from the node.
67
68     """
69     if self.failed:
70       # we already failed in connect
71       return False
72     resp = self.http_conn.getresponse()
73     if resp.status != 200:
74       return False
75     try:
76       length = int(resp.getheader('Content-Length', '0'))
77     except ValueError:
78       return False
79     if not length:
80       logger.Error("Zero-length reply from %s" % self.node)
81       return False
82     payload = resp.read(length)
83     unload = simplejson.loads(payload)
84     return unload
85
86
87 class Client:
88   """RPC Client class.
89
90   This class, given a (remote) method name, a list of parameters and a
91   list of nodes, will contact (in parallel) all nodes, and return a
92   dict of results (key: node name, value: result).
93
94   One current bug is that generic failure is still signalled by
95   'False' result, which is not good. This overloading of values can
96   cause bugs.
97
98   """
99   result_set = False
100   result = False
101   allresult = []
102
103   def __init__(self, procedure, args):
104     ss = ssconf.SimpleStore()
105     self.port = ss.GetNodeDaemonPort()
106     self.nodepw = ss.GetNodeDaemonPassword()
107     self.nc = {}
108     self.results = {}
109     self.procedure = procedure
110     self.args = args
111     self.body = simplejson.dumps(args)
112
113   #--- generic connector -------------
114
115   def connect_list(self, node_list):
116     """Add a list of nodes to the target nodes.
117
118     """
119     for node in node_list:
120       self.connect(node)
121
122   def connect(self, connect_node):
123     """Add a node to the target list.
124
125     """
126     self.nc[connect_node] = nc = NodeController(self, connect_node)
127
128   def getresult(self):
129     """Return the results of the call.
130
131     """
132     return self.results
133
134   def run(self):
135     """Wrapper over reactor.run().
136
137     This function simply calls reactor.run() if we have any requests
138     queued, otherwise it does nothing.
139
140     """
141     for node, nc in self.nc.items():
142       self.results[node] = nc.get_response()
143
144
145 def call_volume_list(node_list, vg_name):
146   """Gets the logical volumes present in a given volume group.
147
148   This is a multi-node call.
149
150   """
151   c = Client("volume_list", [vg_name])
152   c.connect_list(node_list)
153   c.run()
154   return c.getresult()
155
156
157 def call_vg_list(node_list):
158   """Gets the volume group list.
159
160   This is a multi-node call.
161
162   """
163   c = Client("vg_list", [])
164   c.connect_list(node_list)
165   c.run()
166   return c.getresult()
167
168
169 def call_bridges_exist(node, bridges_list):
170   """Checks if a node has all the bridges given.
171
172   This method checks if all bridges given in the bridges_list are
173   present on the remote node, so that an instance that uses interfaces
174   on those bridges can be started.
175
176   This is a single-node call.
177
178   """
179   c = Client("bridges_exist", [bridges_list])
180   c.connect(node)
181   c.run()
182   return c.getresult().get(node, False)
183
184
185 def call_instance_start(node, instance, extra_args):
186   """Starts an instance.
187
188   This is a single-node call.
189
190   """
191   c = Client("instance_start", [instance.ToDict(), extra_args])
192   c.connect(node)
193   c.run()
194   return c.getresult().get(node, False)
195
196
197 def call_instance_shutdown(node, instance):
198   """Stops an instance.
199
200   This is a single-node call.
201
202   """
203   c = Client("instance_shutdown", [instance.ToDict()])
204   c.connect(node)
205   c.run()
206   return c.getresult().get(node, False)
207
208
209 def call_instance_migrate(node, instance, target, live):
210   """Migrate an instance.
211
212   This is a single-node call.
213
214   """
215   c = Client("instance_migrate", [instance.name, target, live])
216   c.connect(node)
217   c.run()
218   return c.getresult().get(node, False)
219
220
221 def call_instance_reboot(node, instance, reboot_type, extra_args):
222   """Reboots an instance.
223
224   This is a single-node call.
225
226   """
227   c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
228   c.connect(node)
229   c.run()
230   return c.getresult().get(node, False)
231
232
233 def call_instance_os_add(node, inst, osdev, swapdev):
234   """Installs an OS on the given instance.
235
236   This is a single-node call.
237
238   """
239   params = [inst.ToDict(), osdev, swapdev]
240   c = Client("instance_os_add", params)
241   c.connect(node)
242   c.run()
243   return c.getresult().get(node, False)
244
245
246 def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
247   """Run the OS rename script for an instance.
248
249   This is a single-node call.
250
251   """
252   params = [inst.ToDict(), old_name, osdev, swapdev]
253   c = Client("instance_run_rename", params)
254   c.connect(node)
255   c.run()
256   return c.getresult().get(node, False)
257
258
259 def call_instance_info(node, instance):
260   """Returns information about a single instance.
261
262   This is a single-node call.
263
264   """
265   c = Client("instance_info", [instance])
266   c.connect(node)
267   c.run()
268   return c.getresult().get(node, False)
269
270
271 def call_all_instances_info(node_list):
272   """Returns information about all instances on a given node.
273
274   This is a single-node call.
275
276   """
277   c = Client("all_instances_info", [])
278   c.connect_list(node_list)
279   c.run()
280   return c.getresult()
281
282
283 def call_instance_list(node_list):
284   """Returns the list of running instances on a given node.
285
286   This is a single-node call.
287
288   """
289   c = Client("instance_list", [])
290   c.connect_list(node_list)
291   c.run()
292   return c.getresult()
293
294
295 def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
296   """Do a TcpPing on the remote node
297
298   This is a single-node call.
299   """
300   c = Client("node_tcp_ping", [source, target, port, timeout,
301                                live_port_needed])
302   c.connect(node)
303   c.run()
304   return c.getresult().get(node, False)
305
306
307 def call_node_info(node_list, vg_name):
308   """Return node information.
309
310   This will return memory information and volume group size and free
311   space.
312
313   This is a multi-node call.
314
315   """
316   c = Client("node_info", [vg_name])
317   c.connect_list(node_list)
318   c.run()
319   retux = c.getresult()
320
321   for node_name in retux:
322     ret = retux.get(node_name, False)
323     if type(ret) != dict:
324       logger.Error("could not connect to node %s" % (node_name))
325       ret = {}
326
327     utils.CheckDict(ret,
328                     { 'memory_total' : '-',
329                       'memory_dom0' : '-',
330                       'memory_free' : '-',
331                       'vg_size' : 'node_unreachable',
332                       'vg_free' : '-' },
333                     "call_node_info",
334                     )
335   return retux
336
337
338 def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
339   """Add a node to the cluster.
340
341   This is a single-node call.
342
343   """
344   params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
345   c = Client("node_add", params)
346   c.connect(node)
347   c.run()
348   return c.getresult().get(node, False)
349
350
351 def call_node_verify(node_list, checkdict):
352   """Request verification of given parameters.
353
354   This is a multi-node call.
355
356   """
357   c = Client("node_verify", [checkdict])
358   c.connect_list(node_list)
359   c.run()
360   return c.getresult()
361
362
363 def call_node_start_master(node, start_daemons):
364   """Tells a node to activate itself as a master.
365
366   This is a single-node call.
367
368   """
369   c = Client("node_start_master", [start_daemons])
370   c.connect(node)
371   c.run()
372   return c.getresult().get(node, False)
373
374
375 def call_node_stop_master(node, stop_daemons):
376   """Tells a node to demote itself from master status.
377
378   This is a single-node call.
379
380   """
381   c = Client("node_stop_master", [stop_daemons])
382   c.connect(node)
383   c.run()
384   return c.getresult().get(node, False)
385
386
387 def call_version(node_list):
388   """Query node version.
389
390   This is a multi-node call.
391
392   """
393   c = Client("version", [])
394   c.connect_list(node_list)
395   c.run()
396   return c.getresult()
397
398
399 def call_blockdev_create(node, bdev, size, owner, on_primary, info):
400   """Request creation of a given block device.
401
402   This is a single-node call.
403
404   """
405   params = [bdev.ToDict(), size, owner, on_primary, info]
406   c = Client("blockdev_create", params)
407   c.connect(node)
408   c.run()
409   return c.getresult().get(node, False)
410
411
412 def call_blockdev_remove(node, bdev):
413   """Request removal of a given block device.
414
415   This is a single-node call.
416
417   """
418   c = Client("blockdev_remove", [bdev.ToDict()])
419   c.connect(node)
420   c.run()
421   return c.getresult().get(node, False)
422
423
424 def call_blockdev_rename(node, devlist):
425   """Request rename of the given block devices.
426
427   This is a single-node call.
428
429   """
430   params = [(d.ToDict(), uid) for d, uid in devlist]
431   c = Client("blockdev_rename", params)
432   c.connect(node)
433   c.run()
434   return c.getresult().get(node, False)
435
436
437 def call_blockdev_assemble(node, disk, owner, on_primary):
438   """Request assembling of a given block device.
439
440   This is a single-node call.
441
442   """
443   params = [disk.ToDict(), owner, on_primary]
444   c = Client("blockdev_assemble", params)
445   c.connect(node)
446   c.run()
447   return c.getresult().get(node, False)
448
449
450 def call_blockdev_shutdown(node, disk):
451   """Request shutdown of a given block device.
452
453   This is a single-node call.
454
455   """
456   c = Client("blockdev_shutdown", [disk.ToDict()])
457   c.connect(node)
458   c.run()
459   return c.getresult().get(node, False)
460
461
462 def call_blockdev_addchildren(node, bdev, ndevs):
463   """Request adding a list of children to a (mirroring) device.
464
465   This is a single-node call.
466
467   """
468   params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
469   c = Client("blockdev_addchildren", params)
470   c.connect(node)
471   c.run()
472   return c.getresult().get(node, False)
473
474
475 def call_blockdev_removechildren(node, bdev, ndevs):
476   """Request removing a list of children from a (mirroring) device.
477
478   This is a single-node call.
479
480   """
481   params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
482   c = Client("blockdev_removechildren", params)
483   c.connect(node)
484   c.run()
485   return c.getresult().get(node, False)
486
487
488 def call_blockdev_getmirrorstatus(node, disks):
489   """Request status of a (mirroring) device.
490
491   This is a single-node call.
492
493   """
494   params = [dsk.ToDict() for dsk in disks]
495   c = Client("blockdev_getmirrorstatus", params)
496   c.connect(node)
497   c.run()
498   return c.getresult().get(node, False)
499
500
501 def call_blockdev_find(node, disk):
502   """Request identification of a given block device.
503
504   This is a single-node call.
505
506   """
507   c = Client("blockdev_find", [disk.ToDict()])
508   c.connect(node)
509   c.run()
510   return c.getresult().get(node, False)
511
512
513 def call_blockdev_close(node, disks):
514   """Closes the given block devices.
515
516   This is a single-node call.
517
518   """
519   params = [cf.ToDict() for cf in disks]
520   c = Client("blockdev_close", params)
521   c.connect(node)
522   c.run()
523   return c.getresult().get(node, False)
524
525
526 def call_upload_file(node_list, file_name):
527   """Upload a file.
528
529   The node will refuse the operation in case the file is not on the
530   approved file list.
531
532   This is a multi-node call.
533
534   """
535   fh = file(file_name)
536   try:
537     data = fh.read()
538   finally:
539     fh.close()
540   st = os.stat(file_name)
541   params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
542             st.st_atime, st.st_mtime]
543   c = Client("upload_file", params)
544   c.connect_list(node_list)
545   c.run()
546   return c.getresult()
547
548
549 def call_os_diagnose(node_list):
550   """Request a diagnose of OS definitions.
551
552   This is a multi-node call.
553
554   """
555   c = Client("os_diagnose", [])
556   c.connect_list(node_list)
557   c.run()
558   result = c.getresult()
559   new_result = {}
560   for node_name in result:
561     if result[node_name]:
562       nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
563     else:
564       nr = []
565     new_result[node_name] = nr
566   return new_result
567
568
569 def call_os_get(node, name):
570   """Returns an OS definition.
571
572   This is a single-node call.
573
574   """
575   c = Client("os_get", [name])
576   c.connect(node)
577   c.run()
578   result = c.getresult().get(node, False)
579   if isinstance(result, dict):
580     return objects.OS.FromDict(result)
581   else:
582     return result
583
584
585 def call_hooks_runner(node_list, hpath, phase, env):
586   """Call the hooks runner.
587
588   Args:
589     - op: the OpCode instance
590     - env: a dictionary with the environment
591
592   This is a multi-node call.
593
594   """
595   params = [hpath, phase, env]
596   c = Client("hooks_runner", params)
597   c.connect_list(node_list)
598   c.run()
599   result = c.getresult()
600   return result
601
602
603 def call_iallocator_runner(node, name, idata):
604   """Call an iallocator on a remote node
605
606   Args:
607     - name: the iallocator name
608     - input: the json-encoded input string
609
610   This is a single-node call.
611
612   """
613   params = [name, idata]
614   c = Client("iallocator_runner", params)
615   c.connect(node)
616   c.run()
617   result = c.getresult().get(node, False)
618   return result
619
620
621 def call_blockdev_grow(node, cf_bdev, amount):
622   """Request a snapshot of the given block device.
623
624   This is a single-node call.
625
626   """
627   c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
628   c.connect(node)
629   c.run()
630   return c.getresult().get(node, False)
631
632
633 def call_blockdev_snapshot(node, cf_bdev):
634   """Request a snapshot of the given block device.
635
636   This is a single-node call.
637
638   """
639   c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
640   c.connect(node)
641   c.run()
642   return c.getresult().get(node, False)
643
644
645 def call_snapshot_export(node, snap_bdev, dest_node, instance):
646   """Request the export of a given snapshot.
647
648   This is a single-node call.
649
650   """
651   params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
652   c = Client("snapshot_export", params)
653   c.connect(node)
654   c.run()
655   return c.getresult().get(node, False)
656
657
658 def call_finalize_export(node, instance, snap_disks):
659   """Request the completion of an export operation.
660
661   This writes the export config file, etc.
662
663   This is a single-node call.
664
665   """
666   flat_disks = []
667   for disk in snap_disks:
668     flat_disks.append(disk.ToDict())
669   params = [instance.ToDict(), flat_disks]
670   c = Client("finalize_export", params)
671   c.connect(node)
672   c.run()
673   return c.getresult().get(node, False)
674
675
676 def call_export_info(node, path):
677   """Queries the export information in a given path.
678
679   This is a single-node call.
680
681   """
682   c = Client("export_info", [path])
683   c.connect(node)
684   c.run()
685   result = c.getresult().get(node, False)
686   if not result:
687     return result
688   return objects.SerializableConfigParser.Loads(str(result))
689
690
691 def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
692   """Request the import of a backup into an instance.
693
694   This is a single-node call.
695
696   """
697   params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
698   c = Client("instance_os_import", params)
699   c.connect(node)
700   c.run()
701   return c.getresult().get(node, False)
702
703
704 def call_export_list(node_list):
705   """Gets the stored exports list.
706
707   This is a multi-node call.
708
709   """
710   c = Client("export_list", [])
711   c.connect_list(node_list)
712   c.run()
713   result = c.getresult()
714   return result
715
716
717 def call_export_remove(node, export):
718   """Requests removal of a given export.
719
720   This is a single-node call.
721
722   """
723   c = Client("export_remove", [export])
724   c.connect(node)
725   c.run()
726   return c.getresult().get(node, False)
727
728
729 def call_node_leave_cluster(node):
730   """Requests a node to clean the cluster information it has.
731
732   This will remove the configuration information from the ganeti data
733   dir.
734
735   This is a single-node call.
736
737   """
738   c = Client("node_leave_cluster", [])
739   c.connect(node)
740   c.run()
741   return c.getresult().get(node, False)
742
743
744 def call_node_volumes(node_list):
745   """Gets all volumes on node(s).
746
747   This is a multi-node call.
748
749   """
750   c = Client("node_volumes", [])
751   c.connect_list(node_list)
752   c.run()
753   return c.getresult()
754
755
756 def call_test_delay(node_list, duration):
757   """Sleep for a fixed time on given node(s).
758
759   This is a multi-node call.
760
761   """
762   c = Client("test_delay", [duration])
763   c.connect_list(node_list)
764   c.run()
765   return c.getresult()
766
767
768 def call_file_storage_dir_create(node, file_storage_dir):
769   """Create the given file storage directory.
770
771   This is a single-node call.
772
773   """
774   c = Client("file_storage_dir_create", [file_storage_dir])
775   c.connect(node)
776   c.run()
777   return c.getresult().get(node, False)
778
779
780 def call_file_storage_dir_remove(node, file_storage_dir):
781   """Remove the given file storage directory.
782
783   This is a single-node call.
784
785   """
786   c = Client("file_storage_dir_remove", [file_storage_dir])
787   c.connect(node)
788   c.run()
789   return c.getresult().get(node, False)
790
791
792 def call_file_storage_dir_rename(node, old_file_storage_dir,
793                                  new_file_storage_dir):
794   """Rename file storage directory.
795
796   This is a single-node call.
797
798   """
799   c = Client("file_storage_dir_rename",
800              [old_file_storage_dir, new_file_storage_dir])
801   c.connect(node)
802   c.run()
803   return c.getresult().get(node, False)