Revision fb1ffbca

b/lib/build/rpc_definitions.py
361 361
  "RpcClientDefault": (_IMPEXP_CALLS + _X509_CALLS + _OS_CALLS + _NODE_CALLS +
362 362
    _FILE_STORAGE_CALLS + _MISC_CALLS + _INSTANCE_CALLS + _BLOCKDEV_CALLS +
363 363
    _STORAGE_CALLS),
364
  "RpcClientJobQueue": [
365
    ("jobqueue_update", MULTI, TMO_URGENT, [
366
      ("file_name", None, None),
367
      ("content", "self._Compress(%s)", None),
368
      ], None, "Update job queue file"),
369
    ("jobqueue_purge", SINGLE, TMO_NORMAL, [], None, "Purge job queue"),
370
    ("jobqueue_rename", MULTI, TMO_URGENT, [
371
      ("rename", None, None),
372
      ], None, "Rename job queue file"),
373
    ],
364 374
  }
b/lib/jqueue.py
1611 1611

  
1612 1612
    logging.info("Job queue inspection finished")
1613 1613

  
1614
  def _GetRpc(self, address_list):
1615
    """Gets RPC runner with context.
1616

  
1617
    """
1618
    return rpc.JobQueueRunner(self.context, address_list)
1619

  
1614 1620
  @locking.ssynchronized(_LOCK)
1615 1621
  @_RequireOpenQueue
1616 1622
  def AddNode(self, node):
......
1624 1630
    assert node_name != self._my_hostname
1625 1631

  
1626 1632
    # Clean queue directory on added node
1627
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
1633
    result = self._GetRpc(None).call_jobqueue_purge(node_name)
1628 1634
    msg = result.fail_msg
1629 1635
    if msg:
1630 1636
      logging.warning("Cannot cleanup queue directory on node %s: %s",
......
1642 1648
    # Upload current serial file
1643 1649
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
1644 1650

  
1651
    # Static address list
1652
    addrs = [node.primary_ip]
1653

  
1645 1654
    for file_name in files:
1646 1655
      # Read file content
1647 1656
      content = utils.ReadFile(file_name)
1648 1657

  
1649
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
1650
                                                  [node.primary_ip],
1651
                                                  file_name, content)
1658
      result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name,
1659
                                                        content)
1652 1660
      msg = result[node_name].fail_msg
1653 1661
      if msg:
1654 1662
        logging.error("Failed to upload file %s to node %s: %s",
......
1732 1740

  
1733 1741
    if replicate:
1734 1742
      names, addrs = self._GetNodeIp()
1735
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
1743
      result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
1736 1744
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
1737 1745

  
1738 1746
  def _RenameFilesUnlocked(self, rename):
......
1751 1759

  
1752 1760
    # ... and on all nodes
1753 1761
    names, addrs = self._GetNodeIp()
1754
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
1762
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1755 1763
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1756 1764

  
1757 1765
  @staticmethod
b/lib/rpc.py
781 781
    return self.call_test_delay(node_list, duration,
782 782
                                read_timeout=int(duration + 5))
783 783

  
784
  @classmethod
785
  @_RpcTimeout(_TMO_URGENT)
786
  def call_jobqueue_update(cls, node_list, address_list, file_name, content):
787
    """Update job queue.
788

  
789
    This is a multi-node call.
790

  
791
    """
792
    return cls._StaticMultiNodeCall(node_list, "jobqueue_update",
793
                                    [file_name, _Compress(content)],
794
                                    address_list=address_list)
795

  
796
  @classmethod
797
  @_RpcTimeout(_TMO_NORMAL)
798
  def call_jobqueue_purge(cls, node):
799
    """Purge job queue.
800

  
801
    This is a single-node call.
802

  
803
    """
804
    return cls._StaticSingleNodeCall(node, "jobqueue_purge", [])
805

  
806
  @classmethod
807
  @_RpcTimeout(_TMO_URGENT)
808
  def call_jobqueue_rename(cls, node_list, address_list, rename):
809
    """Rename a job queue file.
810

  
811
    This is a multi-node call.
812

  
813
    """
814
    return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename,
815
                                    address_list=address_list)
816

  
817 784
  @_RpcTimeout(_TMO_NORMAL)
818 785
  def call_hypervisor_validate_params(self, node_list, hvname, hvparams):
819 786
    """Validate the hypervisor params.
......
832 799
    hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
833 800
    return self._MultiNodeCall(node_list, "hypervisor_validate_params",
834 801
                               [hvname, hv_full])
802

  
803

  
804
class JobQueueRunner(_generated_rpc.RpcClientJobQueue):
805
  """RPC wrappers for job queue.
806

  
807
  """
808
  _Compress = staticmethod(_Compress)
809

  
810
  def __init__(self, context, address_list):
811
    """Initializes this class.
812

  
813
    """
814
    _generated_rpc.RpcClientJobQueue.__init__(self)
815

  
816
    if address_list is None:
817
      resolver = _SsconfResolver
818
    else:
819
      # Caller provided an address list
820
      resolver = _StaticResolver(address_list)
821

  
822
    self._proc = _RpcProcessor(resolver,
823
                               netutils.GetDaemonPort(constants.NODED),
824
                               lock_monitor_cb=context.glm.AddToLockMonitor)
825

  
826
  def _Call(self, node_list, procedure, timeout, args):
827
    """Entry point for automatically generated RPC wrappers.
828

  
829
    """
830
    body = serializer.DumpJson(args, indent=False)
831

  
832
    return self._proc(node_list, procedure, body, read_timeout=timeout)
b/lib/server/noded.py
914 914

  
915 915
    """
916 916
    # TODO: What if a file fails to rename?
917
    return [backend.JobQueueRename(old, new) for old, new in params]
917
    return [backend.JobQueueRename(old, new) for old, new in params[0]]
918 918

  
919 919
  # hypervisor ---------------
920 920

  

Also available in: Unified diff