Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 5c735209

History | View | Annotate | Download (19.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking:
25
There's a single, large lock in the JobQueue class. It's used by all other
26
classes in this module.
27

28
"""
29

    
30
import os
31
import logging
32
import threading
33
import errno
34
import re
35
import time
36
import weakref
37

    
38
from ganeti import constants
39
from ganeti import serializer
40
from ganeti import workerpool
41
from ganeti import opcodes
42
from ganeti import errors
43
from ganeti import mcpu
44
from ganeti import utils
45
from ganeti import jstore
46
from ganeti import rpc
47

    
48

    
49
JOBQUEUE_THREADS = 5
50

    
51

    
52
def TimeStampNow():
53
  return utils.SplitTime(time.time())
54

    
55

    
56
class _QueuedOpCode(object):
57
  """Encasulates an opcode object.
58

59
  The 'log' attribute holds the execution log and consists of tuples
60
  of the form (log_serial, timestamp, level, message).
61

62
  """
63
  def __init__(self, op):
64
    self.input = op
65
    self.status = constants.OP_STATUS_QUEUED
66
    self.result = None
67
    self.log = []
68
    self.start_timestamp = None
69
    self.end_timestamp = None
70

    
71
  @classmethod
72
  def Restore(cls, state):
73
    obj = _QueuedOpCode.__new__(cls)
74
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
75
    obj.status = state["status"]
76
    obj.result = state["result"]
77
    obj.log = state["log"]
78
    obj.start_timestamp = state.get("start_timestamp", None)
79
    obj.end_timestamp = state.get("end_timestamp", None)
80
    return obj
81

    
82
  def Serialize(self):
83
    return {
84
      "input": self.input.__getstate__(),
85
      "status": self.status,
86
      "result": self.result,
87
      "log": self.log,
88
      "start_timestamp": self.start_timestamp,
89
      "end_timestamp": self.end_timestamp,
90
      }
91

    
92

    
93
class _QueuedJob(object):
94
  """In-memory job representation.
95

96
  This is what we use to track the user-submitted jobs. Locking must be taken
97
  care of by users of this class.
98

99
  """
100
  def __init__(self, queue, job_id, ops):
101
    if not ops:
102
      # TODO
103
      raise Exception("No opcodes")
104

    
105
    self.queue = queue
106
    self.id = job_id
107
    self.ops = [_QueuedOpCode(op) for op in ops]
108
    self.run_op_index = -1
109
    self.log_serial = 0
110

    
111
    # Condition to wait for changes
112
    self.change = threading.Condition(self.queue._lock)
113

    
114
  @classmethod
115
  def Restore(cls, queue, state):
116
    obj = _QueuedJob.__new__(cls)
117
    obj.queue = queue
118
    obj.id = state["id"]
119
    obj.run_op_index = state["run_op_index"]
120

    
121
    obj.ops = []
122
    obj.log_serial = 0
123
    for op_state in state["ops"]:
124
      op = _QueuedOpCode.Restore(op_state)
125
      for log_entry in op.log:
126
        obj.log_serial = max(obj.log_serial, log_entry[0])
127
      obj.ops.append(op)
128

    
129
    # Condition to wait for changes
130
    obj.change = threading.Condition(obj.queue._lock)
131

    
132
    return obj
133

    
134
  def Serialize(self):
135
    return {
136
      "id": self.id,
137
      "ops": [op.Serialize() for op in self.ops],
138
      "run_op_index": self.run_op_index,
139
      }
140

    
141
  def CalcStatus(self):
142
    status = constants.JOB_STATUS_QUEUED
143

    
144
    all_success = True
145
    for op in self.ops:
146
      if op.status == constants.OP_STATUS_SUCCESS:
147
        continue
148

    
149
      all_success = False
150

    
151
      if op.status == constants.OP_STATUS_QUEUED:
152
        pass
153
      elif op.status == constants.OP_STATUS_RUNNING:
154
        status = constants.JOB_STATUS_RUNNING
155
      elif op.status == constants.OP_STATUS_ERROR:
156
        status = constants.JOB_STATUS_ERROR
157
        # The whole job fails if one opcode failed
158
        break
159
      elif op.status == constants.OP_STATUS_CANCELED:
160
        status = constants.OP_STATUS_CANCELED
161
        break
162

    
163
    if all_success:
164
      status = constants.JOB_STATUS_SUCCESS
165

    
166
    return status
167

    
168
  def GetLogEntries(self, newer_than):
169
    if newer_than is None:
170
      serial = -1
171
    else:
172
      serial = newer_than
173

    
174
    entries = []
175
    for op in self.ops:
176
      entries.extend(filter(lambda entry: entry[0] > newer_than, op.log))
177

    
178
    return entries
179

    
180

    
181
class _JobQueueWorker(workerpool.BaseWorker):
182
  def RunTask(self, job):
183
    """Job executor.
184

185
    This functions processes a job. It is closely tied to the _QueuedJob and
186
    _QueuedOpCode classes.
187

188
    """
189
    logging.debug("Worker %s processing job %s",
190
                  self.worker_id, job.id)
191
    proc = mcpu.Processor(self.pool.queue.context)
192
    queue = job.queue
193
    try:
194
      try:
195
        count = len(job.ops)
196
        for idx, op in enumerate(job.ops):
197
          try:
198
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
199

    
200
            queue.acquire()
201
            try:
202
              job.run_op_index = idx
203
              op.status = constants.OP_STATUS_RUNNING
204
              op.result = None
205
              op.start_timestamp = TimeStampNow()
206
              queue.UpdateJobUnlocked(job)
207

    
208
              input_opcode = op.input
209
            finally:
210
              queue.release()
211

    
212
            def _Log(*args):
213
              """Append a log entry.
214

215
              """
216
              assert len(args) < 3
217

    
218
              if len(args) == 1:
219
                log_type = constants.ELOG_MESSAGE
220
                log_msg = args[0]
221
              else:
222
                log_type, log_msg = args
223

    
224
              # The time is split to make serialization easier and not lose
225
              # precision.
226
              timestamp = utils.SplitTime(time.time())
227

    
228
              queue.acquire()
229
              try:
230
                job.log_serial += 1
231
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
232

    
233
                job.change.notifyAll()
234
              finally:
235
                queue.release()
236

    
237
            # Make sure not to hold lock while _Log is called
238
            result = proc.ExecOpCode(input_opcode, _Log)
239

    
240
            queue.acquire()
241
            try:
242
              op.status = constants.OP_STATUS_SUCCESS
243
              op.result = result
244
              op.end_timestamp = TimeStampNow()
245
              queue.UpdateJobUnlocked(job)
246
            finally:
247
              queue.release()
248

    
249
            logging.debug("Op %s/%s: Successfully finished %s",
250
                          idx + 1, count, op)
251
          except Exception, err:
252
            queue.acquire()
253
            try:
254
              try:
255
                op.status = constants.OP_STATUS_ERROR
256
                op.result = str(err)
257
                op.end_timestamp = TimeStampNow()
258
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
259
              finally:
260
                queue.UpdateJobUnlocked(job)
261
            finally:
262
              queue.release()
263
            raise
264

    
265
      except errors.GenericError, err:
266
        logging.exception("Ganeti exception")
267
      except:
268
        logging.exception("Unhandled exception")
269
    finally:
270
      queue.acquire()
271
      try:
272
        try:
273
          job.run_op_idx = -1
274
          queue.UpdateJobUnlocked(job)
275
        finally:
276
          job_id = job.id
277
          status = job.CalcStatus()
278
      finally:
279
        queue.release()
280
      logging.debug("Worker %s finished job %s, status = %s",
281
                    self.worker_id, job_id, status)
282

    
283

    
284
class _JobQueueWorkerPool(workerpool.WorkerPool):
285
  def __init__(self, queue):
286
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
287
                                              _JobQueueWorker)
288
    self.queue = queue
289

    
290

    
291
class JobQueue(object):
292
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
293

    
294
  def _RequireOpenQueue(fn):
295
    """Decorator for "public" functions.
296

297
    This function should be used for all "public" functions. That is, functions
298
    usually called from other classes.
299

300
    Important: Use this decorator only after utils.LockedMethod!
301

302
    Example:
303
      @utils.LockedMethod
304
      @_RequireOpenQueue
305
      def Example(self):
306
        pass
307

308
    """
309
    def wrapper(self, *args, **kwargs):
310
      assert self._queue_lock is not None, "Queue should be open"
311
      return fn(self, *args, **kwargs)
312
    return wrapper
313

    
314
  def __init__(self, context):
315
    self.context = context
316
    self._memcache = weakref.WeakValueDictionary()
317
    self._my_hostname = utils.HostInfo().name
318

    
319
    # Locking
320
    self._lock = threading.Lock()
321
    self.acquire = self._lock.acquire
322
    self.release = self._lock.release
323

    
324
    # Initialize
325
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
326

    
327
    # Read serial file
328
    self._last_serial = jstore.ReadSerial()
329
    assert self._last_serial is not None, ("Serial file was modified between"
330
                                           " check in jstore and here")
331

    
332
    # Get initial list of nodes
333
    self._nodes = set(self.context.cfg.GetNodeList())
334

    
335
    # Remove master node
336
    try:
337
      self._nodes.remove(self._my_hostname)
338
    except ValueError:
339
      pass
340

    
341
    # TODO: Check consistency across nodes
342

    
343
    # Setup worker pool
344
    self._wpool = _JobQueueWorkerPool(self)
345

    
346
    # We need to lock here because WorkerPool.AddTask() may start a job while
347
    # we're still doing our work.
348
    self.acquire()
349
    try:
350
      for job in self._GetJobsUnlocked(None):
351
        status = job.CalcStatus()
352

    
353
        if status in (constants.JOB_STATUS_QUEUED, ):
354
          self._wpool.AddTask(job)
355

    
356
        elif status in (constants.JOB_STATUS_RUNNING, ):
357
          logging.warning("Unfinished job %s found: %s", job.id, job)
358
          try:
359
            for op in job.ops:
360
              op.status = constants.OP_STATUS_ERROR
361
              op.result = "Unclean master daemon shutdown"
362
          finally:
363
            self.UpdateJobUnlocked(job)
364
    finally:
365
      self.release()
366

    
367
  @utils.LockedMethod
368
  @_RequireOpenQueue
369
  def AddNode(self, node_name):
370
    assert node_name != self._my_hostname
371

    
372
    # Clean queue directory on added node
373
    rpc.call_jobqueue_purge(node_name)
374

    
375
    # Upload the whole queue excluding archived jobs
376
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
377

    
378
    # Upload current serial file
379
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
380

    
381
    for file_name in files:
382
      # Read file content
383
      fd = open(file_name, "r")
384
      try:
385
        content = fd.read()
386
      finally:
387
        fd.close()
388

    
389
      result = rpc.call_jobqueue_update([node_name], file_name, content)
390
      if not result[node_name]:
391
        logging.error("Failed to upload %s to %s", file_name, node_name)
392

    
393
    self._nodes.add(node_name)
394

    
395
  @utils.LockedMethod
396
  @_RequireOpenQueue
397
  def RemoveNode(self, node_name):
398
    try:
399
      # The queue is removed by the "leave node" RPC call.
400
      self._nodes.remove(node_name)
401
    except KeyError:
402
      pass
403

    
404
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
405
    """Writes a file locally and then replicates it to all nodes.
406

407
    """
408
    utils.WriteFile(file_name, data=data)
409

    
410
    failed_nodes = 0
411
    result = rpc.call_jobqueue_update(self._nodes, file_name, data)
412
    for node in self._nodes:
413
      if not result[node]:
414
        failed_nodes += 1
415
        logging.error("Copy of job queue file to node %s failed", node)
416

    
417
    # TODO: check failed_nodes
418

    
419
  def _RenameFileUnlocked(self, old, new):
420
    os.rename(old, new)
421

    
422
    result = rpc.call_jobqueue_rename(self._nodes, old, new)
423
    for node in self._nodes:
424
      if not result[node]:
425
        logging.error("Moving %s to %s failed on %s", old, new, node)
426

    
427
    # TODO: check failed nodes
428

    
429
  def _FormatJobID(self, job_id):
430
    if not isinstance(job_id, (int, long)):
431
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
432
    if job_id < 0:
433
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
434

    
435
    return str(job_id)
436

    
437
  def _NewSerialUnlocked(self):
438
    """Generates a new job identifier.
439

440
    Job identifiers are unique during the lifetime of a cluster.
441

442
    Returns: A string representing the job identifier.
443

444
    """
445
    # New number
446
    serial = self._last_serial + 1
447

    
448
    # Write to file
449
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
450
                                        "%s\n" % serial)
451

    
452
    # Keep it only if we were able to write the file
453
    self._last_serial = serial
454

    
455
    return self._FormatJobID(serial)
456

    
457
  @staticmethod
458
  def _GetJobPath(job_id):
459
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
460

    
461
  @staticmethod
462
  def _GetArchivedJobPath(job_id):
463
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
464

    
465
  @classmethod
466
  def _ExtractJobID(cls, name):
467
    m = cls._RE_JOB_FILE.match(name)
468
    if m:
469
      return m.group(1)
470
    else:
471
      return None
472

    
473
  def _GetJobIDsUnlocked(self, archived=False):
474
    """Return all known job IDs.
475

476
    If the parameter archived is True, archived jobs IDs will be
477
    included. Currently this argument is unused.
478

479
    The method only looks at disk because it's a requirement that all
480
    jobs are present on disk (so in the _memcache we don't have any
481
    extra IDs).
482

483
    """
484
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
485
    jlist.sort()
486
    return jlist
487

    
488
  def _ListJobFiles(self):
489
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
490
            if self._RE_JOB_FILE.match(name)]
491

    
492
  def _LoadJobUnlocked(self, job_id):
493
    job = self._memcache.get(job_id, None)
494
    if job:
495
      logging.debug("Found job %s in memcache", job_id)
496
      return job
497

    
498
    filepath = self._GetJobPath(job_id)
499
    logging.debug("Loading job from %s", filepath)
500
    try:
501
      fd = open(filepath, "r")
502
    except IOError, err:
503
      if err.errno in (errno.ENOENT, ):
504
        return None
505
      raise
506
    try:
507
      data = serializer.LoadJson(fd.read())
508
    finally:
509
      fd.close()
510

    
511
    job = _QueuedJob.Restore(self, data)
512
    self._memcache[job_id] = job
513
    logging.debug("Added job %s to the cache", job_id)
514
    return job
515

    
516
  def _GetJobsUnlocked(self, job_ids):
517
    if not job_ids:
518
      job_ids = self._GetJobIDsUnlocked()
519

    
520
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
521

    
522
  @utils.LockedMethod
523
  @_RequireOpenQueue
524
  def SubmitJob(self, ops):
525
    """Create and store a new job.
526

527
    This enters the job into our job queue and also puts it on the new
528
    queue, in order for it to be picked up by the queue processors.
529

530
    @type ops: list
531
    @param ops: The list of OpCodes that will become the new job.
532

533
    """
534
    # Get job identifier
535
    job_id = self._NewSerialUnlocked()
536
    job = _QueuedJob(self, job_id, ops)
537

    
538
    # Write to disk
539
    self.UpdateJobUnlocked(job)
540

    
541
    logging.debug("Adding new job %s to the cache", job_id)
542
    self._memcache[job_id] = job
543

    
544
    # Add to worker pool
545
    self._wpool.AddTask(job)
546

    
547
    return job.id
548

    
549
  @_RequireOpenQueue
550
  def UpdateJobUnlocked(self, job):
551
    filename = self._GetJobPath(job.id)
552
    data = serializer.DumpJson(job.Serialize(), indent=False)
553
    logging.debug("Writing job %s to %s", job.id, filename)
554
    self._WriteAndReplicateFileUnlocked(filename, data)
555

    
556
    # Notify waiters about potential changes
557
    job.change.notifyAll()
558

    
559
  @utils.LockedMethod
560
  @_RequireOpenQueue
561
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
562
                        timeout):
563
    """Waits for changes in a job.
564

565
    @type job_id: string
566
    @param job_id: Job identifier
567
    @type fields: list of strings
568
    @param fields: Which fields to check for changes
569
    @type prev_job_info: list or None
570
    @param prev_job_info: Last job information returned
571
    @type prev_log_serial: int
572
    @param prev_log_serial: Last job message serial number
573
    @type timeout: float
574
    @param timeout: maximum time to wait
575

576
    """
577
    logging.debug("Waiting for changes in job %s", job_id)
578
    end_time = time.time() + timeout
579
    while True:
580
      delta_time = end_time - time.time()
581
      if delta_time < 0:
582
        return constants.JOB_NOTCHANGED
583

    
584
      job = self._LoadJobUnlocked(job_id)
585
      if not job:
586
        logging.debug("Job %s not found", job_id)
587
        break
588

    
589
      status = job.CalcStatus()
590
      job_info = self._GetJobInfoUnlocked(job, fields)
591
      log_entries = job.GetLogEntries(prev_log_serial)
592

    
593
      # Serializing and deserializing data can cause type changes (e.g. from
594
      # tuple to list) or precision loss. We're doing it here so that we get
595
      # the same modifications as the data received from the client. Without
596
      # this, the comparison afterwards might fail without the data being
597
      # significantly different.
598
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
599
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
600

    
601
      if status not in (constants.JOB_STATUS_QUEUED,
602
                        constants.JOB_STATUS_RUNNING):
603
        # Don't even try to wait if the job is no longer running, there will be
604
        # no changes.
605
        break
606

    
607
      if (prev_job_info != job_info or
608
          (log_entries and prev_log_serial != log_entries[0][0])):
609
        break
610

    
611
      logging.debug("Waiting again")
612

    
613
      # Release the queue lock while waiting
614
      job.change.wait(delta_time)
615

    
616
    logging.debug("Job %s changed", job_id)
617

    
618
    return (job_info, log_entries)
619

    
620
  @utils.LockedMethod
621
  @_RequireOpenQueue
622
  def CancelJob(self, job_id):
623
    """Cancels a job.
624

625
    @type job_id: string
626
    @param job_id: Job ID of job to be cancelled.
627

628
    """
629
    logging.debug("Cancelling job %s", job_id)
630

    
631
    job = self._LoadJobUnlocked(job_id)
632
    if not job:
633
      logging.debug("Job %s not found", job_id)
634
      return
635

    
636
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
637
      logging.debug("Job %s is no longer in the queue", job.id)
638
      return
639

    
640
    try:
641
      for op in job.ops:
642
        op.status = constants.OP_STATUS_ERROR
643
        op.result = "Job cancelled by request"
644
    finally:
645
      self.UpdateJobUnlocked(job)
646

    
647
  @utils.LockedMethod
648
  @_RequireOpenQueue
649
  def ArchiveJob(self, job_id):
650
    """Archives a job.
651

652
    @type job_id: string
653
    @param job_id: Job ID of job to be archived.
654

655
    """
656
    logging.debug("Archiving job %s", job_id)
657

    
658
    job = self._LoadJobUnlocked(job_id)
659
    if not job:
660
      logging.debug("Job %s not found", job_id)
661
      return
662

    
663
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
664
                                constants.JOB_STATUS_SUCCESS,
665
                                constants.JOB_STATUS_ERROR):
666
      logging.debug("Job %s is not yet done", job.id)
667
      return
668

    
669
    old = self._GetJobPath(job.id)
670
    new = self._GetArchivedJobPath(job.id)
671

    
672
    self._RenameFileUnlocked(old, new)
673

    
674
    logging.debug("Successfully archived job %s", job.id)
675

    
676
  def _GetJobInfoUnlocked(self, job, fields):
677
    row = []
678
    for fname in fields:
679
      if fname == "id":
680
        row.append(job.id)
681
      elif fname == "status":
682
        row.append(job.CalcStatus())
683
      elif fname == "ops":
684
        row.append([op.input.__getstate__() for op in job.ops])
685
      elif fname == "opresult":
686
        row.append([op.result for op in job.ops])
687
      elif fname == "opstatus":
688
        row.append([op.status for op in job.ops])
689
      else:
690
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
691
    return row
692

    
693
  @utils.LockedMethod
694
  @_RequireOpenQueue
695
  def QueryJobs(self, job_ids, fields):
696
    """Returns a list of jobs in queue.
697

698
    Args:
699
    - job_ids: Sequence of job identifiers or None for all
700
    - fields: Names of fields to return
701

702
    """
703
    jobs = []
704

    
705
    for job in self._GetJobsUnlocked(job_ids):
706
      if job is None:
707
        jobs.append(None)
708
      else:
709
        jobs.append(self._GetJobInfoUnlocked(job, fields))
710

    
711
    return jobs
712

    
713
  @utils.LockedMethod
714
  @_RequireOpenQueue
715
  def Shutdown(self):
716
    """Stops the job queue.
717

718
    """
719
    self._wpool.TerminateWorkers()
720

    
721
    self._queue_lock.Close()
722
    self._queue_lock = None