Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 6abf7f2c

History | View | Annotate | Download (42 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
81
  @ivar stop_timestamp: timestamp for the end of the execution
82

83
  """
84
  __slots__ = ["input", "status", "result", "log",
85
               "start_timestamp", "exec_timestamp", "end_timestamp",
86
               "__weakref__"]
87

    
88
  def __init__(self, op):
89
    """Constructor for the _QuededOpCode.
90

91
    @type op: L{opcodes.OpCode}
92
    @param op: the opcode we encapsulate
93

94
    """
95
    self.input = op
96
    self.status = constants.OP_STATUS_QUEUED
97
    self.result = None
98
    self.log = []
99
    self.start_timestamp = None
100
    self.exec_timestamp = None
101
    self.end_timestamp = None
102

    
103
  @classmethod
104
  def Restore(cls, state):
105
    """Restore the _QueuedOpCode from the serialized form.
106

107
    @type state: dict
108
    @param state: the serialized state
109
    @rtype: _QueuedOpCode
110
    @return: a new _QueuedOpCode instance
111

112
    """
113
    obj = _QueuedOpCode.__new__(cls)
114
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
115
    obj.status = state["status"]
116
    obj.result = state["result"]
117
    obj.log = state["log"]
118
    obj.start_timestamp = state.get("start_timestamp", None)
119
    obj.exec_timestamp = state.get("exec_timestamp", None)
120
    obj.end_timestamp = state.get("end_timestamp", None)
121
    return obj
122

    
123
  def Serialize(self):
124
    """Serializes this _QueuedOpCode.
125

126
    @rtype: dict
127
    @return: the dictionary holding the serialized state
128

129
    """
130
    return {
131
      "input": self.input.__getstate__(),
132
      "status": self.status,
133
      "result": self.result,
134
      "log": self.log,
135
      "start_timestamp": self.start_timestamp,
136
      "exec_timestamp": self.exec_timestamp,
137
      "end_timestamp": self.end_timestamp,
138
      }
139

    
140

    
141
class _QueuedJob(object):
142
  """In-memory job representation.
143

144
  This is what we use to track the user-submitted jobs. Locking must
145
  be taken care of by users of this class.
146

147
  @type queue: L{JobQueue}
148
  @ivar queue: the parent queue
149
  @ivar id: the job ID
150
  @type ops: list
151
  @ivar ops: the list of _QueuedOpCode that constitute the job
152
  @type log_serial: int
153
  @ivar log_serial: holds the index for the next log entry
154
  @ivar received_timestamp: the timestamp for when the job was received
155
  @ivar start_timestmap: the timestamp for start of execution
156
  @ivar end_timestamp: the timestamp for end of execution
157
  @ivar lock_status: In-memory locking information for debugging
158
  @ivar change: a Condition variable we use for waiting for job changes
159

160
  """
161
  # pylint: disable-msg=W0212
162
  __slots__ = ["queue", "id", "ops", "log_serial",
163
               "received_timestamp", "start_timestamp", "end_timestamp",
164
               "lock_status", "change",
165
               "__weakref__"]
166

    
167
  def __init__(self, queue, job_id, ops):
168
    """Constructor for the _QueuedJob.
169

170
    @type queue: L{JobQueue}
171
    @param queue: our parent queue
172
    @type job_id: job_id
173
    @param job_id: our job id
174
    @type ops: list
175
    @param ops: the list of opcodes we hold, which will be encapsulated
176
        in _QueuedOpCodes
177

178
    """
179
    if not ops:
180
      # TODO: use a better exception
181
      raise Exception("No opcodes")
182

    
183
    self.queue = queue
184
    self.id = job_id
185
    self.ops = [_QueuedOpCode(op) for op in ops]
186
    self.log_serial = 0
187
    self.received_timestamp = TimeStampNow()
188
    self.start_timestamp = None
189
    self.end_timestamp = None
190

    
191
    # In-memory attributes
192
    self.lock_status = None
193

    
194
    # Condition to wait for changes
195
    self.change = threading.Condition(self.queue._lock)
196

    
197
  def __repr__(self):
198
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
199
              "id=%s" % self.id,
200
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
201

    
202
    return "<%s at %#x>" % (" ".join(status), id(self))
203

    
204
  @classmethod
205
  def Restore(cls, queue, state):
206
    """Restore a _QueuedJob from serialized state:
207

208
    @type queue: L{JobQueue}
209
    @param queue: to which queue the restored job belongs
210
    @type state: dict
211
    @param state: the serialized state
212
    @rtype: _JobQueue
213
    @return: the restored _JobQueue instance
214

215
    """
216
    obj = _QueuedJob.__new__(cls)
217
    obj.queue = queue
218
    obj.id = state["id"]
219
    obj.received_timestamp = state.get("received_timestamp", None)
220
    obj.start_timestamp = state.get("start_timestamp", None)
221
    obj.end_timestamp = state.get("end_timestamp", None)
222

    
223
    # In-memory attributes
224
    obj.lock_status = None
225

    
226
    obj.ops = []
227
    obj.log_serial = 0
228
    for op_state in state["ops"]:
229
      op = _QueuedOpCode.Restore(op_state)
230
      for log_entry in op.log:
231
        obj.log_serial = max(obj.log_serial, log_entry[0])
232
      obj.ops.append(op)
233

    
234
    # Condition to wait for changes
235
    obj.change = threading.Condition(obj.queue._lock)
236

    
237
    return obj
238

    
239
  def Serialize(self):
240
    """Serialize the _JobQueue instance.
241

242
    @rtype: dict
243
    @return: the serialized state
244

245
    """
246
    return {
247
      "id": self.id,
248
      "ops": [op.Serialize() for op in self.ops],
249
      "start_timestamp": self.start_timestamp,
250
      "end_timestamp": self.end_timestamp,
251
      "received_timestamp": self.received_timestamp,
252
      }
253

    
254
  def CalcStatus(self):
255
    """Compute the status of this job.
256

257
    This function iterates over all the _QueuedOpCodes in the job and
258
    based on their status, computes the job status.
259

260
    The algorithm is:
261
      - if we find a cancelled, or finished with error, the job
262
        status will be the same
263
      - otherwise, the last opcode with the status one of:
264
          - waitlock
265
          - canceling
266
          - running
267

268
        will determine the job status
269

270
      - otherwise, it means either all opcodes are queued, or success,
271
        and the job status will be the same
272

273
    @return: the job status
274

275
    """
276
    status = constants.JOB_STATUS_QUEUED
277

    
278
    all_success = True
279
    for op in self.ops:
280
      if op.status == constants.OP_STATUS_SUCCESS:
281
        continue
282

    
283
      all_success = False
284

    
285
      if op.status == constants.OP_STATUS_QUEUED:
286
        pass
287
      elif op.status == constants.OP_STATUS_WAITLOCK:
288
        status = constants.JOB_STATUS_WAITLOCK
289
      elif op.status == constants.OP_STATUS_RUNNING:
290
        status = constants.JOB_STATUS_RUNNING
291
      elif op.status == constants.OP_STATUS_CANCELING:
292
        status = constants.JOB_STATUS_CANCELING
293
        break
294
      elif op.status == constants.OP_STATUS_ERROR:
295
        status = constants.JOB_STATUS_ERROR
296
        # The whole job fails if one opcode failed
297
        break
298
      elif op.status == constants.OP_STATUS_CANCELED:
299
        status = constants.OP_STATUS_CANCELED
300
        break
301

    
302
    if all_success:
303
      status = constants.JOB_STATUS_SUCCESS
304

    
305
    return status
306

    
307
  def GetLogEntries(self, newer_than):
308
    """Selectively returns the log entries.
309

310
    @type newer_than: None or int
311
    @param newer_than: if this is None, return all log entries,
312
        otherwise return only the log entries with serial higher
313
        than this value
314
    @rtype: list
315
    @return: the list of the log entries selected
316

317
    """
318
    if newer_than is None:
319
      serial = -1
320
    else:
321
      serial = newer_than
322

    
323
    entries = []
324
    for op in self.ops:
325
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
326

    
327
    return entries
328

    
329
  def MarkUnfinishedOps(self, status, result):
330
    """Mark unfinished opcodes with a given status and result.
331

332
    This is an utility function for marking all running or waiting to
333
    be run opcodes with a given status. Opcodes which are already
334
    finalised are not changed.
335

336
    @param status: a given opcode status
337
    @param result: the opcode result
338

339
    """
340
    not_marked = True
341
    for op in self.ops:
342
      if op.status in constants.OPS_FINALIZED:
343
        assert not_marked, "Finalized opcodes found after non-finalized ones"
344
        continue
345
      op.status = status
346
      op.result = result
347
      not_marked = False
348

    
349

    
350
class _OpExecCallbacks(mcpu.OpExecCbBase):
351
  def __init__(self, queue, job, op):
352
    """Initializes this class.
353

354
    @type queue: L{JobQueue}
355
    @param queue: Job queue
356
    @type job: L{_QueuedJob}
357
    @param job: Job object
358
    @type op: L{_QueuedOpCode}
359
    @param op: OpCode
360

361
    """
362
    assert queue, "Queue is missing"
363
    assert job, "Job is missing"
364
    assert op, "Opcode is missing"
365

    
366
    self._queue = queue
367
    self._job = job
368
    self._op = op
369

    
370
  def NotifyStart(self):
371
    """Mark the opcode as running, not lock-waiting.
372

373
    This is called from the mcpu code as a notifier function, when the LU is
374
    finally about to start the Exec() method. Of course, to have end-user
375
    visible results, the opcode must be initially (before calling into
376
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
377

378
    """
379
    self._queue.acquire()
380
    try:
381
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
382
                                 constants.OP_STATUS_CANCELING)
383

    
384
      # All locks are acquired by now
385
      self._job.lock_status = None
386

    
387
      # Cancel here if we were asked to
388
      if self._op.status == constants.OP_STATUS_CANCELING:
389
        raise CancelJob()
390

    
391
      self._op.status = constants.OP_STATUS_RUNNING
392
      self._op.exec_timestamp = TimeStampNow()
393
    finally:
394
      self._queue.release()
395

    
396
  def Feedback(self, *args):
397
    """Append a log entry.
398

399
    """
400
    assert len(args) < 3
401

    
402
    if len(args) == 1:
403
      log_type = constants.ELOG_MESSAGE
404
      log_msg = args[0]
405
    else:
406
      (log_type, log_msg) = args
407

    
408
    # The time is split to make serialization easier and not lose
409
    # precision.
410
    timestamp = utils.SplitTime(time.time())
411

    
412
    self._queue.acquire()
413
    try:
414
      self._job.log_serial += 1
415
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
416

    
417
      self._job.change.notifyAll()
418
    finally:
419
      self._queue.release()
420

    
421
  def ReportLocks(self, msg):
422
    """Write locking information to the job.
423

424
    Called whenever the LU processor is waiting for a lock or has acquired one.
425

426
    """
427
    # Not getting the queue lock because this is a single assignment
428
    self._job.lock_status = msg
429

    
430

    
431
class _JobQueueWorker(workerpool.BaseWorker):
432
  """The actual job workers.
433

434
  """
435
  def RunTask(self, job): # pylint: disable-msg=W0221
436
    """Job executor.
437

438
    This functions processes a job. It is closely tied to the _QueuedJob and
439
    _QueuedOpCode classes.
440

441
    @type job: L{_QueuedJob}
442
    @param job: the job to be processed
443

444
    """
445
    logging.info("Processing job %s", job.id)
446
    proc = mcpu.Processor(self.pool.queue.context, job.id)
447
    queue = job.queue
448
    try:
449
      try:
450
        count = len(job.ops)
451
        for idx, op in enumerate(job.ops):
452
          op_summary = op.input.Summary()
453
          if op.status == constants.OP_STATUS_SUCCESS:
454
            # this is a job that was partially completed before master
455
            # daemon shutdown, so it can be expected that some opcodes
456
            # are already completed successfully (if any did error
457
            # out, then the whole job should have been aborted and not
458
            # resubmitted for processing)
459
            logging.info("Op %s/%s: opcode %s already processed, skipping",
460
                         idx + 1, count, op_summary)
461
            continue
462
          try:
463
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
464
                         op_summary)
465

    
466
            queue.acquire()
467
            try:
468
              if op.status == constants.OP_STATUS_CANCELED:
469
                raise CancelJob()
470
              assert op.status == constants.OP_STATUS_QUEUED
471
              op.status = constants.OP_STATUS_WAITLOCK
472
              op.result = None
473
              op.start_timestamp = TimeStampNow()
474
              if idx == 0: # first opcode
475
                job.start_timestamp = op.start_timestamp
476
              queue.UpdateJobUnlocked(job)
477

    
478
              input_opcode = op.input
479
            finally:
480
              queue.release()
481

    
482
            # Make sure not to hold queue lock while calling ExecOpCode
483
            result = proc.ExecOpCode(input_opcode,
484
                                     _OpExecCallbacks(queue, job, op))
485

    
486
            queue.acquire()
487
            try:
488
              op.status = constants.OP_STATUS_SUCCESS
489
              op.result = result
490
              op.end_timestamp = TimeStampNow()
491
              queue.UpdateJobUnlocked(job)
492
            finally:
493
              queue.release()
494

    
495
            logging.info("Op %s/%s: Successfully finished opcode %s",
496
                         idx + 1, count, op_summary)
497
          except CancelJob:
498
            # Will be handled further up
499
            raise
500
          except Exception, err:
501
            queue.acquire()
502
            try:
503
              try:
504
                op.status = constants.OP_STATUS_ERROR
505
                if isinstance(err, errors.GenericError):
506
                  op.result = errors.EncodeException(err)
507
                else:
508
                  op.result = str(err)
509
                op.end_timestamp = TimeStampNow()
510
                logging.info("Op %s/%s: Error in opcode %s: %s",
511
                             idx + 1, count, op_summary, err)
512
              finally:
513
                queue.UpdateJobUnlocked(job)
514
            finally:
515
              queue.release()
516
            raise
517

    
518
      except CancelJob:
519
        queue.acquire()
520
        try:
521
          queue.CancelJobUnlocked(job)
522
        finally:
523
          queue.release()
524
      except errors.GenericError, err:
525
        logging.exception("Ganeti exception")
526
      except:
527
        logging.exception("Unhandled exception")
528
    finally:
529
      queue.acquire()
530
      try:
531
        try:
532
          job.lock_status = None
533
          job.end_timestamp = TimeStampNow()
534
          queue.UpdateJobUnlocked(job)
535
        finally:
536
          job_id = job.id
537
          status = job.CalcStatus()
538
      finally:
539
        queue.release()
540

    
541
      logging.info("Finished job %s, status = %s", job_id, status)
542

    
543

    
544
class _JobQueueWorkerPool(workerpool.WorkerPool):
545
  """Simple class implementing a job-processing workerpool.
546

547
  """
548
  def __init__(self, queue):
549
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
550
                                              JOBQUEUE_THREADS,
551
                                              _JobQueueWorker)
552
    self.queue = queue
553

    
554

    
555
def _RequireOpenQueue(fn):
556
  """Decorator for "public" functions.
557

558
  This function should be used for all 'public' functions. That is,
559
  functions usually called from other classes. Note that this should
560
  be applied only to methods (not plain functions), since it expects
561
  that the decorated function is called with a first argument that has
562
  a '_queue_lock' argument.
563

564
  @warning: Use this decorator only after utils.LockedMethod!
565

566
  Example::
567
    @utils.LockedMethod
568
    @_RequireOpenQueue
569
    def Example(self):
570
      pass
571

572
  """
573
  def wrapper(self, *args, **kwargs):
574
    # pylint: disable-msg=W0212
575
    assert self._queue_lock is not None, "Queue should be open"
576
    return fn(self, *args, **kwargs)
577
  return wrapper
578

    
579

    
580
class JobQueue(object):
581
  """Queue used to manage the jobs.
582

583
  @cvar _RE_JOB_FILE: regex matching the valid job file names
584

585
  """
586
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
587

    
588
  def __init__(self, context):
589
    """Constructor for JobQueue.
590

591
    The constructor will initialize the job queue object and then
592
    start loading the current jobs from disk, either for starting them
593
    (if they were queue) or for aborting them (if they were already
594
    running).
595

596
    @type context: GanetiContext
597
    @param context: the context object for access to the configuration
598
        data and other ganeti objects
599

600
    """
601
    self.context = context
602
    self._memcache = weakref.WeakValueDictionary()
603
    self._my_hostname = utils.HostInfo().name
604

    
605
    # Locking
606
    self._lock = threading.Lock()
607
    self.acquire = self._lock.acquire
608
    self.release = self._lock.release
609

    
610
    # Initialize
611
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
612

    
613
    # Read serial file
614
    self._last_serial = jstore.ReadSerial()
615
    assert self._last_serial is not None, ("Serial file was modified between"
616
                                           " check in jstore and here")
617

    
618
    # Get initial list of nodes
619
    self._nodes = dict((n.name, n.primary_ip)
620
                       for n in self.context.cfg.GetAllNodesInfo().values()
621
                       if n.master_candidate)
622

    
623
    # Remove master node
624
    try:
625
      del self._nodes[self._my_hostname]
626
    except KeyError:
627
      pass
628

    
629
    # TODO: Check consistency across nodes
630

    
631
    # Setup worker pool
632
    self._wpool = _JobQueueWorkerPool(self)
633
    try:
634
      # We need to lock here because WorkerPool.AddTask() may start a job while
635
      # we're still doing our work.
636
      self.acquire()
637
      try:
638
        logging.info("Inspecting job queue")
639

    
640
        all_job_ids = self._GetJobIDsUnlocked()
641
        jobs_count = len(all_job_ids)
642
        lastinfo = time.time()
643
        for idx, job_id in enumerate(all_job_ids):
644
          # Give an update every 1000 jobs or 10 seconds
645
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
646
              idx == (jobs_count - 1)):
647
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
648
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
649
            lastinfo = time.time()
650

    
651
          job = self._LoadJobUnlocked(job_id)
652

    
653
          # a failure in loading the job can cause 'None' to be returned
654
          if job is None:
655
            continue
656

    
657
          status = job.CalcStatus()
658

    
659
          if status in (constants.JOB_STATUS_QUEUED, ):
660
            self._wpool.AddTask(job)
661

    
662
          elif status in (constants.JOB_STATUS_RUNNING,
663
                          constants.JOB_STATUS_WAITLOCK,
664
                          constants.JOB_STATUS_CANCELING):
665
            logging.warning("Unfinished job %s found: %s", job.id, job)
666
            try:
667
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
668
                                    "Unclean master daemon shutdown")
669
            finally:
670
              self.UpdateJobUnlocked(job)
671

    
672
        logging.info("Job queue inspection finished")
673
      finally:
674
        self.release()
675
    except:
676
      self._wpool.TerminateWorkers()
677
      raise
678

    
679
  @utils.LockedMethod
680
  @_RequireOpenQueue
681
  def AddNode(self, node):
682
    """Register a new node with the queue.
683

684
    @type node: L{objects.Node}
685
    @param node: the node object to be added
686

687
    """
688
    node_name = node.name
689
    assert node_name != self._my_hostname
690

    
691
    # Clean queue directory on added node
692
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
693
    msg = result.fail_msg
694
    if msg:
695
      logging.warning("Cannot cleanup queue directory on node %s: %s",
696
                      node_name, msg)
697

    
698
    if not node.master_candidate:
699
      # remove if existing, ignoring errors
700
      self._nodes.pop(node_name, None)
701
      # and skip the replication of the job ids
702
      return
703

    
704
    # Upload the whole queue excluding archived jobs
705
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
706

    
707
    # Upload current serial file
708
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
709

    
710
    for file_name in files:
711
      # Read file content
712
      content = utils.ReadFile(file_name)
713

    
714
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
715
                                                  [node.primary_ip],
716
                                                  file_name, content)
717
      msg = result[node_name].fail_msg
718
      if msg:
719
        logging.error("Failed to upload file %s to node %s: %s",
720
                      file_name, node_name, msg)
721

    
722
    self._nodes[node_name] = node.primary_ip
723

    
724
  @utils.LockedMethod
725
  @_RequireOpenQueue
726
  def RemoveNode(self, node_name):
727
    """Callback called when removing nodes from the cluster.
728

729
    @type node_name: str
730
    @param node_name: the name of the node to remove
731

732
    """
733
    try:
734
      # The queue is removed by the "leave node" RPC call.
735
      del self._nodes[node_name]
736
    except KeyError:
737
      pass
738

    
739
  @staticmethod
740
  def _CheckRpcResult(result, nodes, failmsg):
741
    """Verifies the status of an RPC call.
742

743
    Since we aim to keep consistency should this node (the current
744
    master) fail, we will log errors if our rpc fail, and especially
745
    log the case when more than half of the nodes fails.
746

747
    @param result: the data as returned from the rpc call
748
    @type nodes: list
749
    @param nodes: the list of nodes we made the call to
750
    @type failmsg: str
751
    @param failmsg: the identifier to be used for logging
752

753
    """
754
    failed = []
755
    success = []
756

    
757
    for node in nodes:
758
      msg = result[node].fail_msg
759
      if msg:
760
        failed.append(node)
761
        logging.error("RPC call %s (%s) failed on node %s: %s",
762
                      result[node].call, failmsg, node, msg)
763
      else:
764
        success.append(node)
765

    
766
    # +1 for the master node
767
    if (len(success) + 1) < len(failed):
768
      # TODO: Handle failing nodes
769
      logging.error("More than half of the nodes failed")
770

    
771
  def _GetNodeIp(self):
772
    """Helper for returning the node name/ip list.
773

774
    @rtype: (list, list)
775
    @return: a tuple of two lists, the first one with the node
776
        names and the second one with the node addresses
777

778
    """
779
    name_list = self._nodes.keys()
780
    addr_list = [self._nodes[name] for name in name_list]
781
    return name_list, addr_list
782

    
783
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
784
    """Writes a file locally and then replicates it to all nodes.
785

786
    This function will replace the contents of a file on the local
787
    node and then replicate it to all the other nodes we have.
788

789
    @type file_name: str
790
    @param file_name: the path of the file to be replicated
791
    @type data: str
792
    @param data: the new contents of the file
793

794
    """
795
    utils.WriteFile(file_name, data=data)
796

    
797
    names, addrs = self._GetNodeIp()
798
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
799
    self._CheckRpcResult(result, self._nodes,
800
                         "Updating %s" % file_name)
801

    
802
  def _RenameFilesUnlocked(self, rename):
803
    """Renames a file locally and then replicate the change.
804

805
    This function will rename a file in the local queue directory
806
    and then replicate this rename to all the other nodes we have.
807

808
    @type rename: list of (old, new)
809
    @param rename: List containing tuples mapping old to new names
810

811
    """
812
    # Rename them locally
813
    for old, new in rename:
814
      utils.RenameFile(old, new, mkdir=True)
815

    
816
    # ... and on all nodes
817
    names, addrs = self._GetNodeIp()
818
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
819
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
820

    
821
  @staticmethod
822
  def _FormatJobID(job_id):
823
    """Convert a job ID to string format.
824

825
    Currently this just does C{str(job_id)} after performing some
826
    checks, but if we want to change the job id format this will
827
    abstract this change.
828

829
    @type job_id: int or long
830
    @param job_id: the numeric job id
831
    @rtype: str
832
    @return: the formatted job id
833

834
    """
835
    if not isinstance(job_id, (int, long)):
836
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
837
    if job_id < 0:
838
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
839

    
840
    return str(job_id)
841

    
842
  @classmethod
843
  def _GetArchiveDirectory(cls, job_id):
844
    """Returns the archive directory for a job.
845

846
    @type job_id: str
847
    @param job_id: Job identifier
848
    @rtype: str
849
    @return: Directory name
850

851
    """
852
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
853

    
854
  def _NewSerialsUnlocked(self, count):
855
    """Generates a new job identifier.
856

857
    Job identifiers are unique during the lifetime of a cluster.
858

859
    @type count: integer
860
    @param count: how many serials to return
861
    @rtype: str
862
    @return: a string representing the job identifier.
863

864
    """
865
    assert count > 0
866
    # New number
867
    serial = self._last_serial + count
868

    
869
    # Write to file
870
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
871
                                        "%s\n" % serial)
872

    
873
    result = [self._FormatJobID(v)
874
              for v in range(self._last_serial, serial + 1)]
875
    # Keep it only if we were able to write the file
876
    self._last_serial = serial
877

    
878
    return result
879

    
880
  @staticmethod
881
  def _GetJobPath(job_id):
882
    """Returns the job file for a given job id.
883

884
    @type job_id: str
885
    @param job_id: the job identifier
886
    @rtype: str
887
    @return: the path to the job file
888

889
    """
890
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
891

    
892
  @classmethod
893
  def _GetArchivedJobPath(cls, job_id):
894
    """Returns the archived job file for a give job id.
895

896
    @type job_id: str
897
    @param job_id: the job identifier
898
    @rtype: str
899
    @return: the path to the archived job file
900

901
    """
902
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
903
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
904

    
905
  @classmethod
906
  def _ExtractJobID(cls, name):
907
    """Extract the job id from a filename.
908

909
    @type name: str
910
    @param name: the job filename
911
    @rtype: job id or None
912
    @return: the job id corresponding to the given filename,
913
        or None if the filename does not represent a valid
914
        job file
915

916
    """
917
    m = cls._RE_JOB_FILE.match(name)
918
    if m:
919
      return m.group(1)
920
    else:
921
      return None
922

    
923
  def _GetJobIDsUnlocked(self, archived=False):
924
    """Return all known job IDs.
925

926
    If the parameter archived is True, archived jobs IDs will be
927
    included. Currently this argument is unused.
928

929
    The method only looks at disk because it's a requirement that all
930
    jobs are present on disk (so in the _memcache we don't have any
931
    extra IDs).
932

933
    @rtype: list
934
    @return: the list of job IDs
935

936
    """
937
    # pylint: disable-msg=W0613
938
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
939
    jlist = utils.NiceSort(jlist)
940
    return jlist
941

    
942
  def _ListJobFiles(self):
943
    """Returns the list of current job files.
944

945
    @rtype: list
946
    @return: the list of job file names
947

948
    """
949
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
950
            if self._RE_JOB_FILE.match(name)]
951

    
952
  def _LoadJobUnlocked(self, job_id):
953
    """Loads a job from the disk or memory.
954

955
    Given a job id, this will return the cached job object if
956
    existing, or try to load the job from the disk. If loading from
957
    disk, it will also add the job to the cache.
958

959
    @param job_id: the job id
960
    @rtype: L{_QueuedJob} or None
961
    @return: either None or the job object
962

963
    """
964
    job = self._memcache.get(job_id, None)
965
    if job:
966
      logging.debug("Found job %s in memcache", job_id)
967
      return job
968

    
969
    filepath = self._GetJobPath(job_id)
970
    logging.debug("Loading job from %s", filepath)
971
    try:
972
      raw_data = utils.ReadFile(filepath)
973
    except IOError, err:
974
      if err.errno in (errno.ENOENT, ):
975
        return None
976
      raise
977

    
978
    data = serializer.LoadJson(raw_data)
979

    
980
    try:
981
      job = _QueuedJob.Restore(self, data)
982
    except Exception, err: # pylint: disable-msg=W0703
983
      new_path = self._GetArchivedJobPath(job_id)
984
      if filepath == new_path:
985
        # job already archived (future case)
986
        logging.exception("Can't parse job %s", job_id)
987
      else:
988
        # non-archived case
989
        logging.exception("Can't parse job %s, will archive.", job_id)
990
        self._RenameFilesUnlocked([(filepath, new_path)])
991
      return None
992

    
993
    self._memcache[job_id] = job
994
    logging.debug("Added job %s to the cache", job_id)
995
    return job
996

    
997
  def _GetJobsUnlocked(self, job_ids):
998
    """Return a list of jobs based on their IDs.
999

1000
    @type job_ids: list
1001
    @param job_ids: either an empty list (meaning all jobs),
1002
        or a list of job IDs
1003
    @rtype: list
1004
    @return: the list of job objects
1005

1006
    """
1007
    if not job_ids:
1008
      job_ids = self._GetJobIDsUnlocked()
1009

    
1010
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1011

    
1012
  @staticmethod
1013
  def _IsQueueMarkedDrain():
1014
    """Check if the queue is marked from drain.
1015

1016
    This currently uses the queue drain file, which makes it a
1017
    per-node flag. In the future this can be moved to the config file.
1018

1019
    @rtype: boolean
1020
    @return: True of the job queue is marked for draining
1021

1022
    """
1023
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1024

    
1025
  @staticmethod
1026
  def SetDrainFlag(drain_flag):
1027
    """Sets the drain flag for the queue.
1028

1029
    @type drain_flag: boolean
1030
    @param drain_flag: Whether to set or unset the drain flag
1031

1032
    """
1033
    if drain_flag:
1034
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1035
    else:
1036
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1037
    return True
1038

    
1039
  @_RequireOpenQueue
1040
  def _SubmitJobUnlocked(self, job_id, ops):
1041
    """Create and store a new job.
1042

1043
    This enters the job into our job queue and also puts it on the new
1044
    queue, in order for it to be picked up by the queue processors.
1045

1046
    @type job_id: job ID
1047
    @param job_id: the job ID for the new job
1048
    @type ops: list
1049
    @param ops: The list of OpCodes that will become the new job.
1050
    @rtype: job ID
1051
    @return: the job ID of the newly created job
1052
    @raise errors.JobQueueDrainError: if the job is marked for draining
1053

1054
    """
1055
    if self._IsQueueMarkedDrain():
1056
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1057

    
1058
    # Check job queue size
1059
    size = len(self._ListJobFiles())
1060
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1061
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1062
      # submission, though.
1063
      #size = ...
1064
      pass
1065

    
1066
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1067
      raise errors.JobQueueFull()
1068

    
1069
    job = _QueuedJob(self, job_id, ops)
1070

    
1071
    # Write to disk
1072
    self.UpdateJobUnlocked(job)
1073

    
1074
    logging.debug("Adding new job %s to the cache", job_id)
1075
    self._memcache[job_id] = job
1076

    
1077
    # Add to worker pool
1078
    self._wpool.AddTask(job)
1079

    
1080
    return job.id
1081

    
1082
  @utils.LockedMethod
1083
  @_RequireOpenQueue
1084
  def SubmitJob(self, ops):
1085
    """Create and store a new job.
1086

1087
    @see: L{_SubmitJobUnlocked}
1088

1089
    """
1090
    job_id = self._NewSerialsUnlocked(1)[0]
1091
    return self._SubmitJobUnlocked(job_id, ops)
1092

    
1093
  @utils.LockedMethod
1094
  @_RequireOpenQueue
1095
  def SubmitManyJobs(self, jobs):
1096
    """Create and store multiple jobs.
1097

1098
    @see: L{_SubmitJobUnlocked}
1099

1100
    """
1101
    results = []
1102
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1103
    for job_id, ops in zip(all_job_ids, jobs):
1104
      try:
1105
        data = self._SubmitJobUnlocked(job_id, ops)
1106
        status = True
1107
      except errors.GenericError, err:
1108
        data = str(err)
1109
        status = False
1110
      results.append((status, data))
1111

    
1112
    return results
1113

    
1114
  @_RequireOpenQueue
1115
  def UpdateJobUnlocked(self, job):
1116
    """Update a job's on disk storage.
1117

1118
    After a job has been modified, this function needs to be called in
1119
    order to write the changes to disk and replicate them to the other
1120
    nodes.
1121

1122
    @type job: L{_QueuedJob}
1123
    @param job: the changed job
1124

1125
    """
1126
    filename = self._GetJobPath(job.id)
1127
    data = serializer.DumpJson(job.Serialize(), indent=False)
1128
    logging.debug("Writing job %s to %s", job.id, filename)
1129
    self._WriteAndReplicateFileUnlocked(filename, data)
1130

    
1131
    # Notify waiters about potential changes
1132
    job.change.notifyAll()
1133

    
1134
  @utils.LockedMethod
1135
  @_RequireOpenQueue
1136
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1137
                        timeout):
1138
    """Waits for changes in a job.
1139

1140
    @type job_id: string
1141
    @param job_id: Job identifier
1142
    @type fields: list of strings
1143
    @param fields: Which fields to check for changes
1144
    @type prev_job_info: list or None
1145
    @param prev_job_info: Last job information returned
1146
    @type prev_log_serial: int
1147
    @param prev_log_serial: Last job message serial number
1148
    @type timeout: float
1149
    @param timeout: maximum time to wait
1150
    @rtype: tuple (job info, log entries)
1151
    @return: a tuple of the job information as required via
1152
        the fields parameter, and the log entries as a list
1153

1154
        if the job has not changed and the timeout has expired,
1155
        we instead return a special value,
1156
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1157
        as such by the clients
1158

1159
    """
1160
    job = self._LoadJobUnlocked(job_id)
1161
    if not job:
1162
      logging.debug("Job %s not found", job_id)
1163
      return None
1164

    
1165
    def _CheckForChanges():
1166
      logging.debug("Waiting for changes in job %s", job_id)
1167

    
1168
      status = job.CalcStatus()
1169
      job_info = self._GetJobInfoUnlocked(job, fields)
1170
      log_entries = job.GetLogEntries(prev_log_serial)
1171

    
1172
      # Serializing and deserializing data can cause type changes (e.g. from
1173
      # tuple to list) or precision loss. We're doing it here so that we get
1174
      # the same modifications as the data received from the client. Without
1175
      # this, the comparison afterwards might fail without the data being
1176
      # significantly different.
1177
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1178
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1179

    
1180
      # Don't even try to wait if the job is no longer running, there will be
1181
      # no changes.
1182
      if (status not in (constants.JOB_STATUS_QUEUED,
1183
                         constants.JOB_STATUS_RUNNING,
1184
                         constants.JOB_STATUS_WAITLOCK) or
1185
          prev_job_info != job_info or
1186
          (log_entries and prev_log_serial != log_entries[0][0])):
1187
        logging.debug("Job %s changed", job_id)
1188
        return (job_info, log_entries)
1189

    
1190
      raise utils.RetryAgain()
1191

    
1192
    try:
1193
      # Setting wait function to release the queue lock while waiting
1194
      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1195
                         wait_fn=job.change.wait)
1196
    except utils.RetryTimeout:
1197
      return constants.JOB_NOTCHANGED
1198

    
1199
  @utils.LockedMethod
1200
  @_RequireOpenQueue
1201
  def CancelJob(self, job_id):
1202
    """Cancels a job.
1203

1204
    This will only succeed if the job has not started yet.
1205

1206
    @type job_id: string
1207
    @param job_id: job ID of job to be cancelled.
1208

1209
    """
1210
    logging.info("Cancelling job %s", job_id)
1211

    
1212
    job = self._LoadJobUnlocked(job_id)
1213
    if not job:
1214
      logging.debug("Job %s not found", job_id)
1215
      return (False, "Job %s not found" % job_id)
1216

    
1217
    job_status = job.CalcStatus()
1218

    
1219
    if job_status not in (constants.JOB_STATUS_QUEUED,
1220
                          constants.JOB_STATUS_WAITLOCK):
1221
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1222
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1223

    
1224
    if job_status == constants.JOB_STATUS_QUEUED:
1225
      self.CancelJobUnlocked(job)
1226
      return (True, "Job %s canceled" % job.id)
1227

    
1228
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1229
      # The worker will notice the new status and cancel the job
1230
      try:
1231
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1232
      finally:
1233
        self.UpdateJobUnlocked(job)
1234
      return (True, "Job %s will be canceled" % job.id)
1235

    
1236
  @_RequireOpenQueue
1237
  def CancelJobUnlocked(self, job):
1238
    """Marks a job as canceled.
1239

1240
    """
1241
    try:
1242
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1243
                            "Job canceled by request")
1244
    finally:
1245
      self.UpdateJobUnlocked(job)
1246

    
1247
  @_RequireOpenQueue
1248
  def _ArchiveJobsUnlocked(self, jobs):
1249
    """Archives jobs.
1250

1251
    @type jobs: list of L{_QueuedJob}
1252
    @param jobs: Job objects
1253
    @rtype: int
1254
    @return: Number of archived jobs
1255

1256
    """
1257
    archive_jobs = []
1258
    rename_files = []
1259
    for job in jobs:
1260
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1261
                                  constants.JOB_STATUS_SUCCESS,
1262
                                  constants.JOB_STATUS_ERROR):
1263
        logging.debug("Job %s is not yet done", job.id)
1264
        continue
1265

    
1266
      archive_jobs.append(job)
1267

    
1268
      old = self._GetJobPath(job.id)
1269
      new = self._GetArchivedJobPath(job.id)
1270
      rename_files.append((old, new))
1271

    
1272
    # TODO: What if 1..n files fail to rename?
1273
    self._RenameFilesUnlocked(rename_files)
1274

    
1275
    logging.debug("Successfully archived job(s) %s",
1276
                  utils.CommaJoin(job.id for job in archive_jobs))
1277

    
1278
    return len(archive_jobs)
1279

    
1280
  @utils.LockedMethod
1281
  @_RequireOpenQueue
1282
  def ArchiveJob(self, job_id):
1283
    """Archives a job.
1284

1285
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1286

1287
    @type job_id: string
1288
    @param job_id: Job ID of job to be archived.
1289
    @rtype: bool
1290
    @return: Whether job was archived
1291

1292
    """
1293
    logging.info("Archiving job %s", job_id)
1294

    
1295
    job = self._LoadJobUnlocked(job_id)
1296
    if not job:
1297
      logging.debug("Job %s not found", job_id)
1298
      return False
1299

    
1300
    return self._ArchiveJobsUnlocked([job]) == 1
1301

    
1302
  @utils.LockedMethod
1303
  @_RequireOpenQueue
1304
  def AutoArchiveJobs(self, age, timeout):
1305
    """Archives all jobs based on age.
1306

1307
    The method will archive all jobs which are older than the age
1308
    parameter. For jobs that don't have an end timestamp, the start
1309
    timestamp will be considered. The special '-1' age will cause
1310
    archival of all jobs (that are not running or queued).
1311

1312
    @type age: int
1313
    @param age: the minimum age in seconds
1314

1315
    """
1316
    logging.info("Archiving jobs with age more than %s seconds", age)
1317

    
1318
    now = time.time()
1319
    end_time = now + timeout
1320
    archived_count = 0
1321
    last_touched = 0
1322

    
1323
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1324
    pending = []
1325
    for idx, job_id in enumerate(all_job_ids):
1326
      last_touched = idx + 1
1327

    
1328
      # Not optimal because jobs could be pending
1329
      # TODO: Measure average duration for job archival and take number of
1330
      # pending jobs into account.
1331
      if time.time() > end_time:
1332
        break
1333

    
1334
      # Returns None if the job failed to load
1335
      job = self._LoadJobUnlocked(job_id)
1336
      if job:
1337
        if job.end_timestamp is None:
1338
          if job.start_timestamp is None:
1339
            job_age = job.received_timestamp
1340
          else:
1341
            job_age = job.start_timestamp
1342
        else:
1343
          job_age = job.end_timestamp
1344

    
1345
        if age == -1 or now - job_age[0] > age:
1346
          pending.append(job)
1347

    
1348
          # Archive 10 jobs at a time
1349
          if len(pending) >= 10:
1350
            archived_count += self._ArchiveJobsUnlocked(pending)
1351
            pending = []
1352

    
1353
    if pending:
1354
      archived_count += self._ArchiveJobsUnlocked(pending)
1355

    
1356
    return (archived_count, len(all_job_ids) - last_touched)
1357

    
1358
  @staticmethod
1359
  def _GetJobInfoUnlocked(job, fields):
1360
    """Returns information about a job.
1361

1362
    @type job: L{_QueuedJob}
1363
    @param job: the job which we query
1364
    @type fields: list
1365
    @param fields: names of fields to return
1366
    @rtype: list
1367
    @return: list with one element for each field
1368
    @raise errors.OpExecError: when an invalid field
1369
        has been passed
1370

1371
    """
1372
    row = []
1373
    for fname in fields:
1374
      if fname == "id":
1375
        row.append(job.id)
1376
      elif fname == "status":
1377
        row.append(job.CalcStatus())
1378
      elif fname == "ops":
1379
        row.append([op.input.__getstate__() for op in job.ops])
1380
      elif fname == "opresult":
1381
        row.append([op.result for op in job.ops])
1382
      elif fname == "opstatus":
1383
        row.append([op.status for op in job.ops])
1384
      elif fname == "oplog":
1385
        row.append([op.log for op in job.ops])
1386
      elif fname == "opstart":
1387
        row.append([op.start_timestamp for op in job.ops])
1388
      elif fname == "opexec":
1389
        row.append([op.exec_timestamp for op in job.ops])
1390
      elif fname == "opend":
1391
        row.append([op.end_timestamp for op in job.ops])
1392
      elif fname == "received_ts":
1393
        row.append(job.received_timestamp)
1394
      elif fname == "start_ts":
1395
        row.append(job.start_timestamp)
1396
      elif fname == "end_ts":
1397
        row.append(job.end_timestamp)
1398
      elif fname == "lock_status":
1399
        row.append(job.lock_status)
1400
      elif fname == "summary":
1401
        row.append([op.input.Summary() for op in job.ops])
1402
      else:
1403
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1404
    return row
1405

    
1406
  @utils.LockedMethod
1407
  @_RequireOpenQueue
1408
  def QueryJobs(self, job_ids, fields):
1409
    """Returns a list of jobs in queue.
1410

1411
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1412
    processing for each job.
1413

1414
    @type job_ids: list
1415
    @param job_ids: sequence of job identifiers or None for all
1416
    @type fields: list
1417
    @param fields: names of fields to return
1418
    @rtype: list
1419
    @return: list one element per job, each element being list with
1420
        the requested fields
1421

1422
    """
1423
    jobs = []
1424

    
1425
    for job in self._GetJobsUnlocked(job_ids):
1426
      if job is None:
1427
        jobs.append(None)
1428
      else:
1429
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1430

    
1431
    return jobs
1432

    
1433
  @utils.LockedMethod
1434
  @_RequireOpenQueue
1435
  def Shutdown(self):
1436
    """Stops the job queue.
1437

1438
    This shutdowns all the worker threads an closes the queue.
1439

1440
    """
1441
    self._wpool.TerminateWorkers()
1442

    
1443
    self._queue_lock.Close()
1444
    self._queue_lock = None