Revision fb1ffbca
b/lib/build/rpc_definitions.py | ||
---|---|---|
361 | 361 |
"RpcClientDefault": (_IMPEXP_CALLS + _X509_CALLS + _OS_CALLS + _NODE_CALLS + |
362 | 362 |
_FILE_STORAGE_CALLS + _MISC_CALLS + _INSTANCE_CALLS + _BLOCKDEV_CALLS + |
363 | 363 |
_STORAGE_CALLS), |
364 |
"RpcClientJobQueue": [ |
|
365 |
("jobqueue_update", MULTI, TMO_URGENT, [ |
|
366 |
("file_name", None, None), |
|
367 |
("content", "self._Compress(%s)", None), |
|
368 |
], None, "Update job queue file"), |
|
369 |
("jobqueue_purge", SINGLE, TMO_NORMAL, [], None, "Purge job queue"), |
|
370 |
("jobqueue_rename", MULTI, TMO_URGENT, [ |
|
371 |
("rename", None, None), |
|
372 |
], None, "Rename job queue file"), |
|
373 |
], |
|
364 | 374 |
} |
b/lib/jqueue.py | ||
---|---|---|
1611 | 1611 |
|
1612 | 1612 |
logging.info("Job queue inspection finished") |
1613 | 1613 |
|
1614 |
def _GetRpc(self, address_list): |
|
1615 |
"""Gets RPC runner with context. |
|
1616 |
|
|
1617 |
""" |
|
1618 |
return rpc.JobQueueRunner(self.context, address_list) |
|
1619 |
|
|
1614 | 1620 |
@locking.ssynchronized(_LOCK) |
1615 | 1621 |
@_RequireOpenQueue |
1616 | 1622 |
def AddNode(self, node): |
... | ... | |
1624 | 1630 |
assert node_name != self._my_hostname |
1625 | 1631 |
|
1626 | 1632 |
# Clean queue directory on added node |
1627 |
result = rpc.RpcRunner.call_jobqueue_purge(node_name)
|
|
1633 |
result = self._GetRpc(None).call_jobqueue_purge(node_name)
|
|
1628 | 1634 |
msg = result.fail_msg |
1629 | 1635 |
if msg: |
1630 | 1636 |
logging.warning("Cannot cleanup queue directory on node %s: %s", |
... | ... | |
1642 | 1648 |
# Upload current serial file |
1643 | 1649 |
files.append(constants.JOB_QUEUE_SERIAL_FILE) |
1644 | 1650 |
|
1651 |
# Static address list |
|
1652 |
addrs = [node.primary_ip] |
|
1653 |
|
|
1645 | 1654 |
for file_name in files: |
1646 | 1655 |
# Read file content |
1647 | 1656 |
content = utils.ReadFile(file_name) |
1648 | 1657 |
|
1649 |
result = rpc.RpcRunner.call_jobqueue_update([node_name], |
|
1650 |
[node.primary_ip], |
|
1651 |
file_name, content) |
|
1658 |
result = self._GetRpc(addrs).call_jobqueue_update([node_name], file_name, |
|
1659 |
content) |
|
1652 | 1660 |
msg = result[node_name].fail_msg |
1653 | 1661 |
if msg: |
1654 | 1662 |
logging.error("Failed to upload file %s to node %s: %s", |
... | ... | |
1732 | 1740 |
|
1733 | 1741 |
if replicate: |
1734 | 1742 |
names, addrs = self._GetNodeIp() |
1735 |
result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
|
|
1743 |
result = self._GetRpc(addrs).call_jobqueue_update(names, file_name, data)
|
|
1736 | 1744 |
self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name) |
1737 | 1745 |
|
1738 | 1746 |
def _RenameFilesUnlocked(self, rename): |
... | ... | |
1751 | 1759 |
|
1752 | 1760 |
# ... and on all nodes |
1753 | 1761 |
names, addrs = self._GetNodeIp() |
1754 |
result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
|
|
1762 |
result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
|
|
1755 | 1763 |
self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename) |
1756 | 1764 |
|
1757 | 1765 |
@staticmethod |
b/lib/rpc.py | ||
---|---|---|
781 | 781 |
return self.call_test_delay(node_list, duration, |
782 | 782 |
read_timeout=int(duration + 5)) |
783 | 783 |
|
784 |
@classmethod |
|
785 |
@_RpcTimeout(_TMO_URGENT) |
|
786 |
def call_jobqueue_update(cls, node_list, address_list, file_name, content): |
|
787 |
"""Update job queue. |
|
788 |
|
|
789 |
This is a multi-node call. |
|
790 |
|
|
791 |
""" |
|
792 |
return cls._StaticMultiNodeCall(node_list, "jobqueue_update", |
|
793 |
[file_name, _Compress(content)], |
|
794 |
address_list=address_list) |
|
795 |
|
|
796 |
@classmethod |
|
797 |
@_RpcTimeout(_TMO_NORMAL) |
|
798 |
def call_jobqueue_purge(cls, node): |
|
799 |
"""Purge job queue. |
|
800 |
|
|
801 |
This is a single-node call. |
|
802 |
|
|
803 |
""" |
|
804 |
return cls._StaticSingleNodeCall(node, "jobqueue_purge", []) |
|
805 |
|
|
806 |
@classmethod |
|
807 |
@_RpcTimeout(_TMO_URGENT) |
|
808 |
def call_jobqueue_rename(cls, node_list, address_list, rename): |
|
809 |
"""Rename a job queue file. |
|
810 |
|
|
811 |
This is a multi-node call. |
|
812 |
|
|
813 |
""" |
|
814 |
return cls._StaticMultiNodeCall(node_list, "jobqueue_rename", rename, |
|
815 |
address_list=address_list) |
|
816 |
|
|
817 | 784 |
@_RpcTimeout(_TMO_NORMAL) |
818 | 785 |
def call_hypervisor_validate_params(self, node_list, hvname, hvparams): |
819 | 786 |
"""Validate the hypervisor params. |
... | ... | |
832 | 799 |
hv_full = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams) |
833 | 800 |
return self._MultiNodeCall(node_list, "hypervisor_validate_params", |
834 | 801 |
[hvname, hv_full]) |
802 |
|
|
803 |
|
|
804 |
class JobQueueRunner(_generated_rpc.RpcClientJobQueue): |
|
805 |
"""RPC wrappers for job queue. |
|
806 |
|
|
807 |
""" |
|
808 |
_Compress = staticmethod(_Compress) |
|
809 |
|
|
810 |
def __init__(self, context, address_list): |
|
811 |
"""Initializes this class. |
|
812 |
|
|
813 |
""" |
|
814 |
_generated_rpc.RpcClientJobQueue.__init__(self) |
|
815 |
|
|
816 |
if address_list is None: |
|
817 |
resolver = _SsconfResolver |
|
818 |
else: |
|
819 |
# Caller provided an address list |
|
820 |
resolver = _StaticResolver(address_list) |
|
821 |
|
|
822 |
self._proc = _RpcProcessor(resolver, |
|
823 |
netutils.GetDaemonPort(constants.NODED), |
|
824 |
lock_monitor_cb=context.glm.AddToLockMonitor) |
|
825 |
|
|
826 |
def _Call(self, node_list, procedure, timeout, args): |
|
827 |
"""Entry point for automatically generated RPC wrappers. |
|
828 |
|
|
829 |
""" |
|
830 |
body = serializer.DumpJson(args, indent=False) |
|
831 |
|
|
832 |
return self._proc(node_list, procedure, body, read_timeout=timeout) |
b/lib/server/noded.py | ||
---|---|---|
914 | 914 |
|
915 | 915 |
""" |
916 | 916 |
# TODO: What if a file fails to rename? |
917 |
return [backend.JobQueueRename(old, new) for old, new in params] |
|
917 |
return [backend.JobQueueRename(old, new) for old, new in params[0]]
|
|
918 | 918 |
|
919 | 919 |
# hypervisor --------------- |
920 | 920 |
|
Also available in: Unified diff