Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ b7cb9024

History | View | Annotate | Download (35.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52

    
53

    
54
class CancelJob:
55
  """Special exception to cancel a job.
56

57
  """
58

    
59

    
60
def TimeStampNow():
61
  """Returns the current timestamp.
62

63
  @rtype: tuple
64
  @return: the current time in the (seconds, microseconds) format
65

66
  """
67
  return utils.SplitTime(time.time())
68

    
69

    
70
class _QueuedOpCode(object):
71
  """Encasulates an opcode object.
72

73
  @ivar log: holds the execution log and consists of tuples
74
  of the form C{(log_serial, timestamp, level, message)}
75
  @ivar input: the OpCode we encapsulate
76
  @ivar status: the current status
77
  @ivar result: the result of the LU execution
78
  @ivar start_timestamp: timestamp for the start of the execution
79
  @ivar stop_timestamp: timestamp for the end of the execution
80

81
  """
82
  def __init__(self, op):
83
    """Constructor for the _QuededOpCode.
84

85
    @type op: L{opcodes.OpCode}
86
    @param op: the opcode we encapsulate
87

88
    """
89
    self.input = op
90
    self.status = constants.OP_STATUS_QUEUED
91
    self.result = None
92
    self.log = []
93
    self.start_timestamp = None
94
    self.end_timestamp = None
95

    
96
  @classmethod
97
  def Restore(cls, state):
98
    """Restore the _QueuedOpCode from the serialized form.
99

100
    @type state: dict
101
    @param state: the serialized state
102
    @rtype: _QueuedOpCode
103
    @return: a new _QueuedOpCode instance
104

105
    """
106
    obj = _QueuedOpCode.__new__(cls)
107
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
108
    obj.status = state["status"]
109
    obj.result = state["result"]
110
    obj.log = state["log"]
111
    obj.start_timestamp = state.get("start_timestamp", None)
112
    obj.end_timestamp = state.get("end_timestamp", None)
113
    return obj
114

    
115
  def Serialize(self):
116
    """Serializes this _QueuedOpCode.
117

118
    @rtype: dict
119
    @return: the dictionary holding the serialized state
120

121
    """
122
    return {
123
      "input": self.input.__getstate__(),
124
      "status": self.status,
125
      "result": self.result,
126
      "log": self.log,
127
      "start_timestamp": self.start_timestamp,
128
      "end_timestamp": self.end_timestamp,
129
      }
130

    
131

    
132
class _QueuedJob(object):
133
  """In-memory job representation.
134

135
  This is what we use to track the user-submitted jobs. Locking must
136
  be taken care of by users of this class.
137

138
  @type queue: L{JobQueue}
139
  @ivar queue: the parent queue
140
  @ivar id: the job ID
141
  @type ops: list
142
  @ivar ops: the list of _QueuedOpCode that constitute the job
143
  @type run_op_index: int
144
  @ivar run_op_index: the currently executing opcode, or -1 if
145
      we didn't yet start executing
146
  @type log_serial: int
147
  @ivar log_serial: holds the index for the next log entry
148
  @ivar received_timestamp: the timestamp for when the job was received
149
  @ivar start_timestmap: the timestamp for start of execution
150
  @ivar end_timestamp: the timestamp for end of execution
151
  @ivar change: a Condition variable we use for waiting for job changes
152

153
  """
154
  def __init__(self, queue, job_id, ops):
155
    """Constructor for the _QueuedJob.
156

157
    @type queue: L{JobQueue}
158
    @param queue: our parent queue
159
    @type job_id: job_id
160
    @param job_id: our job id
161
    @type ops: list
162
    @param ops: the list of opcodes we hold, which will be encapsulated
163
        in _QueuedOpCodes
164

165
    """
166
    if not ops:
167
      # TODO: use a better exception
168
      raise Exception("No opcodes")
169

    
170
    self.queue = queue
171
    self.id = job_id
172
    self.ops = [_QueuedOpCode(op) for op in ops]
173
    self.run_op_index = -1
174
    self.log_serial = 0
175
    self.received_timestamp = TimeStampNow()
176
    self.start_timestamp = None
177
    self.end_timestamp = None
178

    
179
    # Condition to wait for changes
180
    self.change = threading.Condition(self.queue._lock)
181

    
182
  @classmethod
183
  def Restore(cls, queue, state):
184
    """Restore a _QueuedJob from serialized state:
185

186
    @type queue: L{JobQueue}
187
    @param queue: to which queue the restored job belongs
188
    @type state: dict
189
    @param state: the serialized state
190
    @rtype: _JobQueue
191
    @return: the restored _JobQueue instance
192

193
    """
194
    obj = _QueuedJob.__new__(cls)
195
    obj.queue = queue
196
    obj.id = state["id"]
197
    obj.run_op_index = state["run_op_index"]
198
    obj.received_timestamp = state.get("received_timestamp", None)
199
    obj.start_timestamp = state.get("start_timestamp", None)
200
    obj.end_timestamp = state.get("end_timestamp", None)
201

    
202
    obj.ops = []
203
    obj.log_serial = 0
204
    for op_state in state["ops"]:
205
      op = _QueuedOpCode.Restore(op_state)
206
      for log_entry in op.log:
207
        obj.log_serial = max(obj.log_serial, log_entry[0])
208
      obj.ops.append(op)
209

    
210
    # Condition to wait for changes
211
    obj.change = threading.Condition(obj.queue._lock)
212

    
213
    return obj
214

    
215
  def Serialize(self):
216
    """Serialize the _JobQueue instance.
217

218
    @rtype: dict
219
    @return: the serialized state
220

221
    """
222
    return {
223
      "id": self.id,
224
      "ops": [op.Serialize() for op in self.ops],
225
      "run_op_index": self.run_op_index,
226
      "start_timestamp": self.start_timestamp,
227
      "end_timestamp": self.end_timestamp,
228
      "received_timestamp": self.received_timestamp,
229
      }
230

    
231
  def CalcStatus(self):
232
    """Compute the status of this job.
233

234
    This function iterates over all the _QueuedOpCodes in the job and
235
    based on their status, computes the job status.
236

237
    The algorithm is:
238
      - if we find a cancelled, or finished with error, the job
239
        status will be the same
240
      - otherwise, the last opcode with the status one of:
241
          - waitlock
242
          - canceling
243
          - running
244

245
        will determine the job status
246

247
      - otherwise, it means either all opcodes are queued, or success,
248
        and the job status will be the same
249

250
    @return: the job status
251

252
    """
253
    status = constants.JOB_STATUS_QUEUED
254

    
255
    all_success = True
256
    for op in self.ops:
257
      if op.status == constants.OP_STATUS_SUCCESS:
258
        continue
259

    
260
      all_success = False
261

    
262
      if op.status == constants.OP_STATUS_QUEUED:
263
        pass
264
      elif op.status == constants.OP_STATUS_WAITLOCK:
265
        status = constants.JOB_STATUS_WAITLOCK
266
      elif op.status == constants.OP_STATUS_RUNNING:
267
        status = constants.JOB_STATUS_RUNNING
268
      elif op.status == constants.OP_STATUS_CANCELING:
269
        status = constants.JOB_STATUS_CANCELING
270
        break
271
      elif op.status == constants.OP_STATUS_ERROR:
272
        status = constants.JOB_STATUS_ERROR
273
        # The whole job fails if one opcode failed
274
        break
275
      elif op.status == constants.OP_STATUS_CANCELED:
276
        status = constants.OP_STATUS_CANCELED
277
        break
278

    
279
    if all_success:
280
      status = constants.JOB_STATUS_SUCCESS
281

    
282
    return status
283

    
284
  def GetLogEntries(self, newer_than):
285
    """Selectively returns the log entries.
286

287
    @type newer_than: None or int
288
    @param newer_than: if this is None, return all log enties,
289
        otherwise return only the log entries with serial higher
290
        than this value
291
    @rtype: list
292
    @return: the list of the log entries selected
293

294
    """
295
    if newer_than is None:
296
      serial = -1
297
    else:
298
      serial = newer_than
299

    
300
    entries = []
301
    for op in self.ops:
302
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
303

    
304
    return entries
305

    
306

    
307
class _JobQueueWorker(workerpool.BaseWorker):
308
  """The actual job workers.
309

310
  """
311
  def _NotifyStart(self):
312
    """Mark the opcode as running, not lock-waiting.
313

314
    This is called from the mcpu code as a notifier function, when the
315
    LU is finally about to start the Exec() method. Of course, to have
316
    end-user visible results, the opcode must be initially (before
317
    calling into Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
318

319
    """
320
    assert self.queue, "Queue attribute is missing"
321
    assert self.opcode, "Opcode attribute is missing"
322

    
323
    self.queue.acquire()
324
    try:
325
      assert self.opcode.status in (constants.OP_STATUS_WAITLOCK,
326
                                    constants.OP_STATUS_CANCELING)
327

    
328
      # Cancel here if we were asked to
329
      if self.opcode.status == constants.OP_STATUS_CANCELING:
330
        raise CancelJob()
331

    
332
      self.opcode.status = constants.OP_STATUS_RUNNING
333
    finally:
334
      self.queue.release()
335

    
336
  def RunTask(self, job):
337
    """Job executor.
338

339
    This functions processes a job. It is closely tied to the _QueuedJob and
340
    _QueuedOpCode classes.
341

342
    @type job: L{_QueuedJob}
343
    @param job: the job to be processed
344

345
    """
346
    logging.debug("Worker %s processing job %s",
347
                  self.worker_id, job.id)
348
    proc = mcpu.Processor(self.pool.queue.context)
349
    self.queue = queue = job.queue
350
    try:
351
      try:
352
        count = len(job.ops)
353
        for idx, op in enumerate(job.ops):
354
          try:
355
            logging.debug("Op %s/%s: Starting %s", idx + 1, count, op)
356

    
357
            queue.acquire()
358
            try:
359
              assert op.status == constants.OP_STATUS_QUEUED
360
              job.run_op_index = idx
361
              op.status = constants.OP_STATUS_WAITLOCK
362
              op.result = None
363
              op.start_timestamp = TimeStampNow()
364
              if idx == 0: # first opcode
365
                job.start_timestamp = op.start_timestamp
366
              queue.UpdateJobUnlocked(job)
367

    
368
              input_opcode = op.input
369
            finally:
370
              queue.release()
371

    
372
            def _Log(*args):
373
              """Append a log entry.
374

375
              """
376
              assert len(args) < 3
377

    
378
              if len(args) == 1:
379
                log_type = constants.ELOG_MESSAGE
380
                log_msg = args[0]
381
              else:
382
                log_type, log_msg = args
383

    
384
              # The time is split to make serialization easier and not lose
385
              # precision.
386
              timestamp = utils.SplitTime(time.time())
387

    
388
              queue.acquire()
389
              try:
390
                job.log_serial += 1
391
                op.log.append((job.log_serial, timestamp, log_type, log_msg))
392

    
393
                job.change.notifyAll()
394
              finally:
395
                queue.release()
396

    
397
            # Make sure not to hold lock while _Log is called
398
            self.opcode = op
399
            result = proc.ExecOpCode(input_opcode, _Log, self._NotifyStart)
400

    
401
            queue.acquire()
402
            try:
403
              op.status = constants.OP_STATUS_SUCCESS
404
              op.result = result
405
              op.end_timestamp = TimeStampNow()
406
              queue.UpdateJobUnlocked(job)
407
            finally:
408
              queue.release()
409

    
410
            logging.debug("Op %s/%s: Successfully finished %s",
411
                          idx + 1, count, op)
412
          except CancelJob:
413
            # Will be handled further up
414
            raise
415
          except Exception, err:
416
            queue.acquire()
417
            try:
418
              try:
419
                op.status = constants.OP_STATUS_ERROR
420
                op.result = str(err)
421
                op.end_timestamp = TimeStampNow()
422
                logging.debug("Op %s/%s: Error in %s", idx + 1, count, op)
423
              finally:
424
                queue.UpdateJobUnlocked(job)
425
            finally:
426
              queue.release()
427
            raise
428

    
429
      except CancelJob:
430
        queue.acquire()
431
        try:
432
          queue.CancelJobUnlocked(job)
433
        finally:
434
          queue.release()
435
      except errors.GenericError, err:
436
        logging.exception("Ganeti exception")
437
      except:
438
        logging.exception("Unhandled exception")
439
    finally:
440
      queue.acquire()
441
      try:
442
        try:
443
          job.run_op_idx = -1
444
          job.end_timestamp = TimeStampNow()
445
          queue.UpdateJobUnlocked(job)
446
        finally:
447
          job_id = job.id
448
          status = job.CalcStatus()
449
      finally:
450
        queue.release()
451
      logging.debug("Worker %s finished job %s, status = %s",
452
                    self.worker_id, job_id, status)
453

    
454

    
455
class _JobQueueWorkerPool(workerpool.WorkerPool):
456
  """Simple class implementing a job-processing workerpool.
457

458
  """
459
  def __init__(self, queue):
460
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
461
                                              _JobQueueWorker)
462
    self.queue = queue
463

    
464

    
465
class JobQueue(object):
466
  """Quue used to manaage the jobs.
467

468
  @cvar _RE_JOB_FILE: regex matching the valid job file names
469

470
  """
471
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
472

    
473
  def _RequireOpenQueue(fn):
474
    """Decorator for "public" functions.
475

476
    This function should be used for all 'public' functions. That is,
477
    functions usually called from other classes.
478

479
    @warning: Use this decorator only after utils.LockedMethod!
480

481
    Example::
482
      @utils.LockedMethod
483
      @_RequireOpenQueue
484
      def Example(self):
485
        pass
486

487
    """
488
    def wrapper(self, *args, **kwargs):
489
      assert self._queue_lock is not None, "Queue should be open"
490
      return fn(self, *args, **kwargs)
491
    return wrapper
492

    
493
  def __init__(self, context):
494
    """Constructor for JobQueue.
495

496
    The constructor will initialize the job queue object and then
497
    start loading the current jobs from disk, either for starting them
498
    (if they were queue) or for aborting them (if they were already
499
    running).
500

501
    @type context: GanetiContext
502
    @param context: the context object for access to the configuration
503
        data and other ganeti objects
504

505
    """
506
    self.context = context
507
    self._memcache = weakref.WeakValueDictionary()
508
    self._my_hostname = utils.HostInfo().name
509

    
510
    # Locking
511
    self._lock = threading.Lock()
512
    self.acquire = self._lock.acquire
513
    self.release = self._lock.release
514

    
515
    # Initialize
516
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
517

    
518
    # Read serial file
519
    self._last_serial = jstore.ReadSerial()
520
    assert self._last_serial is not None, ("Serial file was modified between"
521
                                           " check in jstore and here")
522

    
523
    # Get initial list of nodes
524
    self._nodes = dict((n.name, n.primary_ip)
525
                       for n in self.context.cfg.GetAllNodesInfo().values())
526

    
527
    # Remove master node
528
    try:
529
      del self._nodes[self._my_hostname]
530
    except KeyError:
531
      pass
532

    
533
    # TODO: Check consistency across nodes
534

    
535
    # Setup worker pool
536
    self._wpool = _JobQueueWorkerPool(self)
537
    try:
538
      # We need to lock here because WorkerPool.AddTask() may start a job while
539
      # we're still doing our work.
540
      self.acquire()
541
      try:
542
        logging.info("Inspecting job queue")
543

    
544
        all_job_ids = self._GetJobIDsUnlocked()
545
        jobs_count = len(all_job_ids)
546
        lastinfo = time.time()
547
        for idx, job_id in enumerate(all_job_ids):
548
          # Give an update every 1000 jobs or 10 seconds
549
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
550
              idx == (jobs_count - 1)):
551
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
552
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
553
            lastinfo = time.time()
554

    
555
          job = self._LoadJobUnlocked(job_id)
556

    
557
          # a failure in loading the job can cause 'None' to be returned
558
          if job is None:
559
            continue
560

    
561
          status = job.CalcStatus()
562

    
563
          if status in (constants.JOB_STATUS_QUEUED, ):
564
            self._wpool.AddTask(job)
565

    
566
          elif status in (constants.JOB_STATUS_RUNNING,
567
                          constants.JOB_STATUS_WAITLOCK,
568
                          constants.JOB_STATUS_CANCELING):
569
            logging.warning("Unfinished job %s found: %s", job.id, job)
570
            try:
571
              for op in job.ops:
572
                op.status = constants.OP_STATUS_ERROR
573
                op.result = "Unclean master daemon shutdown"
574
            finally:
575
              self.UpdateJobUnlocked(job)
576

    
577
        logging.info("Job queue inspection finished")
578
      finally:
579
        self.release()
580
    except:
581
      self._wpool.TerminateWorkers()
582
      raise
583

    
584
  @utils.LockedMethod
585
  @_RequireOpenQueue
586
  def AddNode(self, node):
587
    """Register a new node with the queue.
588

589
    @type node: L{objects.Node}
590
    @param node: the node object to be added
591

592
    """
593
    node_name = node.name
594
    assert node_name != self._my_hostname
595

    
596
    # Clean queue directory on added node
597
    rpc.RpcRunner.call_jobqueue_purge(node_name)
598

    
599
    # Upload the whole queue excluding archived jobs
600
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
601

    
602
    # Upload current serial file
603
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
604

    
605
    for file_name in files:
606
      # Read file content
607
      fd = open(file_name, "r")
608
      try:
609
        content = fd.read()
610
      finally:
611
        fd.close()
612

    
613
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
614
                                                  [node.primary_ip],
615
                                                  file_name, content)
616
      if not result[node_name]:
617
        logging.error("Failed to upload %s to %s", file_name, node_name)
618

    
619
    self._nodes[node_name] = node.primary_ip
620

    
621
  @utils.LockedMethod
622
  @_RequireOpenQueue
623
  def RemoveNode(self, node_name):
624
    """Callback called when removing nodes from the cluster.
625

626
    @type node_name: str
627
    @param node_name: the name of the node to remove
628

629
    """
630
    try:
631
      # The queue is removed by the "leave node" RPC call.
632
      del self._nodes[node_name]
633
    except KeyError:
634
      pass
635

    
636
  def _CheckRpcResult(self, result, nodes, failmsg):
637
    """Verifies the status of an RPC call.
638

639
    Since we aim to keep consistency should this node (the current
640
    master) fail, we will log errors if our rpc fail, and especially
641
    log the case when more than half of the nodes failes.
642

643
    @param result: the data as returned from the rpc call
644
    @type nodes: list
645
    @param nodes: the list of nodes we made the call to
646
    @type failmsg: str
647
    @param failmsg: the identifier to be used for logging
648

649
    """
650
    failed = []
651
    success = []
652

    
653
    for node in nodes:
654
      if result[node]:
655
        success.append(node)
656
      else:
657
        failed.append(node)
658

    
659
    if failed:
660
      logging.error("%s failed on %s", failmsg, ", ".join(failed))
661

    
662
    # +1 for the master node
663
    if (len(success) + 1) < len(failed):
664
      # TODO: Handle failing nodes
665
      logging.error("More than half of the nodes failed")
666

    
667
  def _GetNodeIp(self):
668
    """Helper for returning the node name/ip list.
669

670
    @rtype: (list, list)
671
    @return: a tuple of two lists, the first one with the node
672
        names and the second one with the node addresses
673

674
    """
675
    name_list = self._nodes.keys()
676
    addr_list = [self._nodes[name] for name in name_list]
677
    return name_list, addr_list
678

    
679
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
680
    """Writes a file locally and then replicates it to all nodes.
681

682
    This function will replace the contents of a file on the local
683
    node and then replicate it to all the other nodes we have.
684

685
    @type file_name: str
686
    @param file_name: the path of the file to be replicated
687
    @type data: str
688
    @param data: the new contents of the file
689

690
    """
691
    utils.WriteFile(file_name, data=data)
692

    
693
    names, addrs = self._GetNodeIp()
694
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
695
    self._CheckRpcResult(result, self._nodes,
696
                         "Updating %s" % file_name)
697

    
698
  def _RenameFileUnlocked(self, old, new):
699
    """Renames a file locally and then replicate the change.
700

701
    This function will rename a file in the local queue directory
702
    and then replicate this rename to all the other nodes we have.
703

704
    @type old: str
705
    @param old: the current name of the file
706
    @type new: str
707
    @param new: the new name of the file
708

709
    """
710
    os.rename(old, new)
711

    
712
    names, addrs = self._GetNodeIp()
713
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, old, new)
714
    self._CheckRpcResult(result, self._nodes,
715
                         "Moving %s to %s" % (old, new))
716

    
717
  def _FormatJobID(self, job_id):
718
    """Convert a job ID to string format.
719

720
    Currently this just does C{str(job_id)} after performing some
721
    checks, but if we want to change the job id format this will
722
    abstract this change.
723

724
    @type job_id: int or long
725
    @param job_id: the numeric job id
726
    @rtype: str
727
    @return: the formatted job id
728

729
    """
730
    if not isinstance(job_id, (int, long)):
731
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
732
    if job_id < 0:
733
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
734

    
735
    return str(job_id)
736

    
737
  def _NewSerialUnlocked(self):
738
    """Generates a new job identifier.
739

740
    Job identifiers are unique during the lifetime of a cluster.
741

742
    @rtype: str
743
    @return: a string representing the job identifier.
744

745
    """
746
    # New number
747
    serial = self._last_serial + 1
748

    
749
    # Write to file
750
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
751
                                        "%s\n" % serial)
752

    
753
    # Keep it only if we were able to write the file
754
    self._last_serial = serial
755

    
756
    return self._FormatJobID(serial)
757

    
758
  @staticmethod
759
  def _GetJobPath(job_id):
760
    """Returns the job file for a given job id.
761

762
    @type job_id: str
763
    @param job_id: the job identifier
764
    @rtype: str
765
    @return: the path to the job file
766

767
    """
768
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
769

    
770
  @staticmethod
771
  def _GetArchivedJobPath(job_id):
772
    """Returns the archived job file for a give job id.
773

774
    @type job_id: str
775
    @param job_id: the job identifier
776
    @rtype: str
777
    @return: the path to the archived job file
778

779
    """
780
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, "job-%s" % job_id)
781

    
782
  @classmethod
783
  def _ExtractJobID(cls, name):
784
    """Extract the job id from a filename.
785

786
    @type name: str
787
    @param name: the job filename
788
    @rtype: job id or None
789
    @return: the job id corresponding to the given filename,
790
        or None if the filename does not represent a valid
791
        job file
792

793
    """
794
    m = cls._RE_JOB_FILE.match(name)
795
    if m:
796
      return m.group(1)
797
    else:
798
      return None
799

    
800
  def _GetJobIDsUnlocked(self, archived=False):
801
    """Return all known job IDs.
802

803
    If the parameter archived is True, archived jobs IDs will be
804
    included. Currently this argument is unused.
805

806
    The method only looks at disk because it's a requirement that all
807
    jobs are present on disk (so in the _memcache we don't have any
808
    extra IDs).
809

810
    @rtype: list
811
    @return: the list of job IDs
812

813
    """
814
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
815
    jlist = utils.NiceSort(jlist)
816
    return jlist
817

    
818
  def _ListJobFiles(self):
819
    """Returns the list of current job files.
820

821
    @rtype: list
822
    @return: the list of job file names
823

824
    """
825
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
826
            if self._RE_JOB_FILE.match(name)]
827

    
828
  def _LoadJobUnlocked(self, job_id):
829
    """Loads a job from the disk or memory.
830

831
    Given a job id, this will return the cached job object if
832
    existing, or try to load the job from the disk. If loading from
833
    disk, it will also add the job to the cache.
834

835
    @param job_id: the job id
836
    @rtype: L{_QueuedJob} or None
837
    @return: either None or the job object
838

839
    """
840
    job = self._memcache.get(job_id, None)
841
    if job:
842
      logging.debug("Found job %s in memcache", job_id)
843
      return job
844

    
845
    filepath = self._GetJobPath(job_id)
846
    logging.debug("Loading job from %s", filepath)
847
    try:
848
      fd = open(filepath, "r")
849
    except IOError, err:
850
      if err.errno in (errno.ENOENT, ):
851
        return None
852
      raise
853
    try:
854
      data = serializer.LoadJson(fd.read())
855
    finally:
856
      fd.close()
857

    
858
    try:
859
      job = _QueuedJob.Restore(self, data)
860
    except Exception, err:
861
      new_path = self._GetArchivedJobPath(job_id)
862
      if filepath == new_path:
863
        # job already archived (future case)
864
        logging.exception("Can't parse job %s", job_id)
865
      else:
866
        # non-archived case
867
        logging.exception("Can't parse job %s, will archive.", job_id)
868
        self._RenameFileUnlocked(filepath, new_path)
869
      return None
870

    
871
    self._memcache[job_id] = job
872
    logging.debug("Added job %s to the cache", job_id)
873
    return job
874

    
875
  def _GetJobsUnlocked(self, job_ids):
876
    """Return a list of jobs based on their IDs.
877

878
    @type job_ids: list
879
    @param job_ids: either an empty list (meaning all jobs),
880
        or a list of job IDs
881
    @rtype: list
882
    @return: the list of job objects
883

884
    """
885
    if not job_ids:
886
      job_ids = self._GetJobIDsUnlocked()
887

    
888
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
889

    
890
  @staticmethod
891
  def _IsQueueMarkedDrain():
892
    """Check if the queue is marked from drain.
893

894
    This currently uses the queue drain file, which makes it a
895
    per-node flag. In the future this can be moved to the config file.
896

897
    @rtype: boolean
898
    @return: True of the job queue is marked for draining
899

900
    """
901
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
902

    
903
  @staticmethod
904
  def SetDrainFlag(drain_flag):
905
    """Sets the drain flag for the queue.
906

907
    This is similar to the function L{backend.JobQueueSetDrainFlag},
908
    and in the future we might merge them.
909

910
    @type drain_flag: boolean
911
    @param drain_flag: wheter to set or unset the drain flag
912

913
    """
914
    if drain_flag:
915
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
916
    else:
917
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
918
    return True
919

    
920
  @utils.LockedMethod
921
  @_RequireOpenQueue
922
  def SubmitJob(self, ops):
923
    """Create and store a new job.
924

925
    This enters the job into our job queue and also puts it on the new
926
    queue, in order for it to be picked up by the queue processors.
927

928
    @type ops: list
929
    @param ops: The list of OpCodes that will become the new job.
930
    @rtype: job ID
931
    @return: the job ID of the newly created job
932
    @raise errors.JobQueueDrainError: if the job is marked for draining
933

934
    """
935
    if self._IsQueueMarkedDrain():
936
      raise errors.JobQueueDrainError()
937
    # Get job identifier
938
    job_id = self._NewSerialUnlocked()
939
    job = _QueuedJob(self, job_id, ops)
940

    
941
    # Write to disk
942
    self.UpdateJobUnlocked(job)
943

    
944
    logging.debug("Adding new job %s to the cache", job_id)
945
    self._memcache[job_id] = job
946

    
947
    # Add to worker pool
948
    self._wpool.AddTask(job)
949

    
950
    return job.id
951

    
952
  @_RequireOpenQueue
953
  def UpdateJobUnlocked(self, job):
954
    """Update a job's on disk storage.
955

956
    After a job has been modified, this function needs to be called in
957
    order to write the changes to disk and replicate them to the other
958
    nodes.
959

960
    @type job: L{_QueuedJob}
961
    @param job: the changed job
962

963
    """
964
    filename = self._GetJobPath(job.id)
965
    data = serializer.DumpJson(job.Serialize(), indent=False)
966
    logging.debug("Writing job %s to %s", job.id, filename)
967
    self._WriteAndReplicateFileUnlocked(filename, data)
968

    
969
    # Notify waiters about potential changes
970
    job.change.notifyAll()
971

    
972
  @utils.LockedMethod
973
  @_RequireOpenQueue
974
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
975
                        timeout):
976
    """Waits for changes in a job.
977

978
    @type job_id: string
979
    @param job_id: Job identifier
980
    @type fields: list of strings
981
    @param fields: Which fields to check for changes
982
    @type prev_job_info: list or None
983
    @param prev_job_info: Last job information returned
984
    @type prev_log_serial: int
985
    @param prev_log_serial: Last job message serial number
986
    @type timeout: float
987
    @param timeout: maximum time to wait
988
    @rtype: tuple (job info, log entries)
989
    @return: a tuple of the job information as required via
990
        the fields parameter, and the log entries as a list
991

992
        if the job has not changed and the timeout has expired,
993
        we instead return a special value,
994
        L{constants.JOB_NOTCHANGED}, which should be interpreted
995
        as such by the clients
996

997
    """
998
    logging.debug("Waiting for changes in job %s", job_id)
999
    end_time = time.time() + timeout
1000
    while True:
1001
      delta_time = end_time - time.time()
1002
      if delta_time < 0:
1003
        return constants.JOB_NOTCHANGED
1004

    
1005
      job = self._LoadJobUnlocked(job_id)
1006
      if not job:
1007
        logging.debug("Job %s not found", job_id)
1008
        break
1009

    
1010
      status = job.CalcStatus()
1011
      job_info = self._GetJobInfoUnlocked(job, fields)
1012
      log_entries = job.GetLogEntries(prev_log_serial)
1013

    
1014
      # Serializing and deserializing data can cause type changes (e.g. from
1015
      # tuple to list) or precision loss. We're doing it here so that we get
1016
      # the same modifications as the data received from the client. Without
1017
      # this, the comparison afterwards might fail without the data being
1018
      # significantly different.
1019
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1020
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1021

    
1022
      if status not in (constants.JOB_STATUS_QUEUED,
1023
                        constants.JOB_STATUS_RUNNING,
1024
                        constants.JOB_STATUS_WAITLOCK):
1025
        # Don't even try to wait if the job is no longer running, there will be
1026
        # no changes.
1027
        break
1028

    
1029
      if (prev_job_info != job_info or
1030
          (log_entries and prev_log_serial != log_entries[0][0])):
1031
        break
1032

    
1033
      logging.debug("Waiting again")
1034

    
1035
      # Release the queue lock while waiting
1036
      job.change.wait(delta_time)
1037

    
1038
    logging.debug("Job %s changed", job_id)
1039

    
1040
    return (job_info, log_entries)
1041

    
1042
  @utils.LockedMethod
1043
  @_RequireOpenQueue
1044
  def CancelJob(self, job_id):
1045
    """Cancels a job.
1046

1047
    This will only succeed if the job has not started yet.
1048

1049
    @type job_id: string
1050
    @param job_id: job ID of job to be cancelled.
1051

1052
    """
1053
    logging.info("Cancelling job %s", job_id)
1054

    
1055
    job = self._LoadJobUnlocked(job_id)
1056
    if not job:
1057
      logging.debug("Job %s not found", job_id)
1058
      return (False, "Job %s not found" % job_id)
1059

    
1060
    job_status = job.CalcStatus()
1061

    
1062
    if job_status not in (constants.JOB_STATUS_QUEUED,
1063
                          constants.JOB_STATUS_WAITLOCK):
1064
      logging.debug("Job %s is no longer in the queue", job.id)
1065
      return (False, "Job %s is no longer in the queue" % job.id)
1066

    
1067
    if job_status == constants.JOB_STATUS_QUEUED:
1068
      self.CancelJobUnlocked(job)
1069
      return (True, "Job %s canceled" % job.id)
1070

    
1071
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1072
      # The worker will notice the new status and cancel the job
1073
      try:
1074
        for op in job.ops:
1075
          op.status = constants.OP_STATUS_CANCELING
1076
      finally:
1077
        self.UpdateJobUnlocked(job)
1078
      return (True, "Job %s will be canceled" % job.id)
1079

    
1080
  @_RequireOpenQueue
1081
  def CancelJobUnlocked(self, job):
1082
    """Marks a job as canceled.
1083

1084
    """
1085
    try:
1086
      for op in job.ops:
1087
        op.status = constants.OP_STATUS_ERROR
1088
        op.result = "Job canceled by request"
1089
    finally:
1090
      self.UpdateJobUnlocked(job)
1091

    
1092
  @_RequireOpenQueue
1093
  def _ArchiveJobUnlocked(self, job_id):
1094
    """Archives a job.
1095

1096
    @type job_id: string
1097
    @param job_id: Job ID of job to be archived.
1098

1099
    """
1100
    logging.info("Archiving job %s", job_id)
1101

    
1102
    job = self._LoadJobUnlocked(job_id)
1103
    if not job:
1104
      logging.debug("Job %s not found", job_id)
1105
      return
1106

    
1107
    if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1108
                                constants.JOB_STATUS_SUCCESS,
1109
                                constants.JOB_STATUS_ERROR):
1110
      logging.debug("Job %s is not yet done", job.id)
1111
      return
1112

    
1113
    old = self._GetJobPath(job.id)
1114
    new = self._GetArchivedJobPath(job.id)
1115

    
1116
    self._RenameFileUnlocked(old, new)
1117

    
1118
    logging.debug("Successfully archived job %s", job.id)
1119

    
1120
  @utils.LockedMethod
1121
  @_RequireOpenQueue
1122
  def ArchiveJob(self, job_id):
1123
    """Archives a job.
1124

1125
    This is just a wrapper over L{_ArchiveJobUnlocked}.
1126

1127
    @type job_id: string
1128
    @param job_id: Job ID of job to be archived.
1129

1130
    """
1131
    return self._ArchiveJobUnlocked(job_id)
1132

    
1133
  @utils.LockedMethod
1134
  @_RequireOpenQueue
1135
  def AutoArchiveJobs(self, age):
1136
    """Archives all jobs based on age.
1137

1138
    The method will archive all jobs which are older than the age
1139
    parameter. For jobs that don't have an end timestamp, the start
1140
    timestamp will be considered. The special '-1' age will cause
1141
    archival of all jobs (that are not running or queued).
1142

1143
    @type age: int
1144
    @param age: the minimum age in seconds
1145

1146
    """
1147
    logging.info("Archiving jobs with age more than %s seconds", age)
1148

    
1149
    now = time.time()
1150
    for jid in self._GetJobIDsUnlocked(archived=False):
1151
      job = self._LoadJobUnlocked(jid)
1152
      if job.CalcStatus() not in (constants.OP_STATUS_SUCCESS,
1153
                                  constants.OP_STATUS_ERROR,
1154
                                  constants.OP_STATUS_CANCELED):
1155
        continue
1156
      if job.end_timestamp is None:
1157
        if job.start_timestamp is None:
1158
          job_age = job.received_timestamp
1159
        else:
1160
          job_age = job.start_timestamp
1161
      else:
1162
        job_age = job.end_timestamp
1163

    
1164
      if age == -1 or now - job_age[0] > age:
1165
        self._ArchiveJobUnlocked(jid)
1166

    
1167
  def _GetJobInfoUnlocked(self, job, fields):
1168
    """Returns information about a job.
1169

1170
    @type job: L{_QueuedJob}
1171
    @param job: the job which we query
1172
    @type fields: list
1173
    @param fields: names of fields to return
1174
    @rtype: list
1175
    @return: list with one element for each field
1176
    @raise errors.OpExecError: when an invalid field
1177
        has been passed
1178

1179
    """
1180
    row = []
1181
    for fname in fields:
1182
      if fname == "id":
1183
        row.append(job.id)
1184
      elif fname == "status":
1185
        row.append(job.CalcStatus())
1186
      elif fname == "ops":
1187
        row.append([op.input.__getstate__() for op in job.ops])
1188
      elif fname == "opresult":
1189
        row.append([op.result for op in job.ops])
1190
      elif fname == "opstatus":
1191
        row.append([op.status for op in job.ops])
1192
      elif fname == "oplog":
1193
        row.append([op.log for op in job.ops])
1194
      elif fname == "opstart":
1195
        row.append([op.start_timestamp for op in job.ops])
1196
      elif fname == "opend":
1197
        row.append([op.end_timestamp for op in job.ops])
1198
      elif fname == "received_ts":
1199
        row.append(job.received_timestamp)
1200
      elif fname == "start_ts":
1201
        row.append(job.start_timestamp)
1202
      elif fname == "end_ts":
1203
        row.append(job.end_timestamp)
1204
      elif fname == "summary":
1205
        row.append([op.input.Summary() for op in job.ops])
1206
      else:
1207
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1208
    return row
1209

    
1210
  @utils.LockedMethod
1211
  @_RequireOpenQueue
1212
  def QueryJobs(self, job_ids, fields):
1213
    """Returns a list of jobs in queue.
1214

1215
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1216
    processing for each job.
1217

1218
    @type job_ids: list
1219
    @param job_ids: sequence of job identifiers or None for all
1220
    @type fields: list
1221
    @param fields: names of fields to return
1222
    @rtype: list
1223
    @return: list one element per job, each element being list with
1224
        the requested fields
1225

1226
    """
1227
    jobs = []
1228

    
1229
    for job in self._GetJobsUnlocked(job_ids):
1230
      if job is None:
1231
        jobs.append(None)
1232
      else:
1233
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1234

    
1235
    return jobs
1236

    
1237
  @utils.LockedMethod
1238
  @_RequireOpenQueue
1239
  def Shutdown(self):
1240
    """Stops the job queue.
1241

1242
    This shutdowns all the worker threads an closes the queue.
1243

1244
    """
1245
    self._wpool.TerminateWorkers()
1246

    
1247
    self._queue_lock.Close()
1248
    self._queue_lock = None