Revision 56d8ff91

b/daemons/ganeti-masterd
214 214
      ops = [opcodes.OpCode.LoadOpCode(state) for state in args]
215 215
      return queue.SubmitJob(ops)
216 216

  
217
    if method == luxi.REQ_SUBMIT_MANY_JOBS:
218
      logging.info("Received multiple jobs")
219
      jobs = []
220
      for ops in args:
221
        jobs.append([opcodes.OpCode.LoadOpCode(state) for state in ops])
222
      return queue.SubmitManyJobs(jobs)
223

  
217 224
    elif method == luxi.REQ_CANCEL_JOB:
218 225
      job_id = args
219 226
      logging.info("Received job cancel request for %s", job_id)
b/lib/jqueue.py
961 961
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
962 962
    return True
963 963

  
964
  @utils.LockedMethod
965 964
  @_RequireOpenQueue
966
  def SubmitJob(self, ops):
965
  def _SubmitJobUnlocked(self, ops):
967 966
    """Create and store a new job.
968 967

  
969 968
    This enters the job into our job queue and also puts it on the new
......
977 976

  
978 977
    """
979 978
    if self._IsQueueMarkedDrain():
980
      raise errors.JobQueueDrainError()
979
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
981 980

  
982 981
    # Check job queue size
983 982
    size = len(self._ListJobFiles())
......
1005 1004

  
1006 1005
    return job.id
1007 1006

  
1007
  @utils.LockedMethod
1008
  @_RequireOpenQueue
1009
  def SubmitJob(self, ops):
1010
    """Create and store a new job.
1011

  
1012
    @see: L{_SubmitJobUnlocked}
1013

  
1014
    """
1015
    return self._SubmitJobUnlocked(ops)
1016

  
1017
  @utils.LockedMethod
1018
  @_RequireOpenQueue
1019
  def SubmitManyJobs(self, jobs):
1020
    """Create and store multiple jobs.
1021

  
1022
    @see: L{_SubmitJobUnlocked}
1023

  
1024
    """
1025
    results = []
1026
    for ops in jobs:
1027
      try:
1028
        data = self._SubmitJobUnlocked(ops)
1029
        status = True
1030
      except errors.GenericError, err:
1031
        data = str(err)
1032
        status = False
1033
      results.append((status, data))
1034

  
1035
    return results
1036

  
1037

  
1008 1038
  @_RequireOpenQueue
1009 1039
  def UpdateJobUnlocked(self, job):
1010 1040
    """Update a job's on disk storage.
b/lib/luxi.py
45 45
KEY_RESULT = "result"
46 46

  
47 47
REQ_SUBMIT_JOB = "SubmitJob"
48
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
48 49
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
49 50
REQ_CANCEL_JOB = "CancelJob"
50 51
REQ_ARCHIVE_JOB = "ArchiveJob"
......
342 343
    ops_state = map(lambda op: op.__getstate__(), ops)
343 344
    return self.CallMethod(REQ_SUBMIT_JOB, ops_state)
344 345

  
346
  def SubmitManyJobs(self, jobs):
347
    jobs_state = []
348
    for ops in jobs:
349
      jobs_state.append([op.__getstate__() for op in ops])
350
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, jobs_state)
351

  
345 352
  def CancelJob(self, job_id):
346 353
    return self.CallMethod(REQ_CANCEL_JOB, job_id)
347 354

  

Also available in: Unified diff