Revision 704b51ff

b/lib/jqueue.py
48 48
from ganeti import serializer
49 49
from ganeti import workerpool
50 50
from ganeti import locking
51
from ganeti import luxi
51 52
from ganeti import opcodes
52 53
from ganeti import opcodes_base
53 54
from ganeti import errors
......
1937 1938
    result = self._GetRpc(addrs).call_jobqueue_rename(names, rename)
1938 1939
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
1939 1940

  
1940
  def _NewSerialsUnlocked(self, count):
1941
    """Generates a new job identifier.
1942

  
1943
    Job identifiers are unique during the lifetime of a cluster.
1944

  
1945
    @type count: integer
1946
    @param count: how many serials to return
1947
    @rtype: list of int
1948
    @return: a list of job identifiers.
1949

  
1950
    """
1951
    assert ht.TNonNegativeInt(count)
1952

  
1953
    # New number
1954
    serial = self._last_serial + count
1955

  
1956
    # Write to file
1957
    self._UpdateJobQueueFile(pathutils.JOB_QUEUE_SERIAL_FILE,
1958
                             "%s\n" % serial, True)
1959

  
1960
    result = [jstore.FormatJobID(v)
1961
              for v in range(self._last_serial + 1, serial + 1)]
1962

  
1963
    # Keep it only if we were able to write the file
1964
    self._last_serial = serial
1965

  
1966
    assert len(result) == count
1967

  
1968
    return result
1969

  
1970 1941
  @staticmethod
1971 1942
  def _GetJobPath(job_id):
1972 1943
    """Returns the job file for a given job id.
......
2174 2145

  
2175 2146
    return True
2176 2147

  
2177
  @_RequireOpenQueue
2178
  def _SubmitJobUnlocked(self, job_id, ops):
2179
    """Create and store a new job.
2180

  
2181
    This enters the job into our job queue and also puts it on the new
2182
    queue, in order for it to be picked up by the queue processors.
2183

  
2184
    @type job_id: job ID
2185
    @param job_id: the job ID for the new job
2186
    @type ops: list
2187
    @param ops: The list of OpCodes that will become the new job.
2188
    @rtype: L{_QueuedJob}
2189
    @return: the job object to be queued
2190
    @raise errors.JobQueueFull: if the job queue has too many jobs in it
2191
    @raise errors.GenericError: If an opcode is not valid
2192

  
2193
    """
2194
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
2195
      raise errors.JobQueueFull()
2196

  
2197
    job = _QueuedJob(self, job_id, ops, True)
2198

  
2199
    for idx, op in enumerate(job.ops):
2200
      # Check priority
2201
      if op.priority not in constants.OP_PRIO_SUBMIT_VALID:
2202
        allowed = utils.CommaJoin(constants.OP_PRIO_SUBMIT_VALID)
2203
        raise errors.GenericError("Opcode %s has invalid priority %s, allowed"
2204
                                  " are %s" % (idx, op.priority, allowed))
2205

  
2206
      # Check job dependencies
2207
      dependencies = getattr(op.input, opcodes_base.DEPEND_ATTR, None)
2208
      if not opcodes_base.TNoRelativeJobDependencies(dependencies):
2209
        raise errors.GenericError("Opcode %s has invalid dependencies, must"
2210
                                  " match %s: %s" %
2211
                                  (idx, opcodes_base.TNoRelativeJobDependencies,
2212
                                   dependencies))
2213

  
2214
    # Write to disk
2215
    self.UpdateJobUnlocked(job)
2216

  
2217
    self._queue_size += 1
2218

  
2219
    logging.debug("Adding new job %s to the cache", job_id)
2220
    self._memcache[job_id] = job
2221

  
2222
    return job
2223

  
2224
  @locking.ssynchronized(_LOCK)
2225
  @_RequireOpenQueue
2226
  @_RequireNonDrainedQueue
2227
  def SubmitJob(self, ops):
2148
  @classmethod
2149
  def SubmitJob(cls, ops):
2228 2150
    """Create and store a new job.
2229 2151

  
2230
    @see: L{_SubmitJobUnlocked}
2231

  
2232 2152
    """
2233
    (job_id, ) = self._NewSerialsUnlocked(1)
2234
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2235
    return job_id
2153
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitJob(ops)
2236 2154

  
2237
  @locking.ssynchronized(_LOCK)
2238
  @_RequireOpenQueue
2239
  def SubmitJobToDrainedQueue(self, ops):
2155
  @classmethod
2156
  def SubmitJobToDrainedQueue(cls, ops):
2240 2157
    """Forcefully create and store a new job.
2241 2158

  
2242 2159
    Do so, even if the job queue is drained.
2243
    @see: L{_SubmitJobUnlocked}
2244 2160

  
2245 2161
    """
2246
    (job_id, ) = self._NewSerialsUnlocked(1)
2247
    self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)])
2248
    return job_id
2162
    return luxi.Client(address=pathutils.QUERY_SOCKET)\
2163
        .SubmitJobToDrainedQueue(ops)
2249 2164

  
2250
  @locking.ssynchronized(_LOCK)
2251
  @_RequireOpenQueue
2252
  @_RequireNonDrainedQueue
2253
  def SubmitManyJobs(self, jobs):
2165
  @classmethod
2166
  def SubmitManyJobs(cls, jobs):
2254 2167
    """Create and store multiple jobs.
2255 2168

  
2256
    @see: L{_SubmitJobUnlocked}
2257

  
2258 2169
    """
2259
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
2260

  
2261
    (results, added_jobs) = \
2262
      self._SubmitManyJobsUnlocked(jobs, all_job_ids, [])
2263

  
2264
    self._EnqueueJobsUnlocked(added_jobs)
2265

  
2266
    return results
2170
    return luxi.Client(address=pathutils.QUERY_SOCKET).SubmitManyJobs(jobs)
2267 2171

  
2268 2172
  @staticmethod
2269 2173
  def _FormatSubmitError(msg, ops):
......
2304 2208

  
2305 2209
    return (True, result)
2306 2210

  
2307
  def _SubmitManyJobsUnlocked(self, jobs, job_ids, previous_job_ids):
2308
    """Create and store multiple jobs.
2309

  
2310
    @see: L{_SubmitJobUnlocked}
2311

  
2312
    """
2313
    results = []
2314
    added_jobs = []
2315

  
2316
    def resolve_fn(job_idx, reljobid):
2317
      assert reljobid < 0
2318
      return (previous_job_ids + job_ids[:job_idx])[reljobid]
2319

  
2320
    for (idx, (job_id, ops)) in enumerate(zip(job_ids, jobs)):
2321
      for op in ops:
2322
        if getattr(op, opcodes_base.DEPEND_ATTR, None):
2323
          (status, data) = \
2324
            self._ResolveJobDependencies(compat.partial(resolve_fn, idx),
2325
                                         op.depends)
2326
          if not status:
2327
            # Abort resolving dependencies
2328
            assert ht.TNonEmptyString(data), "No error message"
2329
            break
2330
          # Use resolved dependencies
2331
          op.depends = data
2332
      else:
2333
        try:
2334
          job = self._SubmitJobUnlocked(job_id, ops)
2335
        except errors.GenericError, err:
2336
          status = False
2337
          data = self._FormatSubmitError(str(err), ops)
2338
        else:
2339
          status = True
2340
          data = job_id
2341
          added_jobs.append(job)
2342

  
2343
      results.append((status, data))
2344

  
2345
    return (results, added_jobs)
2346

  
2347 2211
  @locking.ssynchronized(_LOCK)
2348 2212
  def _EnqueueJobs(self, jobs):
2349 2213
    """Helper function to add jobs to worker pool's queue.

Also available in: Unified diff