Revision db04ce5d

b/lib/bootstrap.py
222 222

  
223 223
  """
224 224
  def _CheckNodeDaemon():
225
    result = rpc.RpcRunner.call_version([node_name])[node_name]
225
    result = rpc.BootstrapRunner().call_version([node_name])[node_name]
226 226
    if result.fail_msg:
227 227
      raise utils.RetryAgain()
228 228

  
......
565 565
  """
566 566
  cfg = config.ConfigWriter()
567 567
  modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup
568
  result = rpc.RpcRunner.call_node_stop_master(master)
568
  result = rpc.BootstrapRunner().call_node_stop_master(master)
569 569
  msg = result.fail_msg
570 570
  if msg:
571 571
    logging.warning("Could not disable the master role: %s", msg)
572
  result = rpc.RpcRunner.call_node_leave_cluster(master, modify_ssh_setup)
572
  result = rpc.BootstrapRunner().call_node_leave_cluster(master,
573
                                                         modify_ssh_setup)
573 574
  msg = result.fail_msg
574 575
  if msg:
575 576
    logging.warning("Could not shutdown the node daemon and cleanup"
......
697 698

  
698 699
  logging.info("Stopping the master daemon on node %s", old_master)
699 700

  
700
  result = rpc.RpcRunner.call_node_stop_master(old_master)
701
  result = rpc.BootstrapRunner().call_node_stop_master(old_master)
701 702
  msg = result.fail_msg
702 703
  if msg:
703 704
    logging.error("Could not disable the master role on the old master"
......
726 727

  
727 728
  logging.info("Starting the master daemons on the new master")
728 729

  
729
  result = rpc.RpcRunner.call_node_start_master_daemons(new_master, no_voting)
730
  result = rpc.BootstrapRunner().call_node_start_master_daemons(new_master,
731
                                                                no_voting)
730 732
  msg = result.fail_msg
731 733
  if msg:
732 734
    logging.error("Could not start the master role on the new master"
......
782 784
  if not node_list:
783 785
    # no nodes left (eventually after removing myself)
784 786
    return []
785
  results = rpc.RpcRunner.call_master_info(node_list)
787
  results = rpc.BootstrapRunner().call_master_info(node_list)
786 788
  if not isinstance(results, dict):
787 789
    # this should not happen (unless internal error in rpc)
788 790
    logging.critical("Can't complete rpc call, aborting master startup")
b/lib/build/rpc_definitions.py
371 371
      ("rename", None, None),
372 372
      ], None, "Rename job queue file"),
373 373
    ],
374
  "RpcClientBootstrap": [
375
    ("node_start_master_daemons", SINGLE, TMO_FAST, [
376
      ("no_voting", None, None),
377
      ], None, "Starts master daemons on a node"),
378
    ("node_activate_master_ip", SINGLE, TMO_FAST, [], None,
379
     "Activates master IP on a node"),
380
    ("node_stop_master", SINGLE, TMO_FAST, [], None,
381
     "Deactivates master IP and stops master daemons on a node"),
382
    ("node_deactivate_master_ip", SINGLE, TMO_FAST, [], None,
383
     "Deactivates master IP on a node"),
384
    ("node_change_master_netmask", SINGLE, TMO_FAST, [
385
      ("netmask", None, None),
386
      ], None, "Change master IP netmask"),
387
    ("node_leave_cluster", SINGLE, TMO_NORMAL, [
388
      ("modify_ssh_setup", None, None),
389
      ], None, "Requests a node to clean the cluster information it has"),
390
    ("master_info", MULTI, TMO_URGENT, [], None, "Query master info"),
391
    ("version", MULTI, TMO_URGENT, [], None, "Query node version"),
392
    ],
374 393
  }
b/lib/rpc.py
437 437
    return self._CombineResults(results, requests, procedure)
438 438

  
439 439

  
440
class RpcRunner(_generated_rpc.RpcClientDefault):
440
class RpcRunner(_generated_rpc.RpcClientDefault,
441
                _generated_rpc.RpcClientBootstrap):
441 442
  """RPC runner class.
442 443

  
443 444
  """
......
448 449
    @param context: Ganeti context
449 450

  
450 451
    """
452
    # Pylint doesn't recognize multiple inheritance properly, see
453
    # <http://www.logilab.org/ticket/36586> and
454
    # <http://www.logilab.org/ticket/35642>
455
    # pylint: disable=W0233
456
    _generated_rpc.RpcClientBootstrap.__init__(self)
451 457
    _generated_rpc.RpcClientDefault.__init__(self)
452 458

  
453 459
    self._cfg = context.cfg
......
647 653
                                 reinstall, debug])
648 654

  
649 655
  @classmethod
650
  @_RpcTimeout(_TMO_FAST)
651
  def call_node_start_master_daemons(cls, node, no_voting):
652
    """Starts master daemons on a node.
653

  
654
    This is a single-node call.
655

  
656
    """
657
    return cls._StaticSingleNodeCall(node, "node_start_master_daemons",
658
                                     [no_voting])
659

  
660
  @classmethod
661
  @_RpcTimeout(_TMO_FAST)
662
  def call_node_activate_master_ip(cls, node):
663
    """Activates master IP on a node.
664

  
665
    This is a single-node call.
666

  
667
    """
668
    return cls._StaticSingleNodeCall(node, "node_activate_master_ip", [])
669

  
670
  @classmethod
671
  @_RpcTimeout(_TMO_FAST)
672
  def call_node_stop_master(cls, node):
673
    """Deactivates master IP and stops master daemons on a node.
674

  
675
    This is a single-node call.
676

  
677
    """
678
    return cls._StaticSingleNodeCall(node, "node_stop_master", [])
679

  
680
  @classmethod
681
  @_RpcTimeout(_TMO_FAST)
682
  def call_node_deactivate_master_ip(cls, node):
683
    """Deactivates master IP on a node.
684

  
685
    This is a single-node call.
686

  
687
    """
688
    return cls._StaticSingleNodeCall(node, "node_deactivate_master_ip", [])
689

  
690
  @classmethod
691
  @_RpcTimeout(_TMO_FAST)
692
  def call_node_change_master_netmask(cls, node, netmask):
693
    """Change master IP netmask.
694

  
695
    This is a single-node call.
696

  
697
    """
698
    return cls._StaticSingleNodeCall(node, "node_change_master_netmask",
699
                  [netmask])
700

  
701
  @classmethod
702
  @_RpcTimeout(_TMO_URGENT)
703
  def call_master_info(cls, node_list):
704
    """Query master info.
705

  
706
    This is a multi-node call.
707

  
708
    """
709
    # TODO: should this method query down nodes?
710
    return cls._StaticMultiNodeCall(node_list, "master_info", [])
711

  
712
  @classmethod
713
  @_RpcTimeout(_TMO_URGENT)
714
  def call_version(cls, node_list):
715
    """Query node version.
716

  
717
    This is a multi-node call.
718

  
719
    """
720
    return cls._StaticMultiNodeCall(node_list, "version", [])
721

  
722
  @classmethod
723 656
  @_RpcTimeout(_TMO_NORMAL)
724 657
  def call_upload_file(cls, node_list, file_name, address_list=None):
725 658
    """Upload a file.
......
757 690
    """
758 691
    return cls._StaticMultiNodeCall(node_list, "write_ssconf_files", [values])
759 692

  
760
  @classmethod
761
  @_RpcTimeout(_TMO_NORMAL)
762
  def call_node_leave_cluster(cls, node, modify_ssh_setup):
763
    """Requests a node to clean the cluster information it has.
764

  
765
    This will remove the configuration information from the ganeti data
766
    dir.
767

  
768
    This is a single-node call.
769

  
770
    """
771
    return cls._StaticSingleNodeCall(node, "node_leave_cluster",
772
                                     [modify_ssh_setup])
773

  
774 693
  def call_test_delay(self, node_list, duration, read_timeout=None):
775 694
    """Sleep for a fixed time on given node(s).
776 695

  
......
830 749
    body = serializer.DumpJson(args, indent=False)
831 750

  
832 751
    return self._proc(node_list, procedure, body, read_timeout=timeout)
752

  
753

  
754
class BootstrapRunner(_generated_rpc.RpcClientBootstrap):
755
  """RPC wrappers for bootstrapping.
756

  
757
  """
758
  def __init__(self):
759
    """Initializes this class.
760

  
761
    """
762
    _generated_rpc.RpcClientBootstrap.__init__(self)
763

  
764
    self._proc = _RpcProcessor(_SsconfResolver,
765
                               netutils.GetDaemonPort(constants.NODED))
766

  
767
  def _Call(self, node_list, procedure, timeout, args):
768
    """Entry point for automatically generated RPC wrappers.
769

  
770
    """
771
    body = serializer.DumpJson(args, indent=False)
772

  
773
    return self._proc(node_list, procedure, body, read_timeout=timeout)
b/lib/server/masterd.py
532 532
def ActivateMasterIP():
533 533
  # activate ip
534 534
  master_node = ssconf.SimpleStore().GetMasterNode()
535
  result = rpc.RpcRunner.call_node_activate_master_ip(master_node)
535
  result = rpc.BootstrapRunner().call_node_activate_master_ip(master_node)
536 536
  msg = result.fail_msg
537 537
  if msg:
538 538
    logging.error("Can't activate master IP address: %s", msg)
b/test/ganeti.rpc_unittest.py
73 73
    resolver = rpc._StaticResolver(["127.0.0.1"])
74 74
    http_proc = _FakeRequestProcessor(self._GetVersionResponse)
75 75
    proc = rpc._RpcProcessor(resolver, 24094)
76
    result = proc(["localhost"], "version", None, _req_process_fn=http_proc)
76
    result = proc(["localhost"], "version", None, _req_process_fn=http_proc,
77
                  read_timeout=60)
77 78
    self.assertEqual(result.keys(), ["localhost"])
78 79
    lhresp = result["localhost"]
79 80
    self.assertFalse(lhresp.offline)
......
113 114
    resolver = rpc._StaticResolver([rpc._OFFLINE])
114 115
    http_proc = _FakeRequestProcessor(NotImplemented)
115 116
    proc = rpc._RpcProcessor(resolver, 30668)
116
    result = proc(["n17296"], "version", None, _req_process_fn=http_proc)
117
    result = proc(["n17296"], "version", None, _req_process_fn=http_proc,
118
                  read_timeout=60)
117 119
    self.assertEqual(result.keys(), ["n17296"])
118 120
    lhresp = result["n17296"]
119 121
    self.assertTrue(lhresp.offline)
......
143 145
    resolver = rpc._StaticResolver(nodes)
144 146
    http_proc = _FakeRequestProcessor(self._GetMultiVersionResponse)
145 147
    proc = rpc._RpcProcessor(resolver, 23245)
146
    result = proc(nodes, "version", None, _req_process_fn=http_proc)
148
    result = proc(nodes, "version", None, _req_process_fn=http_proc,
149
                  read_timeout=60)
147 150
    self.assertEqual(sorted(result.keys()), sorted(nodes))
148 151

  
149 152
    for name in nodes:
......
171 174
        _FakeRequestProcessor(compat.partial(self._GetVersionResponseFail,
172 175
                                             errinfo))
173 176
      result = proc(["aef9ur4i.example.com"], "version", None,
174
                    _req_process_fn=http_proc)
177
                    _req_process_fn=http_proc, read_timeout=60)
175 178
      self.assertEqual(result.keys(), ["aef9ur4i.example.com"])
176 179
      lhresp = result["aef9ur4i.example.com"]
177 180
      self.assertFalse(lhresp.offline)
......
263 266
    for fn in [self._GetInvalidResponseA, self._GetInvalidResponseB]:
264 267
      http_proc = _FakeRequestProcessor(fn)
265 268
      result = proc(["oqo7lanhly.example.com"], "version", None,
266
                    _req_process_fn=http_proc)
269
                    _req_process_fn=http_proc, read_timeout=60)
267 270
      self.assertEqual(result.keys(), ["oqo7lanhly.example.com"])
268 271
      lhresp = result["oqo7lanhly.example.com"]
269 272
      self.assertFalse(lhresp.offline)

Also available in: Unified diff