Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ b9b5abcb

History | View | Annotate | Download (42.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
81
  @ivar stop_timestamp: timestamp for the end of the execution
82

83
  """
84
  __slots__ = ["input", "status", "result", "log",
85
               "start_timestamp", "exec_timestamp", "end_timestamp",
86
               "__weakref__"]
87

    
88
  def __init__(self, op):
89
    """Constructor for the _QuededOpCode.
90

91
    @type op: L{opcodes.OpCode}
92
    @param op: the opcode we encapsulate
93

94
    """
95
    self.input = op
96
    self.status = constants.OP_STATUS_QUEUED
97
    self.result = None
98
    self.log = []
99
    self.start_timestamp = None
100
    self.exec_timestamp = None
101
    self.end_timestamp = None
102

    
103
  @classmethod
104
  def Restore(cls, state):
105
    """Restore the _QueuedOpCode from the serialized form.
106

107
    @type state: dict
108
    @param state: the serialized state
109
    @rtype: _QueuedOpCode
110
    @return: a new _QueuedOpCode instance
111

112
    """
113
    obj = _QueuedOpCode.__new__(cls)
114
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
115
    obj.status = state["status"]
116
    obj.result = state["result"]
117
    obj.log = state["log"]
118
    obj.start_timestamp = state.get("start_timestamp", None)
119
    obj.exec_timestamp = state.get("exec_timestamp", None)
120
    obj.end_timestamp = state.get("end_timestamp", None)
121
    return obj
122

    
123
  def Serialize(self):
124
    """Serializes this _QueuedOpCode.
125

126
    @rtype: dict
127
    @return: the dictionary holding the serialized state
128

129
    """
130
    return {
131
      "input": self.input.__getstate__(),
132
      "status": self.status,
133
      "result": self.result,
134
      "log": self.log,
135
      "start_timestamp": self.start_timestamp,
136
      "exec_timestamp": self.exec_timestamp,
137
      "end_timestamp": self.end_timestamp,
138
      }
139

    
140

    
141
class _QueuedJob(object):
142
  """In-memory job representation.
143

144
  This is what we use to track the user-submitted jobs. Locking must
145
  be taken care of by users of this class.
146

147
  @type queue: L{JobQueue}
148
  @ivar queue: the parent queue
149
  @ivar id: the job ID
150
  @type ops: list
151
  @ivar ops: the list of _QueuedOpCode that constitute the job
152
  @type log_serial: int
153
  @ivar log_serial: holds the index for the next log entry
154
  @ivar received_timestamp: the timestamp for when the job was received
155
  @ivar start_timestmap: the timestamp for start of execution
156
  @ivar end_timestamp: the timestamp for end of execution
157
  @ivar lock_status: In-memory locking information for debugging
158
  @ivar change: a Condition variable we use for waiting for job changes
159

160
  """
161
  # pylint: disable-msg=W0212
162
  __slots__ = ["queue", "id", "ops", "log_serial",
163
               "received_timestamp", "start_timestamp", "end_timestamp",
164
               "lock_status", "change",
165
               "__weakref__"]
166

    
167
  def __init__(self, queue, job_id, ops):
168
    """Constructor for the _QueuedJob.
169

170
    @type queue: L{JobQueue}
171
    @param queue: our parent queue
172
    @type job_id: job_id
173
    @param job_id: our job id
174
    @type ops: list
175
    @param ops: the list of opcodes we hold, which will be encapsulated
176
        in _QueuedOpCodes
177

178
    """
179
    if not ops:
180
      # TODO: use a better exception
181
      raise Exception("No opcodes")
182

    
183
    self.queue = queue
184
    self.id = job_id
185
    self.ops = [_QueuedOpCode(op) for op in ops]
186
    self.log_serial = 0
187
    self.received_timestamp = TimeStampNow()
188
    self.start_timestamp = None
189
    self.end_timestamp = None
190

    
191
    # In-memory attributes
192
    self.lock_status = None
193

    
194
    # Condition to wait for changes
195
    self.change = threading.Condition(self.queue._lock)
196

    
197
  def __repr__(self):
198
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
199
              "id=%s" % self.id,
200
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
201

    
202
    return "<%s at %#x>" % (" ".join(status), id(self))
203

    
204
  @classmethod
205
  def Restore(cls, queue, state):
206
    """Restore a _QueuedJob from serialized state:
207

208
    @type queue: L{JobQueue}
209
    @param queue: to which queue the restored job belongs
210
    @type state: dict
211
    @param state: the serialized state
212
    @rtype: _JobQueue
213
    @return: the restored _JobQueue instance
214

215
    """
216
    obj = _QueuedJob.__new__(cls)
217
    obj.queue = queue
218
    obj.id = state["id"]
219
    obj.received_timestamp = state.get("received_timestamp", None)
220
    obj.start_timestamp = state.get("start_timestamp", None)
221
    obj.end_timestamp = state.get("end_timestamp", None)
222

    
223
    # In-memory attributes
224
    obj.lock_status = None
225

    
226
    obj.ops = []
227
    obj.log_serial = 0
228
    for op_state in state["ops"]:
229
      op = _QueuedOpCode.Restore(op_state)
230
      for log_entry in op.log:
231
        obj.log_serial = max(obj.log_serial, log_entry[0])
232
      obj.ops.append(op)
233

    
234
    # Condition to wait for changes
235
    obj.change = threading.Condition(obj.queue._lock)
236

    
237
    return obj
238

    
239
  def Serialize(self):
240
    """Serialize the _JobQueue instance.
241

242
    @rtype: dict
243
    @return: the serialized state
244

245
    """
246
    return {
247
      "id": self.id,
248
      "ops": [op.Serialize() for op in self.ops],
249
      "start_timestamp": self.start_timestamp,
250
      "end_timestamp": self.end_timestamp,
251
      "received_timestamp": self.received_timestamp,
252
      }
253

    
254
  def CalcStatus(self):
255
    """Compute the status of this job.
256

257
    This function iterates over all the _QueuedOpCodes in the job and
258
    based on their status, computes the job status.
259

260
    The algorithm is:
261
      - if we find a cancelled, or finished with error, the job
262
        status will be the same
263
      - otherwise, the last opcode with the status one of:
264
          - waitlock
265
          - canceling
266
          - running
267

268
        will determine the job status
269

270
      - otherwise, it means either all opcodes are queued, or success,
271
        and the job status will be the same
272

273
    @return: the job status
274

275
    """
276
    status = constants.JOB_STATUS_QUEUED
277

    
278
    all_success = True
279
    for op in self.ops:
280
      if op.status == constants.OP_STATUS_SUCCESS:
281
        continue
282

    
283
      all_success = False
284

    
285
      if op.status == constants.OP_STATUS_QUEUED:
286
        pass
287
      elif op.status == constants.OP_STATUS_WAITLOCK:
288
        status = constants.JOB_STATUS_WAITLOCK
289
      elif op.status == constants.OP_STATUS_RUNNING:
290
        status = constants.JOB_STATUS_RUNNING
291
      elif op.status == constants.OP_STATUS_CANCELING:
292
        status = constants.JOB_STATUS_CANCELING
293
        break
294
      elif op.status == constants.OP_STATUS_ERROR:
295
        status = constants.JOB_STATUS_ERROR
296
        # The whole job fails if one opcode failed
297
        break
298
      elif op.status == constants.OP_STATUS_CANCELED:
299
        status = constants.OP_STATUS_CANCELED
300
        break
301

    
302
    if all_success:
303
      status = constants.JOB_STATUS_SUCCESS
304

    
305
    return status
306

    
307
  def GetLogEntries(self, newer_than):
308
    """Selectively returns the log entries.
309

310
    @type newer_than: None or int
311
    @param newer_than: if this is None, return all log entries,
312
        otherwise return only the log entries with serial higher
313
        than this value
314
    @rtype: list
315
    @return: the list of the log entries selected
316

317
    """
318
    if newer_than is None:
319
      serial = -1
320
    else:
321
      serial = newer_than
322

    
323
    entries = []
324
    for op in self.ops:
325
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
326

    
327
    return entries
328

    
329
  def MarkUnfinishedOps(self, status, result):
330
    """Mark unfinished opcodes with a given status and result.
331

332
    This is an utility function for marking all running or waiting to
333
    be run opcodes with a given status. Opcodes which are already
334
    finalised are not changed.
335

336
    @param status: a given opcode status
337
    @param result: the opcode result
338

339
    """
340
    not_marked = True
341
    for op in self.ops:
342
      if op.status in constants.OPS_FINALIZED:
343
        assert not_marked, "Finalized opcodes found after non-finalized ones"
344
        continue
345
      op.status = status
346
      op.result = result
347
      not_marked = False
348

    
349

    
350
class _OpExecCallbacks(mcpu.OpExecCbBase):
351
  def __init__(self, queue, job, op):
352
    """Initializes this class.
353

354
    @type queue: L{JobQueue}
355
    @param queue: Job queue
356
    @type job: L{_QueuedJob}
357
    @param job: Job object
358
    @type op: L{_QueuedOpCode}
359
    @param op: OpCode
360

361
    """
362
    assert queue, "Queue is missing"
363
    assert job, "Job is missing"
364
    assert op, "Opcode is missing"
365

    
366
    self._queue = queue
367
    self._job = job
368
    self._op = op
369

    
370
  def NotifyStart(self):
371
    """Mark the opcode as running, not lock-waiting.
372

373
    This is called from the mcpu code as a notifier function, when the LU is
374
    finally about to start the Exec() method. Of course, to have end-user
375
    visible results, the opcode must be initially (before calling into
376
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
377

378
    """
379
    self._queue.acquire()
380
    try:
381
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
382
                                 constants.OP_STATUS_CANCELING)
383

    
384
      # All locks are acquired by now
385
      self._job.lock_status = None
386

    
387
      # Cancel here if we were asked to
388
      if self._op.status == constants.OP_STATUS_CANCELING:
389
        raise CancelJob()
390

    
391
      self._op.status = constants.OP_STATUS_RUNNING
392
      self._op.exec_timestamp = TimeStampNow()
393
    finally:
394
      self._queue.release()
395

    
396
  def Feedback(self, *args):
397
    """Append a log entry.
398

399
    """
400
    assert len(args) < 3
401

    
402
    if len(args) == 1:
403
      log_type = constants.ELOG_MESSAGE
404
      log_msg = args[0]
405
    else:
406
      (log_type, log_msg) = args
407

    
408
    # The time is split to make serialization easier and not lose
409
    # precision.
410
    timestamp = utils.SplitTime(time.time())
411

    
412
    self._queue.acquire()
413
    try:
414
      self._job.log_serial += 1
415
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
416

    
417
      self._job.change.notifyAll()
418
    finally:
419
      self._queue.release()
420

    
421
  def ReportLocks(self, msg):
422
    """Write locking information to the job.
423

424
    Called whenever the LU processor is waiting for a lock or has acquired one.
425

426
    """
427
    # Not getting the queue lock because this is a single assignment
428
    self._job.lock_status = msg
429

    
430

    
431
class _JobQueueWorker(workerpool.BaseWorker):
432
  """The actual job workers.
433

434
  """
435
  def RunTask(self, job): # pylint: disable-msg=W0221
436
    """Job executor.
437

438
    This functions processes a job. It is closely tied to the _QueuedJob and
439
    _QueuedOpCode classes.
440

441
    @type job: L{_QueuedJob}
442
    @param job: the job to be processed
443

444
    """
445
    logging.info("Processing job %s", job.id)
446
    proc = mcpu.Processor(self.pool.queue.context, job.id)
447
    queue = job.queue
448
    try:
449
      try:
450
        count = len(job.ops)
451
        for idx, op in enumerate(job.ops):
452
          op_summary = op.input.Summary()
453
          if op.status == constants.OP_STATUS_SUCCESS:
454
            # this is a job that was partially completed before master
455
            # daemon shutdown, so it can be expected that some opcodes
456
            # are already completed successfully (if any did error
457
            # out, then the whole job should have been aborted and not
458
            # resubmitted for processing)
459
            logging.info("Op %s/%s: opcode %s already processed, skipping",
460
                         idx + 1, count, op_summary)
461
            continue
462
          try:
463
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
464
                         op_summary)
465

    
466
            queue.acquire()
467
            try:
468
              if op.status == constants.OP_STATUS_CANCELED:
469
                raise CancelJob()
470
              assert op.status == constants.OP_STATUS_QUEUED
471
              op.status = constants.OP_STATUS_WAITLOCK
472
              op.result = None
473
              op.start_timestamp = TimeStampNow()
474
              if idx == 0: # first opcode
475
                job.start_timestamp = op.start_timestamp
476
              queue.UpdateJobUnlocked(job)
477

    
478
              input_opcode = op.input
479
            finally:
480
              queue.release()
481

    
482
            # Make sure not to hold queue lock while calling ExecOpCode
483
            result = proc.ExecOpCode(input_opcode,
484
                                     _OpExecCallbacks(queue, job, op))
485

    
486
            queue.acquire()
487
            try:
488
              op.status = constants.OP_STATUS_SUCCESS
489
              op.result = result
490
              op.end_timestamp = TimeStampNow()
491
              queue.UpdateJobUnlocked(job)
492
            finally:
493
              queue.release()
494

    
495
            logging.info("Op %s/%s: Successfully finished opcode %s",
496
                         idx + 1, count, op_summary)
497
          except CancelJob:
498
            # Will be handled further up
499
            raise
500
          except Exception, err:
501
            queue.acquire()
502
            try:
503
              try:
504
                op.status = constants.OP_STATUS_ERROR
505
                if isinstance(err, errors.GenericError):
506
                  op.result = errors.EncodeException(err)
507
                else:
508
                  op.result = str(err)
509
                op.end_timestamp = TimeStampNow()
510
                logging.info("Op %s/%s: Error in opcode %s: %s",
511
                             idx + 1, count, op_summary, err)
512
              finally:
513
                queue.UpdateJobUnlocked(job)
514
            finally:
515
              queue.release()
516
            raise
517

    
518
      except CancelJob:
519
        queue.acquire()
520
        try:
521
          queue.CancelJobUnlocked(job)
522
        finally:
523
          queue.release()
524
      except errors.GenericError, err:
525
        logging.exception("Ganeti exception")
526
      except:
527
        logging.exception("Unhandled exception")
528
    finally:
529
      queue.acquire()
530
      try:
531
        try:
532
          job.lock_status = None
533
          job.end_timestamp = TimeStampNow()
534
          queue.UpdateJobUnlocked(job)
535
        finally:
536
          job_id = job.id
537
          status = job.CalcStatus()
538
      finally:
539
        queue.release()
540

    
541
      logging.info("Finished job %s, status = %s", job_id, status)
542

    
543

    
544
class _JobQueueWorkerPool(workerpool.WorkerPool):
545
  """Simple class implementing a job-processing workerpool.
546

547
  """
548
  def __init__(self, queue):
549
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
550
                                              JOBQUEUE_THREADS,
551
                                              _JobQueueWorker)
552
    self.queue = queue
553

    
554

    
555
def _RequireOpenQueue(fn):
556
  """Decorator for "public" functions.
557

558
  This function should be used for all 'public' functions. That is,
559
  functions usually called from other classes. Note that this should
560
  be applied only to methods (not plain functions), since it expects
561
  that the decorated function is called with a first argument that has
562
  a '_queue_lock' argument.
563

564
  @warning: Use this decorator only after utils.LockedMethod!
565

566
  Example::
567
    @utils.LockedMethod
568
    @_RequireOpenQueue
569
    def Example(self):
570
      pass
571

572
  """
573
  def wrapper(self, *args, **kwargs):
574
    # pylint: disable-msg=W0212
575
    assert self._queue_lock is not None, "Queue should be open"
576
    return fn(self, *args, **kwargs)
577
  return wrapper
578

    
579

    
580
class JobQueue(object):
581
  """Queue used to manage the jobs.
582

583
  @cvar _RE_JOB_FILE: regex matching the valid job file names
584

585
  """
586
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
587

    
588
  def __init__(self, context):
589
    """Constructor for JobQueue.
590

591
    The constructor will initialize the job queue object and then
592
    start loading the current jobs from disk, either for starting them
593
    (if they were queue) or for aborting them (if they were already
594
    running).
595

596
    @type context: GanetiContext
597
    @param context: the context object for access to the configuration
598
        data and other ganeti objects
599

600
    """
601
    self.context = context
602
    self._memcache = weakref.WeakValueDictionary()
603
    self._my_hostname = utils.HostInfo().name
604

    
605
    # Locking
606
    self._lock = threading.Lock()
607
    self.acquire = self._lock.acquire
608
    self.release = self._lock.release
609

    
610
    # Initialize
611
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
612

    
613
    # Read serial file
614
    self._last_serial = jstore.ReadSerial()
615
    assert self._last_serial is not None, ("Serial file was modified between"
616
                                           " check in jstore and here")
617

    
618
    # Get initial list of nodes
619
    self._nodes = dict((n.name, n.primary_ip)
620
                       for n in self.context.cfg.GetAllNodesInfo().values()
621
                       if n.master_candidate)
622

    
623
    # Remove master node
624
    try:
625
      del self._nodes[self._my_hostname]
626
    except KeyError:
627
      pass
628

    
629
    # TODO: Check consistency across nodes
630

    
631
    # Setup worker pool
632
    self._wpool = _JobQueueWorkerPool(self)
633
    try:
634
      # We need to lock here because WorkerPool.AddTask() may start a job while
635
      # we're still doing our work.
636
      self.acquire()
637
      try:
638
        logging.info("Inspecting job queue")
639

    
640
        all_job_ids = self._GetJobIDsUnlocked()
641
        jobs_count = len(all_job_ids)
642
        lastinfo = time.time()
643
        for idx, job_id in enumerate(all_job_ids):
644
          # Give an update every 1000 jobs or 10 seconds
645
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
646
              idx == (jobs_count - 1)):
647
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
648
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
649
            lastinfo = time.time()
650

    
651
          job = self._LoadJobUnlocked(job_id)
652

    
653
          # a failure in loading the job can cause 'None' to be returned
654
          if job is None:
655
            continue
656

    
657
          status = job.CalcStatus()
658

    
659
          if status in (constants.JOB_STATUS_QUEUED, ):
660
            self._wpool.AddTask(job)
661

    
662
          elif status in (constants.JOB_STATUS_RUNNING,
663
                          constants.JOB_STATUS_WAITLOCK,
664
                          constants.JOB_STATUS_CANCELING):
665
            logging.warning("Unfinished job %s found: %s", job.id, job)
666
            try:
667
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
668
                                    "Unclean master daemon shutdown")
669
            finally:
670
              self.UpdateJobUnlocked(job)
671

    
672
        logging.info("Job queue inspection finished")
673
      finally:
674
        self.release()
675
    except:
676
      self._wpool.TerminateWorkers()
677
      raise
678

    
679
  @utils.LockedMethod
680
  @_RequireOpenQueue
681
  def AddNode(self, node):
682
    """Register a new node with the queue.
683

684
    @type node: L{objects.Node}
685
    @param node: the node object to be added
686

687
    """
688
    node_name = node.name
689
    assert node_name != self._my_hostname
690

    
691
    # Clean queue directory on added node
692
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
693
    msg = result.fail_msg
694
    if msg:
695
      logging.warning("Cannot cleanup queue directory on node %s: %s",
696
                      node_name, msg)
697

    
698
    if not node.master_candidate:
699
      # remove if existing, ignoring errors
700
      self._nodes.pop(node_name, None)
701
      # and skip the replication of the job ids
702
      return
703

    
704
    # Upload the whole queue excluding archived jobs
705
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
706

    
707
    # Upload current serial file
708
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
709

    
710
    for file_name in files:
711
      # Read file content
712
      content = utils.ReadFile(file_name)
713

    
714
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
715
                                                  [node.primary_ip],
716
                                                  file_name, content)
717
      msg = result[node_name].fail_msg
718
      if msg:
719
        logging.error("Failed to upload file %s to node %s: %s",
720
                      file_name, node_name, msg)
721

    
722
    self._nodes[node_name] = node.primary_ip
723

    
724
  @utils.LockedMethod
725
  @_RequireOpenQueue
726
  def RemoveNode(self, node_name):
727
    """Callback called when removing nodes from the cluster.
728

729
    @type node_name: str
730
    @param node_name: the name of the node to remove
731

732
    """
733
    try:
734
      # The queue is removed by the "leave node" RPC call.
735
      del self._nodes[node_name]
736
    except KeyError:
737
      pass
738

    
739
  @staticmethod
740
  def _CheckRpcResult(result, nodes, failmsg):
741
    """Verifies the status of an RPC call.
742

743
    Since we aim to keep consistency should this node (the current
744
    master) fail, we will log errors if our rpc fail, and especially
745
    log the case when more than half of the nodes fails.
746

747
    @param result: the data as returned from the rpc call
748
    @type nodes: list
749
    @param nodes: the list of nodes we made the call to
750
    @type failmsg: str
751
    @param failmsg: the identifier to be used for logging
752

753
    """
754
    failed = []
755
    success = []
756

    
757
    for node in nodes:
758
      msg = result[node].fail_msg
759
      if msg:
760
        failed.append(node)
761
        logging.error("RPC call %s (%s) failed on node %s: %s",
762
                      result[node].call, failmsg, node, msg)
763
      else:
764
        success.append(node)
765

    
766
    # +1 for the master node
767
    if (len(success) + 1) < len(failed):
768
      # TODO: Handle failing nodes
769
      logging.error("More than half of the nodes failed")
770

    
771
  def _GetNodeIp(self):
772
    """Helper for returning the node name/ip list.
773

774
    @rtype: (list, list)
775
    @return: a tuple of two lists, the first one with the node
776
        names and the second one with the node addresses
777

778
    """
779
    name_list = self._nodes.keys()
780
    addr_list = [self._nodes[name] for name in name_list]
781
    return name_list, addr_list
782

    
783
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
784
    """Writes a file locally and then replicates it to all nodes.
785

786
    This function will replace the contents of a file on the local
787
    node and then replicate it to all the other nodes we have.
788

789
    @type file_name: str
790
    @param file_name: the path of the file to be replicated
791
    @type data: str
792
    @param data: the new contents of the file
793

794
    """
795
    utils.WriteFile(file_name, data=data)
796

    
797
    names, addrs = self._GetNodeIp()
798
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
799
    self._CheckRpcResult(result, self._nodes,
800
                         "Updating %s" % file_name)
801

    
802
  def _RenameFilesUnlocked(self, rename):
803
    """Renames a file locally and then replicate the change.
804

805
    This function will rename a file in the local queue directory
806
    and then replicate this rename to all the other nodes we have.
807

808
    @type rename: list of (old, new)
809
    @param rename: List containing tuples mapping old to new names
810

811
    """
812
    # Rename them locally
813
    for old, new in rename:
814
      utils.RenameFile(old, new, mkdir=True)
815

    
816
    # ... and on all nodes
817
    names, addrs = self._GetNodeIp()
818
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
819
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
820

    
821
  @staticmethod
822
  def _FormatJobID(job_id):
823
    """Convert a job ID to string format.
824

825
    Currently this just does C{str(job_id)} after performing some
826
    checks, but if we want to change the job id format this will
827
    abstract this change.
828

829
    @type job_id: int or long
830
    @param job_id: the numeric job id
831
    @rtype: str
832
    @return: the formatted job id
833

834
    """
835
    if not isinstance(job_id, (int, long)):
836
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
837
    if job_id < 0:
838
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
839

    
840
    return str(job_id)
841

    
842
  @classmethod
843
  def _GetArchiveDirectory(cls, job_id):
844
    """Returns the archive directory for a job.
845

846
    @type job_id: str
847
    @param job_id: Job identifier
848
    @rtype: str
849
    @return: Directory name
850

851
    """
852
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
853

    
854
  def _NewSerialsUnlocked(self, count):
855
    """Generates a new job identifier.
856

857
    Job identifiers are unique during the lifetime of a cluster.
858

859
    @type count: integer
860
    @param count: how many serials to return
861
    @rtype: str
862
    @return: a string representing the job identifier.
863

864
    """
865
    assert count > 0
866
    # New number
867
    serial = self._last_serial + count
868

    
869
    # Write to file
870
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
871
                                        "%s\n" % serial)
872

    
873
    result = [self._FormatJobID(v)
874
              for v in range(self._last_serial, serial + 1)]
875
    # Keep it only if we were able to write the file
876
    self._last_serial = serial
877

    
878
    return result
879

    
880
  @staticmethod
881
  def _GetJobPath(job_id):
882
    """Returns the job file for a given job id.
883

884
    @type job_id: str
885
    @param job_id: the job identifier
886
    @rtype: str
887
    @return: the path to the job file
888

889
    """
890
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
891

    
892
  @classmethod
893
  def _GetArchivedJobPath(cls, job_id):
894
    """Returns the archived job file for a give job id.
895

896
    @type job_id: str
897
    @param job_id: the job identifier
898
    @rtype: str
899
    @return: the path to the archived job file
900

901
    """
902
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
903
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
904

    
905
  @classmethod
906
  def _ExtractJobID(cls, name):
907
    """Extract the job id from a filename.
908

909
    @type name: str
910
    @param name: the job filename
911
    @rtype: job id or None
912
    @return: the job id corresponding to the given filename,
913
        or None if the filename does not represent a valid
914
        job file
915

916
    """
917
    m = cls._RE_JOB_FILE.match(name)
918
    if m:
919
      return m.group(1)
920
    else:
921
      return None
922

    
923
  def _GetJobIDsUnlocked(self, archived=False):
924
    """Return all known job IDs.
925

926
    If the parameter archived is True, archived jobs IDs will be
927
    included. Currently this argument is unused.
928

929
    The method only looks at disk because it's a requirement that all
930
    jobs are present on disk (so in the _memcache we don't have any
931
    extra IDs).
932

933
    @rtype: list
934
    @return: the list of job IDs
935

936
    """
937
    # pylint: disable-msg=W0613
938
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
939
    jlist = utils.NiceSort(jlist)
940
    return jlist
941

    
942
  def _ListJobFiles(self):
943
    """Returns the list of current job files.
944

945
    @rtype: list
946
    @return: the list of job file names
947

948
    """
949
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
950
            if self._RE_JOB_FILE.match(name)]
951

    
952
  def _LoadJobUnlocked(self, job_id):
953
    """Loads a job from the disk or memory.
954

955
    Given a job id, this will return the cached job object if
956
    existing, or try to load the job from the disk. If loading from
957
    disk, it will also add the job to the cache.
958

959
    @param job_id: the job id
960
    @rtype: L{_QueuedJob} or None
961
    @return: either None or the job object
962

963
    """
964
    job = self._memcache.get(job_id, None)
965
    if job:
966
      logging.debug("Found job %s in memcache", job_id)
967
      return job
968

    
969
    filepath = self._GetJobPath(job_id)
970
    logging.debug("Loading job from %s", filepath)
971
    try:
972
      raw_data = utils.ReadFile(filepath)
973
    except IOError, err:
974
      if err.errno in (errno.ENOENT, ):
975
        return None
976
      raise
977

    
978
    data = serializer.LoadJson(raw_data)
979

    
980
    try:
981
      job = _QueuedJob.Restore(self, data)
982
    except Exception, err: # pylint: disable-msg=W0703
983
      new_path = self._GetArchivedJobPath(job_id)
984
      if filepath == new_path:
985
        # job already archived (future case)
986
        logging.exception("Can't parse job %s", job_id)
987
      else:
988
        # non-archived case
989
        logging.exception("Can't parse job %s, will archive.", job_id)
990
        self._RenameFilesUnlocked([(filepath, new_path)])
991
      return None
992

    
993
    self._memcache[job_id] = job
994
    logging.debug("Added job %s to the cache", job_id)
995
    return job
996

    
997
  def _GetJobsUnlocked(self, job_ids):
998
    """Return a list of jobs based on their IDs.
999

1000
    @type job_ids: list
1001
    @param job_ids: either an empty list (meaning all jobs),
1002
        or a list of job IDs
1003
    @rtype: list
1004
    @return: the list of job objects
1005

1006
    """
1007
    if not job_ids:
1008
      job_ids = self._GetJobIDsUnlocked()
1009

    
1010
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1011

    
1012
  @staticmethod
1013
  def _IsQueueMarkedDrain():
1014
    """Check if the queue is marked from drain.
1015

1016
    This currently uses the queue drain file, which makes it a
1017
    per-node flag. In the future this can be moved to the config file.
1018

1019
    @rtype: boolean
1020
    @return: True of the job queue is marked for draining
1021

1022
    """
1023
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1024

    
1025
  @staticmethod
1026
  def SetDrainFlag(drain_flag):
1027
    """Sets the drain flag for the queue.
1028

1029
    This is similar to the function L{backend.JobQueueSetDrainFlag},
1030
    and in the future we might merge them.
1031

1032
    @type drain_flag: boolean
1033
    @param drain_flag: Whether to set or unset the drain flag
1034

1035
    """
1036
    if drain_flag:
1037
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1038
    else:
1039
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1040
    return True
1041

    
1042
  @_RequireOpenQueue
1043
  def _SubmitJobUnlocked(self, job_id, ops):
1044
    """Create and store a new job.
1045

1046
    This enters the job into our job queue and also puts it on the new
1047
    queue, in order for it to be picked up by the queue processors.
1048

1049
    @type job_id: job ID
1050
    @param job_id: the job ID for the new job
1051
    @type ops: list
1052
    @param ops: The list of OpCodes that will become the new job.
1053
    @rtype: job ID
1054
    @return: the job ID of the newly created job
1055
    @raise errors.JobQueueDrainError: if the job is marked for draining
1056

1057
    """
1058
    if self._IsQueueMarkedDrain():
1059
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1060

    
1061
    # Check job queue size
1062
    size = len(self._ListJobFiles())
1063
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1064
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1065
      # submission, though.
1066
      #size = ...
1067
      pass
1068

    
1069
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1070
      raise errors.JobQueueFull()
1071

    
1072
    job = _QueuedJob(self, job_id, ops)
1073

    
1074
    # Write to disk
1075
    self.UpdateJobUnlocked(job)
1076

    
1077
    logging.debug("Adding new job %s to the cache", job_id)
1078
    self._memcache[job_id] = job
1079

    
1080
    # Add to worker pool
1081
    self._wpool.AddTask(job)
1082

    
1083
    return job.id
1084

    
1085
  @utils.LockedMethod
1086
  @_RequireOpenQueue
1087
  def SubmitJob(self, ops):
1088
    """Create and store a new job.
1089

1090
    @see: L{_SubmitJobUnlocked}
1091

1092
    """
1093
    job_id = self._NewSerialsUnlocked(1)[0]
1094
    return self._SubmitJobUnlocked(job_id, ops)
1095

    
1096
  @utils.LockedMethod
1097
  @_RequireOpenQueue
1098
  def SubmitManyJobs(self, jobs):
1099
    """Create and store multiple jobs.
1100

1101
    @see: L{_SubmitJobUnlocked}
1102

1103
    """
1104
    results = []
1105
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1106
    for job_id, ops in zip(all_job_ids, jobs):
1107
      try:
1108
        data = self._SubmitJobUnlocked(job_id, ops)
1109
        status = True
1110
      except errors.GenericError, err:
1111
        data = str(err)
1112
        status = False
1113
      results.append((status, data))
1114

    
1115
    return results
1116

    
1117
  @_RequireOpenQueue
1118
  def UpdateJobUnlocked(self, job):
1119
    """Update a job's on disk storage.
1120

1121
    After a job has been modified, this function needs to be called in
1122
    order to write the changes to disk and replicate them to the other
1123
    nodes.
1124

1125
    @type job: L{_QueuedJob}
1126
    @param job: the changed job
1127

1128
    """
1129
    filename = self._GetJobPath(job.id)
1130
    data = serializer.DumpJson(job.Serialize(), indent=False)
1131
    logging.debug("Writing job %s to %s", job.id, filename)
1132
    self._WriteAndReplicateFileUnlocked(filename, data)
1133

    
1134
    # Notify waiters about potential changes
1135
    job.change.notifyAll()
1136

    
1137
  @utils.LockedMethod
1138
  @_RequireOpenQueue
1139
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1140
                        timeout):
1141
    """Waits for changes in a job.
1142

1143
    @type job_id: string
1144
    @param job_id: Job identifier
1145
    @type fields: list of strings
1146
    @param fields: Which fields to check for changes
1147
    @type prev_job_info: list or None
1148
    @param prev_job_info: Last job information returned
1149
    @type prev_log_serial: int
1150
    @param prev_log_serial: Last job message serial number
1151
    @type timeout: float
1152
    @param timeout: maximum time to wait
1153
    @rtype: tuple (job info, log entries)
1154
    @return: a tuple of the job information as required via
1155
        the fields parameter, and the log entries as a list
1156

1157
        if the job has not changed and the timeout has expired,
1158
        we instead return a special value,
1159
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1160
        as such by the clients
1161

1162
    """
1163
    job = self._LoadJobUnlocked(job_id)
1164
    if not job:
1165
      logging.debug("Job %s not found", job_id)
1166
      return None
1167

    
1168
    def _CheckForChanges():
1169
      logging.debug("Waiting for changes in job %s", job_id)
1170

    
1171
      status = job.CalcStatus()
1172
      job_info = self._GetJobInfoUnlocked(job, fields)
1173
      log_entries = job.GetLogEntries(prev_log_serial)
1174

    
1175
      # Serializing and deserializing data can cause type changes (e.g. from
1176
      # tuple to list) or precision loss. We're doing it here so that we get
1177
      # the same modifications as the data received from the client. Without
1178
      # this, the comparison afterwards might fail without the data being
1179
      # significantly different.
1180
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1181
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1182

    
1183
      # Don't even try to wait if the job is no longer running, there will be
1184
      # no changes.
1185
      if (status not in (constants.JOB_STATUS_QUEUED,
1186
                         constants.JOB_STATUS_RUNNING,
1187
                         constants.JOB_STATUS_WAITLOCK) or
1188
          prev_job_info != job_info or
1189
          (log_entries and prev_log_serial != log_entries[0][0])):
1190
        logging.debug("Job %s changed", job_id)
1191
        return (job_info, log_entries)
1192

    
1193
      raise utils.RetryAgain()
1194

    
1195
    try:
1196
      # Setting wait function to release the queue lock while waiting
1197
      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1198
                         wait_fn=job.change.wait)
1199
    except utils.RetryTimeout:
1200
      return constants.JOB_NOTCHANGED
1201

    
1202
  @utils.LockedMethod
1203
  @_RequireOpenQueue
1204
  def CancelJob(self, job_id):
1205
    """Cancels a job.
1206

1207
    This will only succeed if the job has not started yet.
1208

1209
    @type job_id: string
1210
    @param job_id: job ID of job to be cancelled.
1211

1212
    """
1213
    logging.info("Cancelling job %s", job_id)
1214

    
1215
    job = self._LoadJobUnlocked(job_id)
1216
    if not job:
1217
      logging.debug("Job %s not found", job_id)
1218
      return (False, "Job %s not found" % job_id)
1219

    
1220
    job_status = job.CalcStatus()
1221

    
1222
    if job_status not in (constants.JOB_STATUS_QUEUED,
1223
                          constants.JOB_STATUS_WAITLOCK):
1224
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1225
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1226

    
1227
    if job_status == constants.JOB_STATUS_QUEUED:
1228
      self.CancelJobUnlocked(job)
1229
      return (True, "Job %s canceled" % job.id)
1230

    
1231
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1232
      # The worker will notice the new status and cancel the job
1233
      try:
1234
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1235
      finally:
1236
        self.UpdateJobUnlocked(job)
1237
      return (True, "Job %s will be canceled" % job.id)
1238

    
1239
  @_RequireOpenQueue
1240
  def CancelJobUnlocked(self, job):
1241
    """Marks a job as canceled.
1242

1243
    """
1244
    try:
1245
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1246
                            "Job canceled by request")
1247
    finally:
1248
      self.UpdateJobUnlocked(job)
1249

    
1250
  @_RequireOpenQueue
1251
  def _ArchiveJobsUnlocked(self, jobs):
1252
    """Archives jobs.
1253

1254
    @type jobs: list of L{_QueuedJob}
1255
    @param jobs: Job objects
1256
    @rtype: int
1257
    @return: Number of archived jobs
1258

1259
    """
1260
    archive_jobs = []
1261
    rename_files = []
1262
    for job in jobs:
1263
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1264
                                  constants.JOB_STATUS_SUCCESS,
1265
                                  constants.JOB_STATUS_ERROR):
1266
        logging.debug("Job %s is not yet done", job.id)
1267
        continue
1268

    
1269
      archive_jobs.append(job)
1270

    
1271
      old = self._GetJobPath(job.id)
1272
      new = self._GetArchivedJobPath(job.id)
1273
      rename_files.append((old, new))
1274

    
1275
    # TODO: What if 1..n files fail to rename?
1276
    self._RenameFilesUnlocked(rename_files)
1277

    
1278
    logging.debug("Successfully archived job(s) %s",
1279
                  utils.CommaJoin(job.id for job in archive_jobs))
1280

    
1281
    return len(archive_jobs)
1282

    
1283
  @utils.LockedMethod
1284
  @_RequireOpenQueue
1285
  def ArchiveJob(self, job_id):
1286
    """Archives a job.
1287

1288
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1289

1290
    @type job_id: string
1291
    @param job_id: Job ID of job to be archived.
1292
    @rtype: bool
1293
    @return: Whether job was archived
1294

1295
    """
1296
    logging.info("Archiving job %s", job_id)
1297

    
1298
    job = self._LoadJobUnlocked(job_id)
1299
    if not job:
1300
      logging.debug("Job %s not found", job_id)
1301
      return False
1302

    
1303
    return self._ArchiveJobsUnlocked([job]) == 1
1304

    
1305
  @utils.LockedMethod
1306
  @_RequireOpenQueue
1307
  def AutoArchiveJobs(self, age, timeout):
1308
    """Archives all jobs based on age.
1309

1310
    The method will archive all jobs which are older than the age
1311
    parameter. For jobs that don't have an end timestamp, the start
1312
    timestamp will be considered. The special '-1' age will cause
1313
    archival of all jobs (that are not running or queued).
1314

1315
    @type age: int
1316
    @param age: the minimum age in seconds
1317

1318
    """
1319
    logging.info("Archiving jobs with age more than %s seconds", age)
1320

    
1321
    now = time.time()
1322
    end_time = now + timeout
1323
    archived_count = 0
1324
    last_touched = 0
1325

    
1326
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1327
    pending = []
1328
    for idx, job_id in enumerate(all_job_ids):
1329
      last_touched = idx + 1
1330

    
1331
      # Not optimal because jobs could be pending
1332
      # TODO: Measure average duration for job archival and take number of
1333
      # pending jobs into account.
1334
      if time.time() > end_time:
1335
        break
1336

    
1337
      # Returns None if the job failed to load
1338
      job = self._LoadJobUnlocked(job_id)
1339
      if job:
1340
        if job.end_timestamp is None:
1341
          if job.start_timestamp is None:
1342
            job_age = job.received_timestamp
1343
          else:
1344
            job_age = job.start_timestamp
1345
        else:
1346
          job_age = job.end_timestamp
1347

    
1348
        if age == -1 or now - job_age[0] > age:
1349
          pending.append(job)
1350

    
1351
          # Archive 10 jobs at a time
1352
          if len(pending) >= 10:
1353
            archived_count += self._ArchiveJobsUnlocked(pending)
1354
            pending = []
1355

    
1356
    if pending:
1357
      archived_count += self._ArchiveJobsUnlocked(pending)
1358

    
1359
    return (archived_count, len(all_job_ids) - last_touched)
1360

    
1361
  @staticmethod
1362
  def _GetJobInfoUnlocked(job, fields):
1363
    """Returns information about a job.
1364

1365
    @type job: L{_QueuedJob}
1366
    @param job: the job which we query
1367
    @type fields: list
1368
    @param fields: names of fields to return
1369
    @rtype: list
1370
    @return: list with one element for each field
1371
    @raise errors.OpExecError: when an invalid field
1372
        has been passed
1373

1374
    """
1375
    row = []
1376
    for fname in fields:
1377
      if fname == "id":
1378
        row.append(job.id)
1379
      elif fname == "status":
1380
        row.append(job.CalcStatus())
1381
      elif fname == "ops":
1382
        row.append([op.input.__getstate__() for op in job.ops])
1383
      elif fname == "opresult":
1384
        row.append([op.result for op in job.ops])
1385
      elif fname == "opstatus":
1386
        row.append([op.status for op in job.ops])
1387
      elif fname == "oplog":
1388
        row.append([op.log for op in job.ops])
1389
      elif fname == "opstart":
1390
        row.append([op.start_timestamp for op in job.ops])
1391
      elif fname == "opexec":
1392
        row.append([op.exec_timestamp for op in job.ops])
1393
      elif fname == "opend":
1394
        row.append([op.end_timestamp for op in job.ops])
1395
      elif fname == "received_ts":
1396
        row.append(job.received_timestamp)
1397
      elif fname == "start_ts":
1398
        row.append(job.start_timestamp)
1399
      elif fname == "end_ts":
1400
        row.append(job.end_timestamp)
1401
      elif fname == "lock_status":
1402
        row.append(job.lock_status)
1403
      elif fname == "summary":
1404
        row.append([op.input.Summary() for op in job.ops])
1405
      else:
1406
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1407
    return row
1408

    
1409
  @utils.LockedMethod
1410
  @_RequireOpenQueue
1411
  def QueryJobs(self, job_ids, fields):
1412
    """Returns a list of jobs in queue.
1413

1414
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1415
    processing for each job.
1416

1417
    @type job_ids: list
1418
    @param job_ids: sequence of job identifiers or None for all
1419
    @type fields: list
1420
    @param fields: names of fields to return
1421
    @rtype: list
1422
    @return: list one element per job, each element being list with
1423
        the requested fields
1424

1425
    """
1426
    jobs = []
1427

    
1428
    for job in self._GetJobsUnlocked(job_ids):
1429
      if job is None:
1430
        jobs.append(None)
1431
      else:
1432
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1433

    
1434
    return jobs
1435

    
1436
  @utils.LockedMethod
1437
  @_RequireOpenQueue
1438
  def Shutdown(self):
1439
    """Stops the job queue.
1440

1441
    This shutdowns all the worker threads an closes the queue.
1442

1443
    """
1444
    self._wpool.TerminateWorkers()
1445

    
1446
    self._queue_lock.Close()
1447
    self._queue_lock = None