Revision d7fd1f28

b/lib/jqueue.py
703 703
    self._CheckRpcResult(result, self._nodes,
704 704
                         "Updating %s" % file_name)
705 705

  
706
  def _RenameFileUnlocked(self, old, new):
706
  def _RenameFilesUnlocked(self, rename):
707 707
    """Renames a file locally and then replicate the change.
708 708

  
709 709
    This function will rename a file in the local queue directory
710 710
    and then replicate this rename to all the other nodes we have.
711 711

  
712
    @type old: str
713
    @param old: the current name of the file
714
    @type new: str
715
    @param new: the new name of the file
712
    @type rename: list of (old, new)
713
    @param rename: List containing tuples mapping old to new names
716 714

  
717 715
    """
718
    utils.RenameFile(old, new, mkdir=True)
716
    for old, new in rename:
717
      utils.RenameFile(old, new, mkdir=True)
719 718

  
720
    names, addrs = self._GetNodeIp()
721
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
722
    self._CheckRpcResult(result, self._nodes,
723
                         "Moving %s to %s" % (old, new))
719
      names, addrs = self._GetNodeIp()
720
      result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
721
      self._CheckRpcResult(result, self._nodes,
722
                           "Moving %s to %s" % (old, new))
724 723

  
725 724
  def _FormatJobID(self, job_id):
726 725
    """Convert a job ID to string format.
......
886 885
      else:
887 886
        # non-archived case
888 887
        logging.exception("Can't parse job %s, will archive.", job_id)
889
        self._RenameFileUnlocked(filepath, new_path)
888
        self._RenameFilesUnlocked([(filepath, new_path)])
890 889
      return None
891 890

  
892 891
    self._memcache[job_id] = job
......
1123 1122
      self.UpdateJobUnlocked(job)
1124 1123

  
1125 1124
  @_RequireOpenQueue
1126
  def _ArchiveJobUnlocked(self, job):
1127
    """Archives a job.
1125
  def _ArchiveJobsUnlocked(self, jobs):
1126
    """Archives jobs.
1128 1127

  
1129
    @type job: L{_QueuedJob}
1130
    @param job: Job object
1131
    @rtype bool
1132
    @return Whether job was archived
1128
    @type jobs: list of L{_QueuedJob}
1129
    @param job: Job objects
1130
    @rtype: int
1131
    @return: Number of archived jobs
1133 1132

  
1134 1133
    """
1135
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1136
                                constants.JOB_STATUS_SUCCESS,
1137
                                constants.JOB_STATUS_ERROR):
1138
      logging.debug("Job %s is not yet done", job.id)
1139
      return False
1134
    archive_jobs = []
1135
    rename_files = []
1136
    for job in jobs:
1137
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1138
                                  constants.JOB_STATUS_SUCCESS,
1139
                                  constants.JOB_STATUS_ERROR):
1140
        logging.debug("Job %s is not yet done", job.id)
1141
        continue
1140 1142

  
1141
    old = self._GetJobPath(job.id)
1142
    new = self._GetArchivedJobPath(job.id)
1143
      archive_jobs.append(job)
1143 1144

  
1144
    self._RenameFileUnlocked(old, new)
1145
      old = self._GetJobPath(job.id)
1146
      new = self._GetArchivedJobPath(job.id)
1147
      rename_files.append((old, new))
1145 1148

  
1146
    logging.debug("Successfully archived job %s", job.id)
1149
    # TODO: What if 1..n files fail to rename?
1150
    self._RenameFilesUnlocked(rename_files)
1147 1151

  
1148
    return True
1152
    logging.debug("Successfully archived job(s) %s",
1153
                  ", ".join(job.id for job in archive_jobs))
1154

  
1155
    return len(archive_jobs)
1149 1156

  
1150 1157
  @utils.LockedMethod
1151 1158
  @_RequireOpenQueue
......
1167 1174
      logging.debug("Job %s not found", job_id)
1168 1175
      return False
1169 1176

  
1170
    return self._ArchiveJobUnlocked(job)
1177
    return self._ArchiveJobUnlocked([job]) == 1
1171 1178

  
1172 1179
  @utils.LockedMethod
1173 1180
  @_RequireOpenQueue
......
1191 1198
    last_touched = 0
1192 1199

  
1193 1200
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1201
    pending = []
1194 1202
    for idx, job_id in enumerate(all_job_ids):
1195 1203
      last_touched = idx
1196 1204

  
1205
      # Not optimal because jobs could be pending
1206
      # TODO: Measure average duration for job archival and take number of
1207
      # pending jobs into account.
1197 1208
      if time.time() > end_time:
1198 1209
        break
1199 1210

  
......
1209 1220
          job_age = job.end_timestamp
1210 1221

  
1211 1222
        if age == -1 or now - job_age[0] > age:
1212
          archived = self._ArchiveJobUnlocked(job)
1213
          if archived:
1214
            archived_count += 1
1215
            continue
1223
          pending.append(job)
1224

  
1225
          # Archive 10 jobs at a time
1226
          if len(pending) >= 10:
1227
            archived_count += self._ArchiveJobsUnlocked(pending)
1228
            pending = []
1216 1229

  
1230
    if pending:
1231
      archived_count += self._ArchiveJobsUnlocked(pending)
1217 1232

  
1218 1233
    return (archived_count, len(all_job_ids) - last_touched - 1)
1219 1234

  

Also available in: Unified diff