Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 6c5a7090

History | View | Annotate | Download (19.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking:
25
There's a single, large lock in the JobQueue class. It's used by all other
26
classes in this module.
27

28
"""
29

    
30
import os
31
import logging
32
import threading
33
import errno
34
import re
35
import time
36

    
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import workerpool
40
from ganeti import opcodes
41
from ganeti import errors
42
from ganeti import mcpu
43
from ganeti import utils
44
from ganeti import jstore
45
from ganeti import rpc
46

    
47

    
48
JOBQUEUE_THREADS = 5
49

    
50

    
51
class _QueuedOpCode(object):
52
  """Encasulates an opcode object.
53

54
  The 'log' attribute holds the execution log and consists of tuples
55
  of the form (log_serial, timestamp, level, message).
56

57
  """
58
  def __init__(self, op):
59
    self.input = op
60
    self.status = constants.OP_STATUS_QUEUED
61
    self.result = None
62
    self.log = []
63

    
64
  @classmethod
65
  def Restore(cls, state):
66
    obj = _QueuedOpCode.__new__(cls)
67
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
68
    obj.status = state["status"]
69
    obj.result = state["result"]
70
    obj.log = state["log"]
71
    return obj
72

    
73
  def Serialize(self):
74
    return {
75
      "input": self.input.__getstate__(),
76
      "status": self.status,
77
      "result": self.result,
78
      "log": self.log,
79
      }
80

    
81

    
82
class _QueuedJob(object):
83
  """In-memory job representation.
84

85
  This is what we use to track the user-submitted jobs. Locking must be taken
86
  care of by users of this class.
87

88
  """
89
  def __init__(self, queue, job_id, ops):
90
    if not ops:
91
      # TODO
92
      raise Exception("No opcodes")
93

    
94
    self.queue = queue
95
    self.id = job_id
96
    self.ops = [_QueuedOpCode(op) for op in ops]
97
    self.run_op_index = -1
98
    self.log_serial = 0
99

    
100
    # Condition to wait for changes
101
    self.change = threading.Condition(self.queue._lock)
102

    
103
  @classmethod
104
  def Restore(cls, queue, state):
105
    obj = _QueuedJob.__new__(cls)
106
    obj.queue = queue
107
    obj.id = state["id"]
108
    obj.run_op_index = state["run_op_index"]
109

    
110
    obj.ops = []
111
    obj.log_serial = 0
112
    for op_state in state["ops"]:
113
      op = _QueuedOpCode.Restore(op_state)
114
      for log_entry in op.log:
115
        obj.log_serial = max(obj.log_serial, log_entry[0])
116
      obj.ops.append(op)
117

    
118
    # Condition to wait for changes
119
    obj.change = threading.Condition(obj.queue._lock)
120

    
121
    return obj
122

    
123
  def Serialize(self):
124
    return {
125
      "id": self.id,
126
      "ops": [op.Serialize() for op in self.ops],
127
      "run_op_index": self.run_op_index,
128
      }
129

    
130
  def CalcStatus(self):
131
    status = constants.JOB_STATUS_QUEUED
132

    
133
    all_success = True
134
    for op in self.ops:
135
      if op.status == constants.OP_STATUS_SUCCESS:
136
        continue
137

    
138
      all_success = False
139

    
140
      if op.status == constants.OP_STATUS_QUEUED:
141
        pass
142
      elif op.status == constants.OP_STATUS_RUNNING:
143
        status = constants.JOB_STATUS_RUNNING
144
      elif op.status == constants.OP_STATUS_ERROR:
145
        status = constants.JOB_STATUS_ERROR
146
        # The whole job fails if one opcode failed
147
        break
148
      elif op.status == constants.OP_STATUS_CANCELED:
149
        status = constants.OP_STATUS_CANCELED
150
        break
151

    
152
    if all_success:
153
      status = constants.JOB_STATUS_SUCCESS
154

    
155
    return status
156

    
157
  def GetLogEntries(self, newer_than):
158
    if newer_than is None:
159
      serial = -1
160
    else:
161
      serial = newer_than
162

    
163
    entries = []
164
    for op in self.ops:
165
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
166

    
167
    return entries
168

    
169

    
170
class _JobQueueWorker(workerpool.BaseWorker):
171
  def RunTask(self, job):
172
    """Job executor.
173

174
    This functions processes a job. It is closely tied to the _QueuedJob and
175
    _QueuedOpCode classes.
176

177
    """
178
    logging.debug("Worker %s processing job %s",
179
                  self.worker_id, job.id)
180
    proc = mcpu.Processor(self.pool.queue.context)
181
    queue = job.queue
182
    try:
183
      try:
184
        count = len(job.ops)
185
        for idx, op in enumerate(job.ops):
186
          try:
187
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
188

    
189
            queue.acquire()
190
            try:
191
              job.run_op_index = idx
192
              op.status = constants.OP_STATUS_RUNNING
193
              op.result = None
194
              queue.UpdateJobUnlocked(job)
195

    
196
              input_opcode = op.input
197
            finally:
198
              queue.release()
199

    
200
            def _Log(*args):
201
              """Append a log entry.
202

203
              """
204
              assert len(args) < 3
205

    
206
              if len(args) == 1:
207
                log_type = constants.ELOG_MESSAGE
208
                log_msg = args[0]
209
              else:
210
                log_type, log_msg = args
211

    
212
              # The time is split to make serialization easier and not lose
213
              # precision.
214
              timestamp = utils.SplitTime(time.time())
215

    
216
              queue.acquire()
217
              try:
218
                job.log_serial += 1
219
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
220

    
221
                job.change.notifyAll()
222
              finally:
223
                queue.release()
224

    
225
            # Make sure not to hold lock while _Log is called
226
            result = proc.ExecOpCode(input_opcode, _Log)
227

    
228
            queue.acquire()
229
            try:
230
              op.status = constants.OP_STATUS_SUCCESS
231
              op.result = result
232
              queue.UpdateJobUnlocked(job)
233
            finally:
234
              queue.release()
235

    
236
            logging.debug("Op %s/%s: Successfully finished %s",
237
                          idx + 1, count, op)
238
          except Exception, err:
239
            queue.acquire()
240
            try:
241
              try:
242
                op.status = constants.OP_STATUS_ERROR
243
                op.result = str(err)
244
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
245
              finally:
246
                queue.UpdateJobUnlocked(job)
247
            finally:
248
              queue.release()
249
            raise
250

    
251
      except errors.GenericError, err:
252
        logging.exception("Ganeti exception")
253
      except:
254
        logging.exception("Unhandled exception")
255
    finally:
256
      queue.acquire()
257
      try:
258
        job_id = job.id
259
        status = job.CalcStatus()
260
      finally:
261
        queue.release()
262
      logging.debug("Worker %s finished job %s, status = %s",
263
                    self.worker_id, job_id, status)
264

    
265

    
266
class _JobQueueWorkerPool(workerpool.WorkerPool):
267
  def __init__(self, queue):
268
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
269
                                              _JobQueueWorker)
270
    self.queue = queue
271

    
272

    
273
class JobQueue(object):
274
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
275

    
276
  def _RequireOpenQueue(fn):
277
    """Decorator for "public" functions.
278

279
    This function should be used for all "public" functions. That is, functions
280
    usually called from other classes.
281

282
    Important: Use this decorator only after utils.LockedMethod!
283

284
    Example:
285
      @utils.LockedMethod
286
      @_RequireOpenQueue
287
      def Example(self):
288
        pass
289

290
    """
291
    def wrapper(self, *args, **kwargs):
292
      assert self._queue_lock is not None, "Queue should be open"
293
      return fn(self, *args, **kwargs)
294
    return wrapper
295

    
296
  def __init__(self, context):
297
    self.context = context
298
    self._memcache = {}
299
    self._my_hostname = utils.HostInfo().name
300

    
301
    # Locking
302
    self._lock = threading.Lock()
303
    self.acquire = self._lock.acquire
304
    self.release = self._lock.release
305

    
306
    # Initialize
307
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
308

    
309
    # Read serial file
310
    self._last_serial = jstore.ReadSerial()
311
    assert self._last_serial is not None, ("Serial file was modified between"
312
                                           " check in jstore and here")
313

    
314
    # Get initial list of nodes
315
    self._nodes = set(self.context.cfg.GetNodeList())
316

    
317
    # Remove master node
318
    try:
319
      self._nodes.remove(self._my_hostname)
320
    except ValueError:
321
      pass
322

    
323
    # TODO: Check consistency across nodes
324

    
325
    # Setup worker pool
326
    self._wpool = _JobQueueWorkerPool(self)
327

    
328
    # We need to lock here because WorkerPool.AddTask() may start a job while
329
    # we're still doing our work.
330
    self.acquire()
331
    try:
332
      for job in self._GetJobsUnlocked(None):
333
        status = job.CalcStatus()
334

    
335
        if status in (constants.JOB_STATUS_QUEUED, ):
336
          self._wpool.AddTask(job)
337

    
338
        elif status in (constants.JOB_STATUS_RUNNING, ):
339
          logging.warning("Unfinished job %s found: %s", job.id, job)
340
          try:
341
            for op in job.ops:
342
              op.status = constants.OP_STATUS_ERROR
343
              op.result = "Unclean master daemon shutdown"
344
          finally:
345
            self.UpdateJobUnlocked(job)
346
    finally:
347
      self.release()
348

    
349
  @utils.LockedMethod
350
  @_RequireOpenQueue
351
  def AddNode(self, node_name):
352
    assert node_name != self._my_hostname
353

    
354
    # Clean queue directory on added node
355
    rpc.call_jobqueue_purge(node_name)
356

    
357
    # Upload the whole queue excluding archived jobs
358
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
359

    
360
    # Upload current serial file
361
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
362

    
363
    for file_name in files:
364
      # Read file content
365
      fd = open(file_name, "r")
366
      try:
367
        content = fd.read()
368
      finally:
369
        fd.close()
370

    
371
      result = rpc.call_jobqueue_update([node_name], file_name, content)
372
      if not result[node_name]:
373
        logging.error("Failed to upload %s to %s", file_name, node_name)
374

    
375
    self._nodes.add(node_name)
376

    
377
  @utils.LockedMethod
378
  @_RequireOpenQueue
379
  def RemoveNode(self, node_name):
380
    try:
381
      # The queue is removed by the "leave node" RPC call.
382
      self._nodes.remove(node_name)
383
    except KeyError:
384
      pass
385

    
386
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
387
    """Writes a file locally and then replicates it to all nodes.
388

389
    """
390
    utils.WriteFile(file_name, data=data)
391

    
392
    failed_nodes = 0
393
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
394
    for node in self._nodes:
395
      if not result[node]:
396
        failed_nodes += 1
397
        logging.error("Copy of job queue file to node %s failed", node)
398

    
399
    # TODO: check failed_nodes
400

    
401
  def _RenameFileUnlocked(self, old, new):
402
    os.rename(old, new)
403

    
404
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
405
    for node in self._nodes:
406
      if not result[node]:
407
        logging.error("Moving %s to %s failed on %s", old, new, node)
408

    
409
    # TODO: check failed nodes
410

    
411
  def _FormatJobID(self, job_id):
412
    if not isinstance(job_id, (int, long)):
413
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
414
    if job_id < 0:
415
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
416

    
417
    return str(job_id)
418

    
419
  def _NewSerialUnlocked(self):
420
    """Generates a new job identifier.
421

422
    Job identifiers are unique during the lifetime of a cluster.
423

424
    Returns: A string representing the job identifier.
425

426
    """
427
    # New number
428
    serial = self._last_serial + 1
429

    
430
    # Write to file
431
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
432
                                        "%s\n" % serial)
433

    
434
    # Keep it only if we were able to write the file
435
    self._last_serial = serial
436

    
437
    return self._FormatJobID(serial)
438

    
439
  @staticmethod
440
  def _GetJobPath(job_id):
441
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
442

    
443
  @staticmethod
444
  def _GetArchivedJobPath(job_id):
445
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
446

    
447
  @classmethod
448
  def _ExtractJobID(cls, name):
449
    m = cls._RE_JOB_FILE.match(name)
450
    if m:
451
      return m.group(1)
452
    else:
453
      return None
454

    
455
  def _GetJobIDsUnlocked(self, archived=False):
456
    """Return all known job IDs.
457

458
    If the parameter archived is True, archived jobs IDs will be
459
    included. Currently this argument is unused.
460

461
    The method only looks at disk because it's a requirement that all
462
    jobs are present on disk (so in the _memcache we don't have any
463
    extra IDs).
464

465
    """
466
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
467
    jlist.sort()
468
    return jlist
469

    
470
  def _ListJobFiles(self):
471
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
472
            if self._RE_JOB_FILE.match(name)]
473

    
474
  def _LoadJobUnlocked(self, job_id):
475
    if job_id in self._memcache:
476
      logging.debug("Found job %s in memcache", job_id)
477
      return self._memcache[job_id]
478

    
479
    filepath = self._GetJobPath(job_id)
480
    logging.debug("Loading job from %s", filepath)
481
    try:
482
      fd = open(filepath, "r")
483
    except IOError, err:
484
      if err.errno in (errno.ENOENT, ):
485
        return None
486
      raise
487
    try:
488
      data = serializer.LoadJson(fd.read())
489
    finally:
490
      fd.close()
491

    
492
    job = _QueuedJob.Restore(self, data)
493
    self._memcache[job_id] = job
494
    logging.debug("Added job %s to the cache", job_id)
495
    return job
496

    
497
  def _GetJobsUnlocked(self, job_ids):
498
    if not job_ids:
499
      job_ids = self._GetJobIDsUnlocked()
500

    
501
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
502

    
503
  @utils.LockedMethod
504
  @_RequireOpenQueue
505
  def SubmitJob(self, ops):
506
    """Create and store a new job.
507

508
    This enters the job into our job queue and also puts it on the new
509
    queue, in order for it to be picked up by the queue processors.
510

511
    @type ops: list
512
    @param ops: The list of OpCodes that will become the new job.
513

514
    """
515
    # Get job identifier
516
    job_id = self._NewSerialUnlocked()
517
    job = _QueuedJob(self, job_id, ops)
518

    
519
    # Write to disk
520
    self.UpdateJobUnlocked(job)
521

    
522
    logging.debug("Added new job %s to the cache", job_id)
523
    self._memcache[job_id] = job
524

    
525
    # Add to worker pool
526
    self._wpool.AddTask(job)
527

    
528
    return job.id
529

    
530
  @_RequireOpenQueue
531
  def UpdateJobUnlocked(self, job):
532
    filename = self._GetJobPath(job.id)
533
    data = serializer.DumpJson(job.Serialize(), indent=False)
534
    logging.debug("Writing job %s to %s", job.id, filename)
535
    self._WriteAndReplicateFileUnlocked(filename, data)
536
    self._CleanCacheUnlocked([job.id])
537

    
538
    # Notify waiters about potential changes
539
    job.change.notifyAll()
540

    
541
  def _CleanCacheUnlocked(self, exclude):
542
    """Clean the memory cache.
543

544
    The exceptions argument contains job IDs that should not be
545
    cleaned.
546

547
    """
548
    assert isinstance(exclude, list)
549

    
550
    for job in self._memcache.values():
551
      if job.id in exclude:
552
        continue
553
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
554
                                  constants.JOB_STATUS_RUNNING):
555
        logging.debug("Cleaning job %s from the cache", job.id)
556
        try:
557
          del self._memcache[job.id]
558
        except KeyError:
559
          pass
560

    
561
  @utils.LockedMethod
562
  @_RequireOpenQueue
563
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
564
    """Waits for changes in a job.
565

566
    @type job_id: string
567
    @param job_id: Job identifier
568
    @type fields: list of strings
569
    @param fields: Which fields to check for changes
570
    @type prev_job_info: list or None
571
    @param prev_job_info: Last job information returned
572
    @type prev_log_serial: int
573
    @param prev_log_serial: Last job message serial number
574

575
    """
576
    logging.debug("Waiting for changes in job %s", job_id)
577

    
578
    while True:
579
      job = self._LoadJobUnlocked(job_id)
580
      if not job:
581
        logging.debug("Job %s not found", job_id)
582
        new_state = None
583
        break
584

    
585
      status = job.CalcStatus()
586
      job_info = self._GetJobInfoUnlocked(job, fields)
587
      log_entries = job.GetLogEntries(prev_log_serial)
588

    
589
      # Serializing and deserializing data can cause type changes (e.g. from
590
      # tuple to list) or precision loss. We're doing it here so that we get
591
      # the same modifications as the data received from the client. Without
592
      # this, the comparison afterwards might fail without the data being
593
      # significantly different.
594
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
595
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
596

    
597
      if status not in (constants.JOB_STATUS_QUEUED,
598
                        constants.JOB_STATUS_RUNNING):
599
        # Don't even try to wait if the job is no longer running, there will be
600
        # no changes.
601
        break
602

    
603
      if (prev_job_info != job_info or
604
          (log_entries and prev_log_serial != log_entries[0][0])):
605
        break
606

    
607
      logging.debug("Waiting again")
608

    
609
      # Release the queue lock while waiting
610
      job.change.wait()
611

    
612
    logging.debug("Job %s changed", job_id)
613

    
614
    return (job_info, log_entries)
615

    
616
  @utils.LockedMethod
617
  @_RequireOpenQueue
618
  def CancelJob(self, job_id):
619
    """Cancels a job.
620

621
    @type job_id: string
622
    @param job_id: Job ID of job to be cancelled.
623

624
    """
625
    logging.debug("Cancelling job %s", job_id)
626

    
627
    job = self._LoadJobUnlocked(job_id)
628
    if not job:
629
      logging.debug("Job %s not found", job_id)
630
      return
631

    
632
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
633
      logging.debug("Job %s is no longer in the queue", job.id)
634
      return
635

    
636
    try:
637
      for op in job.ops:
638
        op.status = constants.OP_STATUS_ERROR
639
        op.result = "Job cancelled by request"
640
    finally:
641
      self.UpdateJobUnlocked(job)
642

    
643
  @utils.LockedMethod
644
  @_RequireOpenQueue
645
  def ArchiveJob(self, job_id):
646
    """Archives a job.
647

648
    @type job_id: string
649
    @param job_id: Job ID of job to be archived.
650

651
    """
652
    logging.debug("Archiving job %s", job_id)
653

    
654
    job = self._LoadJobUnlocked(job_id)
655
    if not job:
656
      logging.debug("Job %s not found", job_id)
657
      return
658

    
659
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
660
                                constants.JOB_STATUS_SUCCESS,
661
                                constants.JOB_STATUS_ERROR):
662
      logging.debug("Job %s is not yet done", job.id)
663
      return
664

    
665
    try:
666
      old = self._GetJobPath(job.id)
667
      new = self._GetArchivedJobPath(job.id)
668

    
669
      self._RenameFileUnlocked(old, new)
670

    
671
      logging.debug("Successfully archived job %s", job.id)
672
    finally:
673
      # Cleaning the cache because we don't know what os.rename actually did
674
      # and to be on the safe side.
675
      self._CleanCacheUnlocked([])
676

    
677
  def _GetJobInfoUnlocked(self, job, fields):
678
    row = []
679
    for fname in fields:
680
      if fname == "id":
681
        row.append(job.id)
682
      elif fname == "status":
683
        row.append(job.CalcStatus())
684
      elif fname == "ops":
685
        row.append([op.input.__getstate__() for op in job.ops])
686
      elif fname == "opresult":
687
        row.append([op.result for op in job.ops])
688
      elif fname == "opstatus":
689
        row.append([op.status for op in job.ops])
690
      else:
691
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
692
    return row
693

    
694
  @utils.LockedMethod
695
  @_RequireOpenQueue
696
  def QueryJobs(self, job_ids, fields):
697
    """Returns a list of jobs in queue.
698

699
    Args:
700
    - job_ids: Sequence of job identifiers or None for all
701
    - fields: Names of fields to return
702

703
    """
704
    jobs = []
705

    
706
    for job in self._GetJobsUnlocked(job_ids):
707
      if job is None:
708
        jobs.append(None)
709
      else:
710
        jobs.append(self._GetJobInfoUnlocked(job, fields))
711

    
712
    return jobs
713

    
714
  @utils.LockedMethod
715
  @_RequireOpenQueue
716
  def Shutdown(self):
717
    """Stops the job queue.
718

719
    """
720
    self._wpool.TerminateWorkers()
721

    
722
    self._queue_lock.Close()
723
    self._queue_lock = None