Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ c4beba1c

History | View | Annotate | Download (15 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29
import time
30

    
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import workerpool
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import mcpu
37
from ganeti import utils
38

    
39

    
40
JOBQUEUE_THREADS = 5
41

    
42

    
43
class _QueuedOpCode(object):
44
  """Encasulates an opcode object.
45

46
  Access is synchronized by the '_lock' attribute.
47

48
  The 'log' attribute holds the execution log and consists of tuples
49
  of the form (timestamp, level, message).
50

51
  """
52
  def __init__(self, op):
53
    self.__Setup(op, constants.OP_STATUS_QUEUED, None, [])
54

    
55
  def __Setup(self, input_, status, result, log):
56
    self._lock = threading.Lock()
57
    self.input = input_
58
    self.status = status
59
    self.result = result
60
    self.log = log
61

    
62
  @classmethod
63
  def Restore(cls, state):
64
    obj = object.__new__(cls)
65
    obj.__Setup(opcodes.OpCode.LoadOpCode(state["input"]),
66
                state["status"], state["result"], state["log"])
67
    return obj
68

    
69
  @utils.LockedMethod
70
  def Serialize(self):
71
    return {
72
      "input": self.input.__getstate__(),
73
      "status": self.status,
74
      "result": self.result,
75
      "log": self.log,
76
      }
77

    
78
  @utils.LockedMethod
79
  def GetInput(self):
80
    """Returns the original opcode.
81

82
    """
83
    return self.input
84

    
85
  @utils.LockedMethod
86
  def SetStatus(self, status, result):
87
    """Update the opcode status and result.
88

89
    """
90
    self.status = status
91
    self.result = result
92

    
93
  @utils.LockedMethod
94
  def GetStatus(self):
95
    """Get the opcode status.
96

97
    """
98
    return self.status
99

    
100
  @utils.LockedMethod
101
  def GetResult(self):
102
    """Get the opcode result.
103

104
    """
105
    return self.result
106

    
107
  @utils.LockedMethod
108
  def Log(self, *args):
109
    """Append a log entry.
110

111
    """
112
    assert len(args) < 2
113

    
114
    if len(args) == 1:
115
      log_type = constants.ELOG_MESSAGE
116
      log_msg = args[0]
117
    else:
118
      log_type, log_msg = args
119
    self.log.append((time.time(), log_type, log_msg))
120

    
121
  @utils.LockedMethod
122
  def RetrieveLog(self, start_at=0):
123
    """Retrieve (a part of) the execution log.
124

125
    """
126
    return self.log[start_at:]
127

    
128

    
129
class _QueuedJob(object):
130
  """In-memory job representation.
131

132
  This is what we use to track the user-submitted jobs.
133

134
  """
135
  def __init__(self, storage, job_id, ops):
136
    if not ops:
137
      # TODO
138
      raise Exception("No opcodes")
139

    
140
    self.__Setup(storage, job_id, [_QueuedOpCode(op) for op in ops], -1)
141

    
142
  def __Setup(self, storage, job_id, ops, run_op_index):
143
    self._lock = threading.Lock()
144
    self.storage = storage
145
    self.id = job_id
146
    self._ops = ops
147
    self.run_op_index = run_op_index
148

    
149
  @classmethod
150
  def Restore(cls, storage, state):
151
    obj = object.__new__(cls)
152
    op_list = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
153
    obj.__Setup(storage, state["id"], op_list, state["run_op_index"])
154
    return obj
155

    
156
  def Serialize(self):
157
    return {
158
      "id": self.id,
159
      "ops": [op.Serialize() for op in self._ops],
160
      "run_op_index": self.run_op_index,
161
      }
162

    
163
  def SetUnclean(self, msg):
164
    try:
165
      for op in self._ops:
166
        op.SetStatus(constants.OP_STATUS_ERROR, msg)
167
    finally:
168
      self.storage.UpdateJob(self)
169

    
170
  def GetStatus(self):
171
    status = constants.JOB_STATUS_QUEUED
172

    
173
    all_success = True
174
    for op in self._ops:
175
      op_status = op.GetStatus()
176
      if op_status == constants.OP_STATUS_SUCCESS:
177
        continue
178

    
179
      all_success = False
180

    
181
      if op_status == constants.OP_STATUS_QUEUED:
182
        pass
183
      elif op_status == constants.OP_STATUS_RUNNING:
184
        status = constants.JOB_STATUS_RUNNING
185
      elif op_status == constants.OP_STATUS_ERROR:
186
        status = constants.JOB_STATUS_ERROR
187
        # The whole job fails if one opcode failed
188
        break
189

    
190
    if all_success:
191
      status = constants.JOB_STATUS_SUCCESS
192

    
193
    return status
194

    
195
  @utils.LockedMethod
196
  def GetRunOpIndex(self):
197
    return self.run_op_index
198

    
199
  def Run(self, proc):
200
    """Job executor.
201

202
    This functions processes a this job in the context of given processor
203
    instance.
204

205
    Args:
206
    - proc: Ganeti Processor to run the job with
207

208
    """
209
    try:
210
      count = len(self._ops)
211
      for idx, op in enumerate(self._ops):
212
        try:
213
          logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
214

    
215
          self._lock.acquire()
216
          try:
217
            self.run_op_index = idx
218
          finally:
219
            self._lock.release()
220

    
221
          op.SetStatus(constants.OP_STATUS_RUNNING, None)
222
          self.storage.UpdateJob(self)
223

    
224
          result = proc.ExecOpCode(op.input, op.Log)
225

    
226
          op.SetStatus(constants.OP_STATUS_SUCCESS, result)
227
          self.storage.UpdateJob(self)
228
          logging.debug("Op %s/%s: Successfully finished %s",
229
                        idx + 1, count, op)
230
        except Exception, err:
231
          try:
232
            op.SetStatus(constants.OP_STATUS_ERROR, str(err))
233
            logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
234
          finally:
235
            self.storage.UpdateJob(self)
236
          raise
237

    
238
    except errors.GenericError, err:
239
      logging.error("ganeti exception %s", exc_info=err)
240
    except Exception, err:
241
      logging.error("unhandled exception %s", exc_info=err)
242
    except:
243
      logging.error("unhandled unknown exception %s", exc_info=err)
244

    
245

    
246
class _JobQueueWorker(workerpool.BaseWorker):
247
  def RunTask(self, job):
248
    logging.debug("Worker %s processing job %s",
249
                  self.worker_id, job.id)
250
    # TODO: feedback function
251
    proc = mcpu.Processor(self.pool.context)
252
    try:
253
      job.Run(proc)
254
    finally:
255
      logging.debug("Worker %s finished job %s, status = %s",
256
                    self.worker_id, job.id, job.GetStatus())
257

    
258

    
259
class _JobQueueWorkerPool(workerpool.WorkerPool):
260
  def __init__(self, context):
261
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
262
                                              _JobQueueWorker)
263
    self.context = context
264

    
265

    
266
class JobStorage(object):
267
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
268

    
269
  def __init__(self):
270
    self._lock = threading.Lock()
271
    self._memcache = {}
272

    
273
    # Make sure our directory exists
274
    try:
275
      os.mkdir(constants.QUEUE_DIR, 0700)
276
    except OSError, err:
277
      if err.errno not in (errno.EEXIST, ):
278
        raise
279

    
280
    # Get queue lock
281
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
282
    try:
283
      utils.LockFile(self.lock_fd)
284
    except:
285
      self.lock_fd.close()
286
      raise
287

    
288
    # Read version
289
    try:
290
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
291
    except IOError, err:
292
      if err.errno not in (errno.ENOENT, ):
293
        raise
294

    
295
      # Setup a new queue
296
      self._InitQueueUnlocked()
297

    
298
      # Try to open again
299
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
300

    
301
    try:
302
      # Try to read version
303
      version = int(version_fd.read(128))
304

    
305
      # Verify version
306
      if version != constants.JOB_QUEUE_VERSION:
307
        raise errors.JobQueueError("Found version %s, expected %s",
308
                                   version, constants.JOB_QUEUE_VERSION)
309
    finally:
310
      version_fd.close()
311

    
312
    self._last_serial = self._ReadSerial()
313
    if self._last_serial is None:
314
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
315
                                      " file")
316

    
317
  @staticmethod
318
  def _ReadSerial():
319
    """Try to read the job serial file.
320

321
    @rtype: None or int
322
    @return: If the serial can be read, then it is returned. Otherwise None
323
             is returned.
324

325
    """
326
    try:
327
      serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
328
      try:
329
        # Read last serial
330
        serial = int(serial_fd.read(1024).strip())
331
      finally:
332
        serial_fd.close()
333
    except (ValueError, EnvironmentError):
334
      serial = None
335

    
336
    return serial
337

    
338
  def Close(self):
339
    assert self.lock_fd, "Queue should be open"
340

    
341
    self.lock_fd.close()
342
    self.lock_fd = None
343

    
344
  def _InitQueueUnlocked(self):
345
    assert self.lock_fd, "Queue should be open"
346

    
347
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
348
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
349
    if self._ReadSerial() is None:
350
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
351
                      data="%s\n" % 0)
352

    
353
  def _NewSerialUnlocked(self):
354
    """Generates a new job identifier.
355

356
    Job identifiers are unique during the lifetime of a cluster.
357

358
    Returns: A string representing the job identifier.
359

360
    """
361
    assert self.lock_fd, "Queue should be open"
362

    
363
    # New number
364
    serial = self._last_serial + 1
365

    
366
    # Write to file
367
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
368
                    data="%s\n" % serial)
369

    
370
    # Keep it only if we were able to write the file
371
    self._last_serial = serial
372

    
373
    return serial
374

    
375
  def _GetJobPath(self, job_id):
376
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
377

    
378
  def _GetJobIDsUnlocked(self, archived=False):
379
    """Return all known job IDs.
380

381
    If the parameter archived is True, archived jobs IDs will be
382
    included. Currently this argument is unused.
383

384
    The method only looks at disk because it's a requirement that all
385
    jobs are present on disk (so in the _memcache we don't have any
386
    extra IDs).
387

388
    """
389
    jfiles = self._ListJobFiles()
390
    jlist = [int(m.group(1)) for m in
391
             [self._RE_JOB_FILE.match(name) for name in jfiles]]
392
    jlist.sort()
393
    return jlist
394

    
395
  def _ListJobFiles(self):
396
    assert self.lock_fd, "Queue should be open"
397

    
398
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
399
            if self._RE_JOB_FILE.match(name)]
400

    
401
  def _LoadJobUnlocked(self, job_id):
402
    assert self.lock_fd, "Queue should be open"
403

    
404
    if job_id in self._memcache:
405
      logging.debug("Found job %d in memcache", job_id)
406
      return self._memcache[job_id]
407

    
408
    filepath = self._GetJobPath(job_id)
409
    logging.debug("Loading job from %s", filepath)
410
    try:
411
      fd = open(filepath, "r")
412
    except IOError, err:
413
      if err.errno in (errno.ENOENT, ):
414
        return None
415
      raise
416
    try:
417
      data = serializer.LoadJson(fd.read())
418
    finally:
419
      fd.close()
420

    
421
    job = _QueuedJob.Restore(self, data)
422
    self._memcache[job_id] = job
423
    logging.debug("Added job %d to the cache", job_id)
424
    return job
425

    
426
  def _GetJobsUnlocked(self, job_ids):
427
    if not job_ids:
428
      job_ids = self._GetJobIDsUnlocked()
429

    
430
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
431

    
432
  @utils.LockedMethod
433
  def GetJobs(self, job_ids):
434
    return self._GetJobsUnlocked(job_ids)
435

    
436
  @utils.LockedMethod
437
  def AddJob(self, ops):
438
    assert self.lock_fd, "Queue should be open"
439

    
440
    # Get job identifier
441
    job_id = self._NewSerialUnlocked()
442
    job = _QueuedJob(self, job_id, ops)
443

    
444
    # Write to disk
445
    self._UpdateJobUnlocked(job)
446

    
447
    logging.debug("Added new job %d to the cache", job_id)
448
    self._memcache[job_id] = job
449

    
450
    return job
451

    
452
  def _UpdateJobUnlocked(self, job):
453
    assert self.lock_fd, "Queue should be open"
454

    
455
    filename = self._GetJobPath(job.id)
456
    logging.debug("Writing job %s to %s", job.id, filename)
457
    utils.WriteFile(filename,
458
                    data=serializer.DumpJson(job.Serialize(), indent=False))
459
    self._CleanCacheUnlocked([job.id])
460

    
461
  def _CleanCacheUnlocked(self, exclude):
462
    """Clean the memory cache.
463

464
    The exceptions argument contains job IDs that should not be
465
    cleaned.
466

467
    """
468
    assert isinstance(exclude, list)
469
    for job in self._memcache.values():
470
      if job.id in exclude:
471
        continue
472
      if job.GetStatus() not in (constants.JOB_STATUS_QUEUED,
473
                                 constants.JOB_STATUS_RUNNING):
474
        logging.debug("Cleaning job %d from the cache", job.id)
475
        try:
476
          del self._memcache[job.id]
477
        except KeyError:
478
          pass
479

    
480
  @utils.LockedMethod
481
  def UpdateJob(self, job):
482
    return self._UpdateJobUnlocked(job)
483

    
484
  def ArchiveJob(self, job_id):
485
    raise NotImplementedError()
486

    
487

    
488
class JobQueue:
489
  """The job queue.
490

491
   """
492
  def __init__(self, context):
493
    self._lock = threading.Lock()
494
    self._jobs = JobStorage()
495
    self._wpool = _JobQueueWorkerPool(context)
496

    
497
    for job in self._jobs.GetJobs(None):
498
      status = job.GetStatus()
499
      if status in (constants.JOB_STATUS_QUEUED, ):
500
        self._wpool.AddTask(job)
501

    
502
      elif status in (constants.JOB_STATUS_RUNNING, ):
503
        logging.warning("Unfinished job %s found: %s", job.id, job)
504
        job.SetUnclean("Unclean master daemon shutdown")
505

    
506
  @utils.LockedMethod
507
  def SubmitJob(self, ops):
508
    """Add a new job to the queue.
509

510
    This enters the job into our job queue and also puts it on the new
511
    queue, in order for it to be picked up by the queue processors.
512

513
    Args:
514
    - ops: Sequence of opcodes
515

516
    """
517
    job = self._jobs.AddJob(ops)
518

    
519
    # Add to worker pool
520
    self._wpool.AddTask(job)
521

    
522
    return job.id
523

    
524
  def ArchiveJob(self, job_id):
525
    raise NotImplementedError()
526

    
527
  def CancelJob(self, job_id):
528
    raise NotImplementedError()
529

    
530
  def _GetJobInfo(self, job, fields):
531
    row = []
532
    for fname in fields:
533
      if fname == "id":
534
        row.append(job.id)
535
      elif fname == "status":
536
        row.append(job.GetStatus())
537
      elif fname == "ops":
538
        row.append([op.GetInput().__getstate__() for op in job._ops])
539
      elif fname == "opresult":
540
        row.append([op.GetResult() for op in job._ops])
541
      elif fname == "opstatus":
542
        row.append([op.GetStatus() for op in job._ops])
543
      elif fname == "ticker":
544
        ji = job.GetRunOpIndex()
545
        if ji < 0:
546
          lmsg = None
547
        else:
548
          lmsg = job._ops[ji].RetrieveLog(-1)
549
          # message might be empty here
550
          if lmsg:
551
            lmsg = lmsg[0]
552
          else:
553
            lmsg = None
554
        row.append(lmsg)
555
      else:
556
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
557
    return row
558

    
559
  def QueryJobs(self, job_ids, fields):
560
    """Returns a list of jobs in queue.
561

562
    Args:
563
    - job_ids: Sequence of job identifiers or None for all
564
    - fields: Names of fields to return
565

566
    """
567
    self._lock.acquire()
568
    try:
569
      jobs = []
570

    
571
      for job in self._jobs.GetJobs(job_ids):
572
        if job is None:
573
          jobs.append(None)
574
        else:
575
          jobs.append(self._GetJobInfo(job, fields))
576

    
577
      return jobs
578
    finally:
579
      self._lock.release()
580

    
581
  @utils.LockedMethod
582
  def Shutdown(self):
583
    """Stops the job queue.
584

585
    """
586
    self._wpool.TerminateWorkers()
587
    self._jobs.Close()