Revision 4c36bdf5 lib/jqueue.py
b/lib/jqueue.py | ||
---|---|---|
822 | 822 |
addr_list = [self._nodes[name] for name in name_list] |
823 | 823 |
return name_list, addr_list |
824 | 824 |
|
825 |
def _WriteAndReplicateFileUnlocked(self, file_name, data):
|
|
825 |
def _UpdateJobQueueFile(self, file_name, data, replicate):
|
|
826 | 826 |
"""Writes a file locally and then replicates it to all nodes. |
827 | 827 |
|
828 | 828 |
This function will replace the contents of a file on the local |
... | ... | |
832 | 832 |
@param file_name: the path of the file to be replicated |
833 | 833 |
@type data: str |
834 | 834 |
@param data: the new contents of the file |
835 |
@type replicate: boolean |
|
836 |
@param replicate: whether to spread the changes to the remote nodes |
|
835 | 837 |
|
836 | 838 |
""" |
837 | 839 |
utils.WriteFile(file_name, data=data) |
838 | 840 |
|
839 |
names, addrs = self._GetNodeIp()
|
|
840 |
result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
|
|
841 |
self._CheckRpcResult(result, self._nodes,
|
|
842 |
"Updating %s" % file_name)
|
|
841 |
if replicate:
|
|
842 |
names, addrs = self._GetNodeIp()
|
|
843 |
result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
|
|
844 |
self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
|
|
843 | 845 |
|
844 | 846 |
def _RenameFilesUnlocked(self, rename): |
845 | 847 |
"""Renames a file locally and then replicate the change. |
... | ... | |
909 | 911 |
serial = self._last_serial + count |
910 | 912 |
|
911 | 913 |
# Write to file |
912 |
self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
|
|
913 |
"%s\n" % serial)
|
|
914 |
self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
|
|
915 |
"%s\n" % serial, True)
|
|
914 | 916 |
|
915 | 917 |
result = [self._FormatJobID(v) |
916 | 918 |
for v in range(self._last_serial, serial + 1)] |
... | ... | |
1149 | 1151 |
return results |
1150 | 1152 |
|
1151 | 1153 |
@_RequireOpenQueue |
1152 |
def UpdateJobUnlocked(self, job): |
|
1154 |
def UpdateJobUnlocked(self, job, replicate=True):
|
|
1153 | 1155 |
"""Update a job's on disk storage. |
1154 | 1156 |
|
1155 | 1157 |
After a job has been modified, this function needs to be called in |
... | ... | |
1158 | 1160 |
|
1159 | 1161 |
@type job: L{_QueuedJob} |
1160 | 1162 |
@param job: the changed job |
1163 |
@type replicate: boolean |
|
1164 |
@param replicate: whether to replicate the change to remote nodes |
|
1161 | 1165 |
|
1162 | 1166 |
""" |
1163 | 1167 |
filename = self._GetJobPath(job.id) |
1164 | 1168 |
data = serializer.DumpJson(job.Serialize(), indent=False) |
1165 | 1169 |
logging.debug("Writing job %s to %s", job.id, filename) |
1166 |
self._WriteAndReplicateFileUnlocked(filename, data)
|
|
1170 |
self._UpdateJobQueueFile(filename, data, replicate)
|
|
1167 | 1171 |
|
1168 | 1172 |
# Notify waiters about potential changes |
1169 | 1173 |
job.change.notifyAll() |
Also available in: Unified diff