Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 70552c46

History | View | Annotate | Download (20.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking:
25
There's a single, large lock in the JobQueue class. It's used by all other
26
classes in this module.
27

28
"""
29

    
30
import os
31
import logging
32
import threading
33
import errno
34
import re
35
import time
36

    
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import workerpool
40
from ganeti import opcodes
41
from ganeti import errors
42
from ganeti import mcpu
43
from ganeti import utils
44
from ganeti import jstore
45
from ganeti import rpc
46

    
47

    
48
JOBQUEUE_THREADS = 5
49

    
50

    
51
def TimeStampNow():
52
  return utils.SplitTime(time.time())
53

    
54

    
55
class _QueuedOpCode(object):
56
  """Encasulates an opcode object.
57

58
  The 'log' attribute holds the execution log and consists of tuples
59
  of the form (log_serial, timestamp, level, message).
60

61
  """
62
  def __init__(self, op):
63
    self.input = op
64
    self.status = constants.OP_STATUS_QUEUED
65
    self.result = None
66
    self.log = []
67
    self.start_timestamp = None
68
    self.end_timestamp = None
69

    
70
  @classmethod
71
  def Restore(cls, state):
72
    obj = _QueuedOpCode.__new__(cls)
73
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
74
    obj.status = state["status"]
75
    obj.result = state["result"]
76
    obj.log = state["log"]
77
    obj.start_timestamp = state.get("start_timestamp", None)
78
    obj.end_timestamp = state.get("end_timestamp", None)
79
    return obj
80

    
81
  def Serialize(self):
82
    return {
83
      "input": self.input.__getstate__(),
84
      "status": self.status,
85
      "result": self.result,
86
      "log": self.log,
87
      "start_timestamp": self.start_timestamp,
88
      "end_timestamp": self.end_timestamp,
89
      }
90

    
91

    
92
class _QueuedJob(object):
93
  """In-memory job representation.
94

95
  This is what we use to track the user-submitted jobs. Locking must be taken
96
  care of by users of this class.
97

98
  """
99
  def __init__(self, queue, job_id, ops):
100
    if not ops:
101
      # TODO
102
      raise Exception("No opcodes")
103

    
104
    self.queue = queue
105
    self.id = job_id
106
    self.ops = [_QueuedOpCode(op) for op in ops]
107
    self.run_op_index = -1
108
    self.log_serial = 0
109

    
110
    # Condition to wait for changes
111
    self.change = threading.Condition(self.queue._lock)
112

    
113
  @classmethod
114
  def Restore(cls, queue, state):
115
    obj = _QueuedJob.__new__(cls)
116
    obj.queue = queue
117
    obj.id = state["id"]
118
    obj.run_op_index = state["run_op_index"]
119

    
120
    obj.ops = []
121
    obj.log_serial = 0
122
    for op_state in state["ops"]:
123
      op = _QueuedOpCode.Restore(op_state)
124
      for log_entry in op.log:
125
        obj.log_serial = max(obj.log_serial, log_entry[0])
126
      obj.ops.append(op)
127

    
128
    # Condition to wait for changes
129
    obj.change = threading.Condition(obj.queue._lock)
130

    
131
    return obj
132

    
133
  def Serialize(self):
134
    return {
135
      "id": self.id,
136
      "ops": [op.Serialize() for op in self.ops],
137
      "run_op_index": self.run_op_index,
138
      }
139

    
140
  def CalcStatus(self):
141
    status = constants.JOB_STATUS_QUEUED
142

    
143
    all_success = True
144
    for op in self.ops:
145
      if op.status == constants.OP_STATUS_SUCCESS:
146
        continue
147

    
148
      all_success = False
149

    
150
      if op.status == constants.OP_STATUS_QUEUED:
151
        pass
152
      elif op.status == constants.OP_STATUS_RUNNING:
153
        status = constants.JOB_STATUS_RUNNING
154
      elif op.status == constants.OP_STATUS_ERROR:
155
        status = constants.JOB_STATUS_ERROR
156
        # The whole job fails if one opcode failed
157
        break
158
      elif op.status == constants.OP_STATUS_CANCELED:
159
        status = constants.OP_STATUS_CANCELED
160
        break
161

    
162
    if all_success:
163
      status = constants.JOB_STATUS_SUCCESS
164

    
165
    return status
166

    
167
  def GetLogEntries(self, newer_than):
168
    if newer_than is None:
169
      serial = -1
170
    else:
171
      serial = newer_than
172

    
173
    entries = []
174
    for op in self.ops:
175
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
176

    
177
    return entries
178

    
179

    
180
class _JobQueueWorker(workerpool.BaseWorker):
181
  def RunTask(self, job):
182
    """Job executor.
183

184
    This functions processes a job. It is closely tied to the _QueuedJob and
185
    _QueuedOpCode classes.
186

187
    """
188
    logging.debug("Worker %s processing job %s",
189
                  self.worker_id, job.id)
190
    proc = mcpu.Processor(self.pool.queue.context)
191
    queue = job.queue
192
    try:
193
      try:
194
        count = len(job.ops)
195
        for idx, op in enumerate(job.ops):
196
          try:
197
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
198

    
199
            queue.acquire()
200
            try:
201
              job.run_op_index = idx
202
              op.status = constants.OP_STATUS_RUNNING
203
              op.result = None
204
              op.start_timestamp = TimeStampNow()
205
              queue.UpdateJobUnlocked(job)
206

    
207
              input_opcode = op.input
208
            finally:
209
              queue.release()
210

    
211
            def _Log(*args):
212
              """Append a log entry.
213

214
              """
215
              assert len(args) < 3
216

    
217
              if len(args) == 1:
218
                log_type = constants.ELOG_MESSAGE
219
                log_msg = args[0]
220
              else:
221
                log_type, log_msg = args
222

    
223
              # The time is split to make serialization easier and not lose
224
              # precision.
225
              timestamp = utils.SplitTime(time.time())
226

    
227
              queue.acquire()
228
              try:
229
                job.log_serial += 1
230
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
231

    
232
                job.change.notifyAll()
233
              finally:
234
                queue.release()
235

    
236
            # Make sure not to hold lock while _Log is called
237
            result = proc.ExecOpCode(input_opcode, _Log)
238

    
239
            queue.acquire()
240
            try:
241
              op.status = constants.OP_STATUS_SUCCESS
242
              op.result = result
243
              op.end_timestamp = TimeStampNow()
244
              queue.UpdateJobUnlocked(job)
245
            finally:
246
              queue.release()
247

    
248
            logging.debug("Op %s/%s: Successfully finished %s",
249
                          idx + 1, count, op)
250
          except Exception, err:
251
            queue.acquire()
252
            try:
253
              try:
254
                op.status = constants.OP_STATUS_ERROR
255
                op.result = str(err)
256
                op.end_timestamp = TimeStampNow()
257
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
258
              finally:
259
                queue.UpdateJobUnlocked(job)
260
            finally:
261
              queue.release()
262
            raise
263

    
264
      except errors.GenericError, err:
265
        logging.exception("Ganeti exception")
266
      except:
267
        logging.exception("Unhandled exception")
268
    finally:
269
      queue.acquire()
270
      try:
271
        try:
272
          job.run_op_idx = -1
273
          queue.UpdateJobUnlocked(job)
274
        finally:
275
          job_id = job.id
276
          status = job.CalcStatus()
277
      finally:
278
        queue.release()
279
      logging.debug("Worker %s finished job %s, status = %s",
280
                    self.worker_id, job_id, status)
281

    
282

    
283
class _JobQueueWorkerPool(workerpool.WorkerPool):
284
  def __init__(self, queue):
285
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
286
                                              _JobQueueWorker)
287
    self.queue = queue
288

    
289

    
290
class JobQueue(object):
291
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
292

    
293
  def _RequireOpenQueue(fn):
294
    """Decorator for "public" functions.
295

296
    This function should be used for all "public" functions. That is, functions
297
    usually called from other classes.
298

299
    Important: Use this decorator only after utils.LockedMethod!
300

301
    Example:
302
      @utils.LockedMethod
303
      @_RequireOpenQueue
304
      def Example(self):
305
        pass
306

307
    """
308
    def wrapper(self, *args, **kwargs):
309
      assert self._queue_lock is not None, "Queue should be open"
310
      return fn(self, *args, **kwargs)
311
    return wrapper
312

    
313
  def __init__(self, context):
314
    self.context = context
315
    self._memcache = {}
316
    self._my_hostname = utils.HostInfo().name
317

    
318
    # Locking
319
    self._lock = threading.Lock()
320
    self.acquire = self._lock.acquire
321
    self.release = self._lock.release
322

    
323
    # Initialize
324
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
325

    
326
    # Read serial file
327
    self._last_serial = jstore.ReadSerial()
328
    assert self._last_serial is not None, ("Serial file was modified between"
329
                                           " check in jstore and here")
330

    
331
    # Get initial list of nodes
332
    self._nodes = set(self.context.cfg.GetNodeList())
333

    
334
    # Remove master node
335
    try:
336
      self._nodes.remove(self._my_hostname)
337
    except ValueError:
338
      pass
339

    
340
    # TODO: Check consistency across nodes
341

    
342
    # Setup worker pool
343
    self._wpool = _JobQueueWorkerPool(self)
344

    
345
    # We need to lock here because WorkerPool.AddTask() may start a job while
346
    # we're still doing our work.
347
    self.acquire()
348
    try:
349
      for job in self._GetJobsUnlocked(None):
350
        status = job.CalcStatus()
351

    
352
        if status in (constants.JOB_STATUS_QUEUED, ):
353
          self._wpool.AddTask(job)
354

    
355
        elif status in (constants.JOB_STATUS_RUNNING, ):
356
          logging.warning("Unfinished job %s found: %s", job.id, job)
357
          try:
358
            for op in job.ops:
359
              op.status = constants.OP_STATUS_ERROR
360
              op.result = "Unclean master daemon shutdown"
361
          finally:
362
            self.UpdateJobUnlocked(job)
363
    finally:
364
      self.release()
365

    
366
  @utils.LockedMethod
367
  @_RequireOpenQueue
368
  def AddNode(self, node_name):
369
    assert node_name != self._my_hostname
370

    
371
    # Clean queue directory on added node
372
    rpc.call_jobqueue_purge(node_name)
373

    
374
    # Upload the whole queue excluding archived jobs
375
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
376

    
377
    # Upload current serial file
378
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
379

    
380
    for file_name in files:
381
      # Read file content
382
      fd = open(file_name, "r")
383
      try:
384
        content = fd.read()
385
      finally:
386
        fd.close()
387

    
388
      result = rpc.call_jobqueue_update([node_name], file_name, content)
389
      if not result[node_name]:
390
        logging.error("Failed to upload %s to %s", file_name, node_name)
391

    
392
    self._nodes.add(node_name)
393

    
394
  @utils.LockedMethod
395
  @_RequireOpenQueue
396
  def RemoveNode(self, node_name):
397
    try:
398
      # The queue is removed by the "leave node" RPC call.
399
      self._nodes.remove(node_name)
400
    except KeyError:
401
      pass
402

    
403
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
404
    """Writes a file locally and then replicates it to all nodes.
405

406
    """
407
    utils.WriteFile(file_name, data=data)
408

    
409
    failed_nodes = 0
410
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
411
    for node in self._nodes:
412
      if not result[node]:
413
        failed_nodes += 1
414
        logging.error("Copy of job queue file to node %s failed", node)
415

    
416
    # TODO: check failed_nodes
417

    
418
  def _RenameFileUnlocked(self, old, new):
419
    os.rename(old, new)
420

    
421
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
422
    for node in self._nodes:
423
      if not result[node]:
424
        logging.error("Moving %s to %s failed on %s", old, new, node)
425

    
426
    # TODO: check failed nodes
427

    
428
  def _FormatJobID(self, job_id):
429
    if not isinstance(job_id, (int, long)):
430
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
431
    if job_id < 0:
432
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
433

    
434
    return str(job_id)
435

    
436
  def _NewSerialUnlocked(self):
437
    """Generates a new job identifier.
438

439
    Job identifiers are unique during the lifetime of a cluster.
440

441
    Returns: A string representing the job identifier.
442

443
    """
444
    # New number
445
    serial = self._last_serial + 1
446

    
447
    # Write to file
448
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
449
                                        "%s\n" % serial)
450

    
451
    # Keep it only if we were able to write the file
452
    self._last_serial = serial
453

    
454
    return self._FormatJobID(serial)
455

    
456
  @staticmethod
457
  def _GetJobPath(job_id):
458
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
459

    
460
  @staticmethod
461
  def _GetArchivedJobPath(job_id):
462
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
463

    
464
  @classmethod
465
  def _ExtractJobID(cls, name):
466
    m = cls._RE_JOB_FILE.match(name)
467
    if m:
468
      return m.group(1)
469
    else:
470
      return None
471

    
472
  def _GetJobIDsUnlocked(self, archived=False):
473
    """Return all known job IDs.
474

475
    If the parameter archived is True, archived jobs IDs will be
476
    included. Currently this argument is unused.
477

478
    The method only looks at disk because it's a requirement that all
479
    jobs are present on disk (so in the _memcache we don't have any
480
    extra IDs).
481

482
    """
483
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
484
    jlist.sort()
485
    return jlist
486

    
487
  def _ListJobFiles(self):
488
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
489
            if self._RE_JOB_FILE.match(name)]
490

    
491
  def _LoadJobUnlocked(self, job_id):
492
    if job_id in self._memcache:
493
      logging.debug("Found job %s in memcache", job_id)
494
      return self._memcache[job_id]
495

    
496
    filepath = self._GetJobPath(job_id)
497
    logging.debug("Loading job from %s", filepath)
498
    try:
499
      fd = open(filepath, "r")
500
    except IOError, err:
501
      if err.errno in (errno.ENOENT, ):
502
        return None
503
      raise
504
    try:
505
      data = serializer.LoadJson(fd.read())
506
    finally:
507
      fd.close()
508

    
509
    job = _QueuedJob.Restore(self, data)
510
    self._memcache[job_id] = job
511
    logging.debug("Added job %s to the cache", job_id)
512
    return job
513

    
514
  def _GetJobsUnlocked(self, job_ids):
515
    if not job_ids:
516
      job_ids = self._GetJobIDsUnlocked()
517

    
518
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
519

    
520
  @utils.LockedMethod
521
  @_RequireOpenQueue
522
  def SubmitJob(self, ops):
523
    """Create and store a new job.
524

525
    This enters the job into our job queue and also puts it on the new
526
    queue, in order for it to be picked up by the queue processors.
527

528
    @type ops: list
529
    @param ops: The list of OpCodes that will become the new job.
530

531
    """
532
    # Get job identifier
533
    job_id = self._NewSerialUnlocked()
534
    job = _QueuedJob(self, job_id, ops)
535

    
536
    # Write to disk
537
    self.UpdateJobUnlocked(job)
538

    
539
    logging.debug("Added new job %s to the cache", job_id)
540
    self._memcache[job_id] = job
541

    
542
    # Add to worker pool
543
    self._wpool.AddTask(job)
544

    
545
    return job.id
546

    
547
  @_RequireOpenQueue
548
  def UpdateJobUnlocked(self, job):
549
    filename = self._GetJobPath(job.id)
550
    data = serializer.DumpJson(job.Serialize(), indent=False)
551
    logging.debug("Writing job %s to %s", job.id, filename)
552
    self._WriteAndReplicateFileUnlocked(filename, data)
553
    self._CleanCacheUnlocked([job.id])
554

    
555
    # Notify waiters about potential changes
556
    job.change.notifyAll()
557

    
558
  def _CleanCacheUnlocked(self, exclude):
559
    """Clean the memory cache.
560

561
    The exceptions argument contains job IDs that should not be
562
    cleaned.
563

564
    """
565
    assert isinstance(exclude, list)
566

    
567
    for job in self._memcache.values():
568
      if job.id in exclude:
569
        continue
570
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
571
                                  constants.JOB_STATUS_RUNNING):
572
        logging.debug("Cleaning job %s from the cache", job.id)
573
        try:
574
          del self._memcache[job.id]
575
        except KeyError:
576
          pass
577

    
578
  @utils.LockedMethod
579
  @_RequireOpenQueue
580
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
581
    """Waits for changes in a job.
582

583
    @type job_id: string
584
    @param job_id: Job identifier
585
    @type fields: list of strings
586
    @param fields: Which fields to check for changes
587
    @type prev_job_info: list or None
588
    @param prev_job_info: Last job information returned
589
    @type prev_log_serial: int
590
    @param prev_log_serial: Last job message serial number
591

592
    """
593
    logging.debug("Waiting for changes in job %s", job_id)
594

    
595
    while True:
596
      job = self._LoadJobUnlocked(job_id)
597
      if not job:
598
        logging.debug("Job %s not found", job_id)
599
        new_state = None
600
        break
601

    
602
      status = job.CalcStatus()
603
      job_info = self._GetJobInfoUnlocked(job, fields)
604
      log_entries = job.GetLogEntries(prev_log_serial)
605

    
606
      # Serializing and deserializing data can cause type changes (e.g. from
607
      # tuple to list) or precision loss. We're doing it here so that we get
608
      # the same modifications as the data received from the client. Without
609
      # this, the comparison afterwards might fail without the data being
610
      # significantly different.
611
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
612
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
613

    
614
      if status not in (constants.JOB_STATUS_QUEUED,
615
                        constants.JOB_STATUS_RUNNING):
616
        # Don't even try to wait if the job is no longer running, there will be
617
        # no changes.
618
        break
619

    
620
      if (prev_job_info != job_info or
621
          (log_entries and prev_log_serial != log_entries[0][0])):
622
        break
623

    
624
      logging.debug("Waiting again")
625

    
626
      # Release the queue lock while waiting
627
      job.change.wait()
628

    
629
    logging.debug("Job %s changed", job_id)
630

    
631
    return (job_info, log_entries)
632

    
633
  @utils.LockedMethod
634
  @_RequireOpenQueue
635
  def CancelJob(self, job_id):
636
    """Cancels a job.
637

638
    @type job_id: string
639
    @param job_id: Job ID of job to be cancelled.
640

641
    """
642
    logging.debug("Cancelling job %s", job_id)
643

    
644
    job = self._LoadJobUnlocked(job_id)
645
    if not job:
646
      logging.debug("Job %s not found", job_id)
647
      return
648

    
649
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
650
      logging.debug("Job %s is no longer in the queue", job.id)
651
      return
652

    
653
    try:
654
      for op in job.ops:
655
        op.status = constants.OP_STATUS_ERROR
656
        op.result = "Job cancelled by request"
657
    finally:
658
      self.UpdateJobUnlocked(job)
659

    
660
  @utils.LockedMethod
661
  @_RequireOpenQueue
662
  def ArchiveJob(self, job_id):
663
    """Archives a job.
664

665
    @type job_id: string
666
    @param job_id: Job ID of job to be archived.
667

668
    """
669
    logging.debug("Archiving job %s", job_id)
670

    
671
    job = self._LoadJobUnlocked(job_id)
672
    if not job:
673
      logging.debug("Job %s not found", job_id)
674
      return
675

    
676
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
677
                                constants.JOB_STATUS_SUCCESS,
678
                                constants.JOB_STATUS_ERROR):
679
      logging.debug("Job %s is not yet done", job.id)
680
      return
681

    
682
    try:
683
      old = self._GetJobPath(job.id)
684
      new = self._GetArchivedJobPath(job.id)
685

    
686
      self._RenameFileUnlocked(old, new)
687

    
688
      logging.debug("Successfully archived job %s", job.id)
689
    finally:
690
      # Cleaning the cache because we don't know what os.rename actually did
691
      # and to be on the safe side.
692
      self._CleanCacheUnlocked([])
693

    
694
  def _GetJobInfoUnlocked(self, job, fields):
695
    row = []
696
    for fname in fields:
697
      if fname == "id":
698
        row.append(job.id)
699
      elif fname == "status":
700
        row.append(job.CalcStatus())
701
      elif fname == "ops":
702
        row.append([op.input.__getstate__() for op in job.ops])
703
      elif fname == "opresult":
704
        row.append([op.result for op in job.ops])
705
      elif fname == "opstatus":
706
        row.append([op.status for op in job.ops])
707
      else:
708
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
709
    return row
710

    
711
  @utils.LockedMethod
712
  @_RequireOpenQueue
713
  def QueryJobs(self, job_ids, fields):
714
    """Returns a list of jobs in queue.
715

716
    Args:
717
    - job_ids: Sequence of job identifiers or None for all
718
    - fields: Names of fields to return
719

720
    """
721
    jobs = []
722

    
723
    for job in self._GetJobsUnlocked(job_ids):
724
      if job is None:
725
        jobs.append(None)
726
      else:
727
        jobs.append(self._GetJobInfoUnlocked(job, fields))
728

    
729
    return jobs
730

    
731
  @utils.LockedMethod
732
  @_RequireOpenQueue
733
  def Shutdown(self):
734
    """Stops the job queue.
735

736
    """
737
    self._wpool.TerminateWorkers()
738

    
739
    self._queue_lock.Close()
740
    self._queue_lock = None