Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 0411c011

History | View | Annotate | Download (41.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  __slots__ = ["input", "status", "result", "log",
84
               "start_timestamp", "end_timestamp",
85
               "__weakref__"]
86

    
87
  def __init__(self, op):
88
    """Constructor for the _QuededOpCode.
89

90
    @type op: L{opcodes.OpCode}
91
    @param op: the opcode we encapsulate
92

93
    """
94
    self.input = op
95
    self.status = constants.OP_STATUS_QUEUED
96
    self.result = None
97
    self.log = []
98
    self.start_timestamp = None
99
    self.end_timestamp = None
100

    
101
  @classmethod
102
  def Restore(cls, state):
103
    """Restore the _QueuedOpCode from the serialized form.
104

105
    @type state: dict
106
    @param state: the serialized state
107
    @rtype: _QueuedOpCode
108
    @return: a new _QueuedOpCode instance
109

110
    """
111
    obj = _QueuedOpCode.__new__(cls)
112
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113
    obj.status = state["status"]
114
    obj.result = state["result"]
115
    obj.log = state["log"]
116
    obj.start_timestamp = state.get("start_timestamp", None)
117
    obj.end_timestamp = state.get("end_timestamp", None)
118
    return obj
119

    
120
  def Serialize(self):
121
    """Serializes this _QueuedOpCode.
122

123
    @rtype: dict
124
    @return: the dictionary holding the serialized state
125

126
    """
127
    return {
128
      "input": self.input.__getstate__(),
129
      "status": self.status,
130
      "result": self.result,
131
      "log": self.log,
132
      "start_timestamp": self.start_timestamp,
133
      "end_timestamp": self.end_timestamp,
134
      }
135

    
136

    
137
class _QueuedJob(object):
138
  """In-memory job representation.
139

140
  This is what we use to track the user-submitted jobs. Locking must
141
  be taken care of by users of this class.
142

143
  @type queue: L{JobQueue}
144
  @ivar queue: the parent queue
145
  @ivar id: the job ID
146
  @type ops: list
147
  @ivar ops: the list of _QueuedOpCode that constitute the job
148
  @type log_serial: int
149
  @ivar log_serial: holds the index for the next log entry
150
  @ivar received_timestamp: the timestamp for when the job was received
151
  @ivar start_timestmap: the timestamp for start of execution
152
  @ivar end_timestamp: the timestamp for end of execution
153
  @ivar lock_status: In-memory locking information for debugging
154
  @ivar change: a Condition variable we use for waiting for job changes
155

156
  """
157
  # pylint: disable-msg=W0212
158
  __slots__ = ["queue", "id", "ops", "log_serial",
159
               "received_timestamp", "start_timestamp", "end_timestamp",
160
               "lock_status", "change",
161
               "__weakref__"]
162

    
163
  def __init__(self, queue, job_id, ops):
164
    """Constructor for the _QueuedJob.
165

166
    @type queue: L{JobQueue}
167
    @param queue: our parent queue
168
    @type job_id: job_id
169
    @param job_id: our job id
170
    @type ops: list
171
    @param ops: the list of opcodes we hold, which will be encapsulated
172
        in _QueuedOpCodes
173

174
    """
175
    if not ops:
176
      # TODO: use a better exception
177
      raise Exception("No opcodes")
178

    
179
    self.queue = queue
180
    self.id = job_id
181
    self.ops = [_QueuedOpCode(op) for op in ops]
182
    self.log_serial = 0
183
    self.received_timestamp = TimeStampNow()
184
    self.start_timestamp = None
185
    self.end_timestamp = None
186

    
187
    # In-memory attributes
188
    self.lock_status = None
189

    
190
    # Condition to wait for changes
191
    self.change = threading.Condition(self.queue._lock)
192

    
193
  def __repr__(self):
194
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
195
              "id=%s" % self.id,
196
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
197

    
198
    return "<%s at %#x>" % (" ".join(status), id(self))
199

    
200
  @classmethod
201
  def Restore(cls, queue, state):
202
    """Restore a _QueuedJob from serialized state:
203

204
    @type queue: L{JobQueue}
205
    @param queue: to which queue the restored job belongs
206
    @type state: dict
207
    @param state: the serialized state
208
    @rtype: _JobQueue
209
    @return: the restored _JobQueue instance
210

211
    """
212
    obj = _QueuedJob.__new__(cls)
213
    obj.queue = queue
214
    obj.id = state["id"]
215
    obj.received_timestamp = state.get("received_timestamp", None)
216
    obj.start_timestamp = state.get("start_timestamp", None)
217
    obj.end_timestamp = state.get("end_timestamp", None)
218

    
219
    # In-memory attributes
220
    obj.lock_status = None
221

    
222
    obj.ops = []
223
    obj.log_serial = 0
224
    for op_state in state["ops"]:
225
      op = _QueuedOpCode.Restore(op_state)
226
      for log_entry in op.log:
227
        obj.log_serial = max(obj.log_serial, log_entry[0])
228
      obj.ops.append(op)
229

    
230
    # Condition to wait for changes
231
    obj.change = threading.Condition(obj.queue._lock)
232

    
233
    return obj
234

    
235
  def Serialize(self):
236
    """Serialize the _JobQueue instance.
237

238
    @rtype: dict
239
    @return: the serialized state
240

241
    """
242
    return {
243
      "id": self.id,
244
      "ops": [op.Serialize() for op in self.ops],
245
      "start_timestamp": self.start_timestamp,
246
      "end_timestamp": self.end_timestamp,
247
      "received_timestamp": self.received_timestamp,
248
      }
249

    
250
  def CalcStatus(self):
251
    """Compute the status of this job.
252

253
    This function iterates over all the _QueuedOpCodes in the job and
254
    based on their status, computes the job status.
255

256
    The algorithm is:
257
      - if we find a cancelled, or finished with error, the job
258
        status will be the same
259
      - otherwise, the last opcode with the status one of:
260
          - waitlock
261
          - canceling
262
          - running
263

264
        will determine the job status
265

266
      - otherwise, it means either all opcodes are queued, or success,
267
        and the job status will be the same
268

269
    @return: the job status
270

271
    """
272
    status = constants.JOB_STATUS_QUEUED
273

    
274
    all_success = True
275
    for op in self.ops:
276
      if op.status == constants.OP_STATUS_SUCCESS:
277
        continue
278

    
279
      all_success = False
280

    
281
      if op.status == constants.OP_STATUS_QUEUED:
282
        pass
283
      elif op.status == constants.OP_STATUS_WAITLOCK:
284
        status = constants.JOB_STATUS_WAITLOCK
285
      elif op.status == constants.OP_STATUS_RUNNING:
286
        status = constants.JOB_STATUS_RUNNING
287
      elif op.status == constants.OP_STATUS_CANCELING:
288
        status = constants.JOB_STATUS_CANCELING
289
        break
290
      elif op.status == constants.OP_STATUS_ERROR:
291
        status = constants.JOB_STATUS_ERROR
292
        # The whole job fails if one opcode failed
293
        break
294
      elif op.status == constants.OP_STATUS_CANCELED:
295
        status = constants.OP_STATUS_CANCELED
296
        break
297

    
298
    if all_success:
299
      status = constants.JOB_STATUS_SUCCESS
300

    
301
    return status
302

    
303
  def GetLogEntries(self, newer_than):
304
    """Selectively returns the log entries.
305

306
    @type newer_than: None or int
307
    @param newer_than: if this is None, return all log entries,
308
        otherwise return only the log entries with serial higher
309
        than this value
310
    @rtype: list
311
    @return: the list of the log entries selected
312

313
    """
314
    if newer_than is None:
315
      serial = -1
316
    else:
317
      serial = newer_than
318

    
319
    entries = []
320
    for op in self.ops:
321
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
322

    
323
    return entries
324

    
325
  def MarkUnfinishedOps(self, status, result):
326
    """Mark unfinished opcodes with a given status and result.
327

328
    This is an utility function for marking all running or waiting to
329
    be run opcodes with a given status. Opcodes which are already
330
    finalised are not changed.
331

332
    @param status: a given opcode status
333
    @param result: the opcode result
334

335
    """
336
    not_marked = True
337
    for op in self.ops:
338
      if op.status in constants.OPS_FINALIZED:
339
        assert not_marked, "Finalized opcodes found after non-finalized ones"
340
        continue
341
      op.status = status
342
      op.result = result
343
      not_marked = False
344

    
345

    
346
class _OpExecCallbacks(mcpu.OpExecCbBase):
347
  def __init__(self, queue, job, op):
348
    """Initializes this class.
349

350
    @type queue: L{JobQueue}
351
    @param queue: Job queue
352
    @type job: L{_QueuedJob}
353
    @param job: Job object
354
    @type op: L{_QueuedOpCode}
355
    @param op: OpCode
356

357
    """
358
    assert queue, "Queue is missing"
359
    assert job, "Job is missing"
360
    assert op, "Opcode is missing"
361

    
362
    self._queue = queue
363
    self._job = job
364
    self._op = op
365

    
366
  def NotifyStart(self):
367
    """Mark the opcode as running, not lock-waiting.
368

369
    This is called from the mcpu code as a notifier function, when the LU is
370
    finally about to start the Exec() method. Of course, to have end-user
371
    visible results, the opcode must be initially (before calling into
372
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
373

374
    """
375
    self._queue.acquire()
376
    try:
377
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
378
                                 constants.OP_STATUS_CANCELING)
379

    
380
      # All locks are acquired by now
381
      self._job.lock_status = None
382

    
383
      # Cancel here if we were asked to
384
      if self._op.status == constants.OP_STATUS_CANCELING:
385
        raise CancelJob()
386

    
387
      self._op.status = constants.OP_STATUS_RUNNING
388
    finally:
389
      self._queue.release()
390

    
391
  def Feedback(self, *args):
392
    """Append a log entry.
393

394
    """
395
    assert len(args) < 3
396

    
397
    if len(args) == 1:
398
      log_type = constants.ELOG_MESSAGE
399
      log_msg = args[0]
400
    else:
401
      (log_type, log_msg) = args
402

    
403
    # The time is split to make serialization easier and not lose
404
    # precision.
405
    timestamp = utils.SplitTime(time.time())
406

    
407
    self._queue.acquire()
408
    try:
409
      self._job.log_serial += 1
410
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
411

    
412
      self._job.change.notifyAll()
413
    finally:
414
      self._queue.release()
415

    
416
  def ReportLocks(self, msg):
417
    """Write locking information to the job.
418

419
    Called whenever the LU processor is waiting for a lock or has acquired one.
420

421
    """
422
    # Not getting the queue lock because this is a single assignment
423
    self._job.lock_status = msg
424

    
425

    
426
class _JobQueueWorker(workerpool.BaseWorker):
427
  """The actual job workers.
428

429
  """
430
  def RunTask(self, job): # pylint: disable-msg=W0221
431
    """Job executor.
432

433
    This functions processes a job. It is closely tied to the _QueuedJob and
434
    _QueuedOpCode classes.
435

436
    @type job: L{_QueuedJob}
437
    @param job: the job to be processed
438

439
    """
440
    logging.info("Processing job %s", job.id)
441
    proc = mcpu.Processor(self.pool.queue.context, job.id)
442
    queue = job.queue
443
    try:
444
      try:
445
        count = len(job.ops)
446
        for idx, op in enumerate(job.ops):
447
          op_summary = op.input.Summary()
448
          if op.status == constants.OP_STATUS_SUCCESS:
449
            # this is a job that was partially completed before master
450
            # daemon shutdown, so it can be expected that some opcodes
451
            # are already completed successfully (if any did error
452
            # out, then the whole job should have been aborted and not
453
            # resubmitted for processing)
454
            logging.info("Op %s/%s: opcode %s already processed, skipping",
455
                         idx + 1, count, op_summary)
456
            continue
457
          try:
458
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
459
                         op_summary)
460

    
461
            queue.acquire()
462
            try:
463
              if op.status == constants.OP_STATUS_CANCELED:
464
                raise CancelJob()
465
              assert op.status == constants.OP_STATUS_QUEUED
466
              op.status = constants.OP_STATUS_WAITLOCK
467
              op.result = None
468
              op.start_timestamp = TimeStampNow()
469
              if idx == 0: # first opcode
470
                job.start_timestamp = op.start_timestamp
471
              queue.UpdateJobUnlocked(job)
472

    
473
              input_opcode = op.input
474
            finally:
475
              queue.release()
476

    
477
            # Make sure not to hold queue lock while calling ExecOpCode
478
            result = proc.ExecOpCode(input_opcode,
479
                                     _OpExecCallbacks(queue, job, op))
480

    
481
            queue.acquire()
482
            try:
483
              op.status = constants.OP_STATUS_SUCCESS
484
              op.result = result
485
              op.end_timestamp = TimeStampNow()
486
              queue.UpdateJobUnlocked(job)
487
            finally:
488
              queue.release()
489

    
490
            logging.info("Op %s/%s: Successfully finished opcode %s",
491
                         idx + 1, count, op_summary)
492
          except CancelJob:
493
            # Will be handled further up
494
            raise
495
          except Exception, err:
496
            queue.acquire()
497
            try:
498
              try:
499
                op.status = constants.OP_STATUS_ERROR
500
                if isinstance(err, errors.GenericError):
501
                  op.result = errors.EncodeException(err)
502
                else:
503
                  op.result = str(err)
504
                op.end_timestamp = TimeStampNow()
505
                logging.info("Op %s/%s: Error in opcode %s: %s",
506
                             idx + 1, count, op_summary, err)
507
              finally:
508
                queue.UpdateJobUnlocked(job)
509
            finally:
510
              queue.release()
511
            raise
512

    
513
      except CancelJob:
514
        queue.acquire()
515
        try:
516
          queue.CancelJobUnlocked(job)
517
        finally:
518
          queue.release()
519
      except errors.GenericError, err:
520
        logging.exception("Ganeti exception")
521
      except:
522
        logging.exception("Unhandled exception")
523
    finally:
524
      queue.acquire()
525
      try:
526
        try:
527
          job.lock_status = None
528
          job.end_timestamp = TimeStampNow()
529
          queue.UpdateJobUnlocked(job)
530
        finally:
531
          job_id = job.id
532
          status = job.CalcStatus()
533
      finally:
534
        queue.release()
535

    
536
      logging.info("Finished job %s, status = %s", job_id, status)
537

    
538

    
539
class _JobQueueWorkerPool(workerpool.WorkerPool):
540
  """Simple class implementing a job-processing workerpool.
541

542
  """
543
  def __init__(self, queue):
544
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
545
                                              JOBQUEUE_THREADS,
546
                                              _JobQueueWorker)
547
    self.queue = queue
548

    
549

    
550
def _RequireOpenQueue(fn):
551
  """Decorator for "public" functions.
552

553
  This function should be used for all 'public' functions. That is,
554
  functions usually called from other classes. Note that this should
555
  be applied only to methods (not plain functions), since it expects
556
  that the decorated function is called with a first argument that has
557
  a '_queue_lock' argument.
558

559
  @warning: Use this decorator only after utils.LockedMethod!
560

561
  Example::
562
    @utils.LockedMethod
563
    @_RequireOpenQueue
564
    def Example(self):
565
      pass
566

567
  """
568
  def wrapper(self, *args, **kwargs):
569
    # pylint: disable-msg=W0212
570
    assert self._queue_lock is not None, "Queue should be open"
571
    return fn(self, *args, **kwargs)
572
  return wrapper
573

    
574

    
575
class JobQueue(object):
576
  """Queue used to manage the jobs.
577

578
  @cvar _RE_JOB_FILE: regex matching the valid job file names
579

580
  """
581
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
582

    
583
  def __init__(self, context):
584
    """Constructor for JobQueue.
585

586
    The constructor will initialize the job queue object and then
587
    start loading the current jobs from disk, either for starting them
588
    (if they were queue) or for aborting them (if they were already
589
    running).
590

591
    @type context: GanetiContext
592
    @param context: the context object for access to the configuration
593
        data and other ganeti objects
594

595
    """
596
    self.context = context
597
    self._memcache = weakref.WeakValueDictionary()
598
    self._my_hostname = utils.HostInfo().name
599

    
600
    # Locking
601
    self._lock = threading.Lock()
602
    self.acquire = self._lock.acquire
603
    self.release = self._lock.release
604

    
605
    # Initialize
606
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
607

    
608
    # Read serial file
609
    self._last_serial = jstore.ReadSerial()
610
    assert self._last_serial is not None, ("Serial file was modified between"
611
                                           " check in jstore and here")
612

    
613
    # Get initial list of nodes
614
    self._nodes = dict((n.name, n.primary_ip)
615
                       for n in self.context.cfg.GetAllNodesInfo().values()
616
                       if n.master_candidate)
617

    
618
    # Remove master node
619
    try:
620
      del self._nodes[self._my_hostname]
621
    except KeyError:
622
      pass
623

    
624
    # TODO: Check consistency across nodes
625

    
626
    # Setup worker pool
627
    self._wpool = _JobQueueWorkerPool(self)
628
    try:
629
      # We need to lock here because WorkerPool.AddTask() may start a job while
630
      # we're still doing our work.
631
      self.acquire()
632
      try:
633
        logging.info("Inspecting job queue")
634

    
635
        all_job_ids = self._GetJobIDsUnlocked()
636
        jobs_count = len(all_job_ids)
637
        lastinfo = time.time()
638
        for idx, job_id in enumerate(all_job_ids):
639
          # Give an update every 1000 jobs or 10 seconds
640
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
641
              idx == (jobs_count - 1)):
642
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
643
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
644
            lastinfo = time.time()
645

    
646
          job = self._LoadJobUnlocked(job_id)
647

    
648
          # a failure in loading the job can cause 'None' to be returned
649
          if job is None:
650
            continue
651

    
652
          status = job.CalcStatus()
653

    
654
          if status in (constants.JOB_STATUS_QUEUED, ):
655
            self._wpool.AddTask(job)
656

    
657
          elif status in (constants.JOB_STATUS_RUNNING,
658
                          constants.JOB_STATUS_WAITLOCK,
659
                          constants.JOB_STATUS_CANCELING):
660
            logging.warning("Unfinished job %s found: %s", job.id, job)
661
            try:
662
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
663
                                    "Unclean master daemon shutdown")
664
            finally:
665
              self.UpdateJobUnlocked(job)
666

    
667
        logging.info("Job queue inspection finished")
668
      finally:
669
        self.release()
670
    except:
671
      self._wpool.TerminateWorkers()
672
      raise
673

    
674
  @utils.LockedMethod
675
  @_RequireOpenQueue
676
  def AddNode(self, node):
677
    """Register a new node with the queue.
678

679
    @type node: L{objects.Node}
680
    @param node: the node object to be added
681

682
    """
683
    node_name = node.name
684
    assert node_name != self._my_hostname
685

    
686
    # Clean queue directory on added node
687
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
688
    msg = result.fail_msg
689
    if msg:
690
      logging.warning("Cannot cleanup queue directory on node %s: %s",
691
                      node_name, msg)
692

    
693
    if not node.master_candidate:
694
      # remove if existing, ignoring errors
695
      self._nodes.pop(node_name, None)
696
      # and skip the replication of the job ids
697
      return
698

    
699
    # Upload the whole queue excluding archived jobs
700
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
701

    
702
    # Upload current serial file
703
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
704

    
705
    for file_name in files:
706
      # Read file content
707
      content = utils.ReadFile(file_name)
708

    
709
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
710
                                                  [node.primary_ip],
711
                                                  file_name, content)
712
      msg = result[node_name].fail_msg
713
      if msg:
714
        logging.error("Failed to upload file %s to node %s: %s",
715
                      file_name, node_name, msg)
716

    
717
    self._nodes[node_name] = node.primary_ip
718

    
719
  @utils.LockedMethod
720
  @_RequireOpenQueue
721
  def RemoveNode(self, node_name):
722
    """Callback called when removing nodes from the cluster.
723

724
    @type node_name: str
725
    @param node_name: the name of the node to remove
726

727
    """
728
    try:
729
      # The queue is removed by the "leave node" RPC call.
730
      del self._nodes[node_name]
731
    except KeyError:
732
      pass
733

    
734
  @staticmethod
735
  def _CheckRpcResult(result, nodes, failmsg):
736
    """Verifies the status of an RPC call.
737

738
    Since we aim to keep consistency should this node (the current
739
    master) fail, we will log errors if our rpc fail, and especially
740
    log the case when more than half of the nodes fails.
741

742
    @param result: the data as returned from the rpc call
743
    @type nodes: list
744
    @param nodes: the list of nodes we made the call to
745
    @type failmsg: str
746
    @param failmsg: the identifier to be used for logging
747

748
    """
749
    failed = []
750
    success = []
751

    
752
    for node in nodes:
753
      msg = result[node].fail_msg
754
      if msg:
755
        failed.append(node)
756
        logging.error("RPC call %s (%s) failed on node %s: %s",
757
                      result[node].call, failmsg, node, msg)
758
      else:
759
        success.append(node)
760

    
761
    # +1 for the master node
762
    if (len(success) + 1) < len(failed):
763
      # TODO: Handle failing nodes
764
      logging.error("More than half of the nodes failed")
765

    
766
  def _GetNodeIp(self):
767
    """Helper for returning the node name/ip list.
768

769
    @rtype: (list, list)
770
    @return: a tuple of two lists, the first one with the node
771
        names and the second one with the node addresses
772

773
    """
774
    name_list = self._nodes.keys()
775
    addr_list = [self._nodes[name] for name in name_list]
776
    return name_list, addr_list
777

    
778
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
779
    """Writes a file locally and then replicates it to all nodes.
780

781
    This function will replace the contents of a file on the local
782
    node and then replicate it to all the other nodes we have.
783

784
    @type file_name: str
785
    @param file_name: the path of the file to be replicated
786
    @type data: str
787
    @param data: the new contents of the file
788

789
    """
790
    utils.WriteFile(file_name, data=data)
791

    
792
    names, addrs = self._GetNodeIp()
793
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
794
    self._CheckRpcResult(result, self._nodes,
795
                         "Updating %s" % file_name)
796

    
797
  def _RenameFilesUnlocked(self, rename):
798
    """Renames a file locally and then replicate the change.
799

800
    This function will rename a file in the local queue directory
801
    and then replicate this rename to all the other nodes we have.
802

803
    @type rename: list of (old, new)
804
    @param rename: List containing tuples mapping old to new names
805

806
    """
807
    # Rename them locally
808
    for old, new in rename:
809
      utils.RenameFile(old, new, mkdir=True)
810

    
811
    # ... and on all nodes
812
    names, addrs = self._GetNodeIp()
813
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
814
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
815

    
816
  @staticmethod
817
  def _FormatJobID(job_id):
818
    """Convert a job ID to string format.
819

820
    Currently this just does C{str(job_id)} after performing some
821
    checks, but if we want to change the job id format this will
822
    abstract this change.
823

824
    @type job_id: int or long
825
    @param job_id: the numeric job id
826
    @rtype: str
827
    @return: the formatted job id
828

829
    """
830
    if not isinstance(job_id, (int, long)):
831
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
832
    if job_id < 0:
833
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
834

    
835
    return str(job_id)
836

    
837
  @classmethod
838
  def _GetArchiveDirectory(cls, job_id):
839
    """Returns the archive directory for a job.
840

841
    @type job_id: str
842
    @param job_id: Job identifier
843
    @rtype: str
844
    @return: Directory name
845

846
    """
847
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
848

    
849
  def _NewSerialsUnlocked(self, count):
850
    """Generates a new job identifier.
851

852
    Job identifiers are unique during the lifetime of a cluster.
853

854
    @type count: integer
855
    @param count: how many serials to return
856
    @rtype: str
857
    @return: a string representing the job identifier.
858

859
    """
860
    assert count > 0
861
    # New number
862
    serial = self._last_serial + count
863

    
864
    # Write to file
865
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
866
                                        "%s\n" % serial)
867

    
868
    result = [self._FormatJobID(v)
869
              for v in range(self._last_serial, serial + 1)]
870
    # Keep it only if we were able to write the file
871
    self._last_serial = serial
872

    
873
    return result
874

    
875
  @staticmethod
876
  def _GetJobPath(job_id):
877
    """Returns the job file for a given job id.
878

879
    @type job_id: str
880
    @param job_id: the job identifier
881
    @rtype: str
882
    @return: the path to the job file
883

884
    """
885
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
886

    
887
  @classmethod
888
  def _GetArchivedJobPath(cls, job_id):
889
    """Returns the archived job file for a give job id.
890

891
    @type job_id: str
892
    @param job_id: the job identifier
893
    @rtype: str
894
    @return: the path to the archived job file
895

896
    """
897
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
898
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
899

    
900
  @classmethod
901
  def _ExtractJobID(cls, name):
902
    """Extract the job id from a filename.
903

904
    @type name: str
905
    @param name: the job filename
906
    @rtype: job id or None
907
    @return: the job id corresponding to the given filename,
908
        or None if the filename does not represent a valid
909
        job file
910

911
    """
912
    m = cls._RE_JOB_FILE.match(name)
913
    if m:
914
      return m.group(1)
915
    else:
916
      return None
917

    
918
  def _GetJobIDsUnlocked(self, archived=False):
919
    """Return all known job IDs.
920

921
    If the parameter archived is True, archived jobs IDs will be
922
    included. Currently this argument is unused.
923

924
    The method only looks at disk because it's a requirement that all
925
    jobs are present on disk (so in the _memcache we don't have any
926
    extra IDs).
927

928
    @rtype: list
929
    @return: the list of job IDs
930

931
    """
932
    # pylint: disable-msg=W0613
933
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
934
    jlist = utils.NiceSort(jlist)
935
    return jlist
936

    
937
  def _ListJobFiles(self):
938
    """Returns the list of current job files.
939

940
    @rtype: list
941
    @return: the list of job file names
942

943
    """
944
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
945
            if self._RE_JOB_FILE.match(name)]
946

    
947
  def _LoadJobUnlocked(self, job_id):
948
    """Loads a job from the disk or memory.
949

950
    Given a job id, this will return the cached job object if
951
    existing, or try to load the job from the disk. If loading from
952
    disk, it will also add the job to the cache.
953

954
    @param job_id: the job id
955
    @rtype: L{_QueuedJob} or None
956
    @return: either None or the job object
957

958
    """
959
    job = self._memcache.get(job_id, None)
960
    if job:
961
      logging.debug("Found job %s in memcache", job_id)
962
      return job
963

    
964
    filepath = self._GetJobPath(job_id)
965
    logging.debug("Loading job from %s", filepath)
966
    try:
967
      raw_data = utils.ReadFile(filepath)
968
    except IOError, err:
969
      if err.errno in (errno.ENOENT, ):
970
        return None
971
      raise
972

    
973
    data = serializer.LoadJson(raw_data)
974

    
975
    try:
976
      job = _QueuedJob.Restore(self, data)
977
    except Exception, err: # pylint: disable-msg=W0703
978
      new_path = self._GetArchivedJobPath(job_id)
979
      if filepath == new_path:
980
        # job already archived (future case)
981
        logging.exception("Can't parse job %s", job_id)
982
      else:
983
        # non-archived case
984
        logging.exception("Can't parse job %s, will archive.", job_id)
985
        self._RenameFilesUnlocked([(filepath, new_path)])
986
      return None
987

    
988
    self._memcache[job_id] = job
989
    logging.debug("Added job %s to the cache", job_id)
990
    return job
991

    
992
  def _GetJobsUnlocked(self, job_ids):
993
    """Return a list of jobs based on their IDs.
994

995
    @type job_ids: list
996
    @param job_ids: either an empty list (meaning all jobs),
997
        or a list of job IDs
998
    @rtype: list
999
    @return: the list of job objects
1000

1001
    """
1002
    if not job_ids:
1003
      job_ids = self._GetJobIDsUnlocked()
1004

    
1005
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1006

    
1007
  @staticmethod
1008
  def _IsQueueMarkedDrain():
1009
    """Check if the queue is marked from drain.
1010

1011
    This currently uses the queue drain file, which makes it a
1012
    per-node flag. In the future this can be moved to the config file.
1013

1014
    @rtype: boolean
1015
    @return: True of the job queue is marked for draining
1016

1017
    """
1018
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1019

    
1020
  @staticmethod
1021
  def SetDrainFlag(drain_flag):
1022
    """Sets the drain flag for the queue.
1023

1024
    This is similar to the function L{backend.JobQueueSetDrainFlag},
1025
    and in the future we might merge them.
1026

1027
    @type drain_flag: boolean
1028
    @param drain_flag: Whether to set or unset the drain flag
1029

1030
    """
1031
    if drain_flag:
1032
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1033
    else:
1034
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1035
    return True
1036

    
1037
  @_RequireOpenQueue
1038
  def _SubmitJobUnlocked(self, job_id, ops):
1039
    """Create and store a new job.
1040

1041
    This enters the job into our job queue and also puts it on the new
1042
    queue, in order for it to be picked up by the queue processors.
1043

1044
    @type job_id: job ID
1045
    @param job_id: the job ID for the new job
1046
    @type ops: list
1047
    @param ops: The list of OpCodes that will become the new job.
1048
    @rtype: job ID
1049
    @return: the job ID of the newly created job
1050
    @raise errors.JobQueueDrainError: if the job is marked for draining
1051

1052
    """
1053
    if self._IsQueueMarkedDrain():
1054
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1055

    
1056
    # Check job queue size
1057
    size = len(self._ListJobFiles())
1058
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1059
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1060
      # submission, though.
1061
      #size = ...
1062
      pass
1063

    
1064
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1065
      raise errors.JobQueueFull()
1066

    
1067
    job = _QueuedJob(self, job_id, ops)
1068

    
1069
    # Write to disk
1070
    self.UpdateJobUnlocked(job)
1071

    
1072
    logging.debug("Adding new job %s to the cache", job_id)
1073
    self._memcache[job_id] = job
1074

    
1075
    # Add to worker pool
1076
    self._wpool.AddTask(job)
1077

    
1078
    return job.id
1079

    
1080
  @utils.LockedMethod
1081
  @_RequireOpenQueue
1082
  def SubmitJob(self, ops):
1083
    """Create and store a new job.
1084

1085
    @see: L{_SubmitJobUnlocked}
1086

1087
    """
1088
    job_id = self._NewSerialsUnlocked(1)[0]
1089
    return self._SubmitJobUnlocked(job_id, ops)
1090

    
1091
  @utils.LockedMethod
1092
  @_RequireOpenQueue
1093
  def SubmitManyJobs(self, jobs):
1094
    """Create and store multiple jobs.
1095

1096
    @see: L{_SubmitJobUnlocked}
1097

1098
    """
1099
    results = []
1100
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1101
    for job_id, ops in zip(all_job_ids, jobs):
1102
      try:
1103
        data = self._SubmitJobUnlocked(job_id, ops)
1104
        status = True
1105
      except errors.GenericError, err:
1106
        data = str(err)
1107
        status = False
1108
      results.append((status, data))
1109

    
1110
    return results
1111

    
1112
  @_RequireOpenQueue
1113
  def UpdateJobUnlocked(self, job):
1114
    """Update a job's on disk storage.
1115

1116
    After a job has been modified, this function needs to be called in
1117
    order to write the changes to disk and replicate them to the other
1118
    nodes.
1119

1120
    @type job: L{_QueuedJob}
1121
    @param job: the changed job
1122

1123
    """
1124
    filename = self._GetJobPath(job.id)
1125
    data = serializer.DumpJson(job.Serialize(), indent=False)
1126
    logging.debug("Writing job %s to %s", job.id, filename)
1127
    self._WriteAndReplicateFileUnlocked(filename, data)
1128

    
1129
    # Notify waiters about potential changes
1130
    job.change.notifyAll()
1131

    
1132
  @utils.LockedMethod
1133
  @_RequireOpenQueue
1134
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1135
                        timeout):
1136
    """Waits for changes in a job.
1137

1138
    @type job_id: string
1139
    @param job_id: Job identifier
1140
    @type fields: list of strings
1141
    @param fields: Which fields to check for changes
1142
    @type prev_job_info: list or None
1143
    @param prev_job_info: Last job information returned
1144
    @type prev_log_serial: int
1145
    @param prev_log_serial: Last job message serial number
1146
    @type timeout: float
1147
    @param timeout: maximum time to wait
1148
    @rtype: tuple (job info, log entries)
1149
    @return: a tuple of the job information as required via
1150
        the fields parameter, and the log entries as a list
1151

1152
        if the job has not changed and the timeout has expired,
1153
        we instead return a special value,
1154
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1155
        as such by the clients
1156

1157
    """
1158
    job = self._LoadJobUnlocked(job_id)
1159
    if not job:
1160
      logging.debug("Job %s not found", job_id)
1161
      return None
1162

    
1163
    def _CheckForChanges():
1164
      logging.debug("Waiting for changes in job %s", job_id)
1165

    
1166
      status = job.CalcStatus()
1167
      job_info = self._GetJobInfoUnlocked(job, fields)
1168
      log_entries = job.GetLogEntries(prev_log_serial)
1169

    
1170
      # Serializing and deserializing data can cause type changes (e.g. from
1171
      # tuple to list) or precision loss. We're doing it here so that we get
1172
      # the same modifications as the data received from the client. Without
1173
      # this, the comparison afterwards might fail without the data being
1174
      # significantly different.
1175
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1176
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1177

    
1178
      # Don't even try to wait if the job is no longer running, there will be
1179
      # no changes.
1180
      if (status not in (constants.JOB_STATUS_QUEUED,
1181
                         constants.JOB_STATUS_RUNNING,
1182
                         constants.JOB_STATUS_WAITLOCK) or
1183
          prev_job_info != job_info or
1184
          (log_entries and prev_log_serial != log_entries[0][0])):
1185
        logging.debug("Job %s changed", job_id)
1186
        return (job_info, log_entries)
1187

    
1188
      raise utils.RetryAgain()
1189

    
1190
    try:
1191
      # Setting wait function to release the queue lock while waiting
1192
      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1193
                         wait_fn=job.change.wait)
1194
    except utils.RetryTimeout:
1195
      return constants.JOB_NOTCHANGED
1196

    
1197
  @utils.LockedMethod
1198
  @_RequireOpenQueue
1199
  def CancelJob(self, job_id):
1200
    """Cancels a job.
1201

1202
    This will only succeed if the job has not started yet.
1203

1204
    @type job_id: string
1205
    @param job_id: job ID of job to be cancelled.
1206

1207
    """
1208
    logging.info("Cancelling job %s", job_id)
1209

    
1210
    job = self._LoadJobUnlocked(job_id)
1211
    if not job:
1212
      logging.debug("Job %s not found", job_id)
1213
      return (False, "Job %s not found" % job_id)
1214

    
1215
    job_status = job.CalcStatus()
1216

    
1217
    if job_status not in (constants.JOB_STATUS_QUEUED,
1218
                          constants.JOB_STATUS_WAITLOCK):
1219
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1220
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1221

    
1222
    if job_status == constants.JOB_STATUS_QUEUED:
1223
      self.CancelJobUnlocked(job)
1224
      return (True, "Job %s canceled" % job.id)
1225

    
1226
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1227
      # The worker will notice the new status and cancel the job
1228
      try:
1229
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1230
      finally:
1231
        self.UpdateJobUnlocked(job)
1232
      return (True, "Job %s will be canceled" % job.id)
1233

    
1234
  @_RequireOpenQueue
1235
  def CancelJobUnlocked(self, job):
1236
    """Marks a job as canceled.
1237

1238
    """
1239
    try:
1240
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1241
                            "Job canceled by request")
1242
    finally:
1243
      self.UpdateJobUnlocked(job)
1244

    
1245
  @_RequireOpenQueue
1246
  def _ArchiveJobsUnlocked(self, jobs):
1247
    """Archives jobs.
1248

1249
    @type jobs: list of L{_QueuedJob}
1250
    @param jobs: Job objects
1251
    @rtype: int
1252
    @return: Number of archived jobs
1253

1254
    """
1255
    archive_jobs = []
1256
    rename_files = []
1257
    for job in jobs:
1258
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1259
                                  constants.JOB_STATUS_SUCCESS,
1260
                                  constants.JOB_STATUS_ERROR):
1261
        logging.debug("Job %s is not yet done", job.id)
1262
        continue
1263

    
1264
      archive_jobs.append(job)
1265

    
1266
      old = self._GetJobPath(job.id)
1267
      new = self._GetArchivedJobPath(job.id)
1268
      rename_files.append((old, new))
1269

    
1270
    # TODO: What if 1..n files fail to rename?
1271
    self._RenameFilesUnlocked(rename_files)
1272

    
1273
    logging.debug("Successfully archived job(s) %s",
1274
                  utils.CommaJoin(job.id for job in archive_jobs))
1275

    
1276
    return len(archive_jobs)
1277

    
1278
  @utils.LockedMethod
1279
  @_RequireOpenQueue
1280
  def ArchiveJob(self, job_id):
1281
    """Archives a job.
1282

1283
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1284

1285
    @type job_id: string
1286
    @param job_id: Job ID of job to be archived.
1287
    @rtype: bool
1288
    @return: Whether job was archived
1289

1290
    """
1291
    logging.info("Archiving job %s", job_id)
1292

    
1293
    job = self._LoadJobUnlocked(job_id)
1294
    if not job:
1295
      logging.debug("Job %s not found", job_id)
1296
      return False
1297

    
1298
    return self._ArchiveJobsUnlocked([job]) == 1
1299

    
1300
  @utils.LockedMethod
1301
  @_RequireOpenQueue
1302
  def AutoArchiveJobs(self, age, timeout):
1303
    """Archives all jobs based on age.
1304

1305
    The method will archive all jobs which are older than the age
1306
    parameter. For jobs that don't have an end timestamp, the start
1307
    timestamp will be considered. The special '-1' age will cause
1308
    archival of all jobs (that are not running or queued).
1309

1310
    @type age: int
1311
    @param age: the minimum age in seconds
1312

1313
    """
1314
    logging.info("Archiving jobs with age more than %s seconds", age)
1315

    
1316
    now = time.time()
1317
    end_time = now + timeout
1318
    archived_count = 0
1319
    last_touched = 0
1320

    
1321
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1322
    pending = []
1323
    for idx, job_id in enumerate(all_job_ids):
1324
      last_touched = idx + 1
1325

    
1326
      # Not optimal because jobs could be pending
1327
      # TODO: Measure average duration for job archival and take number of
1328
      # pending jobs into account.
1329
      if time.time() > end_time:
1330
        break
1331

    
1332
      # Returns None if the job failed to load
1333
      job = self._LoadJobUnlocked(job_id)
1334
      if job:
1335
        if job.end_timestamp is None:
1336
          if job.start_timestamp is None:
1337
            job_age = job.received_timestamp
1338
          else:
1339
            job_age = job.start_timestamp
1340
        else:
1341
          job_age = job.end_timestamp
1342

    
1343
        if age == -1 or now - job_age[0] > age:
1344
          pending.append(job)
1345

    
1346
          # Archive 10 jobs at a time
1347
          if len(pending) >= 10:
1348
            archived_count += self._ArchiveJobsUnlocked(pending)
1349
            pending = []
1350

    
1351
    if pending:
1352
      archived_count += self._ArchiveJobsUnlocked(pending)
1353

    
1354
    return (archived_count, len(all_job_ids) - last_touched)
1355

    
1356
  @staticmethod
1357
  def _GetJobInfoUnlocked(job, fields):
1358
    """Returns information about a job.
1359

1360
    @type job: L{_QueuedJob}
1361
    @param job: the job which we query
1362
    @type fields: list
1363
    @param fields: names of fields to return
1364
    @rtype: list
1365
    @return: list with one element for each field
1366
    @raise errors.OpExecError: when an invalid field
1367
        has been passed
1368

1369
    """
1370
    row = []
1371
    for fname in fields:
1372
      if fname == "id":
1373
        row.append(job.id)
1374
      elif fname == "status":
1375
        row.append(job.CalcStatus())
1376
      elif fname == "ops":
1377
        row.append([op.input.__getstate__() for op in job.ops])
1378
      elif fname == "opresult":
1379
        row.append([op.result for op in job.ops])
1380
      elif fname == "opstatus":
1381
        row.append([op.status for op in job.ops])
1382
      elif fname == "oplog":
1383
        row.append([op.log for op in job.ops])
1384
      elif fname == "opstart":
1385
        row.append([op.start_timestamp for op in job.ops])
1386
      elif fname == "opend":
1387
        row.append([op.end_timestamp for op in job.ops])
1388
      elif fname == "received_ts":
1389
        row.append(job.received_timestamp)
1390
      elif fname == "start_ts":
1391
        row.append(job.start_timestamp)
1392
      elif fname == "end_ts":
1393
        row.append(job.end_timestamp)
1394
      elif fname == "lock_status":
1395
        row.append(job.lock_status)
1396
      elif fname == "summary":
1397
        row.append([op.input.Summary() for op in job.ops])
1398
      else:
1399
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1400
    return row
1401

    
1402
  @utils.LockedMethod
1403
  @_RequireOpenQueue
1404
  def QueryJobs(self, job_ids, fields):
1405
    """Returns a list of jobs in queue.
1406

1407
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1408
    processing for each job.
1409

1410
    @type job_ids: list
1411
    @param job_ids: sequence of job identifiers or None for all
1412
    @type fields: list
1413
    @param fields: names of fields to return
1414
    @rtype: list
1415
    @return: list one element per job, each element being list with
1416
        the requested fields
1417

1418
    """
1419
    jobs = []
1420

    
1421
    for job in self._GetJobsUnlocked(job_ids):
1422
      if job is None:
1423
        jobs.append(None)
1424
      else:
1425
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1426

    
1427
    return jobs
1428

    
1429
  @utils.LockedMethod
1430
  @_RequireOpenQueue
1431
  def Shutdown(self):
1432
    """Stops the job queue.
1433

1434
    This shutdowns all the worker threads an closes the queue.
1435

1436
    """
1437
    self._wpool.TerminateWorkers()
1438

    
1439
    self._queue_lock.Close()
1440
    self._queue_lock = None