Revision 009e73d0 lib/jqueue.py

b/lib/jqueue.py
796 796
    """
797 797
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
798 798

  
799
  def _NewSerialUnlocked(self):
799
  def _NewSerialsUnlocked(self, count):
800 800
    """Generates a new job identifier.
801 801

  
802 802
    Job identifiers are unique during the lifetime of a cluster.
803 803

  
804
    @type count: integer
805
    @param count: how many serials to return
804 806
    @rtype: str
805 807
    @return: a string representing the job identifier.
806 808

  
807 809
    """
810
    assert count > 0
808 811
    # New number
809
    serial = self._last_serial + 1
812
    serial = self._last_serial + count
810 813

  
811 814
    # Write to file
812 815
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
813 816
                                        "%s\n" % serial)
814 817

  
818
    result = [self._FormatJobID(v)
819
              for v in range(self._last_serial, serial + 1)]
815 820
    # Keep it only if we were able to write the file
816 821
    self._last_serial = serial
817 822

  
818
    return self._FormatJobID(serial)
823
    return result
819 824

  
820 825
  @staticmethod
821 826
  def _GetJobPath(job_id):
......
981 986
    return True
982 987

  
983 988
  @_RequireOpenQueue
984
  def _SubmitJobUnlocked(self, ops):
989
  def _SubmitJobUnlocked(self, job_id, ops):
985 990
    """Create and store a new job.
986 991

  
987 992
    This enters the job into our job queue and also puts it on the new
988 993
    queue, in order for it to be picked up by the queue processors.
989 994

  
995
    @type job_id: job ID
996
    @param jod_id: the job ID for the new job
990 997
    @type ops: list
991 998
    @param ops: The list of OpCodes that will become the new job.
992 999
    @rtype: job ID
......
1008 1015
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1009 1016
      raise errors.JobQueueFull()
1010 1017

  
1011
    # Get job identifier
1012
    job_id = self._NewSerialUnlocked()
1013 1018
    job = _QueuedJob(self, job_id, ops)
1014 1019

  
1015 1020
    # Write to disk
......
1031 1036
    @see: L{_SubmitJobUnlocked}
1032 1037

  
1033 1038
    """
1034
    return self._SubmitJobUnlocked(ops)
1039
    job_id = self._NewSerialsUnlocked(1)[0]
1040
    return self._SubmitJobUnlocked(job_id, ops)
1035 1041

  
1036 1042
  @utils.LockedMethod
1037 1043
  @_RequireOpenQueue
......
1042 1048

  
1043 1049
    """
1044 1050
    results = []
1045
    for ops in jobs:
1051
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1052
    for job_id, ops in zip(all_job_ids, jobs):
1046 1053
      try:
1047
        data = self._SubmitJobUnlocked(ops)
1054
        data = self._SubmitJobUnlocked(job_id, ops)
1048 1055
        status = True
1049 1056
      except errors.GenericError, err:
1050 1057
        data = str(err)

Also available in: Unified diff