Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 9728ae5d

History | View | Annotate | Download (36 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52

    
53

    
54
class CancelJob(Exception):
55
  """Special exception to cancel a job.
56

57
  """
58

    
59

    
60
def TimeStampNow():
61
  """Returns the current timestamp.
62

63
  @rtype: tuple
64
  @return: the current time in the (seconds, microseconds) format
65

66
  """
67
  return utils.SplitTime(time.time())
68

    
69

    
70
class _QueuedOpCode(object):
71
  """Encasulates an opcode object.
72

73
  @ivar log: holds the execution log and consists of tuples
74
  of the form C{(log_serial, timestamp, level, message)}
75
  @ivar input: the OpCode we encapsulate
76
  @ivar status: the current status
77
  @ivar result: the result of the LU execution
78
  @ivar start_timestamp: timestamp for the start of the execution
79
  @ivar stop_timestamp: timestamp for the end of the execution
80

81
  """
82
  def __init__(self, op):
83
    """Constructor for the _QuededOpCode.
84

85
    @type op: L{opcodes.OpCode}
86
    @param op: the opcode we encapsulate
87

88
    """
89
    self.input = op
90
    self.status = constants.OP_STATUS_QUEUED
91
    self.result = None
92
    self.log = []
93
    self.start_timestamp = None
94
    self.end_timestamp = None
95

    
96
  @classmethod
97
  def Restore(cls, state):
98
    """Restore the _QueuedOpCode from the serialized form.
99

100
    @type state: dict
101
    @param state: the serialized state
102
    @rtype: _QueuedOpCode
103
    @return: a new _QueuedOpCode instance
104

105
    """
106
    obj = _QueuedOpCode.__new__(cls)
107
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
108
    obj.status = state["status"]
109
    obj.result = state["result"]
110
    obj.log = state["log"]
111
    obj.start_timestamp = state.get("start_timestamp", None)
112
    obj.end_timestamp = state.get("end_timestamp", None)
113
    return obj
114

    
115
  def Serialize(self):
116
    """Serializes this _QueuedOpCode.
117

118
    @rtype: dict
119
    @return: the dictionary holding the serialized state
120

121
    """
122
    return {
123
      "input": self.input.__getstate__(),
124
      "status": self.status,
125
      "result": self.result,
126
      "log": self.log,
127
      "start_timestamp": self.start_timestamp,
128
      "end_timestamp": self.end_timestamp,
129
      }
130

    
131

    
132
class _QueuedJob(object):
133
  """In-memory job representation.
134

135
  This is what we use to track the user-submitted jobs. Locking must
136
  be taken care of by users of this class.
137

138
  @type queue: L{JobQueue}
139
  @ivar queue: the parent queue
140
  @ivar id: the job ID
141
  @type ops: list
142
  @ivar ops: the list of _QueuedOpCode that constitute the job
143
  @type run_op_index: int
144
  @ivar run_op_index: the currently executing opcode, or -1 if
145
      we didn't yet start executing
146
  @type log_serial: int
147
  @ivar log_serial: holds the index for the next log entry
148
  @ivar received_timestamp: the timestamp for when the job was received
149
  @ivar start_timestmap: the timestamp for start of execution
150
  @ivar end_timestamp: the timestamp for end of execution
151
  @ivar change: a Condition variable we use for waiting for job changes
152

153
  """
154
  def __init__(self, queue, job_id, ops):
155
    """Constructor for the _QueuedJob.
156

157
    @type queue: L{JobQueue}
158
    @param queue: our parent queue
159
    @type job_id: job_id
160
    @param job_id: our job id
161
    @type ops: list
162
    @param ops: the list of opcodes we hold, which will be encapsulated
163
        in _QueuedOpCodes
164

165
    """
166
    if not ops:
167
      # TODO: use a better exception
168
      raise Exception("No opcodes")
169

    
170
    self.queue = queue
171
    self.id = job_id
172
    self.ops = [_QueuedOpCode(op) for op in ops]
173
    self.run_op_index = -1
174
    self.log_serial = 0
175
    self.received_timestamp = TimeStampNow()
176
    self.start_timestamp = None
177
    self.end_timestamp = None
178

    
179
    # Condition to wait for changes
180
    self.change = threading.Condition(self.queue._lock)
181

    
182
  @classmethod
183
  def Restore(cls, queue, state):
184
    """Restore a _QueuedJob from serialized state:
185

186
    @type queue: L{JobQueue}
187
    @param queue: to which queue the restored job belongs
188
    @type state: dict
189
    @param state: the serialized state
190
    @rtype: _JobQueue
191
    @return: the restored _JobQueue instance
192

193
    """
194
    obj = _QueuedJob.__new__(cls)
195
    obj.queue = queue
196
    obj.id = state["id"]
197
    obj.run_op_index = state["run_op_index"]
198
    obj.received_timestamp = state.get("received_timestamp", None)
199
    obj.start_timestamp = state.get("start_timestamp", None)
200
    obj.end_timestamp = state.get("end_timestamp", None)
201

    
202
    obj.ops = []
203
    obj.log_serial = 0
204
    for op_state in state["ops"]:
205
      op = _QueuedOpCode.Restore(op_state)
206
      for log_entry in op.log:
207
        obj.log_serial = max(obj.log_serial, log_entry[0])
208
      obj.ops.append(op)
209

    
210
    # Condition to wait for changes
211
    obj.change = threading.Condition(obj.queue._lock)
212

    
213
    return obj
214

    
215
  def Serialize(self):
216
    """Serialize the _JobQueue instance.
217

218
    @rtype: dict
219
    @return: the serialized state
220

221
    """
222
    return {
223
      "id": self.id,
224
      "ops": [op.Serialize() for op in self.ops],
225
      "run_op_index": self.run_op_index,
226
      "start_timestamp": self.start_timestamp,
227
      "end_timestamp": self.end_timestamp,
228
      "received_timestamp": self.received_timestamp,
229
      }
230

    
231
  def CalcStatus(self):
232
    """Compute the status of this job.
233

234
    This function iterates over all the _QueuedOpCodes in the job and
235
    based on their status, computes the job status.
236

237
    The algorithm is:
238
      - if we find a cancelled, or finished with error, the job
239
        status will be the same
240
      - otherwise, the last opcode with the status one of:
241
          - waitlock
242
          - canceling
243
          - running
244

245
        will determine the job status
246

247
      - otherwise, it means either all opcodes are queued, or success,
248
        and the job status will be the same
249

250
    @return: the job status
251

252
    """
253
    status = constants.JOB_STATUS_QUEUED
254

    
255
    all_success = True
256
    for op in self.ops:
257
      if op.status == constants.OP_STATUS_SUCCESS:
258
        continue
259

    
260
      all_success = False
261

    
262
      if op.status == constants.OP_STATUS_QUEUED:
263
        pass
264
      elif op.status == constants.OP_STATUS_WAITLOCK:
265
        status = constants.JOB_STATUS_WAITLOCK
266
      elif op.status == constants.OP_STATUS_RUNNING:
267
        status = constants.JOB_STATUS_RUNNING
268
      elif op.status == constants.OP_STATUS_CANCELING:
269
        status = constants.JOB_STATUS_CANCELING
270
        break
271
      elif op.status == constants.OP_STATUS_ERROR:
272
        status = constants.JOB_STATUS_ERROR
273
        # The whole job fails if one opcode failed
274
        break
275
      elif op.status == constants.OP_STATUS_CANCELED:
276
        status = constants.OP_STATUS_CANCELED
277
        break
278

    
279
    if all_success:
280
      status = constants.JOB_STATUS_SUCCESS
281

    
282
    return status
283

    
284
  def GetLogEntries(self, newer_than):
285
    """Selectively returns the log entries.
286

287
    @type newer_than: None or int
288
    @param newer_than: if this is None, return all log enties,
289
        otherwise return only the log entries with serial higher
290
        than this value
291
    @rtype: list
292
    @return: the list of the log entries selected
293

294
    """
295
    if newer_than is None:
296
      serial = -1
297
    else:
298
      serial = newer_than
299

    
300
    entries = []
301
    for op in self.ops:
302
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
303

    
304
    return entries
305

    
306

    
307
class _JobQueueWorker(workerpool.BaseWorker):
308
  """The actual job workers.
309

310
  """
311
  def _NotifyStart(self):
312
    """Mark the opcode as running, not lock-waiting.
313

314
    This is called from the mcpu code as a notifier function, when the
315
    LU is finally about to start the Exec() method. Of course, to have
316
    end-user visible results, the opcode must be initially (before
317
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
318

319
    """
320
    assert self.queue, "Queue attribute is missing"
321
    assert self.opcode, "Opcode attribute is missing"
322

    
323
    self.queue.acquire()
324
    try:
325
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
326
                                    constants.OP_STATUS_CANCELING)
327

    
328
      # Cancel here if we were asked to
329
      if self.opcode.status == constants.OP_STATUS_CANCELING:
330
        raise CancelJob()
331

    
332
      self.opcode.status = constants.OP_STATUS_RUNNING
333
    finally:
334
      self.queue.release()
335

    
336
  def RunTask(self, job):
337
    """Job executor.
338

339
    This functions processes a job. It is closely tied to the _QueuedJob and
340
    _QueuedOpCode classes.
341

342
    @type job: L{_QueuedJob}
343
    @param job: the job to be processed
344

345
    """
346
    logging.debug("Worker %s processing job %s",
347
                  self.worker_id, job.id)
348
    proc = mcpu.Processor(self.pool.queue.context)
349
    self.queue = queue = job.queue
350
    try:
351
      try:
352
        count = len(job.ops)
353
        for idx, op in enumerate(job.ops):
354
          try:
355
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
356

    
357
            queue.acquire()
358
            try:
359
              assert op.status == constants.OP_STATUS_QUEUED
360
              job.run_op_index = idx
361
              op.status = constants.OP_STATUS_WAITLOCK
362
              op.result = None
363
              op.start_timestamp = TimeStampNow()
364
              if idx == 0: # first opcode
365
                job.start_timestamp = op.start_timestamp
366
              queue.UpdateJobUnlocked(job)
367

    
368
              input_opcode = op.input
369
            finally:
370
              queue.release()
371

    
372
            def _Log(*args):
373
              """Append a log entry.
374

375
              """
376
              assert len(args) < 3
377

    
378
              if len(args) == 1:
379
                log_type = constants.ELOG_MESSAGE
380
                log_msg = args[0]
381
              else:
382
                log_type, log_msg = args
383

    
384
              # The time is split to make serialization easier and not lose
385
              # precision.
386
              timestamp = utils.SplitTime(time.time())
387

    
388
              queue.acquire()
389
              try:
390
                job.log_serial += 1
391
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
392

    
393
                job.change.notifyAll()
394
              finally:
395
                queue.release()
396

    
397
            # Make sure not to hold lock while _Log is called
398
            self.opcode = op
399
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
400

    
401
            queue.acquire()
402
            try:
403
              op.status = constants.OP_STATUS_SUCCESS
404
              op.result = result
405
              op.end_timestamp = TimeStampNow()
406
              queue.UpdateJobUnlocked(job)
407
            finally:
408
              queue.release()
409

    
410
            logging.debug("Op %s/%s: Successfully finished %s",
411
                          idx + 1, count, op)
412
          except CancelJob:
413
            # Will be handled further up
414
            raise
415
          except Exception, err:
416
            queue.acquire()
417
            try:
418
              try:
419
                op.status = constants.OP_STATUS_ERROR
420
                op.result = str(err)
421
                op.end_timestamp = TimeStampNow()
422
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
423
              finally:
424
                queue.UpdateJobUnlocked(job)
425
            finally:
426
              queue.release()
427
            raise
428

    
429
      except CancelJob:
430
        queue.acquire()
431
        try:
432
          queue.CancelJobUnlocked(job)
433
        finally:
434
          queue.release()
435
      except errors.GenericError, err:
436
        logging.exception("Ganeti exception")
437
      except:
438
        logging.exception("Unhandled exception")
439
    finally:
440
      queue.acquire()
441
      try:
442
        try:
443
          job.run_op_idx = -1
444
          job.end_timestamp = TimeStampNow()
445
          queue.UpdateJobUnlocked(job)
446
        finally:
447
          job_id = job.id
448
          status = job.CalcStatus()
449
      finally:
450
        queue.release()
451
      logging.debug("Worker %s finished job %s, status = %s",
452
                    self.worker_id, job_id, status)
453

    
454

    
455
class _JobQueueWorkerPool(workerpool.WorkerPool):
456
  """Simple class implementing a job-processing workerpool.
457

458
  """
459
  def __init__(self, queue):
460
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
461
                                              _JobQueueWorker)
462
    self.queue = queue
463

    
464

    
465
class JobQueue(object):
466
  """Quue used to manaage the jobs.
467

468
  @cvar _RE_JOB_FILE: regex matching the valid job file names
469

470
  """
471
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
472

    
473
  def _RequireOpenQueue(fn):
474
    """Decorator for "public" functions.
475

476
    This function should be used for all 'public' functions. That is,
477
    functions usually called from other classes.
478

479
    @warning: Use this decorator only after utils.LockedMethod!
480

481
    Example::
482
      @utils.LockedMethod
483
      @_RequireOpenQueue
484
      def Example(self):
485
        pass
486

487
    """
488
    def wrapper(self, *args, **kwargs):
489
      assert self._queue_lock is not None, "Queue should be open"
490
      return fn(self, *args, **kwargs)
491
    return wrapper
492

    
493
  def __init__(self, context):
494
    """Constructor for JobQueue.
495

496
    The constructor will initialize the job queue object and then
497
    start loading the current jobs from disk, either for starting them
498
    (if they were queue) or for aborting them (if they were already
499
    running).
500

501
    @type context: GanetiContext
502
    @param context: the context object for access to the configuration
503
        data and other ganeti objects
504

505
    """
506
    self.context = context
507
    self._memcache = weakref.WeakValueDictionary()
508
    self._my_hostname = utils.HostInfo().name
509

    
510
    # Locking
511
    self._lock = threading.Lock()
512
    self.acquire = self._lock.acquire
513
    self.release = self._lock.release
514

    
515
    # Initialize
516
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
517

    
518
    # Read serial file
519
    self._last_serial = jstore.ReadSerial()
520
    assert self._last_serial is not None, ("Serial file was modified between"
521
                                           " check in jstore and here")
522

    
523
    # Get initial list of nodes
524
    self._nodes = dict((n.name, n.primary_ip)
525
                       for n in self.context.cfg.GetAllNodesInfo().values()
526
                       if n.master_candidate)
527

    
528
    # Remove master node
529
    try:
530
      del self._nodes[self._my_hostname]
531
    except KeyError:
532
      pass
533

    
534
    # TODO: Check consistency across nodes
535

    
536
    # Setup worker pool
537
    self._wpool = _JobQueueWorkerPool(self)
538
    try:
539
      # We need to lock here because WorkerPool.AddTask() may start a job while
540
      # we're still doing our work.
541
      self.acquire()
542
      try:
543
        logging.info("Inspecting job queue")
544

    
545
        all_job_ids = self._GetJobIDsUnlocked()
546
        jobs_count = len(all_job_ids)
547
        lastinfo = time.time()
548
        for idx, job_id in enumerate(all_job_ids):
549
          # Give an update every 1000 jobs or 10 seconds
550
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
551
              idx == (jobs_count - 1)):
552
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
553
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
554
            lastinfo = time.time()
555

    
556
          job = self._LoadJobUnlocked(job_id)
557

    
558
          # a failure in loading the job can cause 'None' to be returned
559
          if job is None:
560
            continue
561

    
562
          status = job.CalcStatus()
563

    
564
          if status in (constants.JOB_STATUS_QUEUED, ):
565
            self._wpool.AddTask(job)
566

    
567
          elif status in (constants.JOB_STATUS_RUNNING,
568
                          constants.JOB_STATUS_WAITLOCK,
569
                          constants.JOB_STATUS_CANCELING):
570
            logging.warning("Unfinished job %s found: %s", job.id, job)
571
            try:
572
              for op in job.ops:
573
                op.status = constants.OP_STATUS_ERROR
574
                op.result = "Unclean master daemon shutdown"
575
            finally:
576
              self.UpdateJobUnlocked(job)
577

    
578
        logging.info("Job queue inspection finished")
579
      finally:
580
        self.release()
581
    except:
582
      self._wpool.TerminateWorkers()
583
      raise
584

    
585
  @utils.LockedMethod
586
  @_RequireOpenQueue
587
  def AddNode(self, node):
588
    """Register a new node with the queue.
589

590
    @type node: L{objects.Node}
591
    @param node: the node object to be added
592

593
    """
594
    node_name = node.name
595
    assert node_name != self._my_hostname
596

    
597
    # Clean queue directory on added node
598
    rpc.RpcRunner.call_jobqueue_purge(node_name)
599

    
600
    if not node.master_candidate:
601
      # remove if existing, ignoring errors
602
      self._nodes.pop(node_name, None)
603
      # and skip the replication of the job ids
604
      return
605

    
606
    # Upload the whole queue excluding archived jobs
607
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
608

    
609
    # Upload current serial file
610
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
611

    
612
    for file_name in files:
613
      # Read file content
614
      fd = open(file_name, "r")
615
      try:
616
        content = fd.read()
617
      finally:
618
        fd.close()
619

    
620
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
621
                                                  [node.primary_ip],
622
                                                  file_name, content)
623
      if not result[node_name]:
624
        logging.error("Failed to upload %s to %s", file_name, node_name)
625

    
626
    self._nodes[node_name] = node.primary_ip
627

    
628
  @utils.LockedMethod
629
  @_RequireOpenQueue
630
  def RemoveNode(self, node_name):
631
    """Callback called when removing nodes from the cluster.
632

633
    @type node_name: str
634
    @param node_name: the name of the node to remove
635

636
    """
637
    try:
638
      # The queue is removed by the "leave node" RPC call.
639
      del self._nodes[node_name]
640
    except KeyError:
641
      pass
642

    
643
  def _CheckRpcResult(self, result, nodes, failmsg):
644
    """Verifies the status of an RPC call.
645

646
    Since we aim to keep consistency should this node (the current
647
    master) fail, we will log errors if our rpc fail, and especially
648
    log the case when more than half of the nodes failes.
649

650
    @param result: the data as returned from the rpc call
651
    @type nodes: list
652
    @param nodes: the list of nodes we made the call to
653
    @type failmsg: str
654
    @param failmsg: the identifier to be used for logging
655

656
    """
657
    failed = []
658
    success = []
659

    
660
    for node in nodes:
661
      if result[node]:
662
        success.append(node)
663
      else:
664
        failed.append(node)
665

    
666
    if failed:
667
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
668

    
669
    # +1 for the master node
670
    if (len(success) + 1) < len(failed):
671
      # TODO: Handle failing nodes
672
      logging.error("More than half of the nodes failed")
673

    
674
  def _GetNodeIp(self):
675
    """Helper for returning the node name/ip list.
676

677
    @rtype: (list, list)
678
    @return: a tuple of two lists, the first one with the node
679
        names and the second one with the node addresses
680

681
    """
682
    name_list = self._nodes.keys()
683
    addr_list = [self._nodes[name] for name in name_list]
684
    return name_list, addr_list
685

    
686
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
687
    """Writes a file locally and then replicates it to all nodes.
688

689
    This function will replace the contents of a file on the local
690
    node and then replicate it to all the other nodes we have.
691

692
    @type file_name: str
693
    @param file_name: the path of the file to be replicated
694
    @type data: str
695
    @param data: the new contents of the file
696

697
    """
698
    utils.WriteFile(file_name, data=data)
699

    
700
    names, addrs = self._GetNodeIp()
701
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
702
    self._CheckRpcResult(result, self._nodes,
703
                         "Updating %s" % file_name)
704

    
705
  def _RenameFileUnlocked(self, old, new):
706
    """Renames a file locally and then replicate the change.
707

708
    This function will rename a file in the local queue directory
709
    and then replicate this rename to all the other nodes we have.
710

711
    @type old: str
712
    @param old: the current name of the file
713
    @type new: str
714
    @param new: the new name of the file
715

716
    """
717
    os.rename(old, new)
718

    
719
    names, addrs = self._GetNodeIp()
720
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
721
    self._CheckRpcResult(result, self._nodes,
722
                         "Moving %s to %s" % (old, new))
723

    
724
  def _FormatJobID(self, job_id):
725
    """Convert a job ID to string format.
726

727
    Currently this just does C{str(job_id)} after performing some
728
    checks, but if we want to change the job id format this will
729
    abstract this change.
730

731
    @type job_id: int or long
732
    @param job_id: the numeric job id
733
    @rtype: str
734
    @return: the formatted job id
735

736
    """
737
    if not isinstance(job_id, (int, long)):
738
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
739
    if job_id < 0:
740
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
741

    
742
    return str(job_id)
743

    
744
  def _NewSerialUnlocked(self):
745
    """Generates a new job identifier.
746

747
    Job identifiers are unique during the lifetime of a cluster.
748

749
    @rtype: str
750
    @return: a string representing the job identifier.
751

752
    """
753
    # New number
754
    serial = self._last_serial + 1
755

    
756
    # Write to file
757
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
758
                                        "%s\n" % serial)
759

    
760
    # Keep it only if we were able to write the file
761
    self._last_serial = serial
762

    
763
    return self._FormatJobID(serial)
764

    
765
  @staticmethod
766
  def _GetJobPath(job_id):
767
    """Returns the job file for a given job id.
768

769
    @type job_id: str
770
    @param job_id: the job identifier
771
    @rtype: str
772
    @return: the path to the job file
773

774
    """
775
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
776

    
777
  @staticmethod
778
  def _GetArchivedJobPath(job_id):
779
    """Returns the archived job file for a give job id.
780

781
    @type job_id: str
782
    @param job_id: the job identifier
783
    @rtype: str
784
    @return: the path to the archived job file
785

786
    """
787
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
788

    
789
  @classmethod
790
  def _ExtractJobID(cls, name):
791
    """Extract the job id from a filename.
792

793
    @type name: str
794
    @param name: the job filename
795
    @rtype: job id or None
796
    @return: the job id corresponding to the given filename,
797
        or None if the filename does not represent a valid
798
        job file
799

800
    """
801
    m = cls._RE_JOB_FILE.match(name)
802
    if m:
803
      return m.group(1)
804
    else:
805
      return None
806

    
807
  def _GetJobIDsUnlocked(self, archived=False):
808
    """Return all known job IDs.
809

810
    If the parameter archived is True, archived jobs IDs will be
811
    included. Currently this argument is unused.
812

813
    The method only looks at disk because it's a requirement that all
814
    jobs are present on disk (so in the _memcache we don't have any
815
    extra IDs).
816

817
    @rtype: list
818
    @return: the list of job IDs
819

820
    """
821
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
822
    jlist = utils.NiceSort(jlist)
823
    return jlist
824

    
825
  def _ListJobFiles(self):
826
    """Returns the list of current job files.
827

828
    @rtype: list
829
    @return: the list of job file names
830

831
    """
832
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
833
            if self._RE_JOB_FILE.match(name)]
834

    
835
  def _LoadJobUnlocked(self, job_id):
836
    """Loads a job from the disk or memory.
837

838
    Given a job id, this will return the cached job object if
839
    existing, or try to load the job from the disk. If loading from
840
    disk, it will also add the job to the cache.
841

842
    @param job_id: the job id
843
    @rtype: L{_QueuedJob} or None
844
    @return: either None or the job object
845

846
    """
847
    job = self._memcache.get(job_id, None)
848
    if job:
849
      logging.debug("Found job %s in memcache", job_id)
850
      return job
851

    
852
    filepath = self._GetJobPath(job_id)
853
    logging.debug("Loading job from %s", filepath)
854
    try:
855
      fd = open(filepath, "r")
856
    except IOError, err:
857
      if err.errno in (errno.ENOENT, ):
858
        return None
859
      raise
860
    try:
861
      data = serializer.LoadJson(fd.read())
862
    finally:
863
      fd.close()
864

    
865
    try:
866
      job = _QueuedJob.Restore(self, data)
867
    except Exception, err:
868
      new_path = self._GetArchivedJobPath(job_id)
869
      if filepath == new_path:
870
        # job already archived (future case)
871
        logging.exception("Can't parse job %s", job_id)
872
      else:
873
        # non-archived case
874
        logging.exception("Can't parse job %s, will archive.", job_id)
875
        self._RenameFileUnlocked(filepath, new_path)
876
      return None
877

    
878
    self._memcache[job_id] = job
879
    logging.debug("Added job %s to the cache", job_id)
880
    return job
881

    
882
  def _GetJobsUnlocked(self, job_ids):
883
    """Return a list of jobs based on their IDs.
884

885
    @type job_ids: list
886
    @param job_ids: either an empty list (meaning all jobs),
887
        or a list of job IDs
888
    @rtype: list
889
    @return: the list of job objects
890

891
    """
892
    if not job_ids:
893
      job_ids = self._GetJobIDsUnlocked()
894

    
895
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
896

    
897
  @staticmethod
898
  def _IsQueueMarkedDrain():
899
    """Check if the queue is marked from drain.
900

901
    This currently uses the queue drain file, which makes it a
902
    per-node flag. In the future this can be moved to the config file.
903

904
    @rtype: boolean
905
    @return: True of the job queue is marked for draining
906

907
    """
908
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
909

    
910
  @staticmethod
911
  def SetDrainFlag(drain_flag):
912
    """Sets the drain flag for the queue.
913

914
    This is similar to the function L{backend.JobQueueSetDrainFlag},
915
    and in the future we might merge them.
916

917
    @type drain_flag: boolean
918
    @param drain_flag: wheter to set or unset the drain flag
919

920
    """
921
    if drain_flag:
922
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
923
    else:
924
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
925
    return True
926

    
927
  @utils.LockedMethod
928
  @_RequireOpenQueue
929
  def SubmitJob(self, ops):
930
    """Create and store a new job.
931

932
    This enters the job into our job queue and also puts it on the new
933
    queue, in order for it to be picked up by the queue processors.
934

935
    @type ops: list
936
    @param ops: The list of OpCodes that will become the new job.
937
    @rtype: job ID
938
    @return: the job ID of the newly created job
939
    @raise errors.JobQueueDrainError: if the job is marked for draining
940

941
    """
942
    if self._IsQueueMarkedDrain():
943
      raise errors.JobQueueDrainError()
944
    # Get job identifier
945
    job_id = self._NewSerialUnlocked()
946
    job = _QueuedJob(self, job_id, ops)
947

    
948
    # Write to disk
949
    self.UpdateJobUnlocked(job)
950

    
951
    logging.debug("Adding new job %s to the cache", job_id)
952
    self._memcache[job_id] = job
953

    
954
    # Add to worker pool
955
    self._wpool.AddTask(job)
956

    
957
    return job.id
958

    
959
  @_RequireOpenQueue
960
  def UpdateJobUnlocked(self, job):
961
    """Update a job's on disk storage.
962

963
    After a job has been modified, this function needs to be called in
964
    order to write the changes to disk and replicate them to the other
965
    nodes.
966

967
    @type job: L{_QueuedJob}
968
    @param job: the changed job
969

970
    """
971
    filename = self._GetJobPath(job.id)
972
    data = serializer.DumpJson(job.Serialize(), indent=False)
973
    logging.debug("Writing job %s to %s", job.id, filename)
974
    self._WriteAndReplicateFileUnlocked(filename, data)
975

    
976
    # Notify waiters about potential changes
977
    job.change.notifyAll()
978

    
979
  @utils.LockedMethod
980
  @_RequireOpenQueue
981
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
982
                        timeout):
983
    """Waits for changes in a job.
984

985
    @type job_id: string
986
    @param job_id: Job identifier
987
    @type fields: list of strings
988
    @param fields: Which fields to check for changes
989
    @type prev_job_info: list or None
990
    @param prev_job_info: Last job information returned
991
    @type prev_log_serial: int
992
    @param prev_log_serial: Last job message serial number
993
    @type timeout: float
994
    @param timeout: maximum time to wait
995
    @rtype: tuple (job info, log entries)
996
    @return: a tuple of the job information as required via
997
        the fields parameter, and the log entries as a list
998

999
        if the job has not changed and the timeout has expired,
1000
        we instead return a special value,
1001
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1002
        as such by the clients
1003

1004
    """
1005
    logging.debug("Waiting for changes in job %s", job_id)
1006
    end_time = time.time() + timeout
1007
    while True:
1008
      delta_time = end_time - time.time()
1009
      if delta_time < 0:
1010
        return constants.JOB_NOTCHANGED
1011

    
1012
      job = self._LoadJobUnlocked(job_id)
1013
      if not job:
1014
        logging.debug("Job %s not found", job_id)
1015
        break
1016

    
1017
      status = job.CalcStatus()
1018
      job_info = self._GetJobInfoUnlocked(job, fields)
1019
      log_entries = job.GetLogEntries(prev_log_serial)
1020

    
1021
      # Serializing and deserializing data can cause type changes (e.g. from
1022
      # tuple to list) or precision loss. We're doing it here so that we get
1023
      # the same modifications as the data received from the client. Without
1024
      # this, the comparison afterwards might fail without the data being
1025
      # significantly different.
1026
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1027
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1028

    
1029
      if status not in (constants.JOB_STATUS_QUEUED,
1030
                        constants.JOB_STATUS_RUNNING,
1031
                        constants.JOB_STATUS_WAITLOCK):
1032
        # Don't even try to wait if the job is no longer running, there will be
1033
        # no changes.
1034
        break
1035

    
1036
      if (prev_job_info != job_info or
1037
          (log_entries and prev_log_serial != log_entries[0][0])):
1038
        break
1039

    
1040
      logging.debug("Waiting again")
1041

    
1042
      # Release the queue lock while waiting
1043
      job.change.wait(delta_time)
1044

    
1045
    logging.debug("Job %s changed", job_id)
1046

    
1047
    return (job_info, log_entries)
1048

    
1049
  @utils.LockedMethod
1050
  @_RequireOpenQueue
1051
  def CancelJob(self, job_id):
1052
    """Cancels a job.
1053

1054
    This will only succeed if the job has not started yet.
1055

1056
    @type job_id: string
1057
    @param job_id: job ID of job to be cancelled.
1058

1059
    """
1060
    logging.info("Cancelling job %s", job_id)
1061

    
1062
    job = self._LoadJobUnlocked(job_id)
1063
    if not job:
1064
      logging.debug("Job %s not found", job_id)
1065
      return (False, "Job %s not found" % job_id)
1066

    
1067
    job_status = job.CalcStatus()
1068

    
1069
    if job_status not in (constants.JOB_STATUS_QUEUED,
1070
                          constants.JOB_STATUS_WAITLOCK):
1071
      logging.debug("Job %s is no longer in the queue", job.id)
1072
      return (False, "Job %s is no longer in the queue" % job.id)
1073

    
1074
    if job_status == constants.JOB_STATUS_QUEUED:
1075
      self.CancelJobUnlocked(job)
1076
      return (True, "Job %s canceled" % job.id)
1077

    
1078
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1079
      # The worker will notice the new status and cancel the job
1080
      try:
1081
        for op in job.ops:
1082
          op.status = constants.OP_STATUS_CANCELING
1083
      finally:
1084
        self.UpdateJobUnlocked(job)
1085
      return (True, "Job %s will be canceled" % job.id)
1086

    
1087
  @_RequireOpenQueue
1088
  def CancelJobUnlocked(self, job):
1089
    """Marks a job as canceled.
1090

1091
    """
1092
    try:
1093
      for op in job.ops:
1094
        op.status = constants.OP_STATUS_ERROR
1095
        op.result = "Job canceled by request"
1096
    finally:
1097
      self.UpdateJobUnlocked(job)
1098

    
1099
  @_RequireOpenQueue
1100
  def _ArchiveJobUnlocked(self, job_id):
1101
    """Archives a job.
1102

1103
    @type job_id: string
1104
    @param job_id: the ID of job to be archived
1105

1106
    """
1107
    logging.info("Archiving job %s", job_id)
1108

    
1109
    job = self._LoadJobUnlocked(job_id)
1110
    if not job:
1111
      logging.debug("Job %s not found", job_id)
1112
      return
1113

    
1114
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1115
                                constants.JOB_STATUS_SUCCESS,
1116
                                constants.JOB_STATUS_ERROR):
1117
      logging.debug("Job %s is not yet done", job.id)
1118
      return
1119

    
1120
    old = self._GetJobPath(job.id)
1121
    new = self._GetArchivedJobPath(job.id)
1122

    
1123
    self._RenameFileUnlocked(old, new)
1124

    
1125
    logging.debug("Successfully archived job %s", job.id)
1126

    
1127
  @utils.LockedMethod
1128
  @_RequireOpenQueue
1129
  def ArchiveJob(self, job_id):
1130
    """Archives a job.
1131

1132
    This is just a wrapper over L{_ArchiveJobUnlocked}.
1133

1134
    @type job_id: string
1135
    @param job_id: Job ID of job to be archived.
1136

1137
    """
1138
    return self._ArchiveJobUnlocked(job_id)
1139

    
1140
  @utils.LockedMethod
1141
  @_RequireOpenQueue
1142
  def AutoArchiveJobs(self, age):
1143
    """Archives all jobs based on age.
1144

1145
    The method will archive all jobs which are older than the age
1146
    parameter. For jobs that don't have an end timestamp, the start
1147
    timestamp will be considered. The special '-1' age will cause
1148
    archival of all jobs (that are not running or queued).
1149

1150
    @type age: int
1151
    @param age: the minimum age in seconds
1152

1153
    """
1154
    logging.info("Archiving jobs with age more than %s seconds", age)
1155

    
1156
    now = time.time()
1157
    for jid in self._GetJobIDsUnlocked(archived=False):
1158
      job = self._LoadJobUnlocked(jid)
1159
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1160
                                  constants.OP_STATUS_ERROR,
1161
                                  constants.OP_STATUS_CANCELED):
1162
        continue
1163
      if job.end_timestamp is None:
1164
        if job.start_timestamp is None:
1165
          job_age = job.received_timestamp
1166
        else:
1167
          job_age = job.start_timestamp
1168
      else:
1169
        job_age = job.end_timestamp
1170

    
1171
      if age == -1 or now - job_age[0] > age:
1172
        self._ArchiveJobUnlocked(jid)
1173

    
1174
  def _GetJobInfoUnlocked(self, job, fields):
1175
    """Returns information about a job.
1176

1177
    @type job: L{_QueuedJob}
1178
    @param job: the job which we query
1179
    @type fields: list
1180
    @param fields: names of fields to return
1181
    @rtype: list
1182
    @return: list with one element for each field
1183
    @raise errors.OpExecError: when an invalid field
1184
        has been passed
1185

1186
    """
1187
    row = []
1188
    for fname in fields:
1189
      if fname == "id":
1190
        row.append(job.id)
1191
      elif fname == "status":
1192
        row.append(job.CalcStatus())
1193
      elif fname == "ops":
1194
        row.append([op.input.__getstate__() for op in job.ops])
1195
      elif fname == "opresult":
1196
        row.append([op.result for op in job.ops])
1197
      elif fname == "opstatus":
1198
        row.append([op.status for op in job.ops])
1199
      elif fname == "oplog":
1200
        row.append([op.log for op in job.ops])
1201
      elif fname == "opstart":
1202
        row.append([op.start_timestamp for op in job.ops])
1203
      elif fname == "opend":
1204
        row.append([op.end_timestamp for op in job.ops])
1205
      elif fname == "received_ts":
1206
        row.append(job.received_timestamp)
1207
      elif fname == "start_ts":
1208
        row.append(job.start_timestamp)
1209
      elif fname == "end_ts":
1210
        row.append(job.end_timestamp)
1211
      elif fname == "summary":
1212
        row.append([op.input.Summary() for op in job.ops])
1213
      else:
1214
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1215
    return row
1216

    
1217
  @utils.LockedMethod
1218
  @_RequireOpenQueue
1219
  def QueryJobs(self, job_ids, fields):
1220
    """Returns a list of jobs in queue.
1221

1222
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1223
    processing for each job.
1224

1225
    @type job_ids: list
1226
    @param job_ids: sequence of job identifiers or None for all
1227
    @type fields: list
1228
    @param fields: names of fields to return
1229
    @rtype: list
1230
    @return: list one element per job, each element being list with
1231
        the requested fields
1232

1233
    """
1234
    jobs = []
1235

    
1236
    for job in self._GetJobsUnlocked(job_ids):
1237
      if job is None:
1238
        jobs.append(None)
1239
      else:
1240
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1241

    
1242
    return jobs
1243

    
1244
  @utils.LockedMethod
1245
  @_RequireOpenQueue
1246
  def Shutdown(self):
1247
    """Stops the job queue.
1248

1249
    This shutdowns all the worker threads an closes the queue.
1250

1251
    """
1252
    self._wpool.TerminateWorkers()
1253

    
1254
    self._queue_lock.Close()
1255
    self._queue_lock = None