Revision 2971c913

b/daemons/ganeti-masterd
213 213
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
214 214
      return queue.SubmitJob(ops)
215 215

  
216
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
217
      logging.info("Received multiple jobs")
218
      jobs = []
219
      for ops in args:
220
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
221
      return queue.SubmitManyJobs(jobs)
222

  
216 223
    elif method == luxi.REQ_CANCEL_JOB:
217 224
      job_id = args
218 225
      logging.info("Received job cancel request for %s", job_id)
b/lib/jqueue.py
943 943
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
944 944
    return True
945 945

  
946
  @utils.LockedMethod
947 946
  @_RequireOpenQueue
948
  def SubmitJob(self, ops):
947
  def _SubmitJobUnlocked(self, ops):
949 948
    """Create and store a new job.
950 949

  
951 950
    This enters the job into our job queue and also puts it on the new
......
959 958

  
960 959
    """
961 960
    if self._IsQueueMarkedDrain():
962
      raise errors.JobQueueDrainError()
961
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
963 962

  
964 963
    # Check job queue size
965 964
    size = len(self._ListJobFiles())
......
987 986

  
988 987
    return job.id
989 988

  
989
  @utils.LockedMethod
990
  @_RequireOpenQueue
991
  def SubmitJob(self, ops):
992
    """Create and store a new job.
993

  
994
    @see: L{_SubmitJobUnlocked}
995

  
996
    """
997
    return self._SubmitJobUnlocked(ops)
998

  
999
  @utils.LockedMethod
1000
  @_RequireOpenQueue
1001
  def SubmitManyJobs(self, jobs):
1002
    """Create and store multiple jobs.
1003

  
1004
    @see: L{_SubmitJobUnlocked}
1005

  
1006
    """
1007
    results = []
1008
    for ops in jobs:
1009
      try:
1010
        data = self._SubmitJobUnlocked(ops)
1011
        status = True
1012
      except errors.GenericError, err:
1013
        data = str(err)
1014
        status = False
1015
      results.append((status, data))
1016

  
1017
    return results
1018

  
1019

  
990 1020
  @_RequireOpenQueue
991 1021
  def UpdateJobUnlocked(self, job):
992 1022
    """Update a job's on disk storage.
b/lib/luxi.py
45 45
KEY_RESULT = "result"
46 46

  
47 47
REQ_SUBMIT_JOB = "SubmitJob"
48
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
48 49
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49 50
REQ_CANCEL_JOB = "CancelJob"
50 51
REQ_ARCHIVE_JOB = "ArchiveJob"
......
335 336
    ops_state = map(lambda op: op.__getstate__(), ops)
336 337
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
337 338

  
339
  def SubmitManyJobs(self, jobs):
340
    jobs_state = []
341
    for ops in jobs:
342
      jobs_state.append([op.__getstate__() for op in ops])
343
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
344

  
338 345
  def CancelJob(self, job_id):
339 346
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
340 347

  

Also available in: Unified diff