Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 85f03e0d

History | View | Annotate | Download (17.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling."""
23

    
24
import os
25
import logging
26
import threading
27
import errno
28
import re
29
import time
30

    
31
from ganeti import constants
32
from ganeti import serializer
33
from ganeti import workerpool
34
from ganeti import opcodes
35
from ganeti import errors
36
from ganeti import mcpu
37
from ganeti import utils
38
from ganeti import rpc
39

    
40

    
41
JOBQUEUE_THREADS = 5
42

    
43

    
44
class _QueuedOpCode(object):
45
  """Encasulates an opcode object.
46

47
  Access is synchronized by the '_lock' attribute.
48

49
  The 'log' attribute holds the execution log and consists of tuples
50
  of the form (timestamp, level, message).
51

52
  """
53
  def __new__(cls, *args, **kwargs):
54
    obj = object.__new__(cls, *args, **kwargs)
55
    # Create a special lock for logging
56
    obj._log_lock = threading.Lock()
57
    return obj
58

    
59
  def __init__(self, op):
60
    self.input = op
61
    self.status = constants.OP_STATUS_QUEUED
62
    self.result = None
63
    self.log = []
64

    
65
  @classmethod
66
  def Restore(cls, state):
67
    obj = _QueuedOpCode.__new__(cls)
68
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
69
    obj.status = state["status"]
70
    obj.result = state["result"]
71
    obj.log = state["log"]
72
    return obj
73

    
74
  def Serialize(self):
75
    self._log_lock.acquire()
76
    try:
77
      return {
78
        "input": self.input.__getstate__(),
79
        "status": self.status,
80
        "result": self.result,
81
        "log": self.log,
82
        }
83
    finally:
84
      self._log_lock.release()
85

    
86
  def Log(self, *args):
87
    """Append a log entry.
88

89
    """
90
    assert len(args) < 3
91

    
92
    if len(args) == 1:
93
      log_type = constants.ELOG_MESSAGE
94
      log_msg = args[0]
95
    else:
96
      log_type, log_msg = args
97

    
98
    self._log_lock.acquire()
99
    try:
100
      self.log.append((time.time(), log_type, log_msg))
101
    finally:
102
      self._log_lock.release()
103

    
104
  def RetrieveLog(self, start_at=0):
105
    """Retrieve (a part of) the execution log.
106

107
    """
108
    self._log_lock.acquire()
109
    try:
110
      return self.log[start_at:]
111
    finally:
112
      self._log_lock.release()
113

    
114

    
115
class _QueuedJob(object):
116
  """In-memory job representation.
117

118
  This is what we use to track the user-submitted jobs.
119

120
  """
121
  def __init__(self, queue, job_id, ops):
122
    if not ops:
123
      # TODO
124
      raise Exception("No opcodes")
125

    
126
    self.queue = queue
127
    self.id = job_id
128
    self.ops = [_QueuedOpCode(op) for op in ops]
129
    self.run_op_index = -1
130

    
131
  @classmethod
132
  def Restore(cls, queue, state):
133
    obj = _QueuedJob.__new__(cls)
134
    obj.queue = queue
135
    obj.id = state["id"]
136
    obj.ops = [_QueuedOpCode.Restore(op_state) for op_state in state["ops"]]
137
    obj.run_op_index = state["run_op_index"]
138
    return obj
139

    
140
  def Serialize(self):
141
    return {
142
      "id": self.id,
143
      "ops": [op.Serialize() for op in self.ops],
144
      "run_op_index": self.run_op_index,
145
      }
146

    
147
  def CalcStatus(self):
148
    status = constants.JOB_STATUS_QUEUED
149

    
150
    all_success = True
151
    for op in self.ops:
152
      if op.status == constants.OP_STATUS_SUCCESS:
153
        continue
154

    
155
      all_success = False
156

    
157
      if op.status == constants.OP_STATUS_QUEUED:
158
        pass
159
      elif op.status == constants.OP_STATUS_RUNNING:
160
        status = constants.JOB_STATUS_RUNNING
161
      elif op.status == constants.OP_STATUS_ERROR:
162
        status = constants.JOB_STATUS_ERROR
163
        # The whole job fails if one opcode failed
164
        break
165
      elif op.status == constants.OP_STATUS_CANCELED:
166
        status = constants.OP_STATUS_CANCELED
167
        break
168

    
169
    if all_success:
170
      status = constants.JOB_STATUS_SUCCESS
171

    
172
    return status
173

    
174

    
175
class _JobQueueWorker(workerpool.BaseWorker):
176
  def RunTask(self, job):
177
    """Job executor.
178

179
    This functions processes a job.
180

181
    """
182
    logging.debug("Worker %s processing job %s",
183
                  self.worker_id, job.id)
184
    proc = mcpu.Processor(self.pool.context)
185
    queue = job.queue
186
    try:
187
      try:
188
        count = len(job.ops)
189
        for idx, op in enumerate(job.ops):
190
          try:
191
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
192

    
193
            queue.acquire()
194
            try:
195
              job.run_op_index = idx
196
              op.status = constants.OP_STATUS_RUNNING
197
              op.result = None
198
              queue.UpdateJobUnlocked(job)
199

    
200
              input = op.input
201
            finally:
202
              queue.release()
203

    
204
            result = proc.ExecOpCode(input, op.Log)
205

    
206
            queue.acquire()
207
            try:
208
              op.status = constants.OP_STATUS_SUCCESS
209
              op.result = result
210
              queue.UpdateJobUnlocked(job)
211
            finally:
212
              queue.release()
213

    
214
            logging.debug("Op %s/%s: Successfully finished %s",
215
                          idx + 1, count, op)
216
          except Exception, err:
217
            queue.acquire()
218
            try:
219
              try:
220
                op.status = constants.OP_STATUS_ERROR
221
                op.result = str(err)
222
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
223
              finally:
224
                queue.UpdateJobUnlocked(job)
225
            finally:
226
              queue.release()
227
            raise
228

    
229
      except errors.GenericError, err:
230
        logging.exception("Ganeti exception")
231
      except:
232
        logging.exception("Unhandled exception")
233
    finally:
234
      queue.acquire()
235
      try:
236
        job_id = job.id
237
        status = job.CalcStatus()
238
      finally:
239
        queue.release()
240
      logging.debug("Worker %s finished job %s, status = %s",
241
                    self.worker_id, job_id, status)
242

    
243

    
244
class _JobQueueWorkerPool(workerpool.WorkerPool):
245
  def __init__(self, context):
246
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
247
                                              _JobQueueWorker)
248
    self.context = context
249

    
250

    
251
class JobQueue(object):
252
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
253

    
254
  def __init__(self, context):
255
    self._memcache = {}
256
    self._my_hostname = utils.HostInfo().name
257

    
258
    # Locking
259
    self._lock = threading.Lock()
260
    self.acquire = self._lock.acquire
261
    self.release = self._lock.release
262

    
263
    # Make sure our directories exists
264
    for path in (constants.QUEUE_DIR, constants.JOB_QUEUE_ARCHIVE_DIR):
265
      try:
266
        os.mkdir(path, 0700)
267
      except OSError, err:
268
        if err.errno not in (errno.EEXIST, ):
269
          raise
270

    
271
    # Get queue lock
272
    self.lock_fd = open(constants.JOB_QUEUE_LOCK_FILE, "w")
273
    try:
274
      utils.LockFile(self.lock_fd)
275
    except:
276
      self.lock_fd.close()
277
      raise
278

    
279
    # Read version
280
    try:
281
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
282
    except IOError, err:
283
      if err.errno not in (errno.ENOENT, ):
284
        raise
285

    
286
      # Setup a new queue
287
      self._InitQueueUnlocked()
288

    
289
      # Try to open again
290
      version_fd = open(constants.JOB_QUEUE_VERSION_FILE, "r")
291

    
292
    try:
293
      # Try to read version
294
      version = int(version_fd.read(128))
295

    
296
      # Verify version
297
      if version != constants.JOB_QUEUE_VERSION:
298
        raise errors.JobQueueError("Found version %s, expected %s",
299
                                   version, constants.JOB_QUEUE_VERSION)
300
    finally:
301
      version_fd.close()
302

    
303
    self._last_serial = self._ReadSerial()
304
    if self._last_serial is None:
305
      raise errors.ConfigurationError("Can't read/parse the job queue serial"
306
                                      " file")
307

    
308
    # Setup worker pool
309
    self._wpool = _JobQueueWorkerPool(context)
310

    
311
    # We need to lock here because WorkerPool.AddTask() may start a job while
312
    # we're still doing our work.
313
    self.acquire()
314
    try:
315
      for job in self._GetJobsUnlocked(None):
316
        status = job.CalcStatus()
317

    
318
        if status in (constants.JOB_STATUS_QUEUED, ):
319
          self._wpool.AddTask(job)
320

    
321
        elif status in (constants.JOB_STATUS_RUNNING, ):
322
          logging.warning("Unfinished job %s found: %s", job.id, job)
323
          try:
324
            for op in job.ops:
325
              op.status = constants.OP_STATUS_ERROR
326
              op.result = "Unclean master daemon shutdown"
327
          finally:
328
            self.UpdateJobUnlocked(job)
329
    finally:
330
      self.release()
331

    
332
  @staticmethod
333
  def _ReadSerial():
334
    """Try to read the job serial file.
335

336
    @rtype: None or int
337
    @return: If the serial can be read, then it is returned. Otherwise None
338
             is returned.
339

340
    """
341
    try:
342
      serial_fd = open(constants.JOB_QUEUE_SERIAL_FILE, "r")
343
      try:
344
        # Read last serial
345
        serial = int(serial_fd.read(1024).strip())
346
      finally:
347
        serial_fd.close()
348
    except (ValueError, EnvironmentError):
349
      serial = None
350

    
351
    return serial
352

    
353
  def _InitQueueUnlocked(self):
354
    assert self.lock_fd, "Queue should be open"
355

    
356
    utils.WriteFile(constants.JOB_QUEUE_VERSION_FILE,
357
                    data="%s\n" % constants.JOB_QUEUE_VERSION)
358
    if self._ReadSerial() is None:
359
      utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
360
                      data="%s\n" % 0)
361

    
362
  def _FormatJobID(self, job_id):
363
    if not isinstance(job_id, (int, long)):
364
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
365
    if job_id < 0:
366
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
367

    
368
    return str(job_id)
369

    
370
  def _NewSerialUnlocked(self, nodes):
371
    """Generates a new job identifier.
372

373
    Job identifiers are unique during the lifetime of a cluster.
374

375
    Returns: A string representing the job identifier.
376

377
    """
378
    assert self.lock_fd, "Queue should be open"
379

    
380
    # New number
381
    serial = self._last_serial + 1
382

    
383
    # Write to file
384
    utils.WriteFile(constants.JOB_QUEUE_SERIAL_FILE,
385
                    data="%s\n" % serial)
386

    
387
    # Keep it only if we were able to write the file
388
    self._last_serial = serial
389

    
390
    # Distribute the serial to the other nodes
391
    try:
392
      nodes.remove(self._my_hostname)
393
    except ValueError:
394
      pass
395

    
396
    result = rpc.call_upload_file(nodes, constants.JOB_QUEUE_SERIAL_FILE)
397
    for node in nodes:
398
      if not result[node]:
399
        logging.error("copy of job queue file to node %s failed", node)
400

    
401
    return self._FormatJobID(serial)
402

    
403
  @staticmethod
404
  def _GetJobPath(job_id):
405
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
406

    
407
  @staticmethod
408
  def _GetArchivedJobPath(job_id):
409
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
410

    
411
  @classmethod
412
  def _ExtractJobID(cls, name):
413
    m = cls._RE_JOB_FILE.match(name)
414
    if m:
415
      return m.group(1)
416
    else:
417
      return None
418

    
419
  def _GetJobIDsUnlocked(self, archived=False):
420
    """Return all known job IDs.
421

422
    If the parameter archived is True, archived jobs IDs will be
423
    included. Currently this argument is unused.
424

425
    The method only looks at disk because it's a requirement that all
426
    jobs are present on disk (so in the _memcache we don't have any
427
    extra IDs).
428

429
    """
430
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
431
    jlist.sort()
432
    return jlist
433

    
434
  def _ListJobFiles(self):
435
    assert self.lock_fd, "Queue should be open"
436

    
437
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
438
            if self._RE_JOB_FILE.match(name)]
439

    
440
  def _LoadJobUnlocked(self, job_id):
441
    assert self.lock_fd, "Queue should be open"
442

    
443
    if job_id in self._memcache:
444
      logging.debug("Found job %s in memcache", job_id)
445
      return self._memcache[job_id]
446

    
447
    filepath = self._GetJobPath(job_id)
448
    logging.debug("Loading job from %s", filepath)
449
    try:
450
      fd = open(filepath, "r")
451
    except IOError, err:
452
      if err.errno in (errno.ENOENT, ):
453
        return None
454
      raise
455
    try:
456
      data = serializer.LoadJson(fd.read())
457
    finally:
458
      fd.close()
459

    
460
    job = _QueuedJob.Restore(self, data)
461
    self._memcache[job_id] = job
462
    logging.debug("Added job %s to the cache", job_id)
463
    return job
464

    
465
  def _GetJobsUnlocked(self, job_ids):
466
    if not job_ids:
467
      job_ids = self._GetJobIDsUnlocked()
468

    
469
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
470

    
471
  @utils.LockedMethod
472
  def SubmitJob(self, ops, nodes):
473
    """Create and store a new job.
474

475
    This enters the job into our job queue and also puts it on the new
476
    queue, in order for it to be picked up by the queue processors.
477

478
    @type ops: list
479
    @param ops: The list of OpCodes that will become the new job.
480
    @type nodes: list
481
    @param nodes: The list of nodes to which the new job serial will be
482
                  distributed.
483

484
    """
485
    assert self.lock_fd, "Queue should be open"
486

    
487
    # Get job identifier
488
    job_id = self._NewSerialUnlocked(nodes)
489
    job = _QueuedJob(self, job_id, ops)
490

    
491
    # Write to disk
492
    self.UpdateJobUnlocked(job)
493

    
494
    logging.debug("Added new job %s to the cache", job_id)
495
    self._memcache[job_id] = job
496

    
497
    # Add to worker pool
498
    self._wpool.AddTask(job)
499

    
500
    return job.id
501

    
502
  def UpdateJobUnlocked(self, job):
503
    assert self.lock_fd, "Queue should be open"
504

    
505
    filename = self._GetJobPath(job.id)
506
    logging.debug("Writing job %s to %s", job.id, filename)
507
    utils.WriteFile(filename,
508
                    data=serializer.DumpJson(job.Serialize(), indent=False))
509
    self._CleanCacheUnlocked([job.id])
510

    
511
  def _CleanCacheUnlocked(self, exclude):
512
    """Clean the memory cache.
513

514
    The exceptions argument contains job IDs that should not be
515
    cleaned.
516

517
    """
518
    assert isinstance(exclude, list)
519

    
520
    for job in self._memcache.values():
521
      if job.id in exclude:
522
        continue
523
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
524
                                  constants.JOB_STATUS_RUNNING):
525
        logging.debug("Cleaning job %s from the cache", job.id)
526
        try:
527
          del self._memcache[job.id]
528
        except KeyError:
529
          pass
530

    
531
  @utils.LockedMethod
532
  def CancelJob(self, job_id):
533
    """Cancels a job.
534

535
    @type job_id: string
536
    @param job_id: Job ID of job to be cancelled.
537

538
    """
539
    logging.debug("Cancelling job %s", job_id)
540

    
541
    job = self._LoadJobUnlocked(job_id)
542
    if not job:
543
      logging.debug("Job %s not found", job_id)
544
      return
545

    
546
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
547
      logging.debug("Job %s is no longer in the queue", job.id)
548
      return
549

    
550
    try:
551
      for op in job.ops:
552
        op.status = constants.OP_STATUS_ERROR
553
        op.result = "Job cancelled by request"
554
    finally:
555
      self.UpdateJobUnlocked(job)
556

    
557
  @utils.LockedMethod
558
  def ArchiveJob(self, job_id):
559
    """Archives a job.
560

561
    @type job_id: string
562
    @param job_id: Job ID of job to be archived.
563

564
    """
565
    logging.debug("Archiving job %s", job_id)
566

    
567
    job = self._LoadJobUnlocked(job_id)
568
    if not job:
569
      logging.debug("Job %s not found", job_id)
570
      return
571

    
572
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
573
                                constants.JOB_STATUS_SUCCESS,
574
                                constants.JOB_STATUS_ERROR):
575
      logging.debug("Job %s is not yet done", job.id)
576
      return
577

    
578
    try:
579
      old = self._GetJobPath(job.id)
580
      new = self._GetArchivedJobPath(job.id)
581

    
582
      os.rename(old, new)
583

    
584
      logging.debug("Successfully archived job %s", job.id)
585
    finally:
586
      # Cleaning the cache because we don't know what os.rename actually did
587
      # and to be on the safe side.
588
      self._CleanCacheUnlocked([])
589

    
590
  def _GetJobInfoUnlocked(self, job, fields):
591
    row = []
592
    for fname in fields:
593
      if fname == "id":
594
        row.append(job.id)
595
      elif fname == "status":
596
        row.append(job.CalcStatus())
597
      elif fname == "ops":
598
        row.append([op.input.__getstate__() for op in job.ops])
599
      elif fname == "opresult":
600
        row.append([op.result for op in job.ops])
601
      elif fname == "opstatus":
602
        row.append([op.status for op in job.ops])
603
      elif fname == "ticker":
604
        ji = job.run_op_index
605
        if ji < 0:
606
          lmsg = None
607
        else:
608
          lmsg = job.ops[ji].RetrieveLog(-1)
609
          # message might be empty here
610
          if lmsg:
611
            lmsg = lmsg[0]
612
          else:
613
            lmsg = None
614
        row.append(lmsg)
615
      else:
616
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
617
    return row
618

    
619
  @utils.LockedMethod
620
  def QueryJobs(self, job_ids, fields):
621
    """Returns a list of jobs in queue.
622

623
    Args:
624
    - job_ids: Sequence of job identifiers or None for all
625
    - fields: Names of fields to return
626

627
    """
628
    jobs = []
629

    
630
    for job in self._GetJobsUnlocked(job_ids):
631
      if job is None:
632
        jobs.append(None)
633
      else:
634
        jobs.append(self._GetJobInfoUnlocked(job, fields))
635

    
636
    return jobs
637

    
638
  @utils.LockedMethod
639
  def Shutdown(self):
640
    """Stops the job queue.
641

642
    """
643
    assert self.lock_fd, "Queue should be open"
644

    
645
    self._wpool.TerminateWorkers()
646

    
647
    self.lock_fd.close()
648
    self.lock_fd = None