LockSet: make acquire() fail faster on wrong locks
[ganeti-local] / lib / rpc.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script to show add a new node to the cluster
23
24 """
25
26 # pylint: disable-msg=C0103
27
28 import os
29 import socket
30 import httplib
31
32 import simplejson
33
34 from ganeti import logger
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import constants
38 from ganeti import objects
39 from ganeti import ssconf
40
41 class NodeController:
42   """Node-handling class.
43
44   For each node that we speak with, we create an instance of this
45   class, so that we have a safe place to store the details of this
46   individual call.
47
48   """
49   def __init__(self, parent, node):
50     self.parent = parent
51     self.node = node
52     self.failed = False
53
54     self.http_conn = hc = httplib.HTTPConnection(node, self.parent.port)
55     try:
56       hc.connect()
57       hc.putrequest('PUT', "/%s" % self.parent.procedure,
58                     skip_accept_encoding=True)
59       hc.putheader('Content-Length', str(len(parent.body)))
60       hc.endheaders()
61       hc.send(parent.body)
62     except socket.error, err:
63       logger.Error("Error connecting to %s: %s" % (node, str(err)))
64       self.failed = True
65
66   def get_response(self):
67     """Try to process the response from the node.
68
69     """
70     if self.failed:
71       # we already failed in connect
72       return False
73     resp = self.http_conn.getresponse()
74     if resp.status != 200:
75       return False
76     try:
77       length = int(resp.getheader('Content-Length', '0'))
78     except ValueError:
79       return False
80     if not length:
81       logger.Error("Zero-length reply from %s" % self.node)
82       return False
83     payload = resp.read(length)
84     unload = simplejson.loads(payload)
85     return unload
86
87
88 class Client:
89   """RPC Client class.
90
91   This class, given a (remote) method name, a list of parameters and a
92   list of nodes, will contact (in parallel) all nodes, and return a
93   dict of results (key: node name, value: result).
94
95   One current bug is that generic failure is still signalled by
96   'False' result, which is not good. This overloading of values can
97   cause bugs.
98
99   """
100   result_set = False
101   result = False
102   allresult = []
103
104   def __init__(self, procedure, args):
105     ss = ssconf.SimpleStore()
106     self.port = ss.GetNodeDaemonPort()
107     self.nodepw = ss.GetNodeDaemonPassword()
108     self.nc = {}
109     self.results = {}
110     self.procedure = procedure
111     self.args = args
112     self.body = simplejson.dumps(args)
113
114   #--- generic connector -------------
115
116   def connect_list(self, node_list):
117     """Add a list of nodes to the target nodes.
118
119     """
120     for node in node_list:
121       self.connect(node)
122
123   def connect(self, connect_node):
124     """Add a node to the target list.
125
126     """
127     self.nc[connect_node] = nc = NodeController(self, connect_node)
128
129   def getresult(self):
130     """Return the results of the call.
131
132     """
133     return self.results
134
135   def run(self):
136     """Wrapper over reactor.run().
137
138     This function simply calls reactor.run() if we have any requests
139     queued, otherwise it does nothing.
140
141     """
142     for node, nc in self.nc.items():
143       self.results[node] = nc.get_response()
144
145
146 def call_volume_list(node_list, vg_name):
147   """Gets the logical volumes present in a given volume group.
148
149   This is a multi-node call.
150
151   """
152   c = Client("volume_list", [vg_name])
153   c.connect_list(node_list)
154   c.run()
155   return c.getresult()
156
157
158 def call_vg_list(node_list):
159   """Gets the volume group list.
160
161   This is a multi-node call.
162
163   """
164   c = Client("vg_list", [])
165   c.connect_list(node_list)
166   c.run()
167   return c.getresult()
168
169
170 def call_bridges_exist(node, bridges_list):
171   """Checks if a node has all the bridges given.
172
173   This method checks if all bridges given in the bridges_list are
174   present on the remote node, so that an instance that uses interfaces
175   on those bridges can be started.
176
177   This is a single-node call.
178
179   """
180   c = Client("bridges_exist", [bridges_list])
181   c.connect(node)
182   c.run()
183   return c.getresult().get(node, False)
184
185
186 def call_instance_start(node, instance, extra_args):
187   """Starts an instance.
188
189   This is a single-node call.
190
191   """
192   c = Client("instance_start", [instance.ToDict(), extra_args])
193   c.connect(node)
194   c.run()
195   return c.getresult().get(node, False)
196
197
198 def call_instance_shutdown(node, instance):
199   """Stops an instance.
200
201   This is a single-node call.
202
203   """
204   c = Client("instance_shutdown", [instance.ToDict()])
205   c.connect(node)
206   c.run()
207   return c.getresult().get(node, False)
208
209
210 def call_instance_reboot(node, instance, reboot_type, extra_args):
211   """Reboots an instance.
212
213   This is a single-node call.
214
215   """
216   c = Client("instance_reboot", [instance.ToDict(), reboot_type, extra_args])
217   c.connect(node)
218   c.run()
219   return c.getresult().get(node, False)
220
221
222 def call_instance_os_add(node, inst, osdev, swapdev):
223   """Installs an OS on the given instance.
224
225   This is a single-node call.
226
227   """
228   params = [inst.ToDict(), osdev, swapdev]
229   c = Client("instance_os_add", params)
230   c.connect(node)
231   c.run()
232   return c.getresult().get(node, False)
233
234
235 def call_instance_run_rename(node, inst, old_name, osdev, swapdev):
236   """Run the OS rename script for an instance.
237
238   This is a single-node call.
239
240   """
241   params = [inst.ToDict(), old_name, osdev, swapdev]
242   c = Client("instance_run_rename", params)
243   c.connect(node)
244   c.run()
245   return c.getresult().get(node, False)
246
247
248 def call_instance_info(node, instance):
249   """Returns information about a single instance.
250
251   This is a single-node call.
252
253   """
254   c = Client("instance_info", [instance])
255   c.connect(node)
256   c.run()
257   return c.getresult().get(node, False)
258
259
260 def call_all_instances_info(node_list):
261   """Returns information about all instances on a given node.
262
263   This is a single-node call.
264
265   """
266   c = Client("all_instances_info", [])
267   c.connect_list(node_list)
268   c.run()
269   return c.getresult()
270
271
272 def call_instance_list(node_list):
273   """Returns the list of running instances on a given node.
274
275   This is a single-node call.
276
277   """
278   c = Client("instance_list", [])
279   c.connect_list(node_list)
280   c.run()
281   return c.getresult()
282
283
284 def call_node_tcp_ping(node, source, target, port, timeout, live_port_needed):
285   """Do a TcpPing on the remote node
286
287   This is a single-node call.
288   """
289   c = Client("node_tcp_ping", [source, target, port, timeout,
290                                live_port_needed])
291   c.connect(node)
292   c.run()
293   return c.getresult().get(node, False)
294
295
296 def call_node_info(node_list, vg_name):
297   """Return node information.
298
299   This will return memory information and volume group size and free
300   space.
301
302   This is a multi-node call.
303
304   """
305   c = Client("node_info", [vg_name])
306   c.connect_list(node_list)
307   c.run()
308   retux = c.getresult()
309
310   for node_name in retux:
311     ret = retux.get(node_name, False)
312     if type(ret) != dict:
313       logger.Error("could not connect to node %s" % (node_name))
314       ret = {}
315
316     utils.CheckDict(ret,
317                     { 'memory_total' : '-',
318                       'memory_dom0' : '-',
319                       'memory_free' : '-',
320                       'vg_size' : 'node_unreachable',
321                       'vg_free' : '-' },
322                     "call_node_info",
323                     )
324   return retux
325
326
327 def call_node_add(node, dsa, dsapub, rsa, rsapub, ssh, sshpub):
328   """Add a node to the cluster.
329
330   This is a single-node call.
331
332   """
333   params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
334   c = Client("node_add", params)
335   c.connect(node)
336   c.run()
337   return c.getresult().get(node, False)
338
339
340 def call_node_verify(node_list, checkdict):
341   """Request verification of given parameters.
342
343   This is a multi-node call.
344
345   """
346   c = Client("node_verify", [checkdict])
347   c.connect_list(node_list)
348   c.run()
349   return c.getresult()
350
351
352 def call_node_start_master(node):
353   """Tells a node to activate itself as a master.
354
355   This is a single-node call.
356
357   """
358   c = Client("node_start_master", [])
359   c.connect(node)
360   c.run()
361   return c.getresult().get(node, False)
362
363
364 def call_node_stop_master(node):
365   """Tells a node to demote itself from master status.
366
367   This is a single-node call.
368
369   """
370   c = Client("node_stop_master", [])
371   c.connect(node)
372   c.run()
373   return c.getresult().get(node, False)
374
375
376 def call_version(node_list):
377   """Query node version.
378
379   This is a multi-node call.
380
381   """
382   c = Client("version", [])
383   c.connect_list(node_list)
384   c.run()
385   return c.getresult()
386
387
388 def call_blockdev_create(node, bdev, size, owner, on_primary, info):
389   """Request creation of a given block device.
390
391   This is a single-node call.
392
393   """
394   params = [bdev.ToDict(), size, owner, on_primary, info]
395   c = Client("blockdev_create", params)
396   c.connect(node)
397   c.run()
398   return c.getresult().get(node, False)
399
400
401 def call_blockdev_remove(node, bdev):
402   """Request removal of a given block device.
403
404   This is a single-node call.
405
406   """
407   c = Client("blockdev_remove", [bdev.ToDict()])
408   c.connect(node)
409   c.run()
410   return c.getresult().get(node, False)
411
412
413 def call_blockdev_rename(node, devlist):
414   """Request rename of the given block devices.
415
416   This is a single-node call.
417
418   """
419   params = [(d.ToDict(), uid) for d, uid in devlist]
420   c = Client("blockdev_rename", params)
421   c.connect(node)
422   c.run()
423   return c.getresult().get(node, False)
424
425
426 def call_blockdev_assemble(node, disk, owner, on_primary):
427   """Request assembling of a given block device.
428
429   This is a single-node call.
430
431   """
432   params = [disk.ToDict(), owner, on_primary]
433   c = Client("blockdev_assemble", params)
434   c.connect(node)
435   c.run()
436   return c.getresult().get(node, False)
437
438
439 def call_blockdev_shutdown(node, disk):
440   """Request shutdown of a given block device.
441
442   This is a single-node call.
443
444   """
445   c = Client("blockdev_shutdown", [disk.ToDict()])
446   c.connect(node)
447   c.run()
448   return c.getresult().get(node, False)
449
450
451 def call_blockdev_addchildren(node, bdev, ndevs):
452   """Request adding a list of children to a (mirroring) device.
453
454   This is a single-node call.
455
456   """
457   params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
458   c = Client("blockdev_addchildren", params)
459   c.connect(node)
460   c.run()
461   return c.getresult().get(node, False)
462
463
464 def call_blockdev_removechildren(node, bdev, ndevs):
465   """Request removing a list of children from a (mirroring) device.
466
467   This is a single-node call.
468
469   """
470   params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
471   c = Client("blockdev_removechildren", params)
472   c.connect(node)
473   c.run()
474   return c.getresult().get(node, False)
475
476
477 def call_blockdev_getmirrorstatus(node, disks):
478   """Request status of a (mirroring) device.
479
480   This is a single-node call.
481
482   """
483   params = [dsk.ToDict() for dsk in disks]
484   c = Client("blockdev_getmirrorstatus", params)
485   c.connect(node)
486   c.run()
487   return c.getresult().get(node, False)
488
489
490 def call_blockdev_find(node, disk):
491   """Request identification of a given block device.
492
493   This is a single-node call.
494
495   """
496   c = Client("blockdev_find", [disk.ToDict()])
497   c.connect(node)
498   c.run()
499   return c.getresult().get(node, False)
500
501
502 def call_upload_file(node_list, file_name):
503   """Upload a file.
504
505   The node will refuse the operation in case the file is not on the
506   approved file list.
507
508   This is a multi-node call.
509
510   """
511   fh = file(file_name)
512   try:
513     data = fh.read()
514   finally:
515     fh.close()
516   st = os.stat(file_name)
517   params = [file_name, data, st.st_mode, st.st_uid, st.st_gid,
518             st.st_atime, st.st_mtime]
519   c = Client("upload_file", params)
520   c.connect_list(node_list)
521   c.run()
522   return c.getresult()
523
524
525 def call_os_diagnose(node_list):
526   """Request a diagnose of OS definitions.
527
528   This is a multi-node call.
529
530   """
531   c = Client("os_diagnose", [])
532   c.connect_list(node_list)
533   c.run()
534   result = c.getresult()
535   new_result = {}
536   for node_name in result:
537     if result[node_name]:
538       nr = [objects.OS.FromDict(oss) for oss in result[node_name]]
539     else:
540       nr = []
541     new_result[node_name] = nr
542   return new_result
543
544
545 def call_os_get(node, name):
546   """Returns an OS definition.
547
548   This is a single-node call.
549
550   """
551   c = Client("os_get", [name])
552   c.connect(node)
553   c.run()
554   result = c.getresult().get(node, False)
555   if isinstance(result, dict):
556     return objects.OS.FromDict(result)
557   else:
558     return result
559
560
561 def call_hooks_runner(node_list, hpath, phase, env):
562   """Call the hooks runner.
563
564   Args:
565     - op: the OpCode instance
566     - env: a dictionary with the environment
567
568   This is a multi-node call.
569
570   """
571   params = [hpath, phase, env]
572   c = Client("hooks_runner", params)
573   c.connect_list(node_list)
574   c.run()
575   result = c.getresult()
576   return result
577
578
579 def call_blockdev_snapshot(node, cf_bdev):
580   """Request a snapshot of the given block device.
581
582   This is a single-node call.
583
584   """
585   c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
586   c.connect(node)
587   c.run()
588   return c.getresult().get(node, False)
589
590
591 def call_snapshot_export(node, snap_bdev, dest_node, instance):
592   """Request the export of a given snapshot.
593
594   This is a single-node call.
595
596   """
597   params = [snap_bdev.ToDict(), dest_node, instance.ToDict()]
598   c = Client("snapshot_export", params)
599   c.connect(node)
600   c.run()
601   return c.getresult().get(node, False)
602
603
604 def call_finalize_export(node, instance, snap_disks):
605   """Request the completion of an export operation.
606
607   This writes the export config file, etc.
608
609   This is a single-node call.
610
611   """
612   flat_disks = []
613   for disk in snap_disks:
614     flat_disks.append(disk.ToDict())
615   params = [instance.ToDict(), flat_disks]
616   c = Client("finalize_export", params)
617   c.connect(node)
618   c.run()
619   return c.getresult().get(node, False)
620
621
622 def call_export_info(node, path):
623   """Queries the export information in a given path.
624
625   This is a single-node call.
626
627   """
628   c = Client("export_info", [path])
629   c.connect(node)
630   c.run()
631   result = c.getresult().get(node, False)
632   if not result:
633     return result
634   return objects.SerializableConfigParser.Loads(result)
635
636
637 def call_instance_os_import(node, inst, osdev, swapdev, src_node, src_image):
638   """Request the import of a backup into an instance.
639
640   This is a single-node call.
641
642   """
643   params = [inst.ToDict(), osdev, swapdev, src_node, src_image]
644   c = Client("instance_os_import", params)
645   c.connect(node)
646   c.run()
647   return c.getresult().get(node, False)
648
649
650 def call_export_list(node_list):
651   """Gets the stored exports list.
652
653   This is a multi-node call.
654
655   """
656   c = Client("export_list", [])
657   c.connect_list(node_list)
658   c.run()
659   result = c.getresult()
660   return result
661
662
663 def call_export_remove(node, export):
664   """Requests removal of a given export.
665
666   This is a single-node call.
667
668   """
669   c = Client("export_remove", [export])
670   c.connect(node)
671   c.run()
672   return c.getresult().get(node, False)
673
674
675 def call_node_leave_cluster(node):
676   """Requests a node to clean the cluster information it has.
677
678   This will remove the configuration information from the ganeti data
679   dir.
680
681   This is a single-node call.
682
683   """
684   c = Client("node_leave_cluster", [])
685   c.connect(node)
686   c.run()
687   return c.getresult().get(node, False)
688
689
690 def call_node_volumes(node_list):
691   """Gets all volumes on node(s).
692
693   This is a multi-node call.
694
695   """
696   c = Client("node_volumes", [])
697   c.connect_list(node_list)
698   c.run()
699   return c.getresult()
700
701
702 def call_test_delay(node_list, duration):
703   """Sleep for a fixed time on given node(s).
704
705   This is a multi-node call.
706
707   """
708   c = Client("test_delay", [duration])
709   c.connect_list(node_list)
710   c.run()
711   return c.getresult()