Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ bac5ffc3

History | View | Annotate | Download (14.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29
import time
30

    
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import workerpool
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import mcpu
37
from ganeti import utils
38

    
39

    
40
JOBQUEUE_THREADS = 5
41

    
42

    
43
class _QueuedOpCode(object):
44
  """Encasulates an opcode object.
45

46
  Access is synchronized by the '_lock' attribute.
47

48
  The 'log' attribute holds the execution log and consists of tuples
49
  of the form (timestamp, level, message).
50

51
  """
52
  def __init__(self, op):
53
    self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
54

    
55
  def __Setup(self, input_, status, result, log):
56
    self._lock = threading.Lock()
57
    self.input = input_
58
    self.status = status
59
    self.result = result
60
    self.log = log
61

    
62
  @classmethod
63
  def Restore(cls, state):
64
    obj = object.__new__(cls)
65
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
66
                state["status"], state["result"], state["log"])
67
    return obj
68

    
69
  @utils.LockedMethod
70
  def Serialize(self):
71
    return {
72
      "input": self.input.__getstate__(),
73
      "status": self.status,
74
      "result": self.result,
75
      "log": self.log,
76
      }
77

    
78
  @utils.LockedMethod
79
  def GetInput(self):
80
    """Returns the original opcode.
81

82
    """
83
    return self.input
84

    
85
  @utils.LockedMethod
86
  def SetStatus(self, status, result):
87
    """Update the opcode status and result.
88

89
    """
90
    self.status = status
91
    self.result = result
92

    
93
  @utils.LockedMethod
94
  def GetStatus(self):
95
    """Get the opcode status.
96

97
    """
98
    return self.status
99

    
100
  @utils.LockedMethod
101
  def GetResult(self):
102
    """Get the opcode result.
103

104
    """
105
    return self.result
106

    
107
  @utils.LockedMethod
108
  def Log(self, *args):
109
    """Append a log entry.
110

111
    """
112
    assert len(args) < 2
113

    
114
    if len(args) == 1:
115
      log_type = constants.ELOG_MESSAGE
116
      log_msg = args[0]
117
    else:
118
      log_type, log_msg = args
119
    self.log.append((time.time(), log_type, log_msg))
120

    
121
  @utils.LockedMethod
122
  def RetrieveLog(self, start_at=0):
123
    """Retrieve (a part of) the execution log.
124

125
    """
126
    return self.log[start_at:]
127

    
128

    
129
class _QueuedJob(object):
130
  """In-memory job representation.
131

132
  This is what we use to track the user-submitted jobs.
133

134
  """
135
  def __init__(self, storage, job_id, ops):
136
    if not ops:
137
      # TODO
138
      raise Exception("No opcodes")
139

    
140
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
141

    
142
  def __Setup(self, storage, job_id, ops, run_op_index):
143
    self._lock = threading.Lock()
144
    self.storage = storage
145
    self.id = job_id
146
    self._ops = ops
147
    self.run_op_index = run_op_index
148

    
149
  @classmethod
150
  def Restore(cls, storage, state):
151
    obj = object.__new__(cls)
152
    op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
153
    obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
154
    return obj
155

    
156
  def Serialize(self):
157
    return {
158
      "id": self.id,
159
      "ops": [op.Serialize() for op in self._ops],
160
      "run_op_index": self.run_op_index,
161
      }
162

    
163
  def SetUnclean(self, msg):
164
    try:
165
      for op in self._ops:
166
        op.SetStatus(constants.OP_STATUS_ERROR, msg)
167
    finally:
168
      self.storage.UpdateJob(self)
169

    
170
  def GetStatus(self):
171
    status = constants.JOB_STATUS_QUEUED
172

    
173
    all_success = True
174
    for op in self._ops:
175
      op_status = op.GetStatus()
176
      if op_status == constants.OP_STATUS_SUCCESS:
177
        continue
178

    
179
      all_success = False
180

    
181
      if op_status == constants.OP_STATUS_QUEUED:
182
        pass
183
      elif op_status == constants.OP_STATUS_RUNNING:
184
        status = constants.JOB_STATUS_RUNNING
185
      elif op_status == constants.OP_STATUS_ERROR:
186
        status = constants.JOB_STATUS_ERROR
187
        # The whole job fails if one opcode failed
188
        break
189

    
190
    if all_success:
191
      status = constants.JOB_STATUS_SUCCESS
192

    
193
    return status
194

    
195
  @utils.LockedMethod
196
  def GetRunOpIndex(self):
197
    return self.run_op_index
198

    
199
  def Run(self, proc):
200
    """Job executor.
201

202
    This functions processes a this job in the context of given processor
203
    instance.
204

205
    Args:
206
    - proc: Ganeti Processor to run the job with
207

208
    """
209
    try:
210
      count = len(self._ops)
211
      for idx, op in enumerate(self._ops):
212
        try:
213
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
214

    
215
          self._lock.acquire()
216
          try:
217
            self.run_op_index = idx
218
          finally:
219
            self._lock.release()
220

    
221
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
222
          self.storage.UpdateJob(self)
223

    
224
          result = proc.ExecOpCode(op.input, op.Log)
225

    
226
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
227
          self.storage.UpdateJob(self)
228
          logging.debug("Op %s/%s: Successfully finished %s",
229
                        idx + 1, count, op)
230
        except Exception, err:
231
          try:
232
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
233
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
234
          finally:
235
            self.storage.UpdateJob(self)
236
          raise
237

    
238
    except errors.GenericError, err:
239
      logging.error("ganeti exception %s", exc_info=err)
240
    except Exception, err:
241
      logging.error("unhandled exception %s", exc_info=err)
242
    except:
243
      logging.error("unhandled unknown exception %s", exc_info=err)
244

    
245

    
246
class _JobQueueWorker(workerpool.BaseWorker):
247
  def RunTask(self, job):
248
    logging.debug("Worker %s processing job %s",
249
                  self.worker_id, job.id)
250
    # TODO: feedback function
251
    proc = mcpu.Processor(self.pool.context)
252
    try:
253
      job.Run(proc)
254
    finally:
255
      logging.debug("Worker %s finished job %s, status = %s",
256
                    self.worker_id, job.id, job.GetStatus())
257

    
258

    
259
class _JobQueueWorkerPool(workerpool.WorkerPool):
260
  def __init__(self, context):
261
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
262
                                              _JobQueueWorker)
263
    self.context = context
264

    
265

    
266
class JobStorage(object):
267
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
268

    
269
  def __init__(self):
270
    self._lock = threading.Lock()
271
    self._memcache = {}
272

    
273
    # Make sure our directory exists
274
    try:
275
      os.mkdir(constants.QUEUE_DIR, 0700)
276
    except OSError, err:
277
      if err.errno not in (errno.EEXIST, ):
278
        raise
279

    
280
    # Get queue lock
281
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
282
    try:
283
      utils.LockFile(self.lock_fd)
284
    except:
285
      self.lock_fd.close()
286
      raise
287

    
288
    # Read version
289
    try:
290
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
291
    except IOError, err:
292
      if err.errno not in (errno.ENOENT, ):
293
        raise
294

    
295
      # Setup a new queue
296
      self._InitQueueUnlocked()
297

    
298
      # Try to open again
299
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
300

    
301
    try:
302
      # Try to read version
303
      version = int(version_fd.read(128))
304

    
305
      # Verify version
306
      if version != constants.JOB_QUEUE_VERSION:
307
        raise errors.JobQueueError("Found version %s, expected %s",
308
                                   version, constants.JOB_QUEUE_VERSION)
309
    finally:
310
      version_fd.close()
311

    
312
    serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
313
    try:
314
      # Read last serial
315
      self._last_serial = int(serial_fd.read(1024).strip())
316
    finally:
317
      serial_fd.close()
318

    
319
  def Close(self):
320
    assert self.lock_fd, "Queue should be open"
321

    
322
    self.lock_fd.close()
323
    self.lock_fd = None
324

    
325
  def _InitQueueUnlocked(self):
326
    assert self.lock_fd, "Queue should be open"
327

    
328
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
329
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
330
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
331
                    data="%s\n" % 0)
332

    
333
  def _NewSerialUnlocked(self):
334
    """Generates a new job identifier.
335

336
    Job identifiers are unique during the lifetime of a cluster.
337

338
    Returns: A string representing the job identifier.
339

340
    """
341
    assert self.lock_fd, "Queue should be open"
342

    
343
    # New number
344
    serial = self._last_serial + 1
345

    
346
    # Write to file
347
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
348
                    data="%s\n" % serial)
349

    
350
    # Keep it only if we were able to write the file
351
    self._last_serial = serial
352

    
353
    return serial
354

    
355
  def _GetJobPath(self, job_id):
356
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
357

    
358
  def _GetJobIDsUnlocked(self, archived=False):
359
    """Return all known job IDs.
360

361
    If the parameter archived is True, archived jobs IDs will be
362
    included. Currently this argument is unused.
363

364
    The method only looks at disk because it's a requirement that all
365
    jobs are present on disk (so in the _memcache we don't have any
366
    extra IDs).
367

368
    """
369
    jfiles = self._ListJobFiles()
370
    jlist = [int(m.group(1)) for m in
371
             [self._RE_JOB_FILE.match(name) for name in jfiles]]
372
    jlist.sort()
373
    return jlist
374

    
375
  def _ListJobFiles(self):
376
    assert self.lock_fd, "Queue should be open"
377

    
378
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
379
            if self._RE_JOB_FILE.match(name)]
380

    
381
  def _LoadJobUnlocked(self, job_id):
382
    assert self.lock_fd, "Queue should be open"
383

    
384
    if job_id in self._memcache:
385
      logging.debug("Found job %d in memcache", job_id)
386
      return self._memcache[job_id]
387

    
388
    filepath = self._GetJobPath(job_id)
389
    logging.debug("Loading job from %s", filepath)
390
    try:
391
      fd = open(filepath, "r")
392
    except IOError, err:
393
      if err.errno in (errno.ENOENT, ):
394
        return None
395
      raise
396
    try:
397
      data = serializer.LoadJson(fd.read())
398
    finally:
399
      fd.close()
400

    
401
    job = _QueuedJob.Restore(self, data)
402
    self._memcache[job_id] = job
403
    logging.debug("Added job %d to the cache", job_id)
404
    return job
405

    
406
  def _GetJobsUnlocked(self, job_ids):
407
    if not job_ids:
408
      job_ids = self._GetJobIDsUnlocked()
409

    
410
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
411

    
412
  @utils.LockedMethod
413
  def GetJobs(self, job_ids):
414
    return self._GetJobsUnlocked(job_ids)
415

    
416
  @utils.LockedMethod
417
  def AddJob(self, ops):
418
    assert self.lock_fd, "Queue should be open"
419

    
420
    # Get job identifier
421
    job_id = self._NewSerialUnlocked()
422
    job = _QueuedJob(self, job_id, ops)
423

    
424
    # Write to disk
425
    self._UpdateJobUnlocked(job)
426

    
427
    logging.debug("Added new job %d to the cache", job_id)
428
    self._memcache[job_id] = job
429

    
430
    return job
431

    
432
  def _UpdateJobUnlocked(self, job):
433
    assert self.lock_fd, "Queue should be open"
434

    
435
    filename = self._GetJobPath(job.id)
436
    logging.debug("Writing job %s to %s", job.id, filename)
437
    utils.WriteFile(filename,
438
                    data=serializer.DumpJson(job.Serialize(), indent=False))
439
    self._CleanCacheUnlocked(exceptions=[job.id])
440

    
441
  def _CleanCacheUnlocked(self, exceptions=None):
442
    """Clean the memory cache.
443

444
    The exceptions argument contains job IDs that should not be
445
    cleaned.
446

447
    """
448
    assert isinstance(exceptions, list)
449
    for job in self._memcache.values():
450
      if job.id in exceptions:
451
        continue
452
      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
453
                                 constants.JOB_STATUS_RUNNING):
454
        logging.debug("Cleaning job %d from the cache", job.id)
455
        try:
456
          del self._memcache[job.id]
457
        except KeyError:
458
          pass
459

    
460
  @utils.LockedMethod
461
  def UpdateJob(self, job):
462
    return self._UpdateJobUnlocked(job)
463

    
464
  def ArchiveJob(self, job_id):
465
    raise NotImplementedError()
466

    
467

    
468
class JobQueue:
469
  """The job queue.
470

471
   """
472
  def __init__(self, context):
473
    self._lock = threading.Lock()
474
    self._jobs = JobStorage()
475
    self._wpool = _JobQueueWorkerPool(context)
476

    
477
    for job in self._jobs.GetJobs(None):
478
      status = job.GetStatus()
479
      if status in (constants.JOB_STATUS_QUEUED, ):
480
        self._wpool.AddTask(job)
481

    
482
      elif status in (constants.JOB_STATUS_RUNNING, ):
483
        logging.warning("Unfinished job %s found: %s", job.id, job)
484
        job.SetUnclean("Unclean master daemon shutdown")
485

    
486
  @utils.LockedMethod
487
  def SubmitJob(self, ops):
488
    """Add a new job to the queue.
489

490
    This enters the job into our job queue and also puts it on the new
491
    queue, in order for it to be picked up by the queue processors.
492

493
    Args:
494
    - ops: Sequence of opcodes
495

496
    """
497
    job = self._jobs.AddJob(ops)
498

    
499
    # Add to worker pool
500
    self._wpool.AddTask(job)
501

    
502
    return job.id
503

    
504
  def ArchiveJob(self, job_id):
505
    raise NotImplementedError()
506

    
507
  def CancelJob(self, job_id):
508
    raise NotImplementedError()
509

    
510
  def _GetJobInfo(self, job, fields):
511
    row = []
512
    for fname in fields:
513
      if fname == "id":
514
        row.append(job.id)
515
      elif fname == "status":
516
        row.append(job.GetStatus())
517
      elif fname == "ops":
518
        row.append([op.GetInput().__getstate__() for op in job._ops])
519
      elif fname == "opresult":
520
        row.append([op.GetResult() for op in job._ops])
521
      elif fname == "opstatus":
522
        row.append([op.GetStatus() for op in job._ops])
523
      elif fname == "ticker":
524
        ji = job.GetRunOpIndex()
525
        if ji < 0:
526
          lmsg = None
527
        else:
528
          lmsg = job._ops[ji].RetrieveLog(-1)
529
          # message might be empty here
530
          if lmsg:
531
            lmsg = lmsg[0]
532
          else:
533
            lmsg = None
534
        row.append(lmsg)
535
      else:
536
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
537
    return row
538

    
539
  def QueryJobs(self, job_ids, fields):
540
    """Returns a list of jobs in queue.
541

542
    Args:
543
    - job_ids: Sequence of job identifiers or None for all
544
    - fields: Names of fields to return
545

546
    """
547
    self._lock.acquire()
548
    try:
549
      jobs = []
550

    
551
      for job in self._jobs.GetJobs(job_ids):
552
        if job is None:
553
          jobs.append(None)
554
        else:
555
          jobs.append(self._GetJobInfo(job, fields))
556

    
557
      return jobs
558
    finally:
559
      self._lock.release()
560

    
561
  @utils.LockedMethod
562
  def Shutdown(self):
563
    """Stops the job queue.
564

565
    """
566
    self._wpool.TerminateWorkers()
567
    self._jobs.Close()