Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 162c8636

History | View | Annotate | Download (42.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar exec_timestamp: timestamp for the actual LU Exec() function invocation
81
  @ivar stop_timestamp: timestamp for the end of the execution
82

83
  """
84
  __slots__ = ["input", "status", "result", "log",
85
               "start_timestamp", "exec_timestamp", "end_timestamp",
86
               "__weakref__"]
87

    
88
  def __init__(self, op):
89
    """Constructor for the _QuededOpCode.
90

91
    @type op: L{opcodes.OpCode}
92
    @param op: the opcode we encapsulate
93

94
    """
95
    self.input = op
96
    self.status = constants.OP_STATUS_QUEUED
97
    self.result = None
98
    self.log = []
99
    self.start_timestamp = None
100
    self.exec_timestamp = None
101
    self.end_timestamp = None
102

    
103
  @classmethod
104
  def Restore(cls, state):
105
    """Restore the _QueuedOpCode from the serialized form.
106

107
    @type state: dict
108
    @param state: the serialized state
109
    @rtype: _QueuedOpCode
110
    @return: a new _QueuedOpCode instance
111

112
    """
113
    obj = _QueuedOpCode.__new__(cls)
114
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
115
    obj.status = state["status"]
116
    obj.result = state["result"]
117
    obj.log = state["log"]
118
    obj.start_timestamp = state.get("start_timestamp", None)
119
    obj.exec_timestamp = state.get("exec_timestamp", None)
120
    obj.end_timestamp = state.get("end_timestamp", None)
121
    return obj
122

    
123
  def Serialize(self):
124
    """Serializes this _QueuedOpCode.
125

126
    @rtype: dict
127
    @return: the dictionary holding the serialized state
128

129
    """
130
    return {
131
      "input": self.input.__getstate__(),
132
      "status": self.status,
133
      "result": self.result,
134
      "log": self.log,
135
      "start_timestamp": self.start_timestamp,
136
      "exec_timestamp": self.exec_timestamp,
137
      "end_timestamp": self.end_timestamp,
138
      }
139

    
140

    
141
class _QueuedJob(object):
142
  """In-memory job representation.
143

144
  This is what we use to track the user-submitted jobs. Locking must
145
  be taken care of by users of this class.
146

147
  @type queue: L{JobQueue}
148
  @ivar queue: the parent queue
149
  @ivar id: the job ID
150
  @type ops: list
151
  @ivar ops: the list of _QueuedOpCode that constitute the job
152
  @type log_serial: int
153
  @ivar log_serial: holds the index for the next log entry
154
  @ivar received_timestamp: the timestamp for when the job was received
155
  @ivar start_timestmap: the timestamp for start of execution
156
  @ivar end_timestamp: the timestamp for end of execution
157
  @ivar lock_status: In-memory locking information for debugging
158
  @ivar change: a Condition variable we use for waiting for job changes
159

160
  """
161
  # pylint: disable-msg=W0212
162
  __slots__ = ["queue", "id", "ops", "log_serial",
163
               "received_timestamp", "start_timestamp", "end_timestamp",
164
               "lock_status", "change",
165
               "__weakref__"]
166

    
167
  def __init__(self, queue, job_id, ops):
168
    """Constructor for the _QueuedJob.
169

170
    @type queue: L{JobQueue}
171
    @param queue: our parent queue
172
    @type job_id: job_id
173
    @param job_id: our job id
174
    @type ops: list
175
    @param ops: the list of opcodes we hold, which will be encapsulated
176
        in _QueuedOpCodes
177

178
    """
179
    if not ops:
180
      raise errors.GenericError("A job needs at least one opcode")
181

    
182
    self.queue = queue
183
    self.id = job_id
184
    self.ops = [_QueuedOpCode(op) for op in ops]
185
    self.log_serial = 0
186
    self.received_timestamp = TimeStampNow()
187
    self.start_timestamp = None
188
    self.end_timestamp = None
189

    
190
    # In-memory attributes
191
    self.lock_status = None
192

    
193
    # Condition to wait for changes
194
    self.change = threading.Condition(self.queue._lock)
195

    
196
  def __repr__(self):
197
    status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
198
              "id=%s" % self.id,
199
              "ops=%s" % ",".join([op.input.Summary() for op in self.ops])]
200

    
201
    return "<%s at %#x>" % (" ".join(status), id(self))
202

    
203
  @classmethod
204
  def Restore(cls, queue, state):
205
    """Restore a _QueuedJob from serialized state:
206

207
    @type queue: L{JobQueue}
208
    @param queue: to which queue the restored job belongs
209
    @type state: dict
210
    @param state: the serialized state
211
    @rtype: _JobQueue
212
    @return: the restored _JobQueue instance
213

214
    """
215
    obj = _QueuedJob.__new__(cls)
216
    obj.queue = queue
217
    obj.id = state["id"]
218
    obj.received_timestamp = state.get("received_timestamp", None)
219
    obj.start_timestamp = state.get("start_timestamp", None)
220
    obj.end_timestamp = state.get("end_timestamp", None)
221

    
222
    # In-memory attributes
223
    obj.lock_status = None
224

    
225
    obj.ops = []
226
    obj.log_serial = 0
227
    for op_state in state["ops"]:
228
      op = _QueuedOpCode.Restore(op_state)
229
      for log_entry in op.log:
230
        obj.log_serial = max(obj.log_serial, log_entry[0])
231
      obj.ops.append(op)
232

    
233
    # Condition to wait for changes
234
    obj.change = threading.Condition(obj.queue._lock)
235

    
236
    return obj
237

    
238
  def Serialize(self):
239
    """Serialize the _JobQueue instance.
240

241
    @rtype: dict
242
    @return: the serialized state
243

244
    """
245
    return {
246
      "id": self.id,
247
      "ops": [op.Serialize() for op in self.ops],
248
      "start_timestamp": self.start_timestamp,
249
      "end_timestamp": self.end_timestamp,
250
      "received_timestamp": self.received_timestamp,
251
      }
252

    
253
  def CalcStatus(self):
254
    """Compute the status of this job.
255

256
    This function iterates over all the _QueuedOpCodes in the job and
257
    based on their status, computes the job status.
258

259
    The algorithm is:
260
      - if we find a cancelled, or finished with error, the job
261
        status will be the same
262
      - otherwise, the last opcode with the status one of:
263
          - waitlock
264
          - canceling
265
          - running
266

267
        will determine the job status
268

269
      - otherwise, it means either all opcodes are queued, or success,
270
        and the job status will be the same
271

272
    @return: the job status
273

274
    """
275
    status = constants.JOB_STATUS_QUEUED
276

    
277
    all_success = True
278
    for op in self.ops:
279
      if op.status == constants.OP_STATUS_SUCCESS:
280
        continue
281

    
282
      all_success = False
283

    
284
      if op.status == constants.OP_STATUS_QUEUED:
285
        pass
286
      elif op.status == constants.OP_STATUS_WAITLOCK:
287
        status = constants.JOB_STATUS_WAITLOCK
288
      elif op.status == constants.OP_STATUS_RUNNING:
289
        status = constants.JOB_STATUS_RUNNING
290
      elif op.status == constants.OP_STATUS_CANCELING:
291
        status = constants.JOB_STATUS_CANCELING
292
        break
293
      elif op.status == constants.OP_STATUS_ERROR:
294
        status = constants.JOB_STATUS_ERROR
295
        # The whole job fails if one opcode failed
296
        break
297
      elif op.status == constants.OP_STATUS_CANCELED:
298
        status = constants.OP_STATUS_CANCELED
299
        break
300

    
301
    if all_success:
302
      status = constants.JOB_STATUS_SUCCESS
303

    
304
    return status
305

    
306
  def GetLogEntries(self, newer_than):
307
    """Selectively returns the log entries.
308

309
    @type newer_than: None or int
310
    @param newer_than: if this is None, return all log entries,
311
        otherwise return only the log entries with serial higher
312
        than this value
313
    @rtype: list
314
    @return: the list of the log entries selected
315

316
    """
317
    if newer_than is None:
318
      serial = -1
319
    else:
320
      serial = newer_than
321

    
322
    entries = []
323
    for op in self.ops:
324
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
325

    
326
    return entries
327

    
328
  def MarkUnfinishedOps(self, status, result):
329
    """Mark unfinished opcodes with a given status and result.
330

331
    This is an utility function for marking all running or waiting to
332
    be run opcodes with a given status. Opcodes which are already
333
    finalised are not changed.
334

335
    @param status: a given opcode status
336
    @param result: the opcode result
337

338
    """
339
    not_marked = True
340
    for op in self.ops:
341
      if op.status in constants.OPS_FINALIZED:
342
        assert not_marked, "Finalized opcodes found after non-finalized ones"
343
        continue
344
      op.status = status
345
      op.result = result
346
      not_marked = False
347

    
348

    
349
class _OpExecCallbacks(mcpu.OpExecCbBase):
350
  def __init__(self, queue, job, op):
351
    """Initializes this class.
352

353
    @type queue: L{JobQueue}
354
    @param queue: Job queue
355
    @type job: L{_QueuedJob}
356
    @param job: Job object
357
    @type op: L{_QueuedOpCode}
358
    @param op: OpCode
359

360
    """
361
    assert queue, "Queue is missing"
362
    assert job, "Job is missing"
363
    assert op, "Opcode is missing"
364

    
365
    self._queue = queue
366
    self._job = job
367
    self._op = op
368

    
369
  def NotifyStart(self):
370
    """Mark the opcode as running, not lock-waiting.
371

372
    This is called from the mcpu code as a notifier function, when the LU is
373
    finally about to start the Exec() method. Of course, to have end-user
374
    visible results, the opcode must be initially (before calling into
375
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
376

377
    """
378
    self._queue.acquire()
379
    try:
380
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
381
                                 constants.OP_STATUS_CANCELING)
382

    
383
      # All locks are acquired by now
384
      self._job.lock_status = None
385

    
386
      # Cancel here if we were asked to
387
      if self._op.status == constants.OP_STATUS_CANCELING:
388
        raise CancelJob()
389

    
390
      self._op.status = constants.OP_STATUS_RUNNING
391
      self._op.exec_timestamp = TimeStampNow()
392
    finally:
393
      self._queue.release()
394

    
395
  def Feedback(self, *args):
396
    """Append a log entry.
397

398
    """
399
    assert len(args) < 3
400

    
401
    if len(args) == 1:
402
      log_type = constants.ELOG_MESSAGE
403
      log_msg = args[0]
404
    else:
405
      (log_type, log_msg) = args
406

    
407
    # The time is split to make serialization easier and not lose
408
    # precision.
409
    timestamp = utils.SplitTime(time.time())
410

    
411
    self._queue.acquire()
412
    try:
413
      self._job.log_serial += 1
414
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
415

    
416
      self._job.change.notifyAll()
417
    finally:
418
      self._queue.release()
419

    
420
  def ReportLocks(self, msg):
421
    """Write locking information to the job.
422

423
    Called whenever the LU processor is waiting for a lock or has acquired one.
424

425
    """
426
    # Not getting the queue lock because this is a single assignment
427
    self._job.lock_status = msg
428

    
429

    
430
class _JobQueueWorker(workerpool.BaseWorker):
431
  """The actual job workers.
432

433
  """
434
  def RunTask(self, job): # pylint: disable-msg=W0221
435
    """Job executor.
436

437
    This functions processes a job. It is closely tied to the _QueuedJob and
438
    _QueuedOpCode classes.
439

440
    @type job: L{_QueuedJob}
441
    @param job: the job to be processed
442

443
    """
444
    logging.info("Processing job %s", job.id)
445
    proc = mcpu.Processor(self.pool.queue.context, job.id)
446
    queue = job.queue
447
    try:
448
      try:
449
        count = len(job.ops)
450
        for idx, op in enumerate(job.ops):
451
          op_summary = op.input.Summary()
452
          if op.status == constants.OP_STATUS_SUCCESS:
453
            # this is a job that was partially completed before master
454
            # daemon shutdown, so it can be expected that some opcodes
455
            # are already completed successfully (if any did error
456
            # out, then the whole job should have been aborted and not
457
            # resubmitted for processing)
458
            logging.info("Op %s/%s: opcode %s already processed, skipping",
459
                         idx + 1, count, op_summary)
460
            continue
461
          try:
462
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
463
                         op_summary)
464

    
465
            queue.acquire()
466
            try:
467
              if op.status == constants.OP_STATUS_CANCELED:
468
                raise CancelJob()
469
              assert op.status == constants.OP_STATUS_QUEUED
470
              op.status = constants.OP_STATUS_WAITLOCK
471
              op.result = None
472
              op.start_timestamp = TimeStampNow()
473
              if idx == 0: # first opcode
474
                job.start_timestamp = op.start_timestamp
475
              queue.UpdateJobUnlocked(job)
476

    
477
              input_opcode = op.input
478
            finally:
479
              queue.release()
480

    
481
            # Make sure not to hold queue lock while calling ExecOpCode
482
            result = proc.ExecOpCode(input_opcode,
483
                                     _OpExecCallbacks(queue, job, op))
484

    
485
            queue.acquire()
486
            try:
487
              op.status = constants.OP_STATUS_SUCCESS
488
              op.result = result
489
              op.end_timestamp = TimeStampNow()
490
              queue.UpdateJobUnlocked(job)
491
            finally:
492
              queue.release()
493

    
494
            logging.info("Op %s/%s: Successfully finished opcode %s",
495
                         idx + 1, count, op_summary)
496
          except CancelJob:
497
            # Will be handled further up
498
            raise
499
          except Exception, err:
500
            queue.acquire()
501
            try:
502
              try:
503
                op.status = constants.OP_STATUS_ERROR
504
                if isinstance(err, errors.GenericError):
505
                  op.result = errors.EncodeException(err)
506
                else:
507
                  op.result = str(err)
508
                op.end_timestamp = TimeStampNow()
509
                logging.info("Op %s/%s: Error in opcode %s: %s",
510
                             idx + 1, count, op_summary, err)
511
              finally:
512
                queue.UpdateJobUnlocked(job)
513
            finally:
514
              queue.release()
515
            raise
516

    
517
      except CancelJob:
518
        queue.acquire()
519
        try:
520
          queue.CancelJobUnlocked(job)
521
        finally:
522
          queue.release()
523
      except errors.GenericError, err:
524
        logging.exception("Ganeti exception")
525
      except:
526
        logging.exception("Unhandled exception")
527
    finally:
528
      queue.acquire()
529
      try:
530
        try:
531
          job.lock_status = None
532
          job.end_timestamp = TimeStampNow()
533
          queue.UpdateJobUnlocked(job)
534
        finally:
535
          job_id = job.id
536
          status = job.CalcStatus()
537
      finally:
538
        queue.release()
539

    
540
      logging.info("Finished job %s, status = %s", job_id, status)
541

    
542

    
543
class _JobQueueWorkerPool(workerpool.WorkerPool):
544
  """Simple class implementing a job-processing workerpool.
545

546
  """
547
  def __init__(self, queue):
548
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
549
                                              JOBQUEUE_THREADS,
550
                                              _JobQueueWorker)
551
    self.queue = queue
552

    
553

    
554
def _RequireOpenQueue(fn):
555
  """Decorator for "public" functions.
556

557
  This function should be used for all 'public' functions. That is,
558
  functions usually called from other classes. Note that this should
559
  be applied only to methods (not plain functions), since it expects
560
  that the decorated function is called with a first argument that has
561
  a '_queue_filelock' argument.
562

563
  @warning: Use this decorator only after utils.LockedMethod!
564

565
  Example::
566
    @utils.LockedMethod
567
    @_RequireOpenQueue
568
    def Example(self):
569
      pass
570

571
  """
572
  def wrapper(self, *args, **kwargs):
573
    # pylint: disable-msg=W0212
574
    assert self._queue_filelock is not None, "Queue should be open"
575
    return fn(self, *args, **kwargs)
576
  return wrapper
577

    
578

    
579
class JobQueue(object):
580
  """Queue used to manage the jobs.
581

582
  @cvar _RE_JOB_FILE: regex matching the valid job file names
583

584
  """
585
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
586

    
587
  def __init__(self, context):
588
    """Constructor for JobQueue.
589

590
    The constructor will initialize the job queue object and then
591
    start loading the current jobs from disk, either for starting them
592
    (if they were queue) or for aborting them (if they were already
593
    running).
594

595
    @type context: GanetiContext
596
    @param context: the context object for access to the configuration
597
        data and other ganeti objects
598

599
    """
600
    self.context = context
601
    self._memcache = weakref.WeakValueDictionary()
602
    self._my_hostname = utils.HostInfo().name
603

    
604
    # Locking
605
    self._lock = threading.Lock()
606
    self.acquire = self._lock.acquire
607
    self.release = self._lock.release
608

    
609
    # Initialize the queue, and acquire the filelock.
610
    # This ensures no other process is working on the job queue.
611
    self._queue_filelock = jstore.InitAndVerifyQueue(must_lock=True)
612

    
613
    # Read serial file
614
    self._last_serial = jstore.ReadSerial()
615
    assert self._last_serial is not None, ("Serial file was modified between"
616
                                           " check in jstore and here")
617

    
618
    # Get initial list of nodes
619
    self._nodes = dict((n.name, n.primary_ip)
620
                       for n in self.context.cfg.GetAllNodesInfo().values()
621
                       if n.master_candidate)
622

    
623
    # Remove master node
624
    self._nodes.pop(self._my_hostname, None)
625

    
626
    # TODO: Check consistency across nodes
627

    
628
    self._queue_size = 0
629
    self._UpdateQueueSizeUnlocked()
630
    self._drained = self._IsQueueMarkedDrain()
631

    
632
    # Setup worker pool
633
    self._wpool = _JobQueueWorkerPool(self)
634
    try:
635
      # We need to lock here because WorkerPool.AddTask() may start a job while
636
      # we're still doing our work.
637
      self.acquire()
638
      try:
639
        logging.info("Inspecting job queue")
640

    
641
        all_job_ids = self._GetJobIDsUnlocked()
642
        jobs_count = len(all_job_ids)
643
        lastinfo = time.time()
644
        for idx, job_id in enumerate(all_job_ids):
645
          # Give an update every 1000 jobs or 10 seconds
646
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
647
              idx == (jobs_count - 1)):
648
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
649
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
650
            lastinfo = time.time()
651

    
652
          job = self._LoadJobUnlocked(job_id)
653

    
654
          # a failure in loading the job can cause 'None' to be returned
655
          if job is None:
656
            continue
657

    
658
          status = job.CalcStatus()
659

    
660
          if status in (constants.JOB_STATUS_QUEUED, ):
661
            self._wpool.AddTask(job)
662

    
663
          elif status in (constants.JOB_STATUS_RUNNING,
664
                          constants.JOB_STATUS_WAITLOCK,
665
                          constants.JOB_STATUS_CANCELING):
666
            logging.warning("Unfinished job %s found: %s", job.id, job)
667
            try:
668
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
669
                                    "Unclean master daemon shutdown")
670
            finally:
671
              self.UpdateJobUnlocked(job)
672

    
673
        logging.info("Job queue inspection finished")
674
      finally:
675
        self.release()
676
    except:
677
      self._wpool.TerminateWorkers()
678
      raise
679

    
680
  @utils.LockedMethod
681
  @_RequireOpenQueue
682
  def AddNode(self, node):
683
    """Register a new node with the queue.
684

685
    @type node: L{objects.Node}
686
    @param node: the node object to be added
687

688
    """
689
    node_name = node.name
690
    assert node_name != self._my_hostname
691

    
692
    # Clean queue directory on added node
693
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
694
    msg = result.fail_msg
695
    if msg:
696
      logging.warning("Cannot cleanup queue directory on node %s: %s",
697
                      node_name, msg)
698

    
699
    if not node.master_candidate:
700
      # remove if existing, ignoring errors
701
      self._nodes.pop(node_name, None)
702
      # and skip the replication of the job ids
703
      return
704

    
705
    # Upload the whole queue excluding archived jobs
706
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
707

    
708
    # Upload current serial file
709
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
710

    
711
    for file_name in files:
712
      # Read file content
713
      content = utils.ReadFile(file_name)
714

    
715
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
716
                                                  [node.primary_ip],
717
                                                  file_name, content)
718
      msg = result[node_name].fail_msg
719
      if msg:
720
        logging.error("Failed to upload file %s to node %s: %s",
721
                      file_name, node_name, msg)
722

    
723
    self._nodes[node_name] = node.primary_ip
724

    
725
  @utils.LockedMethod
726
  @_RequireOpenQueue
727
  def RemoveNode(self, node_name):
728
    """Callback called when removing nodes from the cluster.
729

730
    @type node_name: str
731
    @param node_name: the name of the node to remove
732

733
    """
734
    self._nodes.pop(node_name, None)
735

    
736
  @staticmethod
737
  def _CheckRpcResult(result, nodes, failmsg):
738
    """Verifies the status of an RPC call.
739

740
    Since we aim to keep consistency should this node (the current
741
    master) fail, we will log errors if our rpc fail, and especially
742
    log the case when more than half of the nodes fails.
743

744
    @param result: the data as returned from the rpc call
745
    @type nodes: list
746
    @param nodes: the list of nodes we made the call to
747
    @type failmsg: str
748
    @param failmsg: the identifier to be used for logging
749

750
    """
751
    failed = []
752
    success = []
753

    
754
    for node in nodes:
755
      msg = result[node].fail_msg
756
      if msg:
757
        failed.append(node)
758
        logging.error("RPC call %s (%s) failed on node %s: %s",
759
                      result[node].call, failmsg, node, msg)
760
      else:
761
        success.append(node)
762

    
763
    # +1 for the master node
764
    if (len(success) + 1) < len(failed):
765
      # TODO: Handle failing nodes
766
      logging.error("More than half of the nodes failed")
767

    
768
  def _GetNodeIp(self):
769
    """Helper for returning the node name/ip list.
770

771
    @rtype: (list, list)
772
    @return: a tuple of two lists, the first one with the node
773
        names and the second one with the node addresses
774

775
    """
776
    name_list = self._nodes.keys()
777
    addr_list = [self._nodes[name] for name in name_list]
778
    return name_list, addr_list
779

    
780
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
781
    """Writes a file locally and then replicates it to all nodes.
782

783
    This function will replace the contents of a file on the local
784
    node and then replicate it to all the other nodes we have.
785

786
    @type file_name: str
787
    @param file_name: the path of the file to be replicated
788
    @type data: str
789
    @param data: the new contents of the file
790

791
    """
792
    utils.WriteFile(file_name, data=data)
793

    
794
    names, addrs = self._GetNodeIp()
795
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
796
    self._CheckRpcResult(result, self._nodes,
797
                         "Updating %s" % file_name)
798

    
799
  def _RenameFilesUnlocked(self, rename):
800
    """Renames a file locally and then replicate the change.
801

802
    This function will rename a file in the local queue directory
803
    and then replicate this rename to all the other nodes we have.
804

805
    @type rename: list of (old, new)
806
    @param rename: List containing tuples mapping old to new names
807

808
    """
809
    # Rename them locally
810
    for old, new in rename:
811
      utils.RenameFile(old, new, mkdir=True)
812

    
813
    # ... and on all nodes
814
    names, addrs = self._GetNodeIp()
815
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
816
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
817

    
818
  @staticmethod
819
  def _FormatJobID(job_id):
820
    """Convert a job ID to string format.
821

822
    Currently this just does C{str(job_id)} after performing some
823
    checks, but if we want to change the job id format this will
824
    abstract this change.
825

826
    @type job_id: int or long
827
    @param job_id: the numeric job id
828
    @rtype: str
829
    @return: the formatted job id
830

831
    """
832
    if not isinstance(job_id, (int, long)):
833
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
834
    if job_id < 0:
835
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
836

    
837
    return str(job_id)
838

    
839
  @classmethod
840
  def _GetArchiveDirectory(cls, job_id):
841
    """Returns the archive directory for a job.
842

843
    @type job_id: str
844
    @param job_id: Job identifier
845
    @rtype: str
846
    @return: Directory name
847

848
    """
849
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
850

    
851
  def _NewSerialsUnlocked(self, count):
852
    """Generates a new job identifier.
853

854
    Job identifiers are unique during the lifetime of a cluster.
855

856
    @type count: integer
857
    @param count: how many serials to return
858
    @rtype: str
859
    @return: a string representing the job identifier.
860

861
    """
862
    assert count > 0
863
    # New number
864
    serial = self._last_serial + count
865

    
866
    # Write to file
867
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
868
                                        "%s\n" % serial)
869

    
870
    result = [self._FormatJobID(v)
871
              for v in range(self._last_serial, serial + 1)]
872
    # Keep it only if we were able to write the file
873
    self._last_serial = serial
874

    
875
    return result
876

    
877
  @staticmethod
878
  def _GetJobPath(job_id):
879
    """Returns the job file for a given job id.
880

881
    @type job_id: str
882
    @param job_id: the job identifier
883
    @rtype: str
884
    @return: the path to the job file
885

886
    """
887
    return utils.PathJoin(constants.QUEUE_DIR, "job-%s" % job_id)
888

    
889
  @classmethod
890
  def _GetArchivedJobPath(cls, job_id):
891
    """Returns the archived job file for a give job id.
892

893
    @type job_id: str
894
    @param job_id: the job identifier
895
    @rtype: str
896
    @return: the path to the archived job file
897

898
    """
899
    return utils.PathJoin(constants.JOB_QUEUE_ARCHIVE_DIR,
900
                          cls._GetArchiveDirectory(job_id), "job-%s" % job_id)
901

    
902
  def _GetJobIDsUnlocked(self, sort=True):
903
    """Return all known job IDs.
904

905
    The method only looks at disk because it's a requirement that all
906
    jobs are present on disk (so in the _memcache we don't have any
907
    extra IDs).
908

909
    @type sort: boolean
910
    @param sort: perform sorting on the returned job ids
911
    @rtype: list
912
    @return: the list of job IDs
913

914
    """
915
    jlist = []
916
    for filename in utils.ListVisibleFiles(constants.QUEUE_DIR):
917
      m = self._RE_JOB_FILE.match(filename)
918
      if m:
919
        jlist.append(m.group(1))
920
    if sort:
921
      jlist = utils.NiceSort(jlist)
922
    return jlist
923

    
924
  def _LoadJobUnlocked(self, job_id):
925
    """Loads a job from the disk or memory.
926

927
    Given a job id, this will return the cached job object if
928
    existing, or try to load the job from the disk. If loading from
929
    disk, it will also add the job to the cache.
930

931
    @param job_id: the job id
932
    @rtype: L{_QueuedJob} or None
933
    @return: either None or the job object
934

935
    """
936
    job = self._memcache.get(job_id, None)
937
    if job:
938
      logging.debug("Found job %s in memcache", job_id)
939
      return job
940

    
941
    job = self._LoadJobFromDisk(job_id)
942

    
943
    self._memcache[job_id] = job
944
    logging.debug("Added job %s to the cache", job_id)
945
    return job
946

    
947
  def _LoadJobFromDisk(self, job_id):
948
    """Load the given job file from disk.
949

950
    Given a job file, read, load and restore it in a _QueuedJob format.
951

952
    @type job_id: string
953
    @param job_id: job identifier
954
    @rtype: L{_QueuedJob} or None
955
    @return: either None or the job object
956

957
    """
958
    filepath = self._GetJobPath(job_id)
959
    logging.debug("Loading job from %s", filepath)
960
    try:
961
      raw_data = utils.ReadFile(filepath)
962
    except EnvironmentError, err:
963
      if err.errno in (errno.ENOENT, ):
964
        return None
965
      raise
966

    
967
    try:
968
      data = serializer.LoadJson(raw_data)
969
      job = _QueuedJob.Restore(self, data)
970
    except Exception, err: # pylint: disable-msg=W0703
971
      new_path = self._GetArchivedJobPath(job_id)
972
      if filepath == new_path:
973
        # job already archived (future case)
974
        logging.exception("Can't parse job %s", job_id)
975
      else:
976
        # non-archived case
977
        logging.exception("Can't parse job %s, will archive.", job_id)
978
        self._RenameFilesUnlocked([(filepath, new_path)])
979
      return None
980

    
981
    return job
982

    
983
  def _GetJobsUnlocked(self, job_ids):
984
    """Return a list of jobs based on their IDs.
985

986
    @type job_ids: list
987
    @param job_ids: either an empty list (meaning all jobs),
988
        or a list of job IDs
989
    @rtype: list
990
    @return: the list of job objects
991

992
    """
993
    if not job_ids:
994
      job_ids = self._GetJobIDsUnlocked()
995

    
996
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
997

    
998
  @staticmethod
999
  def _IsQueueMarkedDrain():
1000
    """Check if the queue is marked from drain.
1001

1002
    This currently uses the queue drain file, which makes it a
1003
    per-node flag. In the future this can be moved to the config file.
1004

1005
    @rtype: boolean
1006
    @return: True of the job queue is marked for draining
1007

1008
    """
1009
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1010

    
1011
  def _UpdateQueueSizeUnlocked(self):
1012
    """Update the queue size.
1013

1014
    """
1015
    self._queue_size = len(self._GetJobIDsUnlocked(sort=False))
1016

    
1017
  @utils.LockedMethod
1018
  @_RequireOpenQueue
1019
  def SetDrainFlag(self, drain_flag):
1020
    """Sets the drain flag for the queue.
1021

1022
    @type drain_flag: boolean
1023
    @param drain_flag: Whether to set or unset the drain flag
1024

1025
    """
1026
    if drain_flag:
1027
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1028
    else:
1029
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1030

    
1031
    self._drained = drain_flag
1032

    
1033
    return True
1034

    
1035
  @_RequireOpenQueue
1036
  def _SubmitJobUnlocked(self, job_id, ops):
1037
    """Create and store a new job.
1038

1039
    This enters the job into our job queue and also puts it on the new
1040
    queue, in order for it to be picked up by the queue processors.
1041

1042
    @type job_id: job ID
1043
    @param job_id: the job ID for the new job
1044
    @type ops: list
1045
    @param ops: The list of OpCodes that will become the new job.
1046
    @rtype: job ID
1047
    @return: the job ID of the newly created job
1048
    @raise errors.JobQueueDrainError: if the job is marked for draining
1049

1050
    """
1051
    # Ok when sharing the big job queue lock, as the drain file is created when
1052
    # the lock is exclusive.
1053
    if self._drained:
1054
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1055

    
1056
    if self._queue_size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1057
      raise errors.JobQueueFull()
1058

    
1059
    job = _QueuedJob(self, job_id, ops)
1060

    
1061
    # Write to disk
1062
    self.UpdateJobUnlocked(job)
1063

    
1064
    self._queue_size += 1
1065

    
1066
    logging.debug("Adding new job %s to the cache", job_id)
1067
    self._memcache[job_id] = job
1068

    
1069
    # Add to worker pool
1070
    self._wpool.AddTask(job)
1071

    
1072
    return job.id
1073

    
1074
  @utils.LockedMethod
1075
  @_RequireOpenQueue
1076
  def SubmitJob(self, ops):
1077
    """Create and store a new job.
1078

1079
    @see: L{_SubmitJobUnlocked}
1080

1081
    """
1082
    job_id = self._NewSerialsUnlocked(1)[0]
1083
    return self._SubmitJobUnlocked(job_id, ops)
1084

    
1085
  @utils.LockedMethod
1086
  @_RequireOpenQueue
1087
  def SubmitManyJobs(self, jobs):
1088
    """Create and store multiple jobs.
1089

1090
    @see: L{_SubmitJobUnlocked}
1091

1092
    """
1093
    results = []
1094
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1095
    for job_id, ops in zip(all_job_ids, jobs):
1096
      try:
1097
        data = self._SubmitJobUnlocked(job_id, ops)
1098
        status = True
1099
      except errors.GenericError, err:
1100
        data = str(err)
1101
        status = False
1102
      results.append((status, data))
1103

    
1104
    return results
1105

    
1106
  @_RequireOpenQueue
1107
  def UpdateJobUnlocked(self, job):
1108
    """Update a job's on disk storage.
1109

1110
    After a job has been modified, this function needs to be called in
1111
    order to write the changes to disk and replicate them to the other
1112
    nodes.
1113

1114
    @type job: L{_QueuedJob}
1115
    @param job: the changed job
1116

1117
    """
1118
    filename = self._GetJobPath(job.id)
1119
    data = serializer.DumpJson(job.Serialize(), indent=False)
1120
    logging.debug("Writing job %s to %s", job.id, filename)
1121
    self._WriteAndReplicateFileUnlocked(filename, data)
1122

    
1123
    # Notify waiters about potential changes
1124
    job.change.notifyAll()
1125

    
1126
  @utils.LockedMethod
1127
  @_RequireOpenQueue
1128
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1129
                        timeout):
1130
    """Waits for changes in a job.
1131

1132
    @type job_id: string
1133
    @param job_id: Job identifier
1134
    @type fields: list of strings
1135
    @param fields: Which fields to check for changes
1136
    @type prev_job_info: list or None
1137
    @param prev_job_info: Last job information returned
1138
    @type prev_log_serial: int
1139
    @param prev_log_serial: Last job message serial number
1140
    @type timeout: float
1141
    @param timeout: maximum time to wait
1142
    @rtype: tuple (job info, log entries)
1143
    @return: a tuple of the job information as required via
1144
        the fields parameter, and the log entries as a list
1145

1146
        if the job has not changed and the timeout has expired,
1147
        we instead return a special value,
1148
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1149
        as such by the clients
1150

1151
    """
1152
    job = self._LoadJobUnlocked(job_id)
1153
    if not job:
1154
      logging.debug("Job %s not found", job_id)
1155
      return None
1156

    
1157
    def _CheckForChanges():
1158
      logging.debug("Waiting for changes in job %s", job_id)
1159

    
1160
      status = job.CalcStatus()
1161
      job_info = self._GetJobInfoUnlocked(job, fields)
1162
      log_entries = job.GetLogEntries(prev_log_serial)
1163

    
1164
      # Serializing and deserializing data can cause type changes (e.g. from
1165
      # tuple to list) or precision loss. We're doing it here so that we get
1166
      # the same modifications as the data received from the client. Without
1167
      # this, the comparison afterwards might fail without the data being
1168
      # significantly different.
1169
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1170
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1171

    
1172
      # Don't even try to wait if the job is no longer running, there will be
1173
      # no changes.
1174
      if (status not in (constants.JOB_STATUS_QUEUED,
1175
                         constants.JOB_STATUS_RUNNING,
1176
                         constants.JOB_STATUS_WAITLOCK) or
1177
          prev_job_info != job_info or
1178
          (log_entries and prev_log_serial != log_entries[0][0])):
1179
        logging.debug("Job %s changed", job_id)
1180
        return (job_info, log_entries)
1181

    
1182
      raise utils.RetryAgain()
1183

    
1184
    try:
1185
      # Setting wait function to release the queue lock while waiting
1186
      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1187
                         wait_fn=job.change.wait)
1188
    except utils.RetryTimeout:
1189
      return constants.JOB_NOTCHANGED
1190

    
1191
  @utils.LockedMethod
1192
  @_RequireOpenQueue
1193
  def CancelJob(self, job_id):
1194
    """Cancels a job.
1195

1196
    This will only succeed if the job has not started yet.
1197

1198
    @type job_id: string
1199
    @param job_id: job ID of job to be cancelled.
1200

1201
    """
1202
    logging.info("Cancelling job %s", job_id)
1203

    
1204
    job = self._LoadJobUnlocked(job_id)
1205
    if not job:
1206
      logging.debug("Job %s not found", job_id)
1207
      return (False, "Job %s not found" % job_id)
1208

    
1209
    job_status = job.CalcStatus()
1210

    
1211
    if job_status not in (constants.JOB_STATUS_QUEUED,
1212
                          constants.JOB_STATUS_WAITLOCK):
1213
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1214
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1215

    
1216
    if job_status == constants.JOB_STATUS_QUEUED:
1217
      self.CancelJobUnlocked(job)
1218
      return (True, "Job %s canceled" % job.id)
1219

    
1220
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1221
      # The worker will notice the new status and cancel the job
1222
      try:
1223
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1224
      finally:
1225
        self.UpdateJobUnlocked(job)
1226
      return (True, "Job %s will be canceled" % job.id)
1227

    
1228
  @_RequireOpenQueue
1229
  def CancelJobUnlocked(self, job):
1230
    """Marks a job as canceled.
1231

1232
    """
1233
    try:
1234
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1235
                            "Job canceled by request")
1236
    finally:
1237
      self.UpdateJobUnlocked(job)
1238

    
1239
  @_RequireOpenQueue
1240
  def _ArchiveJobsUnlocked(self, jobs):
1241
    """Archives jobs.
1242

1243
    @type jobs: list of L{_QueuedJob}
1244
    @param jobs: Job objects
1245
    @rtype: int
1246
    @return: Number of archived jobs
1247

1248
    """
1249
    archive_jobs = []
1250
    rename_files = []
1251
    for job in jobs:
1252
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1253
                                  constants.JOB_STATUS_SUCCESS,
1254
                                  constants.JOB_STATUS_ERROR):
1255
        logging.debug("Job %s is not yet done", job.id)
1256
        continue
1257

    
1258
      archive_jobs.append(job)
1259

    
1260
      old = self._GetJobPath(job.id)
1261
      new = self._GetArchivedJobPath(job.id)
1262
      rename_files.append((old, new))
1263

    
1264
    # TODO: What if 1..n files fail to rename?
1265
    self._RenameFilesUnlocked(rename_files)
1266

    
1267
    logging.debug("Successfully archived job(s) %s",
1268
                  utils.CommaJoin(job.id for job in archive_jobs))
1269

    
1270
    # Since we haven't quite checked, above, if we succeeded or failed renaming
1271
    # the files, we update the cached queue size from the filesystem. When we
1272
    # get around to fix the TODO: above, we can use the number of actually
1273
    # archived jobs to fix this.
1274
    self._UpdateQueueSizeUnlocked()
1275
    return len(archive_jobs)
1276

    
1277
  @utils.LockedMethod
1278
  @_RequireOpenQueue
1279
  def ArchiveJob(self, job_id):
1280
    """Archives a job.
1281

1282
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1283

1284
    @type job_id: string
1285
    @param job_id: Job ID of job to be archived.
1286
    @rtype: bool
1287
    @return: Whether job was archived
1288

1289
    """
1290
    logging.info("Archiving job %s", job_id)
1291

    
1292
    job = self._LoadJobUnlocked(job_id)
1293
    if not job:
1294
      logging.debug("Job %s not found", job_id)
1295
      return False
1296

    
1297
    return self._ArchiveJobsUnlocked([job]) == 1
1298

    
1299
  @utils.LockedMethod
1300
  @_RequireOpenQueue
1301
  def AutoArchiveJobs(self, age, timeout):
1302
    """Archives all jobs based on age.
1303

1304
    The method will archive all jobs which are older than the age
1305
    parameter. For jobs that don't have an end timestamp, the start
1306
    timestamp will be considered. The special '-1' age will cause
1307
    archival of all jobs (that are not running or queued).
1308

1309
    @type age: int
1310
    @param age: the minimum age in seconds
1311

1312
    """
1313
    logging.info("Archiving jobs with age more than %s seconds", age)
1314

    
1315
    now = time.time()
1316
    end_time = now + timeout
1317
    archived_count = 0
1318
    last_touched = 0
1319

    
1320
    all_job_ids = self._GetJobIDsUnlocked()
1321
    pending = []
1322
    for idx, job_id in enumerate(all_job_ids):
1323
      last_touched = idx + 1
1324

    
1325
      # Not optimal because jobs could be pending
1326
      # TODO: Measure average duration for job archival and take number of
1327
      # pending jobs into account.
1328
      if time.time() > end_time:
1329
        break
1330

    
1331
      # Returns None if the job failed to load
1332
      job = self._LoadJobUnlocked(job_id)
1333
      if job:
1334
        if job.end_timestamp is None:
1335
          if job.start_timestamp is None:
1336
            job_age = job.received_timestamp
1337
          else:
1338
            job_age = job.start_timestamp
1339
        else:
1340
          job_age = job.end_timestamp
1341

    
1342
        if age == -1 or now - job_age[0] > age:
1343
          pending.append(job)
1344

    
1345
          # Archive 10 jobs at a time
1346
          if len(pending) >= 10:
1347
            archived_count += self._ArchiveJobsUnlocked(pending)
1348
            pending = []
1349

    
1350
    if pending:
1351
      archived_count += self._ArchiveJobsUnlocked(pending)
1352

    
1353
    return (archived_count, len(all_job_ids) - last_touched)
1354

    
1355
  @staticmethod
1356
  def _GetJobInfoUnlocked(job, fields):
1357
    """Returns information about a job.
1358

1359
    @type job: L{_QueuedJob}
1360
    @param job: the job which we query
1361
    @type fields: list
1362
    @param fields: names of fields to return
1363
    @rtype: list
1364
    @return: list with one element for each field
1365
    @raise errors.OpExecError: when an invalid field
1366
        has been passed
1367

1368
    """
1369
    row = []
1370
    for fname in fields:
1371
      if fname == "id":
1372
        row.append(job.id)
1373
      elif fname == "status":
1374
        row.append(job.CalcStatus())
1375
      elif fname == "ops":
1376
        row.append([op.input.__getstate__() for op in job.ops])
1377
      elif fname == "opresult":
1378
        row.append([op.result for op in job.ops])
1379
      elif fname == "opstatus":
1380
        row.append([op.status for op in job.ops])
1381
      elif fname == "oplog":
1382
        row.append([op.log for op in job.ops])
1383
      elif fname == "opstart":
1384
        row.append([op.start_timestamp for op in job.ops])
1385
      elif fname == "opexec":
1386
        row.append([op.exec_timestamp for op in job.ops])
1387
      elif fname == "opend":
1388
        row.append([op.end_timestamp for op in job.ops])
1389
      elif fname == "received_ts":
1390
        row.append(job.received_timestamp)
1391
      elif fname == "start_ts":
1392
        row.append(job.start_timestamp)
1393
      elif fname == "end_ts":
1394
        row.append(job.end_timestamp)
1395
      elif fname == "lock_status":
1396
        row.append(job.lock_status)
1397
      elif fname == "summary":
1398
        row.append([op.input.Summary() for op in job.ops])
1399
      else:
1400
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1401
    return row
1402

    
1403
  @utils.LockedMethod
1404
  @_RequireOpenQueue
1405
  def QueryJobs(self, job_ids, fields):
1406
    """Returns a list of jobs in queue.
1407

1408
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1409
    processing for each job.
1410

1411
    @type job_ids: list
1412
    @param job_ids: sequence of job identifiers or None for all
1413
    @type fields: list
1414
    @param fields: names of fields to return
1415
    @rtype: list
1416
    @return: list one element per job, each element being list with
1417
        the requested fields
1418

1419
    """
1420
    jobs = []
1421

    
1422
    for job in self._GetJobsUnlocked(job_ids):
1423
      if job is None:
1424
        jobs.append(None)
1425
      else:
1426
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1427

    
1428
    return jobs
1429

    
1430
  @utils.LockedMethod
1431
  @_RequireOpenQueue
1432
  def Shutdown(self):
1433
    """Stops the job queue.
1434

1435
    This shutdowns all the worker threads an closes the queue.
1436

1437
    """
1438
    self._wpool.TerminateWorkers()
1439

    
1440
    self._queue_filelock.Close()
1441
    self._queue_filelock = None