Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 65548ed5

History | View | Annotate | Download (19.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking:
25
There's a single, large lock in the JobQueue class. It's used by all other
26
classes in this module.
27

28
"""
29

    
30
import os
31
import logging
32
import threading
33
import errno
34
import re
35
import time
36

    
37
from ganeti import constants
38
from ganeti import serializer
39
from ganeti import workerpool
40
from ganeti import opcodes
41
from ganeti import errors
42
from ganeti import mcpu
43
from ganeti import utils
44
from ganeti import jstore
45
from ganeti import rpc
46

    
47

    
48
JOBQUEUE_THREADS = 5
49

    
50

    
51
class _QueuedOpCode(object):
52
  """Encasulates an opcode object.
53

54
  The 'log' attribute holds the execution log and consists of tuples
55
  of the form (log_serial, timestamp, level, message).
56

57
  """
58
  def __init__(self, op):
59
    self.input = op
60
    self.status = constants.OP_STATUS_QUEUED
61
    self.result = None
62
    self.log = []
63

    
64
  @classmethod
65
  def Restore(cls, state):
66
    obj = _QueuedOpCode.__new__(cls)
67
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
68
    obj.status = state["status"]
69
    obj.result = state["result"]
70
    obj.log = state["log"]
71
    return obj
72

    
73
  def Serialize(self):
74
    return {
75
      "input": self.input.__getstate__(),
76
      "status": self.status,
77
      "result": self.result,
78
      "log": self.log,
79
      }
80

    
81

    
82
class _QueuedJob(object):
83
  """In-memory job representation.
84

85
  This is what we use to track the user-submitted jobs. Locking must be taken
86
  care of by users of this class.
87

88
  """
89
  def __init__(self, queue, job_id, ops):
90
    if not ops:
91
      # TODO
92
      raise Exception("No opcodes")
93

    
94
    self.queue = queue
95
    self.id = job_id
96
    self.ops = [_QueuedOpCode(op) for op in ops]
97
    self.run_op_index = -1
98
    self.log_serial = 0
99

    
100
    # Condition to wait for changes
101
    self.change = threading.Condition(self.queue._lock)
102

    
103
  @classmethod
104
  def Restore(cls, queue, state):
105
    obj = _QueuedJob.__new__(cls)
106
    obj.queue = queue
107
    obj.id = state["id"]
108
    obj.run_op_index = state["run_op_index"]
109

    
110
    obj.ops = []
111
    obj.log_serial = 0
112
    for op_state in state["ops"]:
113
      op = _QueuedOpCode.Restore(op_state)
114
      for log_entry in op.log:
115
        obj.log_serial = max(obj.log_serial, log_entry[0])
116
      obj.ops.append(op)
117

    
118
    # Condition to wait for changes
119
    obj.change = threading.Condition(obj.queue._lock)
120

    
121
    return obj
122

    
123
  def Serialize(self):
124
    return {
125
      "id": self.id,
126
      "ops": [op.Serialize() for op in self.ops],
127
      "run_op_index": self.run_op_index,
128
      }
129

    
130
  def CalcStatus(self):
131
    status = constants.JOB_STATUS_QUEUED
132

    
133
    all_success = True
134
    for op in self.ops:
135
      if op.status == constants.OP_STATUS_SUCCESS:
136
        continue
137

    
138
      all_success = False
139

    
140
      if op.status == constants.OP_STATUS_QUEUED:
141
        pass
142
      elif op.status == constants.OP_STATUS_RUNNING:
143
        status = constants.JOB_STATUS_RUNNING
144
      elif op.status == constants.OP_STATUS_ERROR:
145
        status = constants.JOB_STATUS_ERROR
146
        # The whole job fails if one opcode failed
147
        break
148
      elif op.status == constants.OP_STATUS_CANCELED:
149
        status = constants.OP_STATUS_CANCELED
150
        break
151

    
152
    if all_success:
153
      status = constants.JOB_STATUS_SUCCESS
154

    
155
    return status
156

    
157
  def GetLogEntries(self, newer_than):
158
    if newer_than is None:
159
      serial = -1
160
    else:
161
      serial = newer_than
162

    
163
    entries = []
164
    for op in self.ops:
165
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
166

    
167
    return entries
168

    
169

    
170
class _JobQueueWorker(workerpool.BaseWorker):
171
  def RunTask(self, job):
172
    """Job executor.
173

174
    This functions processes a job. It is closely tied to the _QueuedJob and
175
    _QueuedOpCode classes.
176

177
    """
178
    logging.debug("Worker %s processing job %s",
179
                  self.worker_id, job.id)
180
    proc = mcpu.Processor(self.pool.queue.context)
181
    queue = job.queue
182
    try:
183
      try:
184
        count = len(job.ops)
185
        for idx, op in enumerate(job.ops):
186
          try:
187
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
188

    
189
            queue.acquire()
190
            try:
191
              job.run_op_index = idx
192
              op.status = constants.OP_STATUS_RUNNING
193
              op.result = None
194
              queue.UpdateJobUnlocked(job)
195

    
196
              input_opcode = op.input
197
            finally:
198
              queue.release()
199

    
200
            def _Log(*args):
201
              """Append a log entry.
202

203
              """
204
              assert len(args) < 3
205

    
206
              if len(args) == 1:
207
                log_type = constants.ELOG_MESSAGE
208
                log_msg = args[0]
209
              else:
210
                log_type, log_msg = args
211

    
212
              # The time is split to make serialization easier and not lose
213
              # precision.
214
              timestamp = utils.SplitTime(time.time())
215

    
216
              queue.acquire()
217
              try:
218
                job.log_serial += 1
219
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
220

    
221
                job.change.notifyAll()
222
              finally:
223
                queue.release()
224

    
225
            # Make sure not to hold lock while _Log is called
226
            result = proc.ExecOpCode(input_opcode, _Log)
227

    
228
            queue.acquire()
229
            try:
230
              op.status = constants.OP_STATUS_SUCCESS
231
              op.result = result
232
              queue.UpdateJobUnlocked(job)
233
            finally:
234
              queue.release()
235

    
236
            logging.debug("Op %s/%s: Successfully finished %s",
237
                          idx + 1, count, op)
238
          except Exception, err:
239
            queue.acquire()
240
            try:
241
              try:
242
                op.status = constants.OP_STATUS_ERROR
243
                op.result = str(err)
244
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
245
              finally:
246
                queue.UpdateJobUnlocked(job)
247
            finally:
248
              queue.release()
249
            raise
250

    
251
      except errors.GenericError, err:
252
        logging.exception("Ganeti exception")
253
      except:
254
        logging.exception("Unhandled exception")
255
    finally:
256
      queue.acquire()
257
      try:
258
        try:
259
          job.run_op_idx = -1
260
          queue.UpdateJobUnlocked(job)
261
        finally:
262
          job_id = job.id
263
          status = job.CalcStatus()
264
      finally:
265
        queue.release()
266
      logging.debug("Worker %s finished job %s, status = %s",
267
                    self.worker_id, job_id, status)
268

    
269

    
270
class _JobQueueWorkerPool(workerpool.WorkerPool):
271
  def __init__(self, queue):
272
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
273
                                              _JobQueueWorker)
274
    self.queue = queue
275

    
276

    
277
class JobQueue(object):
278
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
279

    
280
  def _RequireOpenQueue(fn):
281
    """Decorator for "public" functions.
282

283
    This function should be used for all "public" functions. That is, functions
284
    usually called from other classes.
285

286
    Important: Use this decorator only after utils.LockedMethod!
287

288
    Example:
289
      @utils.LockedMethod
290
      @_RequireOpenQueue
291
      def Example(self):
292
        pass
293

294
    """
295
    def wrapper(self, *args, **kwargs):
296
      assert self._queue_lock is not None, "Queue should be open"
297
      return fn(self, *args, **kwargs)
298
    return wrapper
299

    
300
  def __init__(self, context):
301
    self.context = context
302
    self._memcache = {}
303
    self._my_hostname = utils.HostInfo().name
304

    
305
    # Locking
306
    self._lock = threading.Lock()
307
    self.acquire = self._lock.acquire
308
    self.release = self._lock.release
309

    
310
    # Initialize
311
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
312

    
313
    # Read serial file
314
    self._last_serial = jstore.ReadSerial()
315
    assert self._last_serial is not None, ("Serial file was modified between"
316
                                           " check in jstore and here")
317

    
318
    # Get initial list of nodes
319
    self._nodes = set(self.context.cfg.GetNodeList())
320

    
321
    # Remove master node
322
    try:
323
      self._nodes.remove(self._my_hostname)
324
    except ValueError:
325
      pass
326

    
327
    # TODO: Check consistency across nodes
328

    
329
    # Setup worker pool
330
    self._wpool = _JobQueueWorkerPool(self)
331

    
332
    # We need to lock here because WorkerPool.AddTask() may start a job while
333
    # we're still doing our work.
334
    self.acquire()
335
    try:
336
      for job in self._GetJobsUnlocked(None):
337
        status = job.CalcStatus()
338

    
339
        if status in (constants.JOB_STATUS_QUEUED, ):
340
          self._wpool.AddTask(job)
341

    
342
        elif status in (constants.JOB_STATUS_RUNNING, ):
343
          logging.warning("Unfinished job %s found: %s", job.id, job)
344
          try:
345
            for op in job.ops:
346
              op.status = constants.OP_STATUS_ERROR
347
              op.result = "Unclean master daemon shutdown"
348
          finally:
349
            self.UpdateJobUnlocked(job)
350
    finally:
351
      self.release()
352

    
353
  @utils.LockedMethod
354
  @_RequireOpenQueue
355
  def AddNode(self, node_name):
356
    assert node_name != self._my_hostname
357

    
358
    # Clean queue directory on added node
359
    rpc.call_jobqueue_purge(node_name)
360

    
361
    # Upload the whole queue excluding archived jobs
362
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
363

    
364
    # Upload current serial file
365
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
366

    
367
    for file_name in files:
368
      # Read file content
369
      fd = open(file_name, "r")
370
      try:
371
        content = fd.read()
372
      finally:
373
        fd.close()
374

    
375
      result = rpc.call_jobqueue_update([node_name], file_name, content)
376
      if not result[node_name]:
377
        logging.error("Failed to upload %s to %s", file_name, node_name)
378

    
379
    self._nodes.add(node_name)
380

    
381
  @utils.LockedMethod
382
  @_RequireOpenQueue
383
  def RemoveNode(self, node_name):
384
    try:
385
      # The queue is removed by the "leave node" RPC call.
386
      self._nodes.remove(node_name)
387
    except KeyError:
388
      pass
389

    
390
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
391
    """Writes a file locally and then replicates it to all nodes.
392

393
    """
394
    utils.WriteFile(file_name, data=data)
395

    
396
    failed_nodes = 0
397
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
398
    for node in self._nodes:
399
      if not result[node]:
400
        failed_nodes += 1
401
        logging.error("Copy of job queue file to node %s failed", node)
402

    
403
    # TODO: check failed_nodes
404

    
405
  def _RenameFileUnlocked(self, old, new):
406
    os.rename(old, new)
407

    
408
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
409
    for node in self._nodes:
410
      if not result[node]:
411
        logging.error("Moving %s to %s failed on %s", old, new, node)
412

    
413
    # TODO: check failed nodes
414

    
415
  def _FormatJobID(self, job_id):
416
    if not isinstance(job_id, (int, long)):
417
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
418
    if job_id < 0:
419
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
420

    
421
    return str(job_id)
422

    
423
  def _NewSerialUnlocked(self):
424
    """Generates a new job identifier.
425

426
    Job identifiers are unique during the lifetime of a cluster.
427

428
    Returns: A string representing the job identifier.
429

430
    """
431
    # New number
432
    serial = self._last_serial + 1
433

    
434
    # Write to file
435
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
436
                                        "%s\n" % serial)
437

    
438
    # Keep it only if we were able to write the file
439
    self._last_serial = serial
440

    
441
    return self._FormatJobID(serial)
442

    
443
  @staticmethod
444
  def _GetJobPath(job_id):
445
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
446

    
447
  @staticmethod
448
  def _GetArchivedJobPath(job_id):
449
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
450

    
451
  @classmethod
452
  def _ExtractJobID(cls, name):
453
    m = cls._RE_JOB_FILE.match(name)
454
    if m:
455
      return m.group(1)
456
    else:
457
      return None
458

    
459
  def _GetJobIDsUnlocked(self, archived=False):
460
    """Return all known job IDs.
461

462
    If the parameter archived is True, archived jobs IDs will be
463
    included. Currently this argument is unused.
464

465
    The method only looks at disk because it's a requirement that all
466
    jobs are present on disk (so in the _memcache we don't have any
467
    extra IDs).
468

469
    """
470
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
471
    jlist.sort()
472
    return jlist
473

    
474
  def _ListJobFiles(self):
475
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
476
            if self._RE_JOB_FILE.match(name)]
477

    
478
  def _LoadJobUnlocked(self, job_id):
479
    if job_id in self._memcache:
480
      logging.debug("Found job %s in memcache", job_id)
481
      return self._memcache[job_id]
482

    
483
    filepath = self._GetJobPath(job_id)
484
    logging.debug("Loading job from %s", filepath)
485
    try:
486
      fd = open(filepath, "r")
487
    except IOError, err:
488
      if err.errno in (errno.ENOENT, ):
489
        return None
490
      raise
491
    try:
492
      data = serializer.LoadJson(fd.read())
493
    finally:
494
      fd.close()
495

    
496
    job = _QueuedJob.Restore(self, data)
497
    self._memcache[job_id] = job
498
    logging.debug("Added job %s to the cache", job_id)
499
    return job
500

    
501
  def _GetJobsUnlocked(self, job_ids):
502
    if not job_ids:
503
      job_ids = self._GetJobIDsUnlocked()
504

    
505
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
506

    
507
  @utils.LockedMethod
508
  @_RequireOpenQueue
509
  def SubmitJob(self, ops):
510
    """Create and store a new job.
511

512
    This enters the job into our job queue and also puts it on the new
513
    queue, in order for it to be picked up by the queue processors.
514

515
    @type ops: list
516
    @param ops: The list of OpCodes that will become the new job.
517

518
    """
519
    # Get job identifier
520
    job_id = self._NewSerialUnlocked()
521
    job = _QueuedJob(self, job_id, ops)
522

    
523
    # Write to disk
524
    self.UpdateJobUnlocked(job)
525

    
526
    logging.debug("Added new job %s to the cache", job_id)
527
    self._memcache[job_id] = job
528

    
529
    # Add to worker pool
530
    self._wpool.AddTask(job)
531

    
532
    return job.id
533

    
534
  @_RequireOpenQueue
535
  def UpdateJobUnlocked(self, job):
536
    filename = self._GetJobPath(job.id)
537
    data = serializer.DumpJson(job.Serialize(), indent=False)
538
    logging.debug("Writing job %s to %s", job.id, filename)
539
    self._WriteAndReplicateFileUnlocked(filename, data)
540
    self._CleanCacheUnlocked([job.id])
541

    
542
    # Notify waiters about potential changes
543
    job.change.notifyAll()
544

    
545
  def _CleanCacheUnlocked(self, exclude):
546
    """Clean the memory cache.
547

548
    The exceptions argument contains job IDs that should not be
549
    cleaned.
550

551
    """
552
    assert isinstance(exclude, list)
553

    
554
    for job in self._memcache.values():
555
      if job.id in exclude:
556
        continue
557
      if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,
558
                                  constants.JOB_STATUS_RUNNING):
559
        logging.debug("Cleaning job %s from the cache", job.id)
560
        try:
561
          del self._memcache[job.id]
562
        except KeyError:
563
          pass
564

    
565
  @utils.LockedMethod
566
  @_RequireOpenQueue
567
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial):
568
    """Waits for changes in a job.
569

570
    @type job_id: string
571
    @param job_id: Job identifier
572
    @type fields: list of strings
573
    @param fields: Which fields to check for changes
574
    @type prev_job_info: list or None
575
    @param prev_job_info: Last job information returned
576
    @type prev_log_serial: int
577
    @param prev_log_serial: Last job message serial number
578

579
    """
580
    logging.debug("Waiting for changes in job %s", job_id)
581

    
582
    while True:
583
      job = self._LoadJobUnlocked(job_id)
584
      if not job:
585
        logging.debug("Job %s not found", job_id)
586
        new_state = None
587
        break
588

    
589
      status = job.CalcStatus()
590
      job_info = self._GetJobInfoUnlocked(job, fields)
591
      log_entries = job.GetLogEntries(prev_log_serial)
592

    
593
      # Serializing and deserializing data can cause type changes (e.g. from
594
      # tuple to list) or precision loss. We're doing it here so that we get
595
      # the same modifications as the data received from the client. Without
596
      # this, the comparison afterwards might fail without the data being
597
      # significantly different.
598
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
599
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
600

    
601
      if status not in (constants.JOB_STATUS_QUEUED,
602
                        constants.JOB_STATUS_RUNNING):
603
        # Don't even try to wait if the job is no longer running, there will be
604
        # no changes.
605
        break
606

    
607
      if (prev_job_info != job_info or
608
          (log_entries and prev_log_serial != log_entries[0][0])):
609
        break
610

    
611
      logging.debug("Waiting again")
612

    
613
      # Release the queue lock while waiting
614
      job.change.wait()
615

    
616
    logging.debug("Job %s changed", job_id)
617

    
618
    return (job_info, log_entries)
619

    
620
  @utils.LockedMethod
621
  @_RequireOpenQueue
622
  def CancelJob(self, job_id):
623
    """Cancels a job.
624

625
    @type job_id: string
626
    @param job_id: Job ID of job to be cancelled.
627

628
    """
629
    logging.debug("Cancelling job %s", job_id)
630

    
631
    job = self._LoadJobUnlocked(job_id)
632
    if not job:
633
      logging.debug("Job %s not found", job_id)
634
      return
635

    
636
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
637
      logging.debug("Job %s is no longer in the queue", job.id)
638
      return
639

    
640
    try:
641
      for op in job.ops:
642
        op.status = constants.OP_STATUS_ERROR
643
        op.result = "Job cancelled by request"
644
    finally:
645
      self.UpdateJobUnlocked(job)
646

    
647
  @utils.LockedMethod
648
  @_RequireOpenQueue
649
  def ArchiveJob(self, job_id):
650
    """Archives a job.
651

652
    @type job_id: string
653
    @param job_id: Job ID of job to be archived.
654

655
    """
656
    logging.debug("Archiving job %s", job_id)
657

    
658
    job = self._LoadJobUnlocked(job_id)
659
    if not job:
660
      logging.debug("Job %s not found", job_id)
661
      return
662

    
663
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
664
                                constants.JOB_STATUS_SUCCESS,
665
                                constants.JOB_STATUS_ERROR):
666
      logging.debug("Job %s is not yet done", job.id)
667
      return
668

    
669
    try:
670
      old = self._GetJobPath(job.id)
671
      new = self._GetArchivedJobPath(job.id)
672

    
673
      self._RenameFileUnlocked(old, new)
674

    
675
      logging.debug("Successfully archived job %s", job.id)
676
    finally:
677
      # Cleaning the cache because we don't know what os.rename actually did
678
      # and to be on the safe side.
679
      self._CleanCacheUnlocked([])
680

    
681
  def _GetJobInfoUnlocked(self, job, fields):
682
    row = []
683
    for fname in fields:
684
      if fname == "id":
685
        row.append(job.id)
686
      elif fname == "status":
687
        row.append(job.CalcStatus())
688
      elif fname == "ops":
689
        row.append([op.input.__getstate__() for op in job.ops])
690
      elif fname == "opresult":
691
        row.append([op.result for op in job.ops])
692
      elif fname == "opstatus":
693
        row.append([op.status for op in job.ops])
694
      else:
695
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
696
    return row
697

    
698
  @utils.LockedMethod
699
  @_RequireOpenQueue
700
  def QueryJobs(self, job_ids, fields):
701
    """Returns a list of jobs in queue.
702

703
    Args:
704
    - job_ids: Sequence of job identifiers or None for all
705
    - fields: Names of fields to return
706

707
    """
708
    jobs = []
709

    
710
    for job in self._GetJobsUnlocked(job_ids):
711
      if job is None:
712
        jobs.append(None)
713
      else:
714
        jobs.append(self._GetJobInfoUnlocked(job, fields))
715

    
716
    return jobs
717

    
718
  @utils.LockedMethod
719
  @_RequireOpenQueue
720
  def Shutdown(self):
721
    """Stops the job queue.
722

723
    """
724
    self._wpool.TerminateWorkers()
725

    
726
    self._queue_lock.Close()
727
    self._queue_lock = None