Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 2d54e29c

History | View | Annotate | Download (41.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  __slots__ = ["input", "status", "result", "log",
84
               "start_timestamp", "end_timestamp",
85
               "__weakref__"]
86

    
87
  def __init__(self, op):
88
    """Constructor for the _QuededOpCode.
89

90
    @type op: L{opcodes.OpCode}
91
    @param op: the opcode we encapsulate
92

93
    """
94
    self.input = op
95
    self.status = constants.OP_STATUS_QUEUED
96
    self.result = None
97
    self.log = []
98
    self.start_timestamp = None
99
    self.end_timestamp = None
100

    
101
  @classmethod
102
  def Restore(cls, state):
103
    """Restore the _QueuedOpCode from the serialized form.
104

105
    @type state: dict
106
    @param state: the serialized state
107
    @rtype: _QueuedOpCode
108
    @return: a new _QueuedOpCode instance
109

110
    """
111
    obj = _QueuedOpCode.__new__(cls)
112
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113
    obj.status = state["status"]
114
    obj.result = state["result"]
115
    obj.log = state["log"]
116
    obj.start_timestamp = state.get("start_timestamp", None)
117
    obj.end_timestamp = state.get("end_timestamp", None)
118
    return obj
119

    
120
  def Serialize(self):
121
    """Serializes this _QueuedOpCode.
122

123
    @rtype: dict
124
    @return: the dictionary holding the serialized state
125

126
    """
127
    return {
128
      "input": self.input.__getstate__(),
129
      "status": self.status,
130
      "result": self.result,
131
      "log": self.log,
132
      "start_timestamp": self.start_timestamp,
133
      "end_timestamp": self.end_timestamp,
134
      }
135

    
136

    
137
class _QueuedJob(object):
138
  """In-memory job representation.
139

140
  This is what we use to track the user-submitted jobs. Locking must
141
  be taken care of by users of this class.
142

143
  @type queue: L{JobQueue}
144
  @ivar queue: the parent queue
145
  @ivar id: the job ID
146
  @type ops: list
147
  @ivar ops: the list of _QueuedOpCode that constitute the job
148
  @type log_serial: int
149
  @ivar log_serial: holds the index for the next log entry
150
  @ivar received_timestamp: the timestamp for when the job was received
151
  @ivar start_timestmap: the timestamp for start of execution
152
  @ivar end_timestamp: the timestamp for end of execution
153
  @ivar lock_status: In-memory locking information for debugging
154
  @ivar change: a Condition variable we use for waiting for job changes
155

156
  """
157
  # pylint: disable-msg=W0212
158
  __slots__ = ["queue", "id", "ops", "log_serial",
159
               "received_timestamp", "start_timestamp", "end_timestamp",
160
               "lock_status", "change",
161
               "__weakref__"]
162

    
163
  def __init__(self, queue, job_id, ops):
164
    """Constructor for the _QueuedJob.
165

166
    @type queue: L{JobQueue}
167
    @param queue: our parent queue
168
    @type job_id: job_id
169
    @param job_id: our job id
170
    @type ops: list
171
    @param ops: the list of opcodes we hold, which will be encapsulated
172
        in _QueuedOpCodes
173

174
    """
175
    if not ops:
176
      # TODO: use a better exception
177
      raise Exception("No opcodes")
178

    
179
    self.queue = queue
180
    self.id = job_id
181
    self.ops = [_QueuedOpCode(op) for op in ops]
182
    self.log_serial = 0
183
    self.received_timestamp = TimeStampNow()
184
    self.start_timestamp = None
185
    self.end_timestamp = None
186

    
187
    # In-memory attributes
188
    self.lock_status = None
189

    
190
    # Condition to wait for changes
191
    self.change = threading.Condition(self.queue._lock)
192

    
193
  @classmethod
194
  def Restore(cls, queue, state):
195
    """Restore a _QueuedJob from serialized state:
196

197
    @type queue: L{JobQueue}
198
    @param queue: to which queue the restored job belongs
199
    @type state: dict
200
    @param state: the serialized state
201
    @rtype: _JobQueue
202
    @return: the restored _JobQueue instance
203

204
    """
205
    obj = _QueuedJob.__new__(cls)
206
    obj.queue = queue
207
    obj.id = state["id"]
208
    obj.received_timestamp = state.get("received_timestamp", None)
209
    obj.start_timestamp = state.get("start_timestamp", None)
210
    obj.end_timestamp = state.get("end_timestamp", None)
211

    
212
    # In-memory attributes
213
    obj.lock_status = None
214

    
215
    obj.ops = []
216
    obj.log_serial = 0
217
    for op_state in state["ops"]:
218
      op = _QueuedOpCode.Restore(op_state)
219
      for log_entry in op.log:
220
        obj.log_serial = max(obj.log_serial, log_entry[0])
221
      obj.ops.append(op)
222

    
223
    # Condition to wait for changes
224
    obj.change = threading.Condition(obj.queue._lock)
225

    
226
    return obj
227

    
228
  def Serialize(self):
229
    """Serialize the _JobQueue instance.
230

231
    @rtype: dict
232
    @return: the serialized state
233

234
    """
235
    return {
236
      "id": self.id,
237
      "ops": [op.Serialize() for op in self.ops],
238
      "start_timestamp": self.start_timestamp,
239
      "end_timestamp": self.end_timestamp,
240
      "received_timestamp": self.received_timestamp,
241
      }
242

    
243
  def CalcStatus(self):
244
    """Compute the status of this job.
245

246
    This function iterates over all the _QueuedOpCodes in the job and
247
    based on their status, computes the job status.
248

249
    The algorithm is:
250
      - if we find a cancelled, or finished with error, the job
251
        status will be the same
252
      - otherwise, the last opcode with the status one of:
253
          - waitlock
254
          - canceling
255
          - running
256

257
        will determine the job status
258

259
      - otherwise, it means either all opcodes are queued, or success,
260
        and the job status will be the same
261

262
    @return: the job status
263

264
    """
265
    status = constants.JOB_STATUS_QUEUED
266

    
267
    all_success = True
268
    for op in self.ops:
269
      if op.status == constants.OP_STATUS_SUCCESS:
270
        continue
271

    
272
      all_success = False
273

    
274
      if op.status == constants.OP_STATUS_QUEUED:
275
        pass
276
      elif op.status == constants.OP_STATUS_WAITLOCK:
277
        status = constants.JOB_STATUS_WAITLOCK
278
      elif op.status == constants.OP_STATUS_RUNNING:
279
        status = constants.JOB_STATUS_RUNNING
280
      elif op.status == constants.OP_STATUS_CANCELING:
281
        status = constants.JOB_STATUS_CANCELING
282
        break
283
      elif op.status == constants.OP_STATUS_ERROR:
284
        status = constants.JOB_STATUS_ERROR
285
        # The whole job fails if one opcode failed
286
        break
287
      elif op.status == constants.OP_STATUS_CANCELED:
288
        status = constants.OP_STATUS_CANCELED
289
        break
290

    
291
    if all_success:
292
      status = constants.JOB_STATUS_SUCCESS
293

    
294
    return status
295

    
296
  def GetLogEntries(self, newer_than):
297
    """Selectively returns the log entries.
298

299
    @type newer_than: None or int
300
    @param newer_than: if this is None, return all log entries,
301
        otherwise return only the log entries with serial higher
302
        than this value
303
    @rtype: list
304
    @return: the list of the log entries selected
305

306
    """
307
    if newer_than is None:
308
      serial = -1
309
    else:
310
      serial = newer_than
311

    
312
    entries = []
313
    for op in self.ops:
314
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
315

    
316
    return entries
317

    
318
  def MarkUnfinishedOps(self, status, result):
319
    """Mark unfinished opcodes with a given status and result.
320

321
    This is an utility function for marking all running or waiting to
322
    be run opcodes with a given status. Opcodes which are already
323
    finalised are not changed.
324

325
    @param status: a given opcode status
326
    @param result: the opcode result
327

328
    """
329
    not_marked = True
330
    for op in self.ops:
331
      if op.status in constants.OPS_FINALIZED:
332
        assert not_marked, "Finalized opcodes found after non-finalized ones"
333
        continue
334
      op.status = status
335
      op.result = result
336
      not_marked = False
337

    
338

    
339
class _OpExecCallbacks(mcpu.OpExecCbBase):
340
  def __init__(self, queue, job, op):
341
    """Initializes this class.
342

343
    @type queue: L{JobQueue}
344
    @param queue: Job queue
345
    @type job: L{_QueuedJob}
346
    @param job: Job object
347
    @type op: L{_QueuedOpCode}
348
    @param op: OpCode
349

350
    """
351
    assert queue, "Queue is missing"
352
    assert job, "Job is missing"
353
    assert op, "Opcode is missing"
354

    
355
    self._queue = queue
356
    self._job = job
357
    self._op = op
358

    
359
  def NotifyStart(self):
360
    """Mark the opcode as running, not lock-waiting.
361

362
    This is called from the mcpu code as a notifier function, when the LU is
363
    finally about to start the Exec() method. Of course, to have end-user
364
    visible results, the opcode must be initially (before calling into
365
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
366

367
    """
368
    self._queue.acquire()
369
    try:
370
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
371
                                 constants.OP_STATUS_CANCELING)
372

    
373
      # All locks are acquired by now
374
      self._job.lock_status = None
375

    
376
      # Cancel here if we were asked to
377
      if self._op.status == constants.OP_STATUS_CANCELING:
378
        raise CancelJob()
379

    
380
      self._op.status = constants.OP_STATUS_RUNNING
381
    finally:
382
      self._queue.release()
383

    
384
  def Feedback(self, *args):
385
    """Append a log entry.
386

387
    """
388
    assert len(args) < 3
389

    
390
    if len(args) == 1:
391
      log_type = constants.ELOG_MESSAGE
392
      log_msg = args[0]
393
    else:
394
      (log_type, log_msg) = args
395

    
396
    # The time is split to make serialization easier and not lose
397
    # precision.
398
    timestamp = utils.SplitTime(time.time())
399

    
400
    self._queue.acquire()
401
    try:
402
      self._job.log_serial += 1
403
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
404

    
405
      self._job.change.notifyAll()
406
    finally:
407
      self._queue.release()
408

    
409
  def ReportLocks(self, msg):
410
    """Write locking information to the job.
411

412
    Called whenever the LU processor is waiting for a lock or has acquired one.
413

414
    """
415
    # Not getting the queue lock because this is a single assignment
416
    self._job.lock_status = msg
417

    
418

    
419
class _JobQueueWorker(workerpool.BaseWorker):
420
  """The actual job workers.
421

422
  """
423
  def RunTask(self, job): # pylint: disable-msg=W0221
424
    """Job executor.
425

426
    This functions processes a job. It is closely tied to the _QueuedJob and
427
    _QueuedOpCode classes.
428

429
    @type job: L{_QueuedJob}
430
    @param job: the job to be processed
431

432
    """
433
    logging.info("Worker %s processing job %s",
434
                  self.worker_id, job.id)
435
    proc = mcpu.Processor(self.pool.queue.context, job.id)
436
    queue = job.queue
437
    try:
438
      try:
439
        count = len(job.ops)
440
        for idx, op in enumerate(job.ops):
441
          op_summary = op.input.Summary()
442
          if op.status == constants.OP_STATUS_SUCCESS:
443
            # this is a job that was partially completed before master
444
            # daemon shutdown, so it can be expected that some opcodes
445
            # are already completed successfully (if any did error
446
            # out, then the whole job should have been aborted and not
447
            # resubmitted for processing)
448
            logging.info("Op %s/%s: opcode %s already processed, skipping",
449
                         idx + 1, count, op_summary)
450
            continue
451
          try:
452
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
453
                         op_summary)
454

    
455
            queue.acquire()
456
            try:
457
              if op.status == constants.OP_STATUS_CANCELED:
458
                raise CancelJob()
459
              assert op.status == constants.OP_STATUS_QUEUED
460
              op.status = constants.OP_STATUS_WAITLOCK
461
              op.result = None
462
              op.start_timestamp = TimeStampNow()
463
              if idx == 0: # first opcode
464
                job.start_timestamp = op.start_timestamp
465
              queue.UpdateJobUnlocked(job)
466

    
467
              input_opcode = op.input
468
            finally:
469
              queue.release()
470

    
471
            # Make sure not to hold queue lock while calling ExecOpCode
472
            result = proc.ExecOpCode(input_opcode,
473
                                     _OpExecCallbacks(queue, job, op))
474

    
475
            queue.acquire()
476
            try:
477
              op.status = constants.OP_STATUS_SUCCESS
478
              op.result = result
479
              op.end_timestamp = TimeStampNow()
480
              queue.UpdateJobUnlocked(job)
481
            finally:
482
              queue.release()
483

    
484
            logging.info("Op %s/%s: Successfully finished opcode %s",
485
                         idx + 1, count, op_summary)
486
          except CancelJob:
487
            # Will be handled further up
488
            raise
489
          except Exception, err:
490
            queue.acquire()
491
            try:
492
              try:
493
                op.status = constants.OP_STATUS_ERROR
494
                if isinstance(err, errors.GenericError):
495
                  op.result = errors.EncodeException(err)
496
                else:
497
                  op.result = str(err)
498
                op.end_timestamp = TimeStampNow()
499
                logging.info("Op %s/%s: Error in opcode %s: %s",
500
                             idx + 1, count, op_summary, err)
501
              finally:
502
                queue.UpdateJobUnlocked(job)
503
            finally:
504
              queue.release()
505
            raise
506

    
507
      except CancelJob:
508
        queue.acquire()
509
        try:
510
          queue.CancelJobUnlocked(job)
511
        finally:
512
          queue.release()
513
      except errors.GenericError, err:
514
        logging.exception("Ganeti exception")
515
      except:
516
        logging.exception("Unhandled exception")
517
    finally:
518
      queue.acquire()
519
      try:
520
        try:
521
          job.lock_status = None
522
          job.end_timestamp = TimeStampNow()
523
          queue.UpdateJobUnlocked(job)
524
        finally:
525
          job_id = job.id
526
          status = job.CalcStatus()
527
      finally:
528
        queue.release()
529

    
530
      logging.info("Worker %s finished job %s, status = %s",
531
                   self.worker_id, job_id, status)
532

    
533

    
534
class _JobQueueWorkerPool(workerpool.WorkerPool):
535
  """Simple class implementing a job-processing workerpool.
536

537
  """
538
  def __init__(self, queue):
539
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
540
                                              _JobQueueWorker)
541
    self.queue = queue
542

    
543

    
544
def _RequireOpenQueue(fn):
545
  """Decorator for "public" functions.
546

547
  This function should be used for all 'public' functions. That is,
548
  functions usually called from other classes. Note that this should
549
  be applied only to methods (not plain functions), since it expects
550
  that the decorated function is called with a first argument that has
551
  a '_queue_lock' argument.
552

553
  @warning: Use this decorator only after utils.LockedMethod!
554

555
  Example::
556
    @utils.LockedMethod
557
    @_RequireOpenQueue
558
    def Example(self):
559
      pass
560

561
  """
562
  def wrapper(self, *args, **kwargs):
563
    # pylint: disable-msg=W0212
564
    assert self._queue_lock is not None, "Queue should be open"
565
    return fn(self, *args, **kwargs)
566
  return wrapper
567

    
568

    
569
class JobQueue(object):
570
  """Queue used to manage the jobs.
571

572
  @cvar _RE_JOB_FILE: regex matching the valid job file names
573

574
  """
575
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
576

    
577
  def __init__(self, context):
578
    """Constructor for JobQueue.
579

580
    The constructor will initialize the job queue object and then
581
    start loading the current jobs from disk, either for starting them
582
    (if they were queue) or for aborting them (if they were already
583
    running).
584

585
    @type context: GanetiContext
586
    @param context: the context object for access to the configuration
587
        data and other ganeti objects
588

589
    """
590
    self.context = context
591
    self._memcache = weakref.WeakValueDictionary()
592
    self._my_hostname = utils.HostInfo().name
593

    
594
    # Locking
595
    self._lock = threading.Lock()
596
    self.acquire = self._lock.acquire
597
    self.release = self._lock.release
598

    
599
    # Initialize
600
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
601

    
602
    # Read serial file
603
    self._last_serial = jstore.ReadSerial()
604
    assert self._last_serial is not None, ("Serial file was modified between"
605
                                           " check in jstore and here")
606

    
607
    # Get initial list of nodes
608
    self._nodes = dict((n.name, n.primary_ip)
609
                       for n in self.context.cfg.GetAllNodesInfo().values()
610
                       if n.master_candidate)
611

    
612
    # Remove master node
613
    try:
614
      del self._nodes[self._my_hostname]
615
    except KeyError:
616
      pass
617

    
618
    # TODO: Check consistency across nodes
619

    
620
    # Setup worker pool
621
    self._wpool = _JobQueueWorkerPool(self)
622
    try:
623
      # We need to lock here because WorkerPool.AddTask() may start a job while
624
      # we're still doing our work.
625
      self.acquire()
626
      try:
627
        logging.info("Inspecting job queue")
628

    
629
        all_job_ids = self._GetJobIDsUnlocked()
630
        jobs_count = len(all_job_ids)
631
        lastinfo = time.time()
632
        for idx, job_id in enumerate(all_job_ids):
633
          # Give an update every 1000 jobs or 10 seconds
634
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
635
              idx == (jobs_count - 1)):
636
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
637
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
638
            lastinfo = time.time()
639

    
640
          job = self._LoadJobUnlocked(job_id)
641

    
642
          # a failure in loading the job can cause 'None' to be returned
643
          if job is None:
644
            continue
645

    
646
          status = job.CalcStatus()
647

    
648
          if status in (constants.JOB_STATUS_QUEUED, ):
649
            self._wpool.AddTask(job)
650

    
651
          elif status in (constants.JOB_STATUS_RUNNING,
652
                          constants.JOB_STATUS_WAITLOCK,
653
                          constants.JOB_STATUS_CANCELING):
654
            logging.warning("Unfinished job %s found: %s", job.id, job)
655
            try:
656
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
657
                                    "Unclean master daemon shutdown")
658
            finally:
659
              self.UpdateJobUnlocked(job)
660

    
661
        logging.info("Job queue inspection finished")
662
      finally:
663
        self.release()
664
    except:
665
      self._wpool.TerminateWorkers()
666
      raise
667

    
668
  @utils.LockedMethod
669
  @_RequireOpenQueue
670
  def AddNode(self, node):
671
    """Register a new node with the queue.
672

673
    @type node: L{objects.Node}
674
    @param node: the node object to be added
675

676
    """
677
    node_name = node.name
678
    assert node_name != self._my_hostname
679

    
680
    # Clean queue directory on added node
681
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
682
    msg = result.fail_msg
683
    if msg:
684
      logging.warning("Cannot cleanup queue directory on node %s: %s",
685
                      node_name, msg)
686

    
687
    if not node.master_candidate:
688
      # remove if existing, ignoring errors
689
      self._nodes.pop(node_name, None)
690
      # and skip the replication of the job ids
691
      return
692

    
693
    # Upload the whole queue excluding archived jobs
694
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
695

    
696
    # Upload current serial file
697
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
698

    
699
    for file_name in files:
700
      # Read file content
701
      content = utils.ReadFile(file_name)
702

    
703
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
704
                                                  [node.primary_ip],
705
                                                  file_name, content)
706
      msg = result[node_name].fail_msg
707
      if msg:
708
        logging.error("Failed to upload file %s to node %s: %s",
709
                      file_name, node_name, msg)
710

    
711
    self._nodes[node_name] = node.primary_ip
712

    
713
  @utils.LockedMethod
714
  @_RequireOpenQueue
715
  def RemoveNode(self, node_name):
716
    """Callback called when removing nodes from the cluster.
717

718
    @type node_name: str
719
    @param node_name: the name of the node to remove
720

721
    """
722
    try:
723
      # The queue is removed by the "leave node" RPC call.
724
      del self._nodes[node_name]
725
    except KeyError:
726
      pass
727

    
728
  @staticmethod
729
  def _CheckRpcResult(result, nodes, failmsg):
730
    """Verifies the status of an RPC call.
731

732
    Since we aim to keep consistency should this node (the current
733
    master) fail, we will log errors if our rpc fail, and especially
734
    log the case when more than half of the nodes fails.
735

736
    @param result: the data as returned from the rpc call
737
    @type nodes: list
738
    @param nodes: the list of nodes we made the call to
739
    @type failmsg: str
740
    @param failmsg: the identifier to be used for logging
741

742
    """
743
    failed = []
744
    success = []
745

    
746
    for node in nodes:
747
      msg = result[node].fail_msg
748
      if msg:
749
        failed.append(node)
750
        logging.error("RPC call %s (%s) failed on node %s: %s",
751
                      result[node].call, failmsg, node, msg)
752
      else:
753
        success.append(node)
754

    
755
    # +1 for the master node
756
    if (len(success) + 1) < len(failed):
757
      # TODO: Handle failing nodes
758
      logging.error("More than half of the nodes failed")
759

    
760
  def _GetNodeIp(self):
761
    """Helper for returning the node name/ip list.
762

763
    @rtype: (list, list)
764
    @return: a tuple of two lists, the first one with the node
765
        names and the second one with the node addresses
766

767
    """
768
    name_list = self._nodes.keys()
769
    addr_list = [self._nodes[name] for name in name_list]
770
    return name_list, addr_list
771

    
772
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
773
    """Writes a file locally and then replicates it to all nodes.
774

775
    This function will replace the contents of a file on the local
776
    node and then replicate it to all the other nodes we have.
777

778
    @type file_name: str
779
    @param file_name: the path of the file to be replicated
780
    @type data: str
781
    @param data: the new contents of the file
782

783
    """
784
    utils.WriteFile(file_name, data=data)
785

    
786
    names, addrs = self._GetNodeIp()
787
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
788
    self._CheckRpcResult(result, self._nodes,
789
                         "Updating %s" % file_name)
790

    
791
  def _RenameFilesUnlocked(self, rename):
792
    """Renames a file locally and then replicate the change.
793

794
    This function will rename a file in the local queue directory
795
    and then replicate this rename to all the other nodes we have.
796

797
    @type rename: list of (old, new)
798
    @param rename: List containing tuples mapping old to new names
799

800
    """
801
    # Rename them locally
802
    for old, new in rename:
803
      utils.RenameFile(old, new, mkdir=True)
804

    
805
    # ... and on all nodes
806
    names, addrs = self._GetNodeIp()
807
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
808
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
809

    
810
  @staticmethod
811
  def _FormatJobID(job_id):
812
    """Convert a job ID to string format.
813

814
    Currently this just does C{str(job_id)} after performing some
815
    checks, but if we want to change the job id format this will
816
    abstract this change.
817

818
    @type job_id: int or long
819
    @param job_id: the numeric job id
820
    @rtype: str
821
    @return: the formatted job id
822

823
    """
824
    if not isinstance(job_id, (int, long)):
825
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
826
    if job_id < 0:
827
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
828

    
829
    return str(job_id)
830

    
831
  @classmethod
832
  def _GetArchiveDirectory(cls, job_id):
833
    """Returns the archive directory for a job.
834

835
    @type job_id: str
836
    @param job_id: Job identifier
837
    @rtype: str
838
    @return: Directory name
839

840
    """
841
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
842

    
843
  def _NewSerialsUnlocked(self, count):
844
    """Generates a new job identifier.
845

846
    Job identifiers are unique during the lifetime of a cluster.
847

848
    @type count: integer
849
    @param count: how many serials to return
850
    @rtype: str
851
    @return: a string representing the job identifier.
852

853
    """
854
    assert count > 0
855
    # New number
856
    serial = self._last_serial + count
857

    
858
    # Write to file
859
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
860
                                        "%s\n" % serial)
861

    
862
    result = [self._FormatJobID(v)
863
              for v in range(self._last_serial, serial + 1)]
864
    # Keep it only if we were able to write the file
865
    self._last_serial = serial
866

    
867
    return result
868

    
869
  @staticmethod
870
  def _GetJobPath(job_id):
871
    """Returns the job file for a given job id.
872

873
    @type job_id: str
874
    @param job_id: the job identifier
875
    @rtype: str
876
    @return: the path to the job file
877

878
    """
879
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
880

    
881
  @classmethod
882
  def _GetArchivedJobPath(cls, job_id):
883
    """Returns the archived job file for a give job id.
884

885
    @type job_id: str
886
    @param job_id: the job identifier
887
    @rtype: str
888
    @return: the path to the archived job file
889

890
    """
891
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
892
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
893

    
894
  @classmethod
895
  def _ExtractJobID(cls, name):
896
    """Extract the job id from a filename.
897

898
    @type name: str
899
    @param name: the job filename
900
    @rtype: job id or None
901
    @return: the job id corresponding to the given filename,
902
        or None if the filename does not represent a valid
903
        job file
904

905
    """
906
    m = cls._RE_JOB_FILE.match(name)
907
    if m:
908
      return m.group(1)
909
    else:
910
      return None
911

    
912
  def _GetJobIDsUnlocked(self, archived=False):
913
    """Return all known job IDs.
914

915
    If the parameter archived is True, archived jobs IDs will be
916
    included. Currently this argument is unused.
917

918
    The method only looks at disk because it's a requirement that all
919
    jobs are present on disk (so in the _memcache we don't have any
920
    extra IDs).
921

922
    @rtype: list
923
    @return: the list of job IDs
924

925
    """
926
    # pylint: disable-msg=W0613
927
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
928
    jlist = utils.NiceSort(jlist)
929
    return jlist
930

    
931
  def _ListJobFiles(self):
932
    """Returns the list of current job files.
933

934
    @rtype: list
935
    @return: the list of job file names
936

937
    """
938
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
939
            if self._RE_JOB_FILE.match(name)]
940

    
941
  def _LoadJobUnlocked(self, job_id):
942
    """Loads a job from the disk or memory.
943

944
    Given a job id, this will return the cached job object if
945
    existing, or try to load the job from the disk. If loading from
946
    disk, it will also add the job to the cache.
947

948
    @param job_id: the job id
949
    @rtype: L{_QueuedJob} or None
950
    @return: either None or the job object
951

952
    """
953
    job = self._memcache.get(job_id, None)
954
    if job:
955
      logging.debug("Found job %s in memcache", job_id)
956
      return job
957

    
958
    filepath = self._GetJobPath(job_id)
959
    logging.debug("Loading job from %s", filepath)
960
    try:
961
      raw_data = utils.ReadFile(filepath)
962
    except IOError, err:
963
      if err.errno in (errno.ENOENT, ):
964
        return None
965
      raise
966

    
967
    data = serializer.LoadJson(raw_data)
968

    
969
    try:
970
      job = _QueuedJob.Restore(self, data)
971
    except Exception, err: # pylint: disable-msg=W0703
972
      new_path = self._GetArchivedJobPath(job_id)
973
      if filepath == new_path:
974
        # job already archived (future case)
975
        logging.exception("Can't parse job %s", job_id)
976
      else:
977
        # non-archived case
978
        logging.exception("Can't parse job %s, will archive.", job_id)
979
        self._RenameFilesUnlocked([(filepath, new_path)])
980
      return None
981

    
982
    self._memcache[job_id] = job
983
    logging.debug("Added job %s to the cache", job_id)
984
    return job
985

    
986
  def _GetJobsUnlocked(self, job_ids):
987
    """Return a list of jobs based on their IDs.
988

989
    @type job_ids: list
990
    @param job_ids: either an empty list (meaning all jobs),
991
        or a list of job IDs
992
    @rtype: list
993
    @return: the list of job objects
994

995
    """
996
    if not job_ids:
997
      job_ids = self._GetJobIDsUnlocked()
998

    
999
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
1000

    
1001
  @staticmethod
1002
  def _IsQueueMarkedDrain():
1003
    """Check if the queue is marked from drain.
1004

1005
    This currently uses the queue drain file, which makes it a
1006
    per-node flag. In the future this can be moved to the config file.
1007

1008
    @rtype: boolean
1009
    @return: True of the job queue is marked for draining
1010

1011
    """
1012
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1013

    
1014
  @staticmethod
1015
  def SetDrainFlag(drain_flag):
1016
    """Sets the drain flag for the queue.
1017

1018
    This is similar to the function L{backend.JobQueueSetDrainFlag},
1019
    and in the future we might merge them.
1020

1021
    @type drain_flag: boolean
1022
    @param drain_flag: Whether to set or unset the drain flag
1023

1024
    """
1025
    if drain_flag:
1026
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1027
    else:
1028
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1029
    return True
1030

    
1031
  @_RequireOpenQueue
1032
  def _SubmitJobUnlocked(self, job_id, ops):
1033
    """Create and store a new job.
1034

1035
    This enters the job into our job queue and also puts it on the new
1036
    queue, in order for it to be picked up by the queue processors.
1037

1038
    @type job_id: job ID
1039
    @param job_id: the job ID for the new job
1040
    @type ops: list
1041
    @param ops: The list of OpCodes that will become the new job.
1042
    @rtype: job ID
1043
    @return: the job ID of the newly created job
1044
    @raise errors.JobQueueDrainError: if the job is marked for draining
1045

1046
    """
1047
    if self._IsQueueMarkedDrain():
1048
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1049

    
1050
    # Check job queue size
1051
    size = len(self._ListJobFiles())
1052
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1053
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1054
      # submission, though.
1055
      #size = ...
1056
      pass
1057

    
1058
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1059
      raise errors.JobQueueFull()
1060

    
1061
    job = _QueuedJob(self, job_id, ops)
1062

    
1063
    # Write to disk
1064
    self.UpdateJobUnlocked(job)
1065

    
1066
    logging.debug("Adding new job %s to the cache", job_id)
1067
    self._memcache[job_id] = job
1068

    
1069
    # Add to worker pool
1070
    self._wpool.AddTask(job)
1071

    
1072
    return job.id
1073

    
1074
  @utils.LockedMethod
1075
  @_RequireOpenQueue
1076
  def SubmitJob(self, ops):
1077
    """Create and store a new job.
1078

1079
    @see: L{_SubmitJobUnlocked}
1080

1081
    """
1082
    job_id = self._NewSerialsUnlocked(1)[0]
1083
    return self._SubmitJobUnlocked(job_id, ops)
1084

    
1085
  @utils.LockedMethod
1086
  @_RequireOpenQueue
1087
  def SubmitManyJobs(self, jobs):
1088
    """Create and store multiple jobs.
1089

1090
    @see: L{_SubmitJobUnlocked}
1091

1092
    """
1093
    results = []
1094
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1095
    for job_id, ops in zip(all_job_ids, jobs):
1096
      try:
1097
        data = self._SubmitJobUnlocked(job_id, ops)
1098
        status = True
1099
      except errors.GenericError, err:
1100
        data = str(err)
1101
        status = False
1102
      results.append((status, data))
1103

    
1104
    return results
1105

    
1106
  @_RequireOpenQueue
1107
  def UpdateJobUnlocked(self, job):
1108
    """Update a job's on disk storage.
1109

1110
    After a job has been modified, this function needs to be called in
1111
    order to write the changes to disk and replicate them to the other
1112
    nodes.
1113

1114
    @type job: L{_QueuedJob}
1115
    @param job: the changed job
1116

1117
    """
1118
    filename = self._GetJobPath(job.id)
1119
    data = serializer.DumpJson(job.Serialize(), indent=False)
1120
    logging.debug("Writing job %s to %s", job.id, filename)
1121
    self._WriteAndReplicateFileUnlocked(filename, data)
1122

    
1123
    # Notify waiters about potential changes
1124
    job.change.notifyAll()
1125

    
1126
  @utils.LockedMethod
1127
  @_RequireOpenQueue
1128
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1129
                        timeout):
1130
    """Waits for changes in a job.
1131

1132
    @type job_id: string
1133
    @param job_id: Job identifier
1134
    @type fields: list of strings
1135
    @param fields: Which fields to check for changes
1136
    @type prev_job_info: list or None
1137
    @param prev_job_info: Last job information returned
1138
    @type prev_log_serial: int
1139
    @param prev_log_serial: Last job message serial number
1140
    @type timeout: float
1141
    @param timeout: maximum time to wait
1142
    @rtype: tuple (job info, log entries)
1143
    @return: a tuple of the job information as required via
1144
        the fields parameter, and the log entries as a list
1145

1146
        if the job has not changed and the timeout has expired,
1147
        we instead return a special value,
1148
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1149
        as such by the clients
1150

1151
    """
1152
    job = self._LoadJobUnlocked(job_id)
1153
    if not job:
1154
      logging.debug("Job %s not found", job_id)
1155
      return None
1156

    
1157
    def _CheckForChanges():
1158
      logging.debug("Waiting for changes in job %s", job_id)
1159

    
1160
      status = job.CalcStatus()
1161
      job_info = self._GetJobInfoUnlocked(job, fields)
1162
      log_entries = job.GetLogEntries(prev_log_serial)
1163

    
1164
      # Serializing and deserializing data can cause type changes (e.g. from
1165
      # tuple to list) or precision loss. We're doing it here so that we get
1166
      # the same modifications as the data received from the client. Without
1167
      # this, the comparison afterwards might fail without the data being
1168
      # significantly different.
1169
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1170
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1171

    
1172
      # Don't even try to wait if the job is no longer running, there will be
1173
      # no changes.
1174
      if (status not in (constants.JOB_STATUS_QUEUED,
1175
                         constants.JOB_STATUS_RUNNING,
1176
                         constants.JOB_STATUS_WAITLOCK) or
1177
          prev_job_info != job_info or
1178
          (log_entries and prev_log_serial != log_entries[0][0])):
1179
        logging.debug("Job %s changed", job_id)
1180
        return (job_info, log_entries)
1181

    
1182
      raise utils.RetryAgain()
1183

    
1184
    try:
1185
      # Setting wait function to release the queue lock while waiting
1186
      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1187
                         wait_fn=job.change.wait)
1188
    except utils.RetryTimeout:
1189
      return constants.JOB_NOTCHANGED
1190

    
1191
  @utils.LockedMethod
1192
  @_RequireOpenQueue
1193
  def CancelJob(self, job_id):
1194
    """Cancels a job.
1195

1196
    This will only succeed if the job has not started yet.
1197

1198
    @type job_id: string
1199
    @param job_id: job ID of job to be cancelled.
1200

1201
    """
1202
    logging.info("Cancelling job %s", job_id)
1203

    
1204
    job = self._LoadJobUnlocked(job_id)
1205
    if not job:
1206
      logging.debug("Job %s not found", job_id)
1207
      return (False, "Job %s not found" % job_id)
1208

    
1209
    job_status = job.CalcStatus()
1210

    
1211
    if job_status not in (constants.JOB_STATUS_QUEUED,
1212
                          constants.JOB_STATUS_WAITLOCK):
1213
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1214
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1215

    
1216
    if job_status == constants.JOB_STATUS_QUEUED:
1217
      self.CancelJobUnlocked(job)
1218
      return (True, "Job %s canceled" % job.id)
1219

    
1220
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1221
      # The worker will notice the new status and cancel the job
1222
      try:
1223
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1224
      finally:
1225
        self.UpdateJobUnlocked(job)
1226
      return (True, "Job %s will be canceled" % job.id)
1227

    
1228
  @_RequireOpenQueue
1229
  def CancelJobUnlocked(self, job):
1230
    """Marks a job as canceled.
1231

1232
    """
1233
    try:
1234
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1235
                            "Job canceled by request")
1236
    finally:
1237
      self.UpdateJobUnlocked(job)
1238

    
1239
  @_RequireOpenQueue
1240
  def _ArchiveJobsUnlocked(self, jobs):
1241
    """Archives jobs.
1242

1243
    @type jobs: list of L{_QueuedJob}
1244
    @param jobs: Job objects
1245
    @rtype: int
1246
    @return: Number of archived jobs
1247

1248
    """
1249
    archive_jobs = []
1250
    rename_files = []
1251
    for job in jobs:
1252
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1253
                                  constants.JOB_STATUS_SUCCESS,
1254
                                  constants.JOB_STATUS_ERROR):
1255
        logging.debug("Job %s is not yet done", job.id)
1256
        continue
1257

    
1258
      archive_jobs.append(job)
1259

    
1260
      old = self._GetJobPath(job.id)
1261
      new = self._GetArchivedJobPath(job.id)
1262
      rename_files.append((old, new))
1263

    
1264
    # TODO: What if 1..n files fail to rename?
1265
    self._RenameFilesUnlocked(rename_files)
1266

    
1267
    logging.debug("Successfully archived job(s) %s",
1268
                  utils.CommaJoin(job.id for job in archive_jobs))
1269

    
1270
    return len(archive_jobs)
1271

    
1272
  @utils.LockedMethod
1273
  @_RequireOpenQueue
1274
  def ArchiveJob(self, job_id):
1275
    """Archives a job.
1276

1277
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1278

1279
    @type job_id: string
1280
    @param job_id: Job ID of job to be archived.
1281
    @rtype: bool
1282
    @return: Whether job was archived
1283

1284
    """
1285
    logging.info("Archiving job %s", job_id)
1286

    
1287
    job = self._LoadJobUnlocked(job_id)
1288
    if not job:
1289
      logging.debug("Job %s not found", job_id)
1290
      return False
1291

    
1292
    return self._ArchiveJobsUnlocked([job]) == 1
1293

    
1294
  @utils.LockedMethod
1295
  @_RequireOpenQueue
1296
  def AutoArchiveJobs(self, age, timeout):
1297
    """Archives all jobs based on age.
1298

1299
    The method will archive all jobs which are older than the age
1300
    parameter. For jobs that don't have an end timestamp, the start
1301
    timestamp will be considered. The special '-1' age will cause
1302
    archival of all jobs (that are not running or queued).
1303

1304
    @type age: int
1305
    @param age: the minimum age in seconds
1306

1307
    """
1308
    logging.info("Archiving jobs with age more than %s seconds", age)
1309

    
1310
    now = time.time()
1311
    end_time = now + timeout
1312
    archived_count = 0
1313
    last_touched = 0
1314

    
1315
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1316
    pending = []
1317
    for idx, job_id in enumerate(all_job_ids):
1318
      last_touched = idx
1319

    
1320
      # Not optimal because jobs could be pending
1321
      # TODO: Measure average duration for job archival and take number of
1322
      # pending jobs into account.
1323
      if time.time() > end_time:
1324
        break
1325

    
1326
      # Returns None if the job failed to load
1327
      job = self._LoadJobUnlocked(job_id)
1328
      if job:
1329
        if job.end_timestamp is None:
1330
          if job.start_timestamp is None:
1331
            job_age = job.received_timestamp
1332
          else:
1333
            job_age = job.start_timestamp
1334
        else:
1335
          job_age = job.end_timestamp
1336

    
1337
        if age == -1 or now - job_age[0] > age:
1338
          pending.append(job)
1339

    
1340
          # Archive 10 jobs at a time
1341
          if len(pending) >= 10:
1342
            archived_count += self._ArchiveJobsUnlocked(pending)
1343
            pending = []
1344

    
1345
    if pending:
1346
      archived_count += self._ArchiveJobsUnlocked(pending)
1347

    
1348
    return (archived_count, len(all_job_ids) - last_touched - 1)
1349

    
1350
  @staticmethod
1351
  def _GetJobInfoUnlocked(job, fields):
1352
    """Returns information about a job.
1353

1354
    @type job: L{_QueuedJob}
1355
    @param job: the job which we query
1356
    @type fields: list
1357
    @param fields: names of fields to return
1358
    @rtype: list
1359
    @return: list with one element for each field
1360
    @raise errors.OpExecError: when an invalid field
1361
        has been passed
1362

1363
    """
1364
    row = []
1365
    for fname in fields:
1366
      if fname == "id":
1367
        row.append(job.id)
1368
      elif fname == "status":
1369
        row.append(job.CalcStatus())
1370
      elif fname == "ops":
1371
        row.append([op.input.__getstate__() for op in job.ops])
1372
      elif fname == "opresult":
1373
        row.append([op.result for op in job.ops])
1374
      elif fname == "opstatus":
1375
        row.append([op.status for op in job.ops])
1376
      elif fname == "oplog":
1377
        row.append([op.log for op in job.ops])
1378
      elif fname == "opstart":
1379
        row.append([op.start_timestamp for op in job.ops])
1380
      elif fname == "opend":
1381
        row.append([op.end_timestamp for op in job.ops])
1382
      elif fname == "received_ts":
1383
        row.append(job.received_timestamp)
1384
      elif fname == "start_ts":
1385
        row.append(job.start_timestamp)
1386
      elif fname == "end_ts":
1387
        row.append(job.end_timestamp)
1388
      elif fname == "lock_status":
1389
        row.append(job.lock_status)
1390
      elif fname == "summary":
1391
        row.append([op.input.Summary() for op in job.ops])
1392
      else:
1393
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1394
    return row
1395

    
1396
  @utils.LockedMethod
1397
  @_RequireOpenQueue
1398
  def QueryJobs(self, job_ids, fields):
1399
    """Returns a list of jobs in queue.
1400

1401
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1402
    processing for each job.
1403

1404
    @type job_ids: list
1405
    @param job_ids: sequence of job identifiers or None for all
1406
    @type fields: list
1407
    @param fields: names of fields to return
1408
    @rtype: list
1409
    @return: list one element per job, each element being list with
1410
        the requested fields
1411

1412
    """
1413
    jobs = []
1414

    
1415
    for job in self._GetJobsUnlocked(job_ids):
1416
      if job is None:
1417
        jobs.append(None)
1418
      else:
1419
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1420

    
1421
    return jobs
1422

    
1423
  @utils.LockedMethod
1424
  @_RequireOpenQueue
1425
  def Shutdown(self):
1426
    """Stops the job queue.
1427

1428
    This shutdowns all the worker threads an closes the queue.
1429

1430
    """
1431
    self._wpool.TerminateWorkers()
1432

    
1433
    self._queue_lock.Close()
1434
    self._queue_lock = None