Revision 4c36bdf5 lib/jqueue.py

b/lib/jqueue.py
822 822
    addr_list = [self._nodes[name] for name in name_list]
823 823
    return name_list, addr_list
824 824

  
825
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
825
  def _UpdateJobQueueFile(self, file_name, data, replicate):
826 826
    """Writes a file locally and then replicates it to all nodes.
827 827

  
828 828
    This function will replace the contents of a file on the local
......
832 832
    @param file_name: the path of the file to be replicated
833 833
    @type data: str
834 834
    @param data: the new contents of the file
835
    @type replicate: boolean
836
    @param replicate: whether to spread the changes to the remote nodes
835 837

  
836 838
    """
837 839
    utils.WriteFile(file_name, data=data)
838 840

  
839
    names, addrs = self._GetNodeIp()
840
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
841
    self._CheckRpcResult(result, self._nodes,
842
                         "Updating %s" % file_name)
841
    if replicate:
842
      names, addrs = self._GetNodeIp()
843
      result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
844
      self._CheckRpcResult(result, self._nodes, "Updating %s" % file_name)
843 845

  
844 846
  def _RenameFilesUnlocked(self, rename):
845 847
    """Renames a file locally and then replicate the change.
......
909 911
    serial = self._last_serial + count
910 912

  
911 913
    # Write to file
912
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
913
                                        "%s\n" % serial)
914
    self._UpdateJobQueueFile(constants.JOB_QUEUE_SERIAL_FILE,
915
                             "%s\n" % serial, True)
914 916

  
915 917
    result = [self._FormatJobID(v)
916 918
              for v in range(self._last_serial, serial + 1)]
......
1149 1151
    return results
1150 1152

  
1151 1153
  @_RequireOpenQueue
1152
  def UpdateJobUnlocked(self, job):
1154
  def UpdateJobUnlocked(self, job, replicate=True):
1153 1155
    """Update a job's on disk storage.
1154 1156

  
1155 1157
    After a job has been modified, this function needs to be called in
......
1158 1160

  
1159 1161
    @type job: L{_QueuedJob}
1160 1162
    @param job: the changed job
1163
    @type replicate: boolean
1164
    @param replicate: whether to replicate the change to remote nodes
1161 1165

  
1162 1166
    """
1163 1167
    filename = self._GetJobPath(job.id)
1164 1168
    data = serializer.DumpJson(job.Serialize(), indent=False)
1165 1169
    logging.debug("Writing job %s to %s", job.id, filename)
1166
    self._WriteAndReplicateFileUnlocked(filename, data)
1170
    self._UpdateJobQueueFile(filename, data, replicate)
1167 1171

  
1168 1172
    # Notify waiters about potential changes
1169 1173
    job.change.notifyAll()

Also available in: Unified diff