rpc directory functions for file backend
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script to show add a new node to the cluster
23
24 """
25
26 # pylint: disable-msg=C0103
27
28 import os
29 import socket
30 import httplib
31
32 import simplejson
33
34 from ganeti import logger
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import constants
38 from ganeti import objects
39 from ganeti import ssconf
40
41
42 class NodeController:
43   """Node-handling class.
44
45   For each node that we speak with, we create an instance of this
46   class, so that we have a safe place to store the details of this
47   individual call.
48
49   """
50   def __init__(self, parent, node):
51     self.parent = parent
52     self.node = node
53     self.failed = False
54
55     self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
56     try:
57       hc.connect()
58       hc.putrequest('PUT', "/%s" % self.parent.procedure,
59                     skip_accept_encoding=True)
60       hc.putheader('Content-Length', str(len(parent.body)))
61       hc.endheaders()
62       hc.send(parent.body)
63     except socket.error, err:
64       logger.Error("Error connecting to %s: %s" % (node, str(err)))
65       self.failed = True
66
67   def get_response(self):
68     """Try to process the response from the node.
69
70     """
71     if self.failed:
72       # we already failed in connect
73       return False
74     resp = self.http_conn.getresponse()
75     if resp.status != 200:
76       return False
77     try:
78       length = int(resp.getheader('Content-Length', '0'))
79     except ValueError:
80       return False
81     if not length:
82       logger.Error("Zero-length reply from %s" % self.node)
83       return False
84     payload = resp.read(length)
85     unload = simplejson.loads(payload)
86     return unload
87
88
89 class Client:
90   """RPC Client class.
91
92   This class, given a (remote) method name, a list of parameters and a
93   list of nodes, will contact (in parallel) all nodes, and return a
94   dict of results (key: node name, value: result).
95
96   One current bug is that generic failure is still signalled by
97   'False' result, which is not good. This overloading of values can
98   cause bugs.
99
100   """
101   result_set = False
102   result = False
103   allresult = []
104
105   def __init__(self, procedure, args):
106     ss = ssconf.SimpleStore()
107     self.port = ss.GetNodeDaemonPort()
108     self.nodepw = ss.GetNodeDaemonPassword()
109     self.nc = {}
110     self.results = {}
111     self.procedure = procedure
112     self.args = args
113     self.body = simplejson.dumps(args)
114
115   #--- generic connector -------------
116
117   def connect_list(self, node_list):
118     """Add a list of nodes to the target nodes.
119
120     """
121     for node in node_list:
122       self.connect(node)
123
124   def connect(self, connect_node):
125     """Add a node to the target list.
126
127     """
128     self.nc[connect_node] = nc = NodeController(self, connect_node)
129
130   def getresult(self):
131     """Return the results of the call.
132
133     """
134     return self.results
135
136   def run(self):
137     """Wrapper over reactor.run().
138
139     This function simply calls reactor.run() if we have any requests
140     queued, otherwise it does nothing.
141
142     """
143     for node, nc in self.nc.items():
144       self.results[node] = nc.get_response()
145
146
147 def call_volume_list(node_list, vg_name):
148   """Gets the logical volumes present in a given volume group.
149
150   This is a multi-node call.
151
152   """
153   c = Client("volume_list", [vg_name])
154   c.connect_list(node_list)
155   c.run()
156   return c.getresult()
157
158
159 def call_vg_list(node_list):
160   """Gets the volume group list.
161
162   This is a multi-node call.
163
164   """
165   c = Client("vg_list", [])
166   c.connect_list(node_list)
167   c.run()
168   return c.getresult()
169
170
171 def call_bridges_exist(node, bridges_list):
172   """Checks if a node has all the bridges given.
173
174   This method checks if all bridges given in the bridges_list are
175   present on the remote node, so that an instance that uses interfaces
176   on those bridges can be started.
177
178   This is a single-node call.
179
180   """
181   c = Client("bridges_exist", [bridges_list])
182   c.connect(node)
183   c.run()
184   return c.getresult().get(node, False)
185
186
187 def call_instance_start(node, instance, extra_args):
188   """Starts an instance.
189
190   This is a single-node call.
191
192   """
193   c = Client("instance_start", [instance.ToDict(), extra_args])
194   c.connect(node)
195   c.run()
196   return c.getresult().get(node, False)
197
198
199 def call_instance_shutdown(node, instance):
200   """Stops an instance.
201
202   This is a single-node call.
203
204   """
205   c = Client("instance_shutdown", [instance.ToDict()])
206   c.connect(node)
207   c.run()
208   return c.getresult().get(node, False)
209
210
211 def call_instance_reboot(node, instance, reboot_type, extra_args):
212   """Reboots an instance.
213
214   This is a single-node call.
215
216   """
217   c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
218   c.connect(node)
219   c.run()
220   return c.getresult().get(node, False)
221
222
223 def call_instance_os_add(node, inst, osdev, swapdev):
224   """Installs an OS on the given instance.
225
226   This is a single-node call.
227
228   """
229   params = [inst.ToDict(), osdev, swapdev]
230   c = Client("instance_os_add", params)
231   c.connect(node)
232   c.run()
233   return c.getresult().get(node, False)
234
235
236 def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
237   """Run the OS rename script for an instance.
238
239   This is a single-node call.
240
241   """
242   params = [inst.ToDict(), old_name, osdev, swapdev]
243   c = Client("instance_run_rename", params)
244   c.connect(node)
245   c.run()
246   return c.getresult().get(node, False)
247
248
249 def call_instance_info(node, instance):
250   """Returns information about a single instance.
251
252   This is a single-node call.
253
254   """
255   c = Client("instance_info", [instance])
256   c.connect(node)
257   c.run()
258   return c.getresult().get(node, False)
259
260
261 def call_all_instances_info(node_list):
262   """Returns information about all instances on a given node.
263
264   This is a single-node call.
265
266   """
267   c = Client("all_instances_info", [])
268   c.connect_list(node_list)
269   c.run()
270   return c.getresult()
271
272
273 def call_instance_list(node_list):
274   """Returns the list of running instances on a given node.
275
276   This is a single-node call.
277
278   """
279   c = Client("instance_list", [])
280   c.connect_list(node_list)
281   c.run()
282   return c.getresult()
283
284
285 def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
286   """Do a TcpPing on the remote node
287
288   This is a single-node call.
289   """
290   c = Client("node_tcp_ping", [source, target, port, timeout,
291                                live_port_needed])
292   c.connect(node)
293   c.run()
294   return c.getresult().get(node, False)
295
296
297 def call_node_info(node_list, vg_name):
298   """Return node information.
299
300   This will return memory information and volume group size and free
301   space.
302
303   This is a multi-node call.
304
305   """
306   c = Client("node_info", [vg_name])
307   c.connect_list(node_list)
308   c.run()
309   retux = c.getresult()
310
311   for node_name in retux:
312     ret = retux.get(node_name, False)
313     if type(ret) != dict:
314       logger.Error("could not connect to node %s" % (node_name))
315       ret = {}
316
317     utils.CheckDict(ret,
318                     { 'memory_total' : '-',
319                       'memory_dom0' : '-',
320                       'memory_free' : '-',
321                       'vg_size' : 'node_unreachable',
322                       'vg_free' : '-' },
323                     "call_node_info",
324                     )
325   return retux
326
327
328 def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
329   """Add a node to the cluster.
330
331   This is a single-node call.
332
333   """
334   params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
335   c = Client("node_add", params)
336   c.connect(node)
337   c.run()
338   return c.getresult().get(node, False)
339
340
341 def call_node_verify(node_list, checkdict):
342   """Request verification of given parameters.
343
344   This is a multi-node call.
345
346   """
347   c = Client("node_verify", [checkdict])
348   c.connect_list(node_list)
349   c.run()
350   return c.getresult()
351
352
353 def call_node_start_master(node):
354   """Tells a node to activate itself as a master.
355
356   This is a single-node call.
357
358   """
359   c = Client("node_start_master", [])
360   c.connect(node)
361   c.run()
362   return c.getresult().get(node, False)
363
364
365 def call_node_stop_master(node):
366   """Tells a node to demote itself from master status.
367
368   This is a single-node call.
369
370   """
371   c = Client("node_stop_master", [])
372   c.connect(node)
373   c.run()
374   return c.getresult().get(node, False)
375
376
377 def call_version(node_list):
378   """Query node version.
379
380   This is a multi-node call.
381
382   """
383   c = Client("version", [])
384   c.connect_list(node_list)
385   c.run()
386   return c.getresult()
387
388
389 def call_blockdev_create(node, bdev, size, owner, on_primary, info):
390   """Request creation of a given block device.
391
392   This is a single-node call.
393
394   """
395   params = [bdev.ToDict(), size, owner, on_primary, info]
396   c = Client("blockdev_create", params)
397   c.connect(node)
398   c.run()
399   return c.getresult().get(node, False)
400
401
402 def call_blockdev_remove(node, bdev):
403   """Request removal of a given block device.
404
405   This is a single-node call.
406
407   """
408   c = Client("blockdev_remove", [bdev.ToDict()])
409   c.connect(node)
410   c.run()
411   return c.getresult().get(node, False)
412
413
414 def call_blockdev_rename(node, devlist):
415   """Request rename of the given block devices.
416
417   This is a single-node call.
418
419   """
420   params = [(d.ToDict(), uid) for d, uid in devlist]
421   c = Client("blockdev_rename", params)
422   c.connect(node)
423   c.run()
424   return c.getresult().get(node, False)
425
426
427 def call_blockdev_assemble(node, disk, owner, on_primary):
428   """Request assembling of a given block device.
429
430   This is a single-node call.
431
432   """
433   params = [disk.ToDict(), owner, on_primary]
434   c = Client("blockdev_assemble", params)
435   c.connect(node)
436   c.run()
437   return c.getresult().get(node, False)
438
439
440 def call_blockdev_shutdown(node, disk):
441   """Request shutdown of a given block device.
442
443   This is a single-node call.
444
445   """
446   c = Client("blockdev_shutdown", [disk.ToDict()])
447   c.connect(node)
448   c.run()
449   return c.getresult().get(node, False)
450
451
452 def call_blockdev_addchildren(node, bdev, ndevs):
453   """Request adding a list of children to a (mirroring) device.
454
455   This is a single-node call.
456
457   """
458   params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
459   c = Client("blockdev_addchildren", params)
460   c.connect(node)
461   c.run()
462   return c.getresult().get(node, False)
463
464
465 def call_blockdev_removechildren(node, bdev, ndevs):
466   """Request removing a list of children from a (mirroring) device.
467
468   This is a single-node call.
469
470   """
471   params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
472   c = Client("blockdev_removechildren", params)
473   c.connect(node)
474   c.run()
475   return c.getresult().get(node, False)
476
477
478 def call_blockdev_getmirrorstatus(node, disks):
479   """Request status of a (mirroring) device.
480
481   This is a single-node call.
482
483   """
484   params = [dsk.ToDict() for dsk in disks]
485   c = Client("blockdev_getmirrorstatus", params)
486   c.connect(node)
487   c.run()
488   return c.getresult().get(node, False)
489
490
491 def call_blockdev_find(node, disk):
492   """Request identification of a given block device.
493
494   This is a single-node call.
495
496   """
497   c = Client("blockdev_find", [disk.ToDict()])
498   c.connect(node)
499   c.run()
500   return c.getresult().get(node, False)
501
502
503 def call_upload_file(node_list, file_name):
504   """Upload a file.
505
506   The node will refuse the operation in case the file is not on the
507   approved file list.
508
509   This is a multi-node call.
510
511   """
512   fh = file(file_name)
513   try:
514     data = fh.read()
515   finally:
516     fh.close()
517   st = os.stat(file_name)
518   params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
519             st.st_atime, st.st_mtime]
520   c = Client("upload_file", params)
521   c.connect_list(node_list)
522   c.run()
523   return c.getresult()
524
525
526 def call_os_diagnose(node_list):
527   """Request a diagnose of OS definitions.
528
529   This is a multi-node call.
530
531   """
532   c = Client("os_diagnose", [])
533   c.connect_list(node_list)
534   c.run()
535   result = c.getresult()
536   new_result = {}
537   for node_name in result:
538     if result[node_name]:
539       nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
540     else:
541       nr = []
542     new_result[node_name] = nr
543   return new_result
544
545
546 def call_os_get(node, name):
547   """Returns an OS definition.
548
549   This is a single-node call.
550
551   """
552   c = Client("os_get", [name])
553   c.connect(node)
554   c.run()
555   result = c.getresult().get(node, False)
556   if isinstance(result, dict):
557     return objects.OS.FromDict(result)
558   else:
559     return result
560
561
562 def call_hooks_runner(node_list, hpath, phase, env):
563   """Call the hooks runner.
564
565   Args:
566     - op: the OpCode instance
567     - env: a dictionary with the environment
568
569   This is a multi-node call.
570
571   """
572   params = [hpath, phase, env]
573   c = Client("hooks_runner", params)
574   c.connect_list(node_list)
575   c.run()
576   result = c.getresult()
577   return result
578
579
580 def call_blockdev_snapshot(node, cf_bdev):
581   """Request a snapshot of the given block device.
582
583   This is a single-node call.
584
585   """
586   c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
587   c.connect(node)
588   c.run()
589   return c.getresult().get(node, False)
590
591
592 def call_snapshot_export(node, snap_bdev, dest_node, instance):
593   """Request the export of a given snapshot.
594
595   This is a single-node call.
596
597   """
598   params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
599   c = Client("snapshot_export", params)
600   c.connect(node)
601   c.run()
602   return c.getresult().get(node, False)
603
604
605 def call_finalize_export(node, instance, snap_disks):
606   """Request the completion of an export operation.
607
608   This writes the export config file, etc.
609
610   This is a single-node call.
611
612   """
613   flat_disks = []
614   for disk in snap_disks:
615     flat_disks.append(disk.ToDict())
616   params = [instance.ToDict(), flat_disks]
617   c = Client("finalize_export", params)
618   c.connect(node)
619   c.run()
620   return c.getresult().get(node, False)
621
622
623 def call_export_info(node, path):
624   """Queries the export information in a given path.
625
626   This is a single-node call.
627
628   """
629   c = Client("export_info", [path])
630   c.connect(node)
631   c.run()
632   result = c.getresult().get(node, False)
633   if not result:
634     return result
635   return objects.SerializableConfigParser.Loads(str(result))
636
637
638 def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
639   """Request the import of a backup into an instance.
640
641   This is a single-node call.
642
643   """
644   params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
645   c = Client("instance_os_import", params)
646   c.connect(node)
647   c.run()
648   return c.getresult().get(node, False)
649
650
651 def call_export_list(node_list):
652   """Gets the stored exports list.
653
654   This is a multi-node call.
655
656   """
657   c = Client("export_list", [])
658   c.connect_list(node_list)
659   c.run()
660   result = c.getresult()
661   return result
662
663
664 def call_export_remove(node, export):
665   """Requests removal of a given export.
666
667   This is a single-node call.
668
669   """
670   c = Client("export_remove", [export])
671   c.connect(node)
672   c.run()
673   return c.getresult().get(node, False)
674
675
676 def call_node_leave_cluster(node):
677   """Requests a node to clean the cluster information it has.
678
679   This will remove the configuration information from the ganeti data
680   dir.
681
682   This is a single-node call.
683
684   """
685   c = Client("node_leave_cluster", [])
686   c.connect(node)
687   c.run()
688   return c.getresult().get(node, False)
689
690
691 def call_node_volumes(node_list):
692   """Gets all volumes on node(s).
693
694   This is a multi-node call.
695
696   """
697   c = Client("node_volumes", [])
698   c.connect_list(node_list)
699   c.run()
700   return c.getresult()
701
702
703 def call_test_delay(node_list, duration):
704   """Sleep for a fixed time on given node(s).
705
706   This is a multi-node call.
707
708   """
709   c = Client("test_delay", [duration])
710   c.connect_list(node_list)
711   c.run()
712   return c.getresult()
713
714
715 def call_file_storage_dir_create(node, file_storage_dir):
716   """Create the given file storage directory.
717
718   This is a single-node call.
719
720   """
721   c = Client("file_storage_dir_create", [file_storage_dir])
722   c.connect(node)
723   c.run()
724   return c.getresult().get(node, False)
725
726
727 def call_file_storage_dir_remove(node, file_storage_dir):
728   """Remove the given file storage directory.
729
730   This is a single-node call.
731
732   """
733   c = Client("file_storage_dir_remove", [file_storage_dir])
734   c.connect(node)
735   c.run()
736   return c.getresult().get(node, False)
737
738
739 def call_file_storage_dir_rename(node, old_file_storage_dir,
740                                  new_file_storage_dir):
741   """Rename file storage directory.
742
743   This is a single-node call.
744
745   """
746   c = Client("file_storage_dir_rename",
747              [old_file_storage_dir, new_file_storage_dir])
748   c.connect(node)
749   c.run()
750   return c.getresult().get(node, False)
751