Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ ea03467c

History | View | Annotate | Download (33.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50
from ganeti.rpc import RpcRunner
51

    
52
JOBQUEUE_THREADS = 25
53

    
54

    
55
def TimeStampNow():
56
  """Returns the current timestamp.
57

58
  @rtype: tuple
59
  @return: the current time in the (seconds, microseconds) format
60

61
  """
62
  return utils.SplitTime(time.time())
63

    
64

    
65
class _QueuedOpCode(object):
66
  """Encasulates an opcode object.
67

68
  @ivar log: holds the execution log and consists of tuples
69
  of the form C{(log_serial, timestamp, level, message)}
70
  @ivar input: the OpCode we encapsulate
71
  @ivar status: the current status
72
  @ivar result: the result of the LU execution
73
  @ivar start_timestamp: timestamp for the start of the execution
74
  @ivar stop_timestamp: timestamp for the end of the execution
75

76
  """
77
  def __init__(self, op):
78
    """Constructor for the _QuededOpCode.
79

80
    @type op: L{opcodes.OpCode}
81
    @param op: the opcode we encapsulate
82

83
    """
84
    self.input = op
85
    self.status = constants.OP_STATUS_QUEUED
86
    self.result = None
87
    self.log = []
88
    self.start_timestamp = None
89
    self.end_timestamp = None
90

    
91
  @classmethod
92
  def Restore(cls, state):
93
    """Restore the _QueuedOpCode from the serialized form.
94

95
    @type state: dict
96
    @param state: the serialized state
97
    @rtype: _QueuedOpCode
98
    @return: a new _QueuedOpCode instance
99

100
    """
101
    obj = _QueuedOpCode.__new__(cls)
102
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
103
    obj.status = state["status"]
104
    obj.result = state["result"]
105
    obj.log = state["log"]
106
    obj.start_timestamp = state.get("start_timestamp", None)
107
    obj.end_timestamp = state.get("end_timestamp", None)
108
    return obj
109

    
110
  def Serialize(self):
111
    """Serializes this _QueuedOpCode.
112

113
    @rtype: dict
114
    @return: the dictionary holding the serialized state
115

116
    """
117
    return {
118
      "input": self.input.__getstate__(),
119
      "status": self.status,
120
      "result": self.result,
121
      "log": self.log,
122
      "start_timestamp": self.start_timestamp,
123
      "end_timestamp": self.end_timestamp,
124
      }
125

    
126

    
127
class _QueuedJob(object):
128
  """In-memory job representation.
129

130
  This is what we use to track the user-submitted jobs. Locking must
131
  be taken care of by users of this class.
132

133
  @type queue: L{JobQueue}
134
  @ivar queue: the parent queue
135
  @ivar id: the job ID
136
  @type ops: list
137
  @ivar ops: the list of _QueuedOpCode that constitute the job
138
  @type run_op_index: int
139
  @ivar run_op_index: the currently executing opcode, or -1 if
140
      we didn't yet start executing
141
  @type log_serial: int
142
  @ivar log_serial: holds the index for the next log entry
143
  @ivar received_timestamp: the timestamp for when the job was received
144
  @ivar start_timestmap: the timestamp for start of execution
145
  @ivar end_timestamp: the timestamp for end of execution
146
  @ivar change: a Condition variable we use for waiting for job changes
147

148
  """
149
  def __init__(self, queue, job_id, ops):
150
    """Constructor for the _QueuedJob.
151

152
    @type queue: L{JobQueue}
153
    @param queue: our parent queue
154
    @type job_id: job_id
155
    @param job_id: our job id
156
    @type ops: list
157
    @param ops: the list of opcodes we hold, which will be encapsulated
158
        in _QueuedOpCodes
159

160
    """
161
    if not ops:
162
      # TODO: use a better exception
163
      raise Exception("No opcodes")
164

    
165
    self.queue = queue
166
    self.id = job_id
167
    self.ops = [_QueuedOpCode(op) for op in ops]
168
    self.run_op_index = -1
169
    self.log_serial = 0
170
    self.received_timestamp = TimeStampNow()
171
    self.start_timestamp = None
172
    self.end_timestamp = None
173

    
174
    # Condition to wait for changes
175
    self.change = threading.Condition(self.queue._lock)
176

    
177
  @classmethod
178
  def Restore(cls, queue, state):
179
    """Restore a _QueuedJob from serialized state:
180

181
    @type queue: L{JobQueue}
182
    @param queue: to which queue the restored job belongs
183
    @type state: dict
184
    @param state: the serialized state
185
    @rtype: _JobQueue
186
    @return: the restored _JobQueue instance
187

188
    """
189
    obj = _QueuedJob.__new__(cls)
190
    obj.queue = queue
191
    obj.id = state["id"]
192
    obj.run_op_index = state["run_op_index"]
193
    obj.received_timestamp = state.get("received_timestamp", None)
194
    obj.start_timestamp = state.get("start_timestamp", None)
195
    obj.end_timestamp = state.get("end_timestamp", None)
196

    
197
    obj.ops = []
198
    obj.log_serial = 0
199
    for op_state in state["ops"]:
200
      op = _QueuedOpCode.Restore(op_state)
201
      for log_entry in op.log:
202
        obj.log_serial = max(obj.log_serial, log_entry[0])
203
      obj.ops.append(op)
204

    
205
    # Condition to wait for changes
206
    obj.change = threading.Condition(obj.queue._lock)
207

    
208
    return obj
209

    
210
  def Serialize(self):
211
    """Serialize the _JobQueue instance.
212

213
    @rtype: dict
214
    @return: the serialized state
215

216
    """
217
    return {
218
      "id": self.id,
219
      "ops": [op.Serialize() for op in self.ops],
220
      "run_op_index": self.run_op_index,
221
      "start_timestamp": self.start_timestamp,
222
      "end_timestamp": self.end_timestamp,
223
      "received_timestamp": self.received_timestamp,
224
      }
225

    
226
  def CalcStatus(self):
227
    """Compute the status of this job.
228

229
    This function iterates over all the _QueuedOpCodes in the job and
230
    based on their status, computes the job status.
231

232
    The algorithm is:
233
      - if we find a cancelled, or finished with error, the job
234
        status will be the same
235
      - otherwise, the last opcode with the status one of:
236
          - waitlock
237
          - running
238

239
        will determine the job status
240

241
      - otherwise, it means either all opcodes are queued, or success,
242
        and the job status will be the same
243

244
    @return: the job status
245

246
    """
247
    status = constants.JOB_STATUS_QUEUED
248

    
249
    all_success = True
250
    for op in self.ops:
251
      if op.status == constants.OP_STATUS_SUCCESS:
252
        continue
253

    
254
      all_success = False
255

    
256
      if op.status == constants.OP_STATUS_QUEUED:
257
        pass
258
      elif op.status == constants.OP_STATUS_WAITLOCK:
259
        status = constants.JOB_STATUS_WAITLOCK
260
      elif op.status == constants.OP_STATUS_RUNNING:
261
        status = constants.JOB_STATUS_RUNNING
262
      elif op.status == constants.OP_STATUS_ERROR:
263
        status = constants.JOB_STATUS_ERROR
264
        # The whole job fails if one opcode failed
265
        break
266
      elif op.status == constants.OP_STATUS_CANCELED:
267
        status = constants.OP_STATUS_CANCELED
268
        break
269

    
270
    if all_success:
271
      status = constants.JOB_STATUS_SUCCESS
272

    
273
    return status
274

    
275
  def GetLogEntries(self, newer_than):
276
    """Selectively returns the log entries.
277

278
    @type newer_than: None or int
279
    @param newer_than: if this is None, return all log enties,
280
        otherwise return only the log entries with serial higher
281
        than this value
282
    @rtype: list
283
    @return: the list of the log entries selected
284

285
    """
286
    if newer_than is None:
287
      serial = -1
288
    else:
289
      serial = newer_than
290

    
291
    entries = []
292
    for op in self.ops:
293
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
294

    
295
    return entries
296

    
297

    
298
class _JobQueueWorker(workerpool.BaseWorker):
299
  """The actual job workers.
300

301
  """
302
  def _NotifyStart(self):
303
    """Mark the opcode as running, not lock-waiting.
304

305
    This is called from the mcpu code as a notifier function, when the
306
    LU is finally about to start the Exec() method. Of course, to have
307
    end-user visible results, the opcode must be initially (before
308
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
309

310
    """
311
    assert self.queue, "Queue attribute is missing"
312
    assert self.opcode, "Opcode attribute is missing"
313

    
314
    self.queue.acquire()
315
    try:
316
      self.opcode.status = constants.OP_STATUS_RUNNING
317
    finally:
318
      self.queue.release()
319

    
320
  def RunTask(self, job):
321
    """Job executor.
322

323
    This functions processes a job. It is closely tied to the _QueuedJob and
324
    _QueuedOpCode classes.
325

326
    @type job: L{_QueuedJob}
327
    @param job: the job to be processed
328

329
    """
330
    logging.debug("Worker %s processing job %s",
331
                  self.worker_id, job.id)
332
    proc = mcpu.Processor(self.pool.queue.context)
333
    self.queue = queue = job.queue
334
    try:
335
      try:
336
        count = len(job.ops)
337
        for idx, op in enumerate(job.ops):
338
          try:
339
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
340

    
341
            queue.acquire()
342
            try:
343
              job.run_op_index = idx
344
              op.status = constants.OP_STATUS_WAITLOCK
345
              op.result = None
346
              op.start_timestamp = TimeStampNow()
347
              if idx == 0: # first opcode
348
                job.start_timestamp = op.start_timestamp
349
              queue.UpdateJobUnlocked(job)
350

    
351
              input_opcode = op.input
352
            finally:
353
              queue.release()
354

    
355
            def _Log(*args):
356
              """Append a log entry.
357

358
              """
359
              assert len(args) < 3
360

    
361
              if len(args) == 1:
362
                log_type = constants.ELOG_MESSAGE
363
                log_msg = args[0]
364
              else:
365
                log_type, log_msg = args
366

    
367
              # The time is split to make serialization easier and not lose
368
              # precision.
369
              timestamp = utils.SplitTime(time.time())
370

    
371
              queue.acquire()
372
              try:
373
                job.log_serial += 1
374
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
375

    
376
                job.change.notifyAll()
377
              finally:
378
                queue.release()
379

    
380
            # Make sure not to hold lock while _Log is called
381
            self.opcode = op
382
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
383

    
384
            queue.acquire()
385
            try:
386
              op.status = constants.OP_STATUS_SUCCESS
387
              op.result = result
388
              op.end_timestamp = TimeStampNow()
389
              queue.UpdateJobUnlocked(job)
390
            finally:
391
              queue.release()
392

    
393
            logging.debug("Op %s/%s: Successfully finished %s",
394
                          idx + 1, count, op)
395
          except Exception, err:
396
            queue.acquire()
397
            try:
398
              try:
399
                op.status = constants.OP_STATUS_ERROR
400
                op.result = str(err)
401
                op.end_timestamp = TimeStampNow()
402
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
403
              finally:
404
                queue.UpdateJobUnlocked(job)
405
            finally:
406
              queue.release()
407
            raise
408

    
409
      except errors.GenericError, err:
410
        logging.exception("Ganeti exception")
411
      except:
412
        logging.exception("Unhandled exception")
413
    finally:
414
      queue.acquire()
415
      try:
416
        try:
417
          job.run_op_idx = -1
418
          job.end_timestamp = TimeStampNow()
419
          queue.UpdateJobUnlocked(job)
420
        finally:
421
          job_id = job.id
422
          status = job.CalcStatus()
423
      finally:
424
        queue.release()
425
      logging.debug("Worker %s finished job %s, status = %s",
426
                    self.worker_id, job_id, status)
427

    
428

    
429
class _JobQueueWorkerPool(workerpool.WorkerPool):
430
  """Simple class implementing a job-processing workerpool.
431

432
  """
433
  def __init__(self, queue):
434
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
435
                                              _JobQueueWorker)
436
    self.queue = queue
437

    
438

    
439
class JobQueue(object):
440
  """Quue used to manaage the jobs.
441

442
  @cvar _RE_JOB_FILE: regex matching the valid job file names
443

444
  """
445
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
446

    
447
  def _RequireOpenQueue(fn):
448
    """Decorator for "public" functions.
449

450
    This function should be used for all 'public' functions. That is,
451
    functions usually called from other classes.
452

453
    @warning: Use this decorator only after utils.LockedMethod!
454

455
    Example::
456
      @utils.LockedMethod
457
      @_RequireOpenQueue
458
      def Example(self):
459
        pass
460

461
    """
462
    def wrapper(self, *args, **kwargs):
463
      assert self._queue_lock is not None, "Queue should be open"
464
      return fn(self, *args, **kwargs)
465
    return wrapper
466

    
467
  def __init__(self, context):
468
    """Constructor for JobQueue.
469

470
    The constructor will initialize the job queue object and then
471
    start loading the current jobs from disk, either for starting them
472
    (if they were queue) or for aborting them (if they were already
473
    running).
474

475
    @type context: GanetiContext
476
    @param context: the context object for access to the configuration
477
        data and other ganeti objects
478

479
    """
480
    self.context = context
481
    self._memcache = weakref.WeakValueDictionary()
482
    self._my_hostname = utils.HostInfo().name
483

    
484
    # Locking
485
    self._lock = threading.Lock()
486
    self.acquire = self._lock.acquire
487
    self.release = self._lock.release
488

    
489
    # Initialize
490
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
491

    
492
    # Read serial file
493
    self._last_serial = jstore.ReadSerial()
494
    assert self._last_serial is not None, ("Serial file was modified between"
495
                                           " check in jstore and here")
496

    
497
    # Get initial list of nodes
498
    self._nodes = dict((n.name, n.primary_ip)
499
                       for n in self.context.cfg.GetAllNodesInfo().values())
500

    
501
    # Remove master node
502
    try:
503
      del self._nodes[self._my_hostname]
504
    except ValueError:
505
      pass
506

    
507
    # TODO: Check consistency across nodes
508

    
509
    # Setup worker pool
510
    self._wpool = _JobQueueWorkerPool(self)
511

    
512
    # We need to lock here because WorkerPool.AddTask() may start a job while
513
    # we're still doing our work.
514
    self.acquire()
515
    try:
516
      for job in self._GetJobsUnlocked(None):
517
        # a failure in loading the job can cause 'None' to be returned
518
        if job is None:
519
          continue
520

    
521
        status = job.CalcStatus()
522

    
523
        if status in (constants.JOB_STATUS_QUEUED, ):
524
          self._wpool.AddTask(job)
525

    
526
        elif status in (constants.JOB_STATUS_RUNNING,
527
                        constants.JOB_STATUS_WAITLOCK):
528
          logging.warning("Unfinished job %s found: %s", job.id, job)
529
          try:
530
            for op in job.ops:
531
              op.status = constants.OP_STATUS_ERROR
532
              op.result = "Unclean master daemon shutdown"
533
          finally:
534
            self.UpdateJobUnlocked(job)
535
    finally:
536
      self.release()
537

    
538
  @utils.LockedMethod
539
  @_RequireOpenQueue
540
  def AddNode(self, node):
541
    """Register a new node with the queue.
542

543
    @type node: L{objects.Node}
544
    @param node: the node object to be added
545

546
    """
547
    node_name = node.name
548
    assert node_name != self._my_hostname
549

    
550
    # Clean queue directory on added node
551
    RpcRunner.call_jobqueue_purge(node_name)
552

    
553
    # Upload the whole queue excluding archived jobs
554
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
555

    
556
    # Upload current serial file
557
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
558

    
559
    for file_name in files:
560
      # Read file content
561
      fd = open(file_name, "r")
562
      try:
563
        content = fd.read()
564
      finally:
565
        fd.close()
566

    
567
      result = RpcRunner.call_jobqueue_update([node_name], [node.primary_ip],
568
                                              file_name, content)
569
      if not result[node_name]:
570
        logging.error("Failed to upload %s to %s", file_name, node_name)
571

    
572
    self._nodes[node_name] = node.primary_ip
573

    
574
  @utils.LockedMethod
575
  @_RequireOpenQueue
576
  def RemoveNode(self, node_name):
577
    """Callback called when removing nodes from the cluster.
578

579
    @type node_name: str
580
    @param node_name: the name of the node to remove
581

582
    """
583
    try:
584
      # The queue is removed by the "leave node" RPC call.
585
      del self._nodes[node_name]
586
    except KeyError:
587
      pass
588

    
589
  def _CheckRpcResult(self, result, nodes, failmsg):
590
    """Verifies the status of an RPC call.
591

592
    Since we aim to keep consistency should this node (the current
593
    master) fail, we will log errors if our rpc fail, and especially
594
    log the case when more than half of the nodes failes.
595

596
    @param result: the data as returned from the rpc call
597
    @type nodes: list
598
    @param nodes: the list of nodes we made the call to
599
    @type failmsg: str
600
    @param failmsg: the identifier to be used for logging
601

602
    """
603
    failed = []
604
    success = []
605

    
606
    for node in nodes:
607
      if result[node]:
608
        success.append(node)
609
      else:
610
        failed.append(node)
611

    
612
    if failed:
613
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
614

    
615
    # +1 for the master node
616
    if (len(success) + 1) < len(failed):
617
      # TODO: Handle failing nodes
618
      logging.error("More than half of the nodes failed")
619

    
620
  def _GetNodeIp(self):
621
    """Helper for returning the node name/ip list.
622

623
    @rtype: (list, list)
624
    @return: a tuple of two lists, the first one with the node
625
        names and the second one with the node addresses
626

627
    """
628
    name_list = self._nodes.keys()
629
    addr_list = [self._nodes[name] for name in name_list]
630
    return name_list, addr_list
631

    
632
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
633
    """Writes a file locally and then replicates it to all nodes.
634

635
    This function will replace the contents of a file on the local
636
    node and then replicate it to all the other nodes we have.
637

638
    @type file_name: str
639
    @param file_name: the path of the file to be replicated
640
    @type data: str
641
    @param data: the new contents of the file
642

643
    """
644
    utils.WriteFile(file_name, data=data)
645

    
646
    names, addrs = self._GetNodeIp()
647
    result = RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
648
    self._CheckRpcResult(result, self._nodes,
649
                         "Updating %s" % file_name)
650

    
651
  def _RenameFileUnlocked(self, old, new):
652
    """Renames a file locally and then replicate the change.
653

654
    This function will rename a file in the local queue directory
655
    and then replicate this rename to all the other nodes we have.
656

657
    @type old: str
658
    @param old: the current name of the file
659
    @type new: str
660
    @param new: the new name of the file
661

662
    """
663
    os.rename(old, new)
664

    
665
    names, addrs = self._GetNodeIp()
666
    result = RpcRunner.call_jobqueue_rename(names, addrs, old, new)
667
    self._CheckRpcResult(result, self._nodes,
668
                         "Moving %s to %s" % (old, new))
669

    
670
  def _FormatJobID(self, job_id):
671
    """Convert a job ID to string format.
672

673
    Currently this just does C{str(job_id)} after performing some
674
    checks, but if we want to change the job id format this will
675
    abstract this change.
676

677
    @type job_id: int or long
678
    @param job_id: the numeric job id
679
    @rtype: str
680
    @return: the formatted job id
681

682
    """
683
    if not isinstance(job_id, (int, long)):
684
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
685
    if job_id < 0:
686
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
687

    
688
    return str(job_id)
689

    
690
  def _NewSerialUnlocked(self):
691
    """Generates a new job identifier.
692

693
    Job identifiers are unique during the lifetime of a cluster.
694

695
    @rtype: str
696
    @return: a string representing the job identifier.
697

698
    """
699
    # New number
700
    serial = self._last_serial + 1
701

    
702
    # Write to file
703
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
704
                                        "%s\n" % serial)
705

    
706
    # Keep it only if we were able to write the file
707
    self._last_serial = serial
708

    
709
    return self._FormatJobID(serial)
710

    
711
  @staticmethod
712
  def _GetJobPath(job_id):
713
    """Returns the job file for a given job id.
714

715
    @type job_id: str
716
    @param job_id: the job identifier
717
    @rtype: str
718
    @return: the path to the job file
719

720
    """
721
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
722

    
723
  @staticmethod
724
  def _GetArchivedJobPath(job_id):
725
    """Returns the archived job file for a give job id.
726

727
    @type job_id: str
728
    @param job_id: the job identifier
729
    @rtype: str
730
    @return: the path to the archived job file
731

732
    """
733
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
734

    
735
  @classmethod
736
  def _ExtractJobID(cls, name):
737
    """Extract the job id from a filename.
738

739
    @type name: str
740
    @param name: the job filename
741
    @rtype: job id or None
742
    @return: the job id corresponding to the given filename,
743
        or None if the filename does not represent a valid
744
        job file
745

746
    """
747
    m = cls._RE_JOB_FILE.match(name)
748
    if m:
749
      return m.group(1)
750
    else:
751
      return None
752

    
753
  def _GetJobIDsUnlocked(self, archived=False):
754
    """Return all known job IDs.
755

756
    If the parameter archived is True, archived jobs IDs will be
757
    included. Currently this argument is unused.
758

759
    The method only looks at disk because it's a requirement that all
760
    jobs are present on disk (so in the _memcache we don't have any
761
    extra IDs).
762

763
    @rtype: list
764
    @return: the list of job IDs
765

766
    """
767
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
768
    jlist = utils.NiceSort(jlist)
769
    return jlist
770

    
771
  def _ListJobFiles(self):
772
    """Returns the list of current job files.
773

774
    @rtype: list
775
    @return: the list of job file names
776

777
    """
778
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
779
            if self._RE_JOB_FILE.match(name)]
780

    
781
  def _LoadJobUnlocked(self, job_id):
782
    """Loads a job from the disk or memory.
783

784
    Given a job id, this will return the cached job object if
785
    existing, or try to load the job from the disk. If loading from
786
    disk, it will also add the job to the cache.
787

788
    @param job_id: the job id
789
    @rtype: L{_QueuedJob} or None
790
    @return: either None or the job object
791

792
    """
793
    job = self._memcache.get(job_id, None)
794
    if job:
795
      logging.debug("Found job %s in memcache", job_id)
796
      return job
797

    
798
    filepath = self._GetJobPath(job_id)
799
    logging.debug("Loading job from %s", filepath)
800
    try:
801
      fd = open(filepath, "r")
802
    except IOError, err:
803
      if err.errno in (errno.ENOENT, ):
804
        return None
805
      raise
806
    try:
807
      data = serializer.LoadJson(fd.read())
808
    finally:
809
      fd.close()
810

    
811
    try:
812
      job = _QueuedJob.Restore(self, data)
813
    except Exception, err:
814
      new_path = self._GetArchivedJobPath(job_id)
815
      if filepath == new_path:
816
        # job already archived (future case)
817
        logging.exception("Can't parse job %s", job_id)
818
      else:
819
        # non-archived case
820
        logging.exception("Can't parse job %s, will archive.", job_id)
821
        self._RenameFileUnlocked(filepath, new_path)
822
      return None
823

    
824
    self._memcache[job_id] = job
825
    logging.debug("Added job %s to the cache", job_id)
826
    return job
827

    
828
  def _GetJobsUnlocked(self, job_ids):
829
    """Return a list of jobs based on their IDs.
830

831
    @type job_ids: list
832
    @param job_ids: either an empty list (meaning all jobs),
833
        or a list of job IDs
834
    @rtype: list
835
    @return: the list of job objects
836

837
    """
838
    if not job_ids:
839
      job_ids = self._GetJobIDsUnlocked()
840

    
841
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
842

    
843
  @staticmethod
844
  def _IsQueueMarkedDrain():
845
    """Check if the queue is marked from drain.
846

847
    This currently uses the queue drain file, which makes it a
848
    per-node flag. In the future this can be moved to the config file.
849

850
    @rtype: boolean
851
    @return: True of the job queue is marked for draining
852

853
    """
854
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
855

    
856
  @staticmethod
857
  def SetDrainFlag(drain_flag):
858
    """Sets the drain flag for the queue.
859

860
    This is similar to the function L{backend.JobQueueSetDrainFlag},
861
    and in the future we might merge them.
862

863
    @type drain_flag: boolean
864
    @param drain_flag: wheter to set or unset the drain flag
865

866
    """
867
    if drain_flag:
868
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
869
    else:
870
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
871
    return True
872

    
873
  @utils.LockedMethod
874
  @_RequireOpenQueue
875
  def SubmitJob(self, ops):
876
    """Create and store a new job.
877

878
    This enters the job into our job queue and also puts it on the new
879
    queue, in order for it to be picked up by the queue processors.
880

881
    @type ops: list
882
    @param ops: The list of OpCodes that will become the new job.
883
    @rtype: job ID
884
    @return: the job ID of the newly created job
885
    @raise errors.JobQueueDrainError: if the job is marked for draining
886

887
    """
888
    if self._IsQueueMarkedDrain():
889
      raise errors.JobQueueDrainError()
890
    # Get job identifier
891
    job_id = self._NewSerialUnlocked()
892
    job = _QueuedJob(self, job_id, ops)
893

    
894
    # Write to disk
895
    self.UpdateJobUnlocked(job)
896

    
897
    logging.debug("Adding new job %s to the cache", job_id)
898
    self._memcache[job_id] = job
899

    
900
    # Add to worker pool
901
    self._wpool.AddTask(job)
902

    
903
    return job.id
904

    
905
  @_RequireOpenQueue
906
  def UpdateJobUnlocked(self, job):
907
    """Update a job's on disk storage.
908

909
    After a job has been modified, this function needs to be called in
910
    order to write the changes to disk and replicate them to the other
911
    nodes.
912

913
    @type job: L{_QueuedJob}
914
    @param job: the changed job
915

916
    """
917
    filename = self._GetJobPath(job.id)
918
    data = serializer.DumpJson(job.Serialize(), indent=False)
919
    logging.debug("Writing job %s to %s", job.id, filename)
920
    self._WriteAndReplicateFileUnlocked(filename, data)
921

    
922
    # Notify waiters about potential changes
923
    job.change.notifyAll()
924

    
925
  @utils.LockedMethod
926
  @_RequireOpenQueue
927
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
928
                        timeout):
929
    """Waits for changes in a job.
930

931
    @type job_id: string
932
    @param job_id: Job identifier
933
    @type fields: list of strings
934
    @param fields: Which fields to check for changes
935
    @type prev_job_info: list or None
936
    @param prev_job_info: Last job information returned
937
    @type prev_log_serial: int
938
    @param prev_log_serial: Last job message serial number
939
    @type timeout: float
940
    @param timeout: maximum time to wait
941
    @rtype: tuple (job info, log entries)
942
    @return: a tuple of the job information as required via
943
        the fields parameter, and the log entries as a list
944

945
        if the job has not changed and the timeout has expired,
946
        we instead return a special value,
947
        L{constants.JOB_NOTCHANGED}, which should be interpreted
948
        as such by the clients
949

950
    """
951
    logging.debug("Waiting for changes in job %s", job_id)
952
    end_time = time.time() + timeout
953
    while True:
954
      delta_time = end_time - time.time()
955
      if delta_time < 0:
956
        return constants.JOB_NOTCHANGED
957

    
958
      job = self._LoadJobUnlocked(job_id)
959
      if not job:
960
        logging.debug("Job %s not found", job_id)
961
        break
962

    
963
      status = job.CalcStatus()
964
      job_info = self._GetJobInfoUnlocked(job, fields)
965
      log_entries = job.GetLogEntries(prev_log_serial)
966

    
967
      # Serializing and deserializing data can cause type changes (e.g. from
968
      # tuple to list) or precision loss. We're doing it here so that we get
969
      # the same modifications as the data received from the client. Without
970
      # this, the comparison afterwards might fail without the data being
971
      # significantly different.
972
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
973
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
974

    
975
      if status not in (constants.JOB_STATUS_QUEUED,
976
                        constants.JOB_STATUS_RUNNING,
977
                        constants.JOB_STATUS_WAITLOCK):
978
        # Don't even try to wait if the job is no longer running, there will be
979
        # no changes.
980
        break
981

    
982
      if (prev_job_info != job_info or
983
          (log_entries and prev_log_serial != log_entries[0][0])):
984
        break
985

    
986
      logging.debug("Waiting again")
987

    
988
      # Release the queue lock while waiting
989
      job.change.wait(delta_time)
990

    
991
    logging.debug("Job %s changed", job_id)
992

    
993
    return (job_info, log_entries)
994

    
995
  @utils.LockedMethod
996
  @_RequireOpenQueue
997
  def CancelJob(self, job_id):
998
    """Cancels a job.
999

1000
    This will only succeed if the job has not started yet.
1001

1002
    @type job_id: string
1003
    @param job_id: job ID of job to be cancelled.
1004

1005
    """
1006
    logging.debug("Cancelling job %s", job_id)
1007

    
1008
    job = self._LoadJobUnlocked(job_id)
1009
    if not job:
1010
      logging.debug("Job %s not found", job_id)
1011
      return
1012

    
1013
    if job.CalcStatus() not in (constants.JOB_STATUS_QUEUED,):
1014
      logging.debug("Job %s is no longer in the queue", job.id)
1015
      return
1016

    
1017
    try:
1018
      for op in job.ops:
1019
        op.status = constants.OP_STATUS_ERROR
1020
        op.result = "Job cancelled by request"
1021
    finally:
1022
      self.UpdateJobUnlocked(job)
1023

    
1024
  @_RequireOpenQueue
1025
  def _ArchiveJobUnlocked(self, job_id):
1026
    """Archives a job.
1027

1028
    @type job_id: string
1029
    @param job_id: Job ID of job to be archived.
1030

1031
    """
1032
    logging.info("Archiving job %s", job_id)
1033

    
1034
    job = self._LoadJobUnlocked(job_id)
1035
    if not job:
1036
      logging.debug("Job %s not found", job_id)
1037
      return
1038

    
1039
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1040
                                constants.JOB_STATUS_SUCCESS,
1041
                                constants.JOB_STATUS_ERROR):
1042
      logging.debug("Job %s is not yet done", job.id)
1043
      return
1044

    
1045
    old = self._GetJobPath(job.id)
1046
    new = self._GetArchivedJobPath(job.id)
1047

    
1048
    self._RenameFileUnlocked(old, new)
1049

    
1050
    logging.debug("Successfully archived job %s", job.id)
1051

    
1052
  @utils.LockedMethod
1053
  @_RequireOpenQueue
1054
  def ArchiveJob(self, job_id):
1055
    """Archives a job.
1056

1057
    This is just a wrapper over L{_ArchiveJobUnlocked}.
1058

1059
    @type job_id: string
1060
    @param job_id: Job ID of job to be archived.
1061

1062
    """
1063
    return self._ArchiveJobUnlocked(job_id)
1064

    
1065
  @utils.LockedMethod
1066
  @_RequireOpenQueue
1067
  def AutoArchiveJobs(self, age):
1068
    """Archives all jobs based on age.
1069

1070
    The method will archive all jobs which are older than the age
1071
    parameter. For jobs that don't have an end timestamp, the start
1072
    timestamp will be considered. The special '-1' age will cause
1073
    archival of all jobs (that are not running or queued).
1074

1075
    @type age: int
1076
    @param age: the minimum age in seconds
1077

1078
    """
1079
    logging.info("Archiving jobs with age more than %s seconds", age)
1080

    
1081
    now = time.time()
1082
    for jid in self._GetJobIDsUnlocked(archived=False):
1083
      job = self._LoadJobUnlocked(jid)
1084
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1085
                                  constants.OP_STATUS_ERROR,
1086
                                  constants.OP_STATUS_CANCELED):
1087
        continue
1088
      if job.end_timestamp is None:
1089
        if job.start_timestamp is None:
1090
          job_age = job.received_timestamp
1091
        else:
1092
          job_age = job.start_timestamp
1093
      else:
1094
        job_age = job.end_timestamp
1095

    
1096
      if age == -1 or now - job_age[0] > age:
1097
        self._ArchiveJobUnlocked(jid)
1098

    
1099
  def _GetJobInfoUnlocked(self, job, fields):
1100
    """Returns information about a job.
1101

1102
    @type job: L{_QueuedJob}
1103
    @param job: the job which we query
1104
    @type fields: list
1105
    @param fields: names of fields to return
1106
    @rtype: list
1107
    @return: list with one element for each field
1108
    @raise errors.OpExecError: when an invalid field
1109
        has been passed
1110

1111
    """
1112
    row = []
1113
    for fname in fields:
1114
      if fname == "id":
1115
        row.append(job.id)
1116
      elif fname == "status":
1117
        row.append(job.CalcStatus())
1118
      elif fname == "ops":
1119
        row.append([op.input.__getstate__() for op in job.ops])
1120
      elif fname == "opresult":
1121
        row.append([op.result for op in job.ops])
1122
      elif fname == "opstatus":
1123
        row.append([op.status for op in job.ops])
1124
      elif fname == "oplog":
1125
        row.append([op.log for op in job.ops])
1126
      elif fname == "opstart":
1127
        row.append([op.start_timestamp for op in job.ops])
1128
      elif fname == "opend":
1129
        row.append([op.end_timestamp for op in job.ops])
1130
      elif fname == "received_ts":
1131
        row.append(job.received_timestamp)
1132
      elif fname == "start_ts":
1133
        row.append(job.start_timestamp)
1134
      elif fname == "end_ts":
1135
        row.append(job.end_timestamp)
1136
      elif fname == "summary":
1137
        row.append([op.input.Summary() for op in job.ops])
1138
      else:
1139
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1140
    return row
1141

    
1142
  @utils.LockedMethod
1143
  @_RequireOpenQueue
1144
  def QueryJobs(self, job_ids, fields):
1145
    """Returns a list of jobs in queue.
1146

1147
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1148
    processing for each job.
1149

1150
    @type job_ids: list
1151
    @param job_ids: sequence of job identifiers or None for all
1152
    @type fields: list
1153
    @param fields: names of fields to return
1154
    @rtype: list
1155
    @return: list one element per job, each element being list with
1156
        the requested fields
1157

1158
    """
1159
    jobs = []
1160

    
1161
    for job in self._GetJobsUnlocked(job_ids):
1162
      if job is None:
1163
        jobs.append(None)
1164
      else:
1165
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1166

    
1167
    return jobs
1168

    
1169
  @utils.LockedMethod
1170
  @_RequireOpenQueue
1171
  def Shutdown(self):
1172
    """Stops the job queue.
1173

1174
    This shutdowns all the worker threads an closes the queue.
1175

1176
    """
1177
    self._wpool.TerminateWorkers()
1178

    
1179
    self._queue_lock.Close()
1180
    self._queue_lock = None