Revision ecfe9491 lib/rpc.py

b/lib/rpc.py
39 39

  
40 40
from ganeti import utils
41 41
from ganeti import objects
42

  
43

  
44
class NodeController:
45
  """Node-handling class.
46

  
47
  For each node that we speak with, we create an instance of this
48
  class, so that we have a safe place to store the details of this
49
  individual call.
50

  
51
  """
52
  def __init__(self, parent, node, address=None):
53
    """Constructor for the node controller.
54

  
55
    @type parent: L{Client}
56
    @param parent: the C{Client} instance which holds global parameters for
57
        the call
58
    @type node: str
59
    @param node: the name of the node we connect to; it is used for error
60
        messages and in cases we the address paramater is not passed
61
    @type address: str
62
    @keyword address: the node's address, in case we know it, so that we
63
        don't need to resolve it; testing shows that httplib has high
64
        overhead in resolving addresses (even when speficied in /etc/hosts)
65

  
66
    """
67
    self.parent = parent
68
    self.node = node
69
    if address is None:
70
      address = node
71
    self.failed = False
72

  
73
    self.http_conn = hc = httplib.HTTPConnection(address, parent.port)
74
    try:
75
      hc.connect()
76
      hc.putrequest('PUT', "/%s" % parent.procedure,
77
                    skip_accept_encoding=True)
78
      hc.putheader('Content-Length', parent.body_length)
79
      hc.endheaders()
80
      hc.send(parent.body)
81
    except socket.error:
82
      logging.exception("Error connecting to node %s", node)
83
      self.failed = True
84

  
85
  def GetResponse(self):
86
    """Try to process the response from the node.
87

  
88
    """
89
    if self.failed:
90
      # we already failed in connect
91
      return False
92
    resp = self.http_conn.getresponse()
93
    if resp.status != 200:
94
      return False
95
    try:
96
      length = int(resp.getheader('Content-Length', '0'))
97
    except ValueError:
98
      return False
99
    if not length:
100
      logging.error("Zero-length reply from node %s", self.node)
101
      return False
102
    payload = resp.read(length)
103
    unload = simplejson.loads(payload)
104
    return unload
42
from ganeti import http
105 43

  
106 44

  
107 45
class Client:
......
115 53
  'False' result, which is not good. This overloading of values can
116 54
  cause bugs.
117 55

  
118
  @var body_length: cached string value of the length of the body (so that
119
      individual C{NodeController} instances don't have to recompute it)
120

  
121 56
  """
122
  result_set = False
123
  result = False
124
  allresult = []
125

  
126 57
  def __init__(self, procedure, args):
127
    self.port = utils.GetNodeDaemonPort()
128
    self.nodepw = utils.GetNodeDaemonPassword()
129
    self.nc = {}
130
    self.results = {}
131 58
    self.procedure = procedure
132 59
    self.args = args
133 60
    self.body = simplejson.dumps(args)
134
    self.body_length = str(len(self.body))
135 61

  
136
  #--- generic connector -------------
62
    self.port = utils.GetNodeDaemonPort()
63
    self.nodepw = utils.GetNodeDaemonPassword()
64
    self.nc = {}
137 65

  
138 66
  def ConnectList(self, node_list, address_list=None):
139 67
    """Add a list of nodes to the target nodes.
......
162 90
    @keyword address: the node address, if known
163 91

  
164 92
    """
165
    self.nc[name] = NodeController(self, name, address)
93
    if address is None:
94
      address = name
95

  
96
    self.nc[name] = http.HttpClientRequest(address, self.port, http.HTTP_PUT,
97
                                           "/%s" % self.procedure,
98
                                           post_data=self.body)
166 99

  
167 100
  def GetResults(self):
168
    """Return the results of the call.
101
    """Call nodes and return results.
102

  
103
    @rtype: list
104
    @returns: List of RPC results
169 105

  
170 106
    """
171
    return self.results
107
    # TODO: Shared and reused manager
108
    mgr = http.HttpClientManager()
109
    try:
110
      mgr.ExecRequests(self.nc.values())
111
    finally:
112
      mgr.Shutdown()
172 113

  
173
  def Run(self):
174
    """Gather results from the node controllers.
114
    results = {}
175 115

  
176
    This function simply calls GetResponse() for each of our node
177
    controllers.
116
    for name, req in self.nc.iteritems():
117
      if req.success and req.resp_status == http.HTTP_OK:
118
        results[name] = simplejson.loads(req.resp_body)
119
        continue
178 120

  
179
    """
180
    for node, nc in self.nc.items():
181
      self.results[node] = nc.GetResponse()
121
      if req.error:
122
        msg = req.error
123
      else:
124
        msg = req.resp_body
125

  
126
      logging.error("RPC error from node %s: %s", name, msg)
127
      results[name] = False
128

  
129
    return results
182 130

  
183 131

  
184 132
class RpcRunner(object):
......
256 204
    """
257 205
    c = Client("volume_list", [vg_name])
258 206
    self._ConnectList(c, node_list)
259
    c.Run()
260 207
    return c.GetResults()
261 208

  
262 209
  def call_vg_list(self, node_list):
......
267 214
    """
268 215
    c = Client("vg_list", [])
269 216
    self._ConnectList(c, node_list)
270
    c.Run()
271 217
    return c.GetResults()
272 218

  
273 219
  def call_bridges_exist(self, node, bridges_list):
......
282 228
    """
283 229
    c = Client("bridges_exist", [bridges_list])
284 230
    self._ConnectNode(c, node)
285
    c.Run()
286 231
    return c.GetResults().get(node, False)
287 232

  
288 233
  def call_instance_start(self, node, instance, extra_args):
......
293 238
    """
294 239
    c = Client("instance_start", [self._InstDict(instance), extra_args])
295 240
    self._ConnectNode(c, node)
296
    c.Run()
297 241
    return c.GetResults().get(node, False)
298 242

  
299 243
  def call_instance_shutdown(self, node, instance):
......
304 248
    """
305 249
    c = Client("instance_shutdown", [self._InstDict(instance)])
306 250
    self._ConnectNode(c, node)
307
    c.Run()
308 251
    return c.GetResults().get(node, False)
309 252

  
310 253
  def call_instance_migrate(self, node, instance, target, live):
......
325 268
    """
326 269
    c = Client("instance_migrate", [self._InstDict(instance), target, live])
327 270
    self._ConnectNode(c, node)
328
    c.Run()
329 271
    return c.GetResults().get(node, False)
330 272

  
331 273
  def call_instance_reboot(self, node, instance, reboot_type, extra_args):
......
337 279
    c = Client("instance_reboot", [self._InstDict(instance),
338 280
                                   reboot_type, extra_args])
339 281
    self._ConnectNode(c, node)
340
    c.Run()
341 282
    return c.GetResults().get(node, False)
342 283

  
343 284
  def call_instance_os_add(self, node, inst):
......
349 290
    params = [self._InstDict(inst)]
350 291
    c = Client("instance_os_add", params)
351 292
    self._ConnectNode(c, node)
352
    c.Run()
353 293
    return c.GetResults().get(node, False)
354 294

  
355 295
  def call_instance_run_rename(self, node, inst, old_name):
......
361 301
    params = [self._InstDict(inst), old_name]
362 302
    c = Client("instance_run_rename", params)
363 303
    self._ConnectNode(c, node)
364
    c.Run()
365 304
    return c.GetResults().get(node, False)
366 305

  
367 306
  def call_instance_info(self, node, instance, hname):
......
379 318
    """
380 319
    c = Client("instance_info", [instance, hname])
381 320
    self._ConnectNode(c, node)
382
    c.Run()
383 321
    return c.GetResults().get(node, False)
384 322

  
385 323
  def call_all_instances_info(self, node_list, hypervisor_list):
......
395 333
    """
396 334
    c = Client("all_instances_info", [hypervisor_list])
397 335
    self._ConnectList(c, node_list)
398
    c.Run()
399 336
    return c.GetResults()
400 337

  
401 338
  def call_instance_list(self, node_list, hypervisor_list):
......
411 348
    """
412 349
    c = Client("instance_list", [hypervisor_list])
413 350
    self._ConnectList(c, node_list)
414
    c.Run()
415 351
    return c.GetResults()
416 352

  
417 353
  def call_node_tcp_ping(self, node, source, target, port, timeout,
......
424 360
    c = Client("node_tcp_ping", [source, target, port, timeout,
425 361
                                 live_port_needed])
426 362
    self._ConnectNode(c, node)
427
    c.Run()
428 363
    return c.GetResults().get(node, False)
429 364

  
430 365
  def call_node_has_ip_address(self, node, address):
......
435 370
    """
436 371
    c = Client("node_has_ip_address", [address])
437 372
    self._ConnectNode(c, node)
438
    c.Run()
439 373
    return c.GetResults().get(node, False)
440 374

  
441 375
  def call_node_info(self, node_list, vg_name, hypervisor_type):
......
458 392
    """
459 393
    c = Client("node_info", [vg_name, hypervisor_type])
460 394
    self._ConnectList(c, node_list)
461
    c.Run()
462 395
    retux = c.GetResults()
463 396

  
464 397
    for node_name in retux:
......
486 419
    params = [dsa, dsapub, rsa, rsapub, ssh, sshpub]
487 420
    c = Client("node_add", params)
488 421
    self._ConnectNode(c, node)
489
    c.Run()
490 422
    return c.GetResults().get(node, False)
491 423

  
492 424
  def call_node_verify(self, node_list, checkdict, cluster_name):
......
497 429
    """
498 430
    c = Client("node_verify", [checkdict, cluster_name])
499 431
    self._ConnectList(c, node_list)
500
    c.Run()
501 432
    return c.GetResults()
502 433

  
503 434
  @staticmethod
......
509 440
    """
510 441
    c = Client("node_start_master", [start_daemons])
511 442
    c.ConnectNode(node)
512
    c.Run()
513 443
    return c.GetResults().get(node, False)
514 444

  
515 445
  @staticmethod
......
521 451
    """
522 452
    c = Client("node_stop_master", [stop_daemons])
523 453
    c.ConnectNode(node)
524
    c.Run()
525 454
    return c.GetResults().get(node, False)
526 455

  
527 456
  @staticmethod
......
534 463
    # TODO: should this method query down nodes?
535 464
    c = Client("master_info", [])
536 465
    c.ConnectList(node_list)
537
    c.Run()
538 466
    return c.GetResults()
539 467

  
540 468
  def call_version(self, node_list):
......
545 473
    """
546 474
    c = Client("version", [])
547 475
    self._ConnectList(c, node_list)
548
    c.Run()
549 476
    return c.GetResults()
550 477

  
551 478
  def call_blockdev_create(self, node, bdev, size, owner, on_primary, info):
......
557 484
    params = [bdev.ToDict(), size, owner, on_primary, info]
558 485
    c = Client("blockdev_create", params)
559 486
    self._ConnectNode(c, node)
560
    c.Run()
561 487
    return c.GetResults().get(node, False)
562 488

  
563 489
  def call_blockdev_remove(self, node, bdev):
......
568 494
    """
569 495
    c = Client("blockdev_remove", [bdev.ToDict()])
570 496
    self._ConnectNode(c, node)
571
    c.Run()
572 497
    return c.GetResults().get(node, False)
573 498

  
574 499
  def call_blockdev_rename(self, node, devlist):
......
580 505
    params = [(d.ToDict(), uid) for d, uid in devlist]
581 506
    c = Client("blockdev_rename", params)
582 507
    self._ConnectNode(c, node)
583
    c.Run()
584 508
    return c.GetResults().get(node, False)
585 509

  
586 510
  def call_blockdev_assemble(self, node, disk, owner, on_primary):
......
592 516
    params = [disk.ToDict(), owner, on_primary]
593 517
    c = Client("blockdev_assemble", params)
594 518
    self._ConnectNode(c, node)
595
    c.Run()
596 519
    return c.GetResults().get(node, False)
597 520

  
598 521
  def call_blockdev_shutdown(self, node, disk):
......
603 526
    """
604 527
    c = Client("blockdev_shutdown", [disk.ToDict()])
605 528
    self._ConnectNode(c, node)
606
    c.Run()
607 529
    return c.GetResults().get(node, False)
608 530

  
609 531
  def call_blockdev_addchildren(self, node, bdev, ndevs):
......
615 537
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
616 538
    c = Client("blockdev_addchildren", params)
617 539
    self._ConnectNode(c, node)
618
    c.Run()
619 540
    return c.GetResults().get(node, False)
620 541

  
621 542
  def call_blockdev_removechildren(self, node, bdev, ndevs):
......
627 548
    params = [bdev.ToDict(), [disk.ToDict() for disk in ndevs]]
628 549
    c = Client("blockdev_removechildren", params)
629 550
    self._ConnectNode(c, node)
630
    c.Run()
631 551
    return c.GetResults().get(node, False)
632 552

  
633 553
  def call_blockdev_getmirrorstatus(self, node, disks):
......
639 559
    params = [dsk.ToDict() for dsk in disks]
640 560
    c = Client("blockdev_getmirrorstatus", params)
641 561
    self._ConnectNode(c, node)
642
    c.Run()
643 562
    return c.GetResults().get(node, False)
644 563

  
645 564
  def call_blockdev_find(self, node, disk):
......
650 569
    """
651 570
    c = Client("blockdev_find", [disk.ToDict()])
652 571
    self._ConnectNode(c, node)
653
    c.Run()
654 572
    return c.GetResults().get(node, False)
655 573

  
656 574
  def call_blockdev_close(self, node, disks):
......
662 580
    params = [cf.ToDict() for cf in disks]
663 581
    c = Client("blockdev_close", params)
664 582
    self._ConnectNode(c, node)
665
    c.Run()
666 583
    return c.GetResults().get(node, False)
667 584

  
668 585
  @staticmethod
......
693 610
              st.st_atime, st.st_mtime]
694 611
    c = Client("upload_file", params)
695 612
    c.ConnectList(node_list, address_list=address_list)
696
    c.Run()
697 613
    return c.GetResults()
698 614

  
699 615
  def call_os_diagnose(self, node_list):
......
704 620
    """
705 621
    c = Client("os_diagnose", [])
706 622
    self._ConnectList(c, node_list)
707
    c.Run()
708 623
    result = c.GetResults()
709 624
    new_result = {}
710 625
    for node_name in result:
......
723 638
    """
724 639
    c = Client("os_get", [name])
725 640
    self._ConnectNode(c, node)
726
    c.Run()
727 641
    result = c.GetResults().get(node, False)
728 642
    if isinstance(result, dict):
729 643
      return objects.OS.FromDict(result)
......
743 657
    params = [hpath, phase, env]
744 658
    c = Client("hooks_runner", params)
745 659
    self._ConnectList(c, node_list)
746
    c.Run()
747 660
    result = c.GetResults()
748 661
    return result
749 662

  
......
760 673
    params = [name, idata]
761 674
    c = Client("iallocator_runner", params)
762 675
    self._ConnectNode(c, node)
763
    c.Run()
764 676
    result = c.GetResults().get(node, False)
765 677
    return result
766 678

  
......
772 684
    """
773 685
    c = Client("blockdev_grow", [cf_bdev.ToDict(), amount])
774 686
    self._ConnectNode(c, node)
775
    c.Run()
776 687
    return c.GetResults().get(node, False)
777 688

  
778 689
  def call_blockdev_snapshot(self, node, cf_bdev):
......
783 694
    """
784 695
    c = Client("blockdev_snapshot", [cf_bdev.ToDict()])
785 696
    self._ConnectNode(c, node)
786
    c.Run()
787 697
    return c.GetResults().get(node, False)
788 698

  
789 699
  def call_snapshot_export(self, node, snap_bdev, dest_node, instance,
......
797 707
              self._InstDict(instance), cluster_name, idx]
798 708
    c = Client("snapshot_export", params)
799 709
    self._ConnectNode(c, node)
800
    c.Run()
801 710
    return c.GetResults().get(node, False)
802 711

  
803 712
  def call_finalize_export(self, node, instance, snap_disks):
......
814 723
    params = [self._InstDict(instance), flat_disks]
815 724
    c = Client("finalize_export", params)
816 725
    self._ConnectNode(c, node)
817
    c.Run()
818 726
    return c.GetResults().get(node, False)
819 727

  
820 728
  def call_export_info(self, node, path):
......
825 733
    """
826 734
    c = Client("export_info", [path])
827 735
    self._ConnectNode(c, node)
828
    c.Run()
829 736
    result = c.GetResults().get(node, False)
830 737
    if not result:
831 738
      return result
......
841 748
    params = [self._InstDict(inst), src_node, src_images, cluster_name]
842 749
    c = Client("instance_os_import", params)
843 750
    self._ConnectNode(c, node)
844
    c.Run()
845 751
    return c.GetResults().get(node, False)
846 752

  
847 753
  def call_export_list(self, node_list):
......
852 758
    """
853 759
    c = Client("export_list", [])
854 760
    self._ConnectList(c, node_list)
855
    c.Run()
856 761
    result = c.GetResults()
857 762
    return result
858 763

  
......
864 769
    """
865 770
    c = Client("export_remove", [export])
866 771
    self._ConnectNode(c, node)
867
    c.Run()
868 772
    return c.GetResults().get(node, False)
869 773

  
870 774
  @staticmethod
......
879 783
    """
880 784
    c = Client("node_leave_cluster", [])
881 785
    c.ConnectNode(node)
882
    c.Run()
883 786
    return c.GetResults().get(node, False)
884 787

  
885 788
  def call_node_volumes(self, node_list):
......
890 793
    """
891 794
    c = Client("node_volumes", [])
892 795
    self._ConnectList(c, node_list)
893
    c.Run()
894 796
    return c.GetResults()
895 797

  
896 798
  def call_test_delay(self, node_list, duration):
......
901 803
    """
902 804
    c = Client("test_delay", [duration])
903 805
    self._ConnectList(c, node_list)
904
    c.Run()
905 806
    return c.GetResults()
906 807

  
907 808
  def call_file_storage_dir_create(self, node, file_storage_dir):
......
912 813
    """
913 814
    c = Client("file_storage_dir_create", [file_storage_dir])
914 815
    self._ConnectNode(c, node)
915
    c.Run()
916 816
    return c.GetResults().get(node, False)
917 817

  
918 818
  def call_file_storage_dir_remove(self, node, file_storage_dir):
......
923 823
    """
924 824
    c = Client("file_storage_dir_remove", [file_storage_dir])
925 825
    self._ConnectNode(c, node)
926
    c.Run()
927 826
    return c.GetResults().get(node, False)
928 827

  
929 828
  def call_file_storage_dir_rename(self, node, old_file_storage_dir,
......
936 835
    c = Client("file_storage_dir_rename",
937 836
               [old_file_storage_dir, new_file_storage_dir])
938 837
    self._ConnectNode(c, node)
939
    c.Run()
940 838
    return c.GetResults().get(node, False)
941 839

  
942 840
  @staticmethod
......
948 846
    """
949 847
    c = Client("jobqueue_update", [file_name, content])
950 848
    c.ConnectList(node_list, address_list=address_list)
951
    c.Run()
952 849
    result = c.GetResults()
953 850
    return result
954 851

  
......
961 858
    """
962 859
    c = Client("jobqueue_purge", [])
963 860
    c.ConnectNode(node)
964
    c.Run()
965 861
    return c.GetResults().get(node, False)
966 862

  
967 863
  @staticmethod
......
973 869
    """
974 870
    c = Client("jobqueue_rename", [old, new])
975 871
    c.ConnectList(node_list, address_list=address_list)
976
    c.Run()
977 872
    result = c.GetResults()
978 873
    return result
979 874

  
......
992 887
    """
993 888
    c = Client("jobqueue_set_drain", [drain_flag])
994 889
    c.ConnectList(node_list)
995
    c.Run()
996 890
    result = c.GetResults()
997 891
    return result
998 892

  
......
1014 908
    hv_full = cluster.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
1015 909
    c = Client("hypervisor_validate_params", [hvname, hv_full])
1016 910
    self._ConnectList(c, node_list)
1017
    c.Run()
1018 911
    result = c.GetResults()
1019 912
    return result

Also available in: Unified diff