Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 60dd1473

History | View | Annotate | Download (20.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking:
25
There's a single, large lock in the JobQueue class. It's used by all other
26
classes in this module.
27

28
"""
29

    
30
import os
31
import logging
32
import threading
33
import errno
34
import re
35
import time
36
import weakref
37

    
38
from ganeti import constants
39
from ganeti import serializer
40
from ganeti import workerpool
41
from ganeti import opcodes
42
from ganeti import errors
43
from ganeti import mcpu
44
from ganeti import utils
45
from ganeti import jstore
46
from ganeti import rpc
47

    
48

    
49
JOBQUEUE_THREADS = 5
50

    
51

    
52
def TimeStampNow():
53
  return utils.SplitTime(time.time())
54

    
55

    
56
class _QueuedOpCode(object):
57
  """Encasulates an opcode object.
58

59
  The 'log' attribute holds the execution log and consists of tuples
60
  of the form (log_serial, timestamp, level, message).
61

62
  """
63
  def __init__(self, op):
64
    self.input = op
65
    self.status = constants.OP_STATUS_QUEUED
66
    self.result = None
67
    self.log = []
68
    self.start_timestamp = None
69
    self.end_timestamp = None
70

    
71
  @classmethod
72
  def Restore(cls, state):
73
    obj = _QueuedOpCode.__new__(cls)
74
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
75
    obj.status = state["status"]
76
    obj.result = state["result"]
77
    obj.log = state["log"]
78
    obj.start_timestamp = state.get("start_timestamp", None)
79
    obj.end_timestamp = state.get("end_timestamp", None)
80
    return obj
81

    
82
  def Serialize(self):
83
    return {
84
      "input": self.input.__getstate__(),
85
      "status": self.status,
86
      "result": self.result,
87
      "log": self.log,
88
      "start_timestamp": self.start_timestamp,
89
      "end_timestamp": self.end_timestamp,
90
      }
91

    
92

    
93
class _QueuedJob(object):
94
  """In-memory job representation.
95

96
  This is what we use to track the user-submitted jobs. Locking must be taken
97
  care of by users of this class.
98

99
  """
100
  def __init__(self, queue, job_id, ops):
101
    if not ops:
102
      # TODO
103
      raise Exception("No opcodes")
104

    
105
    self.queue = queue
106
    self.id = job_id
107
    self.ops = [_QueuedOpCode(op) for op in ops]
108
    self.run_op_index = -1
109
    self.log_serial = 0
110

    
111
    # Condition to wait for changes
112
    self.change = threading.Condition(self.queue._lock)
113

    
114
  @classmethod
115
  def Restore(cls, queue, state):
116
    obj = _QueuedJob.__new__(cls)
117
    obj.queue = queue
118
    obj.id = state["id"]
119
    obj.run_op_index = state["run_op_index"]
120

    
121
    obj.ops = []
122
    obj.log_serial = 0
123
    for op_state in state["ops"]:
124
      op = _QueuedOpCode.Restore(op_state)
125
      for log_entry in op.log:
126
        obj.log_serial = max(obj.log_serial, log_entry[0])
127
      obj.ops.append(op)
128

    
129
    # Condition to wait for changes
130
    obj.change = threading.Condition(obj.queue._lock)
131

    
132
    return obj
133

    
134
  def Serialize(self):
135
    return {
136
      "id": self.id,
137
      "ops": [op.Serialize() for op in self.ops],
138
      "run_op_index": self.run_op_index,
139
      }
140

    
141
  def CalcStatus(self):
142
    status = constants.JOB_STATUS_QUEUED
143

    
144
    all_success = True
145
    for op in self.ops:
146
      if op.status == constants.OP_STATUS_SUCCESS:
147
        continue
148

    
149
      all_success = False
150

    
151
      if op.status == constants.OP_STATUS_QUEUED:
152
        pass
153
      elif op.status == constants.OP_STATUS_RUNNING:
154
        status = constants.JOB_STATUS_RUNNING
155
      elif op.status == constants.OP_STATUS_ERROR:
156
        status = constants.JOB_STATUS_ERROR
157
        # The whole job fails if one opcode failed
158
        break
159
      elif op.status == constants.OP_STATUS_CANCELED:
160
        status = constants.OP_STATUS_CANCELED
161
        break
162

    
163
    if all_success:
164
      status = constants.JOB_STATUS_SUCCESS
165

    
166
    return status
167

    
168
  def GetLogEntries(self, newer_than):
169
    if newer_than is None:
170
      serial = -1
171
    else:
172
      serial = newer_than
173

    
174
    entries = []
175
    for op in self.ops:
176
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
177

    
178
    return entries
179

    
180

    
181
class _JobQueueWorker(workerpool.BaseWorker):
182
  def RunTask(self, job):
183
    """Job executor.
184

185
    This functions processes a job. It is closely tied to the _QueuedJob and
186
    _QueuedOpCode classes.
187

188
    """
189
    logging.debug("Worker %s processing job %s",
190
                  self.worker_id, job.id)
191
    proc = mcpu.Processor(self.pool.queue.context)
192
    queue = job.queue
193
    try:
194
      try:
195
        count = len(job.ops)
196
        for idx, op in enumerate(job.ops):
197
          try:
198
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
199

    
200
            queue.acquire()
201
            try:
202
              job.run_op_index = idx
203
              op.status = constants.OP_STATUS_RUNNING
204
              op.result = None
205
              op.start_timestamp = TimeStampNow()
206
              queue.UpdateJobUnlocked(job)
207

    
208
              input_opcode = op.input
209
            finally:
210
              queue.release()
211

    
212
            def _Log(*args):
213
              """Append a log entry.
214

215
              """
216
              assert len(args) < 3
217

    
218
              if len(args) == 1:
219
                log_type = constants.ELOG_MESSAGE
220
                log_msg = args[0]
221
              else:
222
                log_type, log_msg = args
223

    
224
              # The time is split to make serialization easier and not lose
225
              # precision.
226
              timestamp = utils.SplitTime(time.time())
227

    
228
              queue.acquire()
229
              try:
230
                job.log_serial += 1
231
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
232

    
233
                job.change.notifyAll()
234
              finally:
235
                queue.release()
236

    
237
            # Make sure not to hold lock while _Log is called
238
            result = proc.ExecOpCode(input_opcode, _Log)
239

    
240
            queue.acquire()
241
            try:
242
              op.status = constants.OP_STATUS_SUCCESS
243
              op.result = result
244
              op.end_timestamp = TimeStampNow()
245
              queue.UpdateJobUnlocked(job)
246
            finally:
247
              queue.release()
248

    
249
            logging.debug("Op %s/%s: Successfully finished %s",
250
                          idx + 1, count, op)
251
          except Exception, err:
252
            queue.acquire()
253
            try:
254
              try:
255
                op.status = constants.OP_STATUS_ERROR
256
                op.result = str(err)
257
                op.end_timestamp = TimeStampNow()
258
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
259
              finally:
260
                queue.UpdateJobUnlocked(job)
261
            finally:
262
              queue.release()
263
            raise
264

    
265
      except errors.GenericError, err:
266
        logging.exception("Ganeti exception")
267
      except:
268
        logging.exception("Unhandled exception")
269
    finally:
270
      queue.acquire()
271
      try:
272
        try:
273
          job.run_op_idx = -1
274
          queue.UpdateJobUnlocked(job)
275
        finally:
276
          job_id = job.id
277
          status = job.CalcStatus()
278
      finally:
279
        queue.release()
280
      logging.debug("Worker %s finished job %s, status = %s",
281
                    self.worker_id, job_id, status)
282

    
283

    
284
class _JobQueueWorkerPool(workerpool.WorkerPool):
285
  def __init__(self, queue):
286
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
287
                                              _JobQueueWorker)
288
    self.queue = queue
289

    
290

    
291
class JobQueue(object):
292
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
293

    
294
  def _RequireOpenQueue(fn):
295
    """Decorator for "public" functions.
296

297
    This function should be used for all "public" functions. That is, functions
298
    usually called from other classes.
299

300
    Important: Use this decorator only after utils.LockedMethod!
301

302
    Example:
303
      @utils.LockedMethod
304
      @_RequireOpenQueue
305
      def Example(self):
306
        pass
307

308
    """
309
    def wrapper(self, *args, **kwargs):
310
      assert self._queue_lock is not None, "Queue should be open"
311
      return fn(self, *args, **kwargs)
312
    return wrapper
313

    
314
  def __init__(self, context):
315
    self.context = context
316
    self._memcache = weakref.WeakValueDictionary()
317
    self._my_hostname = utils.HostInfo().name
318

    
319
    # Locking
320
    self._lock = threading.Lock()
321
    self.acquire = self._lock.acquire
322
    self.release = self._lock.release
323

    
324
    # Initialize
325
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
326

    
327
    # Read serial file
328
    self._last_serial = jstore.ReadSerial()
329
    assert self._last_serial is not None, ("Serial file was modified between"
330
                                           " check in jstore and here")
331

    
332
    # Get initial list of nodes
333
    self._nodes = set(self.context.cfg.GetNodeList())
334

    
335
    # Remove master node
336
    try:
337
      self._nodes.remove(self._my_hostname)
338
    except ValueError:
339
      pass
340

    
341
    # TODO: Check consistency across nodes
342

    
343
    # Setup worker pool
344
    self._wpool = _JobQueueWorkerPool(self)
345

    
346
    # We need to lock here because WorkerPool.AddTask() may start a job while
347
    # we're still doing our work.
348
    self.acquire()
349
    try:
350
      for job in self._GetJobsUnlocked(None):
351
        status = job.CalcStatus()
352

    
353
        if status in (constants.JOB_STATUS_QUEUED, ):
354
          self._wpool.AddTask(job)
355

    
356
        elif status in (constants.JOB_STATUS_RUNNING, ):
357
          logging.warning("Unfinished job %s found: %s", job.id, job)
358
          try:
359
            for op in job.ops:
360
              op.status = constants.OP_STATUS_ERROR
361
              op.result = "Unclean master daemon shutdown"
362
          finally:
363
            self.UpdateJobUnlocked(job)
364
    finally:
365
      self.release()
366

    
367
  @utils.LockedMethod
368
  @_RequireOpenQueue
369
  def AddNode(self, node_name):
370
    assert node_name != self._my_hostname
371

    
372
    # Clean queue directory on added node
373
    rpc.call_jobqueue_purge(node_name)
374

    
375
    # Upload the whole queue excluding archived jobs
376
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
377

    
378
    # Upload current serial file
379
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
380

    
381
    for file_name in files:
382
      # Read file content
383
      fd = open(file_name, "r")
384
      try:
385
        content = fd.read()
386
      finally:
387
        fd.close()
388

    
389
      result = rpc.call_jobqueue_update([node_name], file_name, content)
390
      if not result[node_name]:
391
        logging.error("Failed to upload %s to %s", file_name, node_name)
392

    
393
    self._nodes.add(node_name)
394

    
395
  @utils.LockedMethod
396
  @_RequireOpenQueue
397
  def RemoveNode(self, node_name):
398
    try:
399
      # The queue is removed by the "leave node" RPC call.
400
      self._nodes.remove(node_name)
401
    except KeyError:
402
      pass
403

    
404
  def _CheckRpcResult(self, result, nodes, failmsg):
405
    failed = []
406
    success = []
407

    
408
    for node in nodes:
409
      if result[node]:
410
        success.append(node)
411
      else:
412
        failed.append(node)
413

    
414
    if failed:
415
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
416

    
417
    # +1 for the master node
418
    if (len(success) + 1) < len(failed):
419
      # TODO: Handle failing nodes
420
      logging.error("More than half of the nodes failed")
421

    
422
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
423
    """Writes a file locally and then replicates it to all nodes.
424

425
    """
426
    utils.WriteFile(file_name, data=data)
427

    
428
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
429
    self._CheckRpcResult(result, self._nodes,
430
                         "Updating %s" % file_name)
431

    
432
  def _RenameFileUnlocked(self, old, new):
433
    os.rename(old, new)
434

    
435
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
436
    self._CheckRpcResult(result, self._nodes,
437
                         "Moving %s to %s" % (old, new))
438

    
439
  def _FormatJobID(self, job_id):
440
    if not isinstance(job_id, (int, long)):
441
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
442
    if job_id < 0:
443
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
444

    
445
    return str(job_id)
446

    
447
  def _NewSerialUnlocked(self):
448
    """Generates a new job identifier.
449

450
    Job identifiers are unique during the lifetime of a cluster.
451

452
    Returns: A string representing the job identifier.
453

454
    """
455
    # New number
456
    serial = self._last_serial + 1
457

    
458
    # Write to file
459
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
460
                                        "%s\n" % serial)
461

    
462
    # Keep it only if we were able to write the file
463
    self._last_serial = serial
464

    
465
    return self._FormatJobID(serial)
466

    
467
  @staticmethod
468
  def _GetJobPath(job_id):
469
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
470

    
471
  @staticmethod
472
  def _GetArchivedJobPath(job_id):
473
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
474

    
475
  @classmethod
476
  def _ExtractJobID(cls, name):
477
    m = cls._RE_JOB_FILE.match(name)
478
    if m:
479
      return m.group(1)
480
    else:
481
      return None
482

    
483
  def _GetJobIDsUnlocked(self, archived=False):
484
    """Return all known job IDs.
485

486
    If the parameter archived is True, archived jobs IDs will be
487
    included. Currently this argument is unused.
488

489
    The method only looks at disk because it's a requirement that all
490
    jobs are present on disk (so in the _memcache we don't have any
491
    extra IDs).
492

493
    """
494
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
495
    jlist = utils.NiceSort(jlist)
496
    return jlist
497

    
498
  def _ListJobFiles(self):
499
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
500
            if self._RE_JOB_FILE.match(name)]
501

    
502
  def _LoadJobUnlocked(self, job_id):
503
    job = self._memcache.get(job_id, None)
504
    if job:
505
      logging.debug("Found job %s in memcache", job_id)
506
      return job
507

    
508
    filepath = self._GetJobPath(job_id)
509
    logging.debug("Loading job from %s", filepath)
510
    try:
511
      fd = open(filepath, "r")
512
    except IOError, err:
513
      if err.errno in (errno.ENOENT, ):
514
        return None
515
      raise
516
    try:
517
      data = serializer.LoadJson(fd.read())
518
    finally:
519
      fd.close()
520

    
521
    job = _QueuedJob.Restore(self, data)
522
    self._memcache[job_id] = job
523
    logging.debug("Added job %s to the cache", job_id)
524
    return job
525

    
526
  def _GetJobsUnlocked(self, job_ids):
527
    if not job_ids:
528
      job_ids = self._GetJobIDsUnlocked()
529

    
530
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
531

    
532
  @utils.LockedMethod
533
  @_RequireOpenQueue
534
  def SubmitJob(self, ops):
535
    """Create and store a new job.
536

537
    This enters the job into our job queue and also puts it on the new
538
    queue, in order for it to be picked up by the queue processors.
539

540
    @type ops: list
541
    @param ops: The list of OpCodes that will become the new job.
542

543
    """
544
    # Get job identifier
545
    job_id = self._NewSerialUnlocked()
546
    job = _QueuedJob(self, job_id, ops)
547

    
548
    # Write to disk
549
    self.UpdateJobUnlocked(job)
550

    
551
    logging.debug("Adding new job %s to the cache", job_id)
552
    self._memcache[job_id] = job
553

    
554
    # Add to worker pool
555
    self._wpool.AddTask(job)
556

    
557
    return job.id
558

    
559
  @_RequireOpenQueue
560
  def UpdateJobUnlocked(self, job):
561
    filename = self._GetJobPath(job.id)
562
    data = serializer.DumpJson(job.Serialize(), indent=False)
563
    logging.debug("Writing job %s to %s", job.id, filename)
564
    self._WriteAndReplicateFileUnlocked(filename, data)
565

    
566
    # Notify waiters about potential changes
567
    job.change.notifyAll()
568

    
569
  @utils.LockedMethod
570
  @_RequireOpenQueue
571
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
572
                        timeout):
573
    """Waits for changes in a job.
574

575
    @type job_id: string
576
    @param job_id: Job identifier
577
    @type fields: list of strings
578
    @param fields: Which fields to check for changes
579
    @type prev_job_info: list or None
580
    @param prev_job_info: Last job information returned
581
    @type prev_log_serial: int
582
    @param prev_log_serial: Last job message serial number
583
    @type timeout: float
584
    @param timeout: maximum time to wait
585

586
    """
587
    logging.debug("Waiting for changes in job %s", job_id)
588
    end_time = time.time() + timeout
589
    while True:
590
      delta_time = end_time - time.time()
591
      if delta_time < 0:
592
        return constants.JOB_NOTCHANGED
593

    
594
      job = self._LoadJobUnlocked(job_id)
595
      if not job:
596
        logging.debug("Job %s not found", job_id)
597
        break
598

    
599
      status = job.CalcStatus()
600
      job_info = self._GetJobInfoUnlocked(job, fields)
601
      log_entries = job.GetLogEntries(prev_log_serial)
602

    
603
      # Serializing and deserializing data can cause type changes (e.g. from
604
      # tuple to list) or precision loss. We're doing it here so that we get
605
      # the same modifications as the data received from the client. Without
606
      # this, the comparison afterwards might fail without the data being
607
      # significantly different.
608
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
609
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
610

    
611
      if status not in (constants.JOB_STATUS_QUEUED,
612
                        constants.JOB_STATUS_RUNNING):
613
        # Don't even try to wait if the job is no longer running, there will be
614
        # no changes.
615
        break
616

    
617
      if (prev_job_info != job_info or
618
          (log_entries and prev_log_serial != log_entries[0][0])):
619
        break
620

    
621
      logging.debug("Waiting again")
622

    
623
      # Release the queue lock while waiting
624
      job.change.wait(delta_time)
625

    
626
    logging.debug("Job %s changed", job_id)
627

    
628
    return (job_info, log_entries)
629

    
630
  @utils.LockedMethod
631
  @_RequireOpenQueue
632
  def CancelJob(self, job_id):
633
    """Cancels a job.
634

635
    @type job_id: string
636
    @param job_id: Job ID of job to be cancelled.
637

638
    """
639
    logging.debug("Cancelling job %s", job_id)
640

    
641
    job = self._LoadJobUnlocked(job_id)
642
    if not job:
643
      logging.debug("Job %s not found", job_id)
644
      return
645

    
646
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
647
      logging.debug("Job %s is no longer in the queue", job.id)
648
      return
649

    
650
    try:
651
      for op in job.ops:
652
        op.status = constants.OP_STATUS_ERROR
653
        op.result = "Job cancelled by request"
654
    finally:
655
      self.UpdateJobUnlocked(job)
656

    
657
  @utils.LockedMethod
658
  @_RequireOpenQueue
659
  def ArchiveJob(self, job_id):
660
    """Archives a job.
661

662
    @type job_id: string
663
    @param job_id: Job ID of job to be archived.
664

665
    """
666
    logging.debug("Archiving job %s", job_id)
667

    
668
    job = self._LoadJobUnlocked(job_id)
669
    if not job:
670
      logging.debug("Job %s not found", job_id)
671
      return
672

    
673
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
674
                                constants.JOB_STATUS_SUCCESS,
675
                                constants.JOB_STATUS_ERROR):
676
      logging.debug("Job %s is not yet done", job.id)
677
      return
678

    
679
    old = self._GetJobPath(job.id)
680
    new = self._GetArchivedJobPath(job.id)
681

    
682
    self._RenameFileUnlocked(old, new)
683

    
684
    logging.debug("Successfully archived job %s", job.id)
685

    
686
  def _GetJobInfoUnlocked(self, job, fields):
687
    row = []
688
    for fname in fields:
689
      if fname == "id":
690
        row.append(job.id)
691
      elif fname == "status":
692
        row.append(job.CalcStatus())
693
      elif fname == "ops":
694
        row.append([op.input.__getstate__() for op in job.ops])
695
      elif fname == "opresult":
696
        row.append([op.result for op in job.ops])
697
      elif fname == "opstatus":
698
        row.append([op.status for op in job.ops])
699
      elif fname == "summary":
700
        row.append([op.input.Summary() for op in job.ops])
701
      else:
702
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
703
    return row
704

    
705
  @utils.LockedMethod
706
  @_RequireOpenQueue
707
  def QueryJobs(self, job_ids, fields):
708
    """Returns a list of jobs in queue.
709

710
    Args:
711
    - job_ids: Sequence of job identifiers or None for all
712
    - fields: Names of fields to return
713

714
    """
715
    jobs = []
716

    
717
    for job in self._GetJobsUnlocked(job_ids):
718
      if job is None:
719
        jobs.append(None)
720
      else:
721
        jobs.append(self._GetJobInfoUnlocked(job, fields))
722

    
723
    return jobs
724

    
725
  @utils.LockedMethod
726
  @_RequireOpenQueue
727
  def Shutdown(self):
728
    """Stops the job queue.
729

730
    """
731
    self._wpool.TerminateWorkers()
732

    
733
    self._queue_lock.Close()
734
    self._queue_lock = None