Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 1d2dcdfd

History | View | Annotate | Download (41.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  __slots__ = ["input", "status", "result", "log",
84
               "start_timestamp", "end_timestamp",
85
               "__weakref__"]
86

    
87
  def __init__(self, op):
88
    """Constructor for the _QuededOpCode.
89

90
    @type op: L{opcodes.OpCode}
91
    @param op: the opcode we encapsulate
92

93
    """
94
    self.input = op
95
    self.status = constants.OP_STATUS_QUEUED
96
    self.result = None
97
    self.log = []
98
    self.start_timestamp = None
99
    self.end_timestamp = None
100

    
101
  @classmethod
102
  def Restore(cls, state):
103
    """Restore the _QueuedOpCode from the serialized form.
104

105
    @type state: dict
106
    @param state: the serialized state
107
    @rtype: _QueuedOpCode
108
    @return: a new _QueuedOpCode instance
109

110
    """
111
    obj = _QueuedOpCode.__new__(cls)
112
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113
    obj.status = state["status"]
114
    obj.result = state["result"]
115
    obj.log = state["log"]
116
    obj.start_timestamp = state.get("start_timestamp", None)
117
    obj.end_timestamp = state.get("end_timestamp", None)
118
    return obj
119

    
120
  def Serialize(self):
121
    """Serializes this _QueuedOpCode.
122

123
    @rtype: dict
124
    @return: the dictionary holding the serialized state
125

126
    """
127
    return {
128
      "input": self.input.__getstate__(),
129
      "status": self.status,
130
      "result": self.result,
131
      "log": self.log,
132
      "start_timestamp": self.start_timestamp,
133
      "end_timestamp": self.end_timestamp,
134
      }
135

    
136

    
137
class _QueuedJob(object):
138
  """In-memory job representation.
139

140
  This is what we use to track the user-submitted jobs. Locking must
141
  be taken care of by users of this class.
142

143
  @type queue: L{JobQueue}
144
  @ivar queue: the parent queue
145
  @ivar id: the job ID
146
  @type ops: list
147
  @ivar ops: the list of _QueuedOpCode that constitute the job
148
  @type run_op_index: int
149
  @ivar run_op_index: the currently executing opcode, or -1 if
150
      we didn't yet start executing
151
  @type log_serial: int
152
  @ivar log_serial: holds the index for the next log entry
153
  @ivar received_timestamp: the timestamp for when the job was received
154
  @ivar start_timestmap: the timestamp for start of execution
155
  @ivar end_timestamp: the timestamp for end of execution
156
  @ivar lock_status: In-memory locking information for debugging
157
  @ivar change: a Condition variable we use for waiting for job changes
158

159
  """
160
  __slots__ = ["queue", "id", "ops", "run_op_index", "log_serial",
161
               "received_timestamp", "start_timestamp", "end_timestamp",
162
               "lock_status", "change",
163
               "__weakref__"]
164

    
165
  def __init__(self, queue, job_id, ops):
166
    """Constructor for the _QueuedJob.
167

168
    @type queue: L{JobQueue}
169
    @param queue: our parent queue
170
    @type job_id: job_id
171
    @param job_id: our job id
172
    @type ops: list
173
    @param ops: the list of opcodes we hold, which will be encapsulated
174
        in _QueuedOpCodes
175

176
    """
177
    if not ops:
178
      # TODO: use a better exception
179
      raise Exception("No opcodes")
180

    
181
    self.queue = queue
182
    self.id = job_id
183
    self.ops = [_QueuedOpCode(op) for op in ops]
184
    self.run_op_index = -1
185
    self.log_serial = 0
186
    self.received_timestamp = TimeStampNow()
187
    self.start_timestamp = None
188
    self.end_timestamp = None
189

    
190
    # In-memory attributes
191
    self.lock_status = None
192

    
193
    # Condition to wait for changes
194
    self.change = threading.Condition(self.queue._lock)
195

    
196
  @classmethod
197
  def Restore(cls, queue, state):
198
    """Restore a _QueuedJob from serialized state:
199

200
    @type queue: L{JobQueue}
201
    @param queue: to which queue the restored job belongs
202
    @type state: dict
203
    @param state: the serialized state
204
    @rtype: _JobQueue
205
    @return: the restored _JobQueue instance
206

207
    """
208
    obj = _QueuedJob.__new__(cls)
209
    obj.queue = queue
210
    obj.id = state["id"]
211
    obj.run_op_index = state["run_op_index"]
212
    obj.received_timestamp = state.get("received_timestamp", None)
213
    obj.start_timestamp = state.get("start_timestamp", None)
214
    obj.end_timestamp = state.get("end_timestamp", None)
215

    
216
    # In-memory attributes
217
    obj.lock_status = None
218

    
219
    obj.ops = []
220
    obj.log_serial = 0
221
    for op_state in state["ops"]:
222
      op = _QueuedOpCode.Restore(op_state)
223
      for log_entry in op.log:
224
        obj.log_serial = max(obj.log_serial, log_entry[0])
225
      obj.ops.append(op)
226

    
227
    # Condition to wait for changes
228
    obj.change = threading.Condition(obj.queue._lock)
229

    
230
    return obj
231

    
232
  def Serialize(self):
233
    """Serialize the _JobQueue instance.
234

235
    @rtype: dict
236
    @return: the serialized state
237

238
    """
239
    return {
240
      "id": self.id,
241
      "ops": [op.Serialize() for op in self.ops],
242
      "run_op_index": self.run_op_index,
243
      "start_timestamp": self.start_timestamp,
244
      "end_timestamp": self.end_timestamp,
245
      "received_timestamp": self.received_timestamp,
246
      }
247

    
248
  def CalcStatus(self):
249
    """Compute the status of this job.
250

251
    This function iterates over all the _QueuedOpCodes in the job and
252
    based on their status, computes the job status.
253

254
    The algorithm is:
255
      - if we find a cancelled, or finished with error, the job
256
        status will be the same
257
      - otherwise, the last opcode with the status one of:
258
          - waitlock
259
          - canceling
260
          - running
261

262
        will determine the job status
263

264
      - otherwise, it means either all opcodes are queued, or success,
265
        and the job status will be the same
266

267
    @return: the job status
268

269
    """
270
    status = constants.JOB_STATUS_QUEUED
271

    
272
    all_success = True
273
    for op in self.ops:
274
      if op.status == constants.OP_STATUS_SUCCESS:
275
        continue
276

    
277
      all_success = False
278

    
279
      if op.status == constants.OP_STATUS_QUEUED:
280
        pass
281
      elif op.status == constants.OP_STATUS_WAITLOCK:
282
        status = constants.JOB_STATUS_WAITLOCK
283
      elif op.status == constants.OP_STATUS_RUNNING:
284
        status = constants.JOB_STATUS_RUNNING
285
      elif op.status == constants.OP_STATUS_CANCELING:
286
        status = constants.JOB_STATUS_CANCELING
287
        break
288
      elif op.status == constants.OP_STATUS_ERROR:
289
        status = constants.JOB_STATUS_ERROR
290
        # The whole job fails if one opcode failed
291
        break
292
      elif op.status == constants.OP_STATUS_CANCELED:
293
        status = constants.OP_STATUS_CANCELED
294
        break
295

    
296
    if all_success:
297
      status = constants.JOB_STATUS_SUCCESS
298

    
299
    return status
300

    
301
  def GetLogEntries(self, newer_than):
302
    """Selectively returns the log entries.
303

304
    @type newer_than: None or int
305
    @param newer_than: if this is None, return all log entries,
306
        otherwise return only the log entries with serial higher
307
        than this value
308
    @rtype: list
309
    @return: the list of the log entries selected
310

311
    """
312
    if newer_than is None:
313
      serial = -1
314
    else:
315
      serial = newer_than
316

    
317
    entries = []
318
    for op in self.ops:
319
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
320

    
321
    return entries
322

    
323
  def MarkUnfinishedOps(self, status, result):
324
    """Mark unfinished opcodes with a given status and result.
325

326
    This is an utility function for marking all running or waiting to
327
    be run opcodes with a given status. Opcodes which are already
328
    finalised are not changed.
329

330
    @param status: a given opcode status
331
    @param result: the opcode result
332

333
    """
334
    not_marked = True
335
    for op in self.ops:
336
      if op.status in constants.OPS_FINALIZED:
337
        assert not_marked, "Finalized opcodes found after non-finalized ones"
338
        continue
339
      op.status = status
340
      op.result = result
341
      not_marked = False
342

    
343

    
344
class _OpExecCallbacks(mcpu.OpExecCbBase):
345
  def __init__(self, queue, job, op):
346
    """Initializes this class.
347

348
    @type queue: L{JobQueue}
349
    @param queue: Job queue
350
    @type job: L{_QueuedJob}
351
    @param job: Job object
352
    @type op: L{_QueuedOpCode}
353
    @param op: OpCode
354

355
    """
356
    assert queue, "Queue is missing"
357
    assert job, "Job is missing"
358
    assert op, "Opcode is missing"
359

    
360
    self._queue = queue
361
    self._job = job
362
    self._op = op
363

    
364
  def NotifyStart(self):
365
    """Mark the opcode as running, not lock-waiting.
366

367
    This is called from the mcpu code as a notifier function, when the LU is
368
    finally about to start the Exec() method. Of course, to have end-user
369
    visible results, the opcode must be initially (before calling into
370
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
371

372
    """
373
    self._queue.acquire()
374
    try:
375
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
376
                                 constants.OP_STATUS_CANCELING)
377

    
378
      # All locks are acquired by now
379
      self._job.lock_status = None
380

    
381
      # Cancel here if we were asked to
382
      if self._op.status == constants.OP_STATUS_CANCELING:
383
        raise CancelJob()
384

    
385
      self._op.status = constants.OP_STATUS_RUNNING
386
    finally:
387
      self._queue.release()
388

    
389
  def Feedback(self, *args):
390
    """Append a log entry.
391

392
    """
393
    assert len(args) < 3
394

    
395
    if len(args) == 1:
396
      log_type = constants.ELOG_MESSAGE
397
      log_msg = args[0]
398
    else:
399
      (log_type, log_msg) = args
400

    
401
    # The time is split to make serialization easier and not lose
402
    # precision.
403
    timestamp = utils.SplitTime(time.time())
404

    
405
    self._queue.acquire()
406
    try:
407
      self._job.log_serial += 1
408
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
409

    
410
      self._job.change.notifyAll()
411
    finally:
412
      self._queue.release()
413

    
414
  def ReportLocks(self, msg):
415
    """Write locking information to the job.
416

417
    Called whenever the LU processor is waiting for a lock or has acquired one.
418

419
    """
420
    # Not getting the queue lock because this is a single assignment
421
    self._job.lock_status = msg
422

    
423

    
424
class _JobQueueWorker(workerpool.BaseWorker):
425
  """The actual job workers.
426

427
  """
428
  def RunTask(self, job):
429
    """Job executor.
430

431
    This functions processes a job. It is closely tied to the _QueuedJob and
432
    _QueuedOpCode classes.
433

434
    @type job: L{_QueuedJob}
435
    @param job: the job to be processed
436

437
    """
438
    logging.info("Worker %s processing job %s",
439
                  self.worker_id, job.id)
440
    proc = mcpu.Processor(self.pool.queue.context)
441
    queue = job.queue
442
    try:
443
      try:
444
        count = len(job.ops)
445
        for idx, op in enumerate(job.ops):
446
          op_summary = op.input.Summary()
447
          if op.status == constants.OP_STATUS_SUCCESS:
448
            # this is a job that was partially completed before master
449
            # daemon shutdown, so it can be expected that some opcodes
450
            # are already completed successfully (if any did error
451
            # out, then the whole job should have been aborted and not
452
            # resubmitted for processing)
453
            logging.info("Op %s/%s: opcode %s already processed, skipping",
454
                         idx + 1, count, op_summary)
455
            continue
456
          try:
457
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
458
                         op_summary)
459

    
460
            queue.acquire()
461
            try:
462
              if op.status == constants.OP_STATUS_CANCELED:
463
                raise CancelJob()
464
              assert op.status == constants.OP_STATUS_QUEUED
465
              job.run_op_index = idx
466
              op.status = constants.OP_STATUS_WAITLOCK
467
              op.result = None
468
              op.start_timestamp = TimeStampNow()
469
              if idx == 0: # first opcode
470
                job.start_timestamp = op.start_timestamp
471
              queue.UpdateJobUnlocked(job)
472

    
473
              input_opcode = op.input
474
            finally:
475
              queue.release()
476

    
477
            # Make sure not to hold queue lock while calling ExecOpCode
478
            result = proc.ExecOpCode(input_opcode,
479
                                     _OpExecCallbacks(queue, job, op))
480

    
481
            queue.acquire()
482
            try:
483
              op.status = constants.OP_STATUS_SUCCESS
484
              op.result = result
485
              op.end_timestamp = TimeStampNow()
486
              queue.UpdateJobUnlocked(job)
487
            finally:
488
              queue.release()
489

    
490
            logging.info("Op %s/%s: Successfully finished opcode %s",
491
                         idx + 1, count, op_summary)
492
          except CancelJob:
493
            # Will be handled further up
494
            raise
495
          except Exception, err:
496
            queue.acquire()
497
            try:
498
              try:
499
                op.status = constants.OP_STATUS_ERROR
500
                if isinstance(err, errors.GenericError):
501
                  op.result = errors.EncodeException(err)
502
                else:
503
                  op.result = str(err)
504
                op.end_timestamp = TimeStampNow()
505
                logging.info("Op %s/%s: Error in opcode %s: %s",
506
                             idx + 1, count, op_summary, err)
507
              finally:
508
                queue.UpdateJobUnlocked(job)
509
            finally:
510
              queue.release()
511
            raise
512

    
513
      except CancelJob:
514
        queue.acquire()
515
        try:
516
          queue.CancelJobUnlocked(job)
517
        finally:
518
          queue.release()
519
      except errors.GenericError, err:
520
        logging.exception("Ganeti exception")
521
      except:
522
        logging.exception("Unhandled exception")
523
    finally:
524
      queue.acquire()
525
      try:
526
        try:
527
          job.lock_status = None
528
          job.run_op_index = -1
529
          job.end_timestamp = TimeStampNow()
530
          queue.UpdateJobUnlocked(job)
531
        finally:
532
          job_id = job.id
533
          status = job.CalcStatus()
534
      finally:
535
        queue.release()
536

    
537
      logging.info("Worker %s finished job %s, status = %s",
538
                   self.worker_id, job_id, status)
539

    
540

    
541
class _JobQueueWorkerPool(workerpool.WorkerPool):
542
  """Simple class implementing a job-processing workerpool.
543

544
  """
545
  def __init__(self, queue):
546
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
547
                                              _JobQueueWorker)
548
    self.queue = queue
549

    
550

    
551
class JobQueue(object):
552
  """Queue used to manage the jobs.
553

554
  @cvar _RE_JOB_FILE: regex matching the valid job file names
555

556
  """
557
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
558

    
559
  def _RequireOpenQueue(fn):
560
    """Decorator for "public" functions.
561

562
    This function should be used for all 'public' functions. That is,
563
    functions usually called from other classes.
564

565
    @warning: Use this decorator only after utils.LockedMethod!
566

567
    Example::
568
      @utils.LockedMethod
569
      @_RequireOpenQueue
570
      def Example(self):
571
        pass
572

573
    """
574
    def wrapper(self, *args, **kwargs):
575
      assert self._queue_lock is not None, "Queue should be open"
576
      return fn(self, *args, **kwargs)
577
    return wrapper
578

    
579
  def __init__(self, context):
580
    """Constructor for JobQueue.
581

582
    The constructor will initialize the job queue object and then
583
    start loading the current jobs from disk, either for starting them
584
    (if they were queue) or for aborting them (if they were already
585
    running).
586

587
    @type context: GanetiContext
588
    @param context: the context object for access to the configuration
589
        data and other ganeti objects
590

591
    """
592
    self.context = context
593
    self._memcache = weakref.WeakValueDictionary()
594
    self._my_hostname = utils.HostInfo().name
595

    
596
    # Locking
597
    self._lock = threading.Lock()
598
    self.acquire = self._lock.acquire
599
    self.release = self._lock.release
600

    
601
    # Initialize
602
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
603

    
604
    # Read serial file
605
    self._last_serial = jstore.ReadSerial()
606
    assert self._last_serial is not None, ("Serial file was modified between"
607
                                           " check in jstore and here")
608

    
609
    # Get initial list of nodes
610
    self._nodes = dict((n.name, n.primary_ip)
611
                       for n in self.context.cfg.GetAllNodesInfo().values()
612
                       if n.master_candidate)
613

    
614
    # Remove master node
615
    try:
616
      del self._nodes[self._my_hostname]
617
    except KeyError:
618
      pass
619

    
620
    # TODO: Check consistency across nodes
621

    
622
    # Setup worker pool
623
    self._wpool = _JobQueueWorkerPool(self)
624
    try:
625
      # We need to lock here because WorkerPool.AddTask() may start a job while
626
      # we're still doing our work.
627
      self.acquire()
628
      try:
629
        logging.info("Inspecting job queue")
630

    
631
        all_job_ids = self._GetJobIDsUnlocked()
632
        jobs_count = len(all_job_ids)
633
        lastinfo = time.time()
634
        for idx, job_id in enumerate(all_job_ids):
635
          # Give an update every 1000 jobs or 10 seconds
636
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
637
              idx == (jobs_count - 1)):
638
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
639
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
640
            lastinfo = time.time()
641

    
642
          job = self._LoadJobUnlocked(job_id)
643

    
644
          # a failure in loading the job can cause 'None' to be returned
645
          if job is None:
646
            continue
647

    
648
          status = job.CalcStatus()
649

    
650
          if status in (constants.JOB_STATUS_QUEUED, ):
651
            self._wpool.AddTask(job)
652

    
653
          elif status in (constants.JOB_STATUS_RUNNING,
654
                          constants.JOB_STATUS_WAITLOCK,
655
                          constants.JOB_STATUS_CANCELING):
656
            logging.warning("Unfinished job %s found: %s", job.id, job)
657
            try:
658
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
659
                                    "Unclean master daemon shutdown")
660
            finally:
661
              self.UpdateJobUnlocked(job)
662

    
663
        logging.info("Job queue inspection finished")
664
      finally:
665
        self.release()
666
    except:
667
      self._wpool.TerminateWorkers()
668
      raise
669

    
670
  @utils.LockedMethod
671
  @_RequireOpenQueue
672
  def AddNode(self, node):
673
    """Register a new node with the queue.
674

675
    @type node: L{objects.Node}
676
    @param node: the node object to be added
677

678
    """
679
    node_name = node.name
680
    assert node_name != self._my_hostname
681

    
682
    # Clean queue directory on added node
683
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
684
    msg = result.RemoteFailMsg()
685
    if msg:
686
      logging.warning("Cannot cleanup queue directory on node %s: %s",
687
                      node_name, msg)
688

    
689
    if not node.master_candidate:
690
      # remove if existing, ignoring errors
691
      self._nodes.pop(node_name, None)
692
      # and skip the replication of the job ids
693
      return
694

    
695
    # Upload the whole queue excluding archived jobs
696
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
697

    
698
    # Upload current serial file
699
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
700

    
701
    for file_name in files:
702
      # Read file content
703
      content = utils.ReadFile(file_name)
704

    
705
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
706
                                                  [node.primary_ip],
707
                                                  file_name, content)
708
      msg = result[node_name].RemoteFailMsg()
709
      if msg:
710
        logging.error("Failed to upload file %s to node %s: %s",
711
                      file_name, node_name, msg)
712

    
713
    self._nodes[node_name] = node.primary_ip
714

    
715
  @utils.LockedMethod
716
  @_RequireOpenQueue
717
  def RemoveNode(self, node_name):
718
    """Callback called when removing nodes from the cluster.
719

720
    @type node_name: str
721
    @param node_name: the name of the node to remove
722

723
    """
724
    try:
725
      # The queue is removed by the "leave node" RPC call.
726
      del self._nodes[node_name]
727
    except KeyError:
728
      pass
729

    
730
  def _CheckRpcResult(self, result, nodes, failmsg):
731
    """Verifies the status of an RPC call.
732

733
    Since we aim to keep consistency should this node (the current
734
    master) fail, we will log errors if our rpc fail, and especially
735
    log the case when more than half of the nodes fails.
736

737
    @param result: the data as returned from the rpc call
738
    @type nodes: list
739
    @param nodes: the list of nodes we made the call to
740
    @type failmsg: str
741
    @param failmsg: the identifier to be used for logging
742

743
    """
744
    failed = []
745
    success = []
746

    
747
    for node in nodes:
748
      msg = result[node].RemoteFailMsg()
749
      if msg:
750
        failed.append(node)
751
        logging.error("RPC call %s failed on node %s: %s",
752
                      result[node].call, node, msg)
753
      else:
754
        success.append(node)
755

    
756
    # +1 for the master node
757
    if (len(success) + 1) < len(failed):
758
      # TODO: Handle failing nodes
759
      logging.error("More than half of the nodes failed")
760

    
761
  def _GetNodeIp(self):
762
    """Helper for returning the node name/ip list.
763

764
    @rtype: (list, list)
765
    @return: a tuple of two lists, the first one with the node
766
        names and the second one with the node addresses
767

768
    """
769
    name_list = self._nodes.keys()
770
    addr_list = [self._nodes[name] for name in name_list]
771
    return name_list, addr_list
772

    
773
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
774
    """Writes a file locally and then replicates it to all nodes.
775

776
    This function will replace the contents of a file on the local
777
    node and then replicate it to all the other nodes we have.
778

779
    @type file_name: str
780
    @param file_name: the path of the file to be replicated
781
    @type data: str
782
    @param data: the new contents of the file
783

784
    """
785
    utils.WriteFile(file_name, data=data)
786

    
787
    names, addrs = self._GetNodeIp()
788
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
789
    self._CheckRpcResult(result, self._nodes,
790
                         "Updating %s" % file_name)
791

    
792
  def _RenameFilesUnlocked(self, rename):
793
    """Renames a file locally and then replicate the change.
794

795
    This function will rename a file in the local queue directory
796
    and then replicate this rename to all the other nodes we have.
797

798
    @type rename: list of (old, new)
799
    @param rename: List containing tuples mapping old to new names
800

801
    """
802
    # Rename them locally
803
    for old, new in rename:
804
      utils.RenameFile(old, new, mkdir=True)
805

    
806
    # ... and on all nodes
807
    names, addrs = self._GetNodeIp()
808
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
809
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
810

    
811
  def _FormatJobID(self, job_id):
812
    """Convert a job ID to string format.
813

814
    Currently this just does C{str(job_id)} after performing some
815
    checks, but if we want to change the job id format this will
816
    abstract this change.
817

818
    @type job_id: int or long
819
    @param job_id: the numeric job id
820
    @rtype: str
821
    @return: the formatted job id
822

823
    """
824
    if not isinstance(job_id, (int, long)):
825
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
826
    if job_id < 0:
827
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
828

    
829
    return str(job_id)
830

    
831
  @classmethod
832
  def _GetArchiveDirectory(cls, job_id):
833
    """Returns the archive directory for a job.
834

835
    @type job_id: str
836
    @param job_id: Job identifier
837
    @rtype: str
838
    @return: Directory name
839

840
    """
841
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
842

    
843
  def _NewSerialsUnlocked(self, count):
844
    """Generates a new job identifier.
845

846
    Job identifiers are unique during the lifetime of a cluster.
847

848
    @type count: integer
849
    @param count: how many serials to return
850
    @rtype: str
851
    @return: a string representing the job identifier.
852

853
    """
854
    assert count > 0
855
    # New number
856
    serial = self._last_serial + count
857

    
858
    # Write to file
859
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
860
                                        "%s\n" % serial)
861

    
862
    result = [self._FormatJobID(v)
863
              for v in range(self._last_serial, serial + 1)]
864
    # Keep it only if we were able to write the file
865
    self._last_serial = serial
866

    
867
    return result
868

    
869
  @staticmethod
870
  def _GetJobPath(job_id):
871
    """Returns the job file for a given job id.
872

873
    @type job_id: str
874
    @param job_id: the job identifier
875
    @rtype: str
876
    @return: the path to the job file
877

878
    """
879
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
880

    
881
  @classmethod
882
  def _GetArchivedJobPath(cls, job_id):
883
    """Returns the archived job file for a give job id.
884

885
    @type job_id: str
886
    @param job_id: the job identifier
887
    @rtype: str
888
    @return: the path to the archived job file
889

890
    """
891
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
892
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
893

    
894
  @classmethod
895
  def _ExtractJobID(cls, name):
896
    """Extract the job id from a filename.
897

898
    @type name: str
899
    @param name: the job filename
900
    @rtype: job id or None
901
    @return: the job id corresponding to the given filename,
902
        or None if the filename does not represent a valid
903
        job file
904

905
    """
906
    m = cls._RE_JOB_FILE.match(name)
907
    if m:
908
      return m.group(1)
909
    else:
910
      return None
911

    
912
  def _GetJobIDsUnlocked(self, archived=False):
913
    """Return all known job IDs.
914

915
    If the parameter archived is True, archived jobs IDs will be
916
    included. Currently this argument is unused.
917

918
    The method only looks at disk because it's a requirement that all
919
    jobs are present on disk (so in the _memcache we don't have any
920
    extra IDs).
921

922
    @rtype: list
923
    @return: the list of job IDs
924

925
    """
926
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
927
    jlist = utils.NiceSort(jlist)
928
    return jlist
929

    
930
  def _ListJobFiles(self):
931
    """Returns the list of current job files.
932

933
    @rtype: list
934
    @return: the list of job file names
935

936
    """
937
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
938
            if self._RE_JOB_FILE.match(name)]
939

    
940
  def _LoadJobUnlocked(self, job_id):
941
    """Loads a job from the disk or memory.
942

943
    Given a job id, this will return the cached job object if
944
    existing, or try to load the job from the disk. If loading from
945
    disk, it will also add the job to the cache.
946

947
    @param job_id: the job id
948
    @rtype: L{_QueuedJob} or None
949
    @return: either None or the job object
950

951
    """
952
    job = self._memcache.get(job_id, None)
953
    if job:
954
      logging.debug("Found job %s in memcache", job_id)
955
      return job
956

    
957
    filepath = self._GetJobPath(job_id)
958
    logging.debug("Loading job from %s", filepath)
959
    try:
960
      raw_data = utils.ReadFile(filepath)
961
    except IOError, err:
962
      if err.errno in (errno.ENOENT, ):
963
        return None
964
      raise
965

    
966
    data = serializer.LoadJson(raw_data)
967

    
968
    try:
969
      job = _QueuedJob.Restore(self, data)
970
    except Exception, err:
971
      new_path = self._GetArchivedJobPath(job_id)
972
      if filepath == new_path:
973
        # job already archived (future case)
974
        logging.exception("Can't parse job %s", job_id)
975
      else:
976
        # non-archived case
977
        logging.exception("Can't parse job %s, will archive.", job_id)
978
        self._RenameFilesUnlocked([(filepath, new_path)])
979
      return None
980

    
981
    self._memcache[job_id] = job
982
    logging.debug("Added job %s to the cache", job_id)
983
    return job
984

    
985
  def _GetJobsUnlocked(self, job_ids):
986
    """Return a list of jobs based on their IDs.
987

988
    @type job_ids: list
989
    @param job_ids: either an empty list (meaning all jobs),
990
        or a list of job IDs
991
    @rtype: list
992
    @return: the list of job objects
993

994
    """
995
    if not job_ids:
996
      job_ids = self._GetJobIDsUnlocked()
997

    
998
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
999

    
1000
  @staticmethod
1001
  def _IsQueueMarkedDrain():
1002
    """Check if the queue is marked from drain.
1003

1004
    This currently uses the queue drain file, which makes it a
1005
    per-node flag. In the future this can be moved to the config file.
1006

1007
    @rtype: boolean
1008
    @return: True of the job queue is marked for draining
1009

1010
    """
1011
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1012

    
1013
  @staticmethod
1014
  def SetDrainFlag(drain_flag):
1015
    """Sets the drain flag for the queue.
1016

1017
    This is similar to the function L{backend.JobQueueSetDrainFlag},
1018
    and in the future we might merge them.
1019

1020
    @type drain_flag: boolean
1021
    @param drain_flag: Whether to set or unset the drain flag
1022

1023
    """
1024
    if drain_flag:
1025
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1026
    else:
1027
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1028
    return True
1029

    
1030
  @_RequireOpenQueue
1031
  def _SubmitJobUnlocked(self, job_id, ops):
1032
    """Create and store a new job.
1033

1034
    This enters the job into our job queue and also puts it on the new
1035
    queue, in order for it to be picked up by the queue processors.
1036

1037
    @type job_id: job ID
1038
    @param jod_id: the job ID for the new job
1039
    @type ops: list
1040
    @param ops: The list of OpCodes that will become the new job.
1041
    @rtype: job ID
1042
    @return: the job ID of the newly created job
1043
    @raise errors.JobQueueDrainError: if the job is marked for draining
1044

1045
    """
1046
    if self._IsQueueMarkedDrain():
1047
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1048

    
1049
    # Check job queue size
1050
    size = len(self._ListJobFiles())
1051
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1052
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1053
      # submission, though.
1054
      #size = ...
1055
      pass
1056

    
1057
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1058
      raise errors.JobQueueFull()
1059

    
1060
    job = _QueuedJob(self, job_id, ops)
1061

    
1062
    # Write to disk
1063
    self.UpdateJobUnlocked(job)
1064

    
1065
    logging.debug("Adding new job %s to the cache", job_id)
1066
    self._memcache[job_id] = job
1067

    
1068
    # Add to worker pool
1069
    self._wpool.AddTask(job)
1070

    
1071
    return job.id
1072

    
1073
  @utils.LockedMethod
1074
  @_RequireOpenQueue
1075
  def SubmitJob(self, ops):
1076
    """Create and store a new job.
1077

1078
    @see: L{_SubmitJobUnlocked}
1079

1080
    """
1081
    job_id = self._NewSerialsUnlocked(1)[0]
1082
    return self._SubmitJobUnlocked(job_id, ops)
1083

    
1084
  @utils.LockedMethod
1085
  @_RequireOpenQueue
1086
  def SubmitManyJobs(self, jobs):
1087
    """Create and store multiple jobs.
1088

1089
    @see: L{_SubmitJobUnlocked}
1090

1091
    """
1092
    results = []
1093
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1094
    for job_id, ops in zip(all_job_ids, jobs):
1095
      try:
1096
        data = self._SubmitJobUnlocked(job_id, ops)
1097
        status = True
1098
      except errors.GenericError, err:
1099
        data = str(err)
1100
        status = False
1101
      results.append((status, data))
1102

    
1103
    return results
1104

    
1105
  @_RequireOpenQueue
1106
  def UpdateJobUnlocked(self, job):
1107
    """Update a job's on disk storage.
1108

1109
    After a job has been modified, this function needs to be called in
1110
    order to write the changes to disk and replicate them to the other
1111
    nodes.
1112

1113
    @type job: L{_QueuedJob}
1114
    @param job: the changed job
1115

1116
    """
1117
    filename = self._GetJobPath(job.id)
1118
    data = serializer.DumpJson(job.Serialize(), indent=False)
1119
    logging.debug("Writing job %s to %s", job.id, filename)
1120
    self._WriteAndReplicateFileUnlocked(filename, data)
1121

    
1122
    # Notify waiters about potential changes
1123
    job.change.notifyAll()
1124

    
1125
  @utils.LockedMethod
1126
  @_RequireOpenQueue
1127
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1128
                        timeout):
1129
    """Waits for changes in a job.
1130

1131
    @type job_id: string
1132
    @param job_id: Job identifier
1133
    @type fields: list of strings
1134
    @param fields: Which fields to check for changes
1135
    @type prev_job_info: list or None
1136
    @param prev_job_info: Last job information returned
1137
    @type prev_log_serial: int
1138
    @param prev_log_serial: Last job message serial number
1139
    @type timeout: float
1140
    @param timeout: maximum time to wait
1141
    @rtype: tuple (job info, log entries)
1142
    @return: a tuple of the job information as required via
1143
        the fields parameter, and the log entries as a list
1144

1145
        if the job has not changed and the timeout has expired,
1146
        we instead return a special value,
1147
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1148
        as such by the clients
1149

1150
    """
1151
    logging.debug("Waiting for changes in job %s", job_id)
1152

    
1153
    job_info = None
1154
    log_entries = None
1155

    
1156
    end_time = time.time() + timeout
1157
    while True:
1158
      delta_time = end_time - time.time()
1159
      if delta_time < 0:
1160
        return constants.JOB_NOTCHANGED
1161

    
1162
      job = self._LoadJobUnlocked(job_id)
1163
      if not job:
1164
        logging.debug("Job %s not found", job_id)
1165
        break
1166

    
1167
      status = job.CalcStatus()
1168
      job_info = self._GetJobInfoUnlocked(job, fields)
1169
      log_entries = job.GetLogEntries(prev_log_serial)
1170

    
1171
      # Serializing and deserializing data can cause type changes (e.g. from
1172
      # tuple to list) or precision loss. We're doing it here so that we get
1173
      # the same modifications as the data received from the client. Without
1174
      # this, the comparison afterwards might fail without the data being
1175
      # significantly different.
1176
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1177
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1178

    
1179
      if status not in (constants.JOB_STATUS_QUEUED,
1180
                        constants.JOB_STATUS_RUNNING,
1181
                        constants.JOB_STATUS_WAITLOCK):
1182
        # Don't even try to wait if the job is no longer running, there will be
1183
        # no changes.
1184
        break
1185

    
1186
      if (prev_job_info != job_info or
1187
          (log_entries and prev_log_serial != log_entries[0][0])):
1188
        break
1189

    
1190
      logging.debug("Waiting again")
1191

    
1192
      # Release the queue lock while waiting
1193
      job.change.wait(delta_time)
1194

    
1195
    logging.debug("Job %s changed", job_id)
1196

    
1197
    if job_info is None and log_entries is None:
1198
      return None
1199
    else:
1200
      return (job_info, log_entries)
1201

    
1202
  @utils.LockedMethod
1203
  @_RequireOpenQueue
1204
  def CancelJob(self, job_id):
1205
    """Cancels a job.
1206

1207
    This will only succeed if the job has not started yet.
1208

1209
    @type job_id: string
1210
    @param job_id: job ID of job to be cancelled.
1211

1212
    """
1213
    logging.info("Cancelling job %s", job_id)
1214

    
1215
    job = self._LoadJobUnlocked(job_id)
1216
    if not job:
1217
      logging.debug("Job %s not found", job_id)
1218
      return (False, "Job %s not found" % job_id)
1219

    
1220
    job_status = job.CalcStatus()
1221

    
1222
    if job_status not in (constants.JOB_STATUS_QUEUED,
1223
                          constants.JOB_STATUS_WAITLOCK):
1224
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1225
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1226

    
1227
    if job_status == constants.JOB_STATUS_QUEUED:
1228
      self.CancelJobUnlocked(job)
1229
      return (True, "Job %s canceled" % job.id)
1230

    
1231
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1232
      # The worker will notice the new status and cancel the job
1233
      try:
1234
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1235
      finally:
1236
        self.UpdateJobUnlocked(job)
1237
      return (True, "Job %s will be canceled" % job.id)
1238

    
1239
  @_RequireOpenQueue
1240
  def CancelJobUnlocked(self, job):
1241
    """Marks a job as canceled.
1242

1243
    """
1244
    try:
1245
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1246
                            "Job canceled by request")
1247
    finally:
1248
      self.UpdateJobUnlocked(job)
1249

    
1250
  @_RequireOpenQueue
1251
  def _ArchiveJobsUnlocked(self, jobs):
1252
    """Archives jobs.
1253

1254
    @type jobs: list of L{_QueuedJob}
1255
    @param jobs: Job objects
1256
    @rtype: int
1257
    @return: Number of archived jobs
1258

1259
    """
1260
    archive_jobs = []
1261
    rename_files = []
1262
    for job in jobs:
1263
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1264
                                  constants.JOB_STATUS_SUCCESS,
1265
                                  constants.JOB_STATUS_ERROR):
1266
        logging.debug("Job %s is not yet done", job.id)
1267
        continue
1268

    
1269
      archive_jobs.append(job)
1270

    
1271
      old = self._GetJobPath(job.id)
1272
      new = self._GetArchivedJobPath(job.id)
1273
      rename_files.append((old, new))
1274

    
1275
    # TODO: What if 1..n files fail to rename?
1276
    self._RenameFilesUnlocked(rename_files)
1277

    
1278
    logging.debug("Successfully archived job(s) %s",
1279
                  ", ".join(job.id for job in archive_jobs))
1280

    
1281
    return len(archive_jobs)
1282

    
1283
  @utils.LockedMethod
1284
  @_RequireOpenQueue
1285
  def ArchiveJob(self, job_id):
1286
    """Archives a job.
1287

1288
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1289

1290
    @type job_id: string
1291
    @param job_id: Job ID of job to be archived.
1292
    @rtype: bool
1293
    @return: Whether job was archived
1294

1295
    """
1296
    logging.info("Archiving job %s", job_id)
1297

    
1298
    job = self._LoadJobUnlocked(job_id)
1299
    if not job:
1300
      logging.debug("Job %s not found", job_id)
1301
      return False
1302

    
1303
    return self._ArchiveJobsUnlocked([job]) == 1
1304

    
1305
  @utils.LockedMethod
1306
  @_RequireOpenQueue
1307
  def AutoArchiveJobs(self, age, timeout):
1308
    """Archives all jobs based on age.
1309

1310
    The method will archive all jobs which are older than the age
1311
    parameter. For jobs that don't have an end timestamp, the start
1312
    timestamp will be considered. The special '-1' age will cause
1313
    archival of all jobs (that are not running or queued).
1314

1315
    @type age: int
1316
    @param age: the minimum age in seconds
1317

1318
    """
1319
    logging.info("Archiving jobs with age more than %s seconds", age)
1320

    
1321
    now = time.time()
1322
    end_time = now + timeout
1323
    archived_count = 0
1324
    last_touched = 0
1325

    
1326
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1327
    pending = []
1328
    for idx, job_id in enumerate(all_job_ids):
1329
      last_touched = idx
1330

    
1331
      # Not optimal because jobs could be pending
1332
      # TODO: Measure average duration for job archival and take number of
1333
      # pending jobs into account.
1334
      if time.time() > end_time:
1335
        break
1336

    
1337
      # Returns None if the job failed to load
1338
      job = self._LoadJobUnlocked(job_id)
1339
      if job:
1340
        if job.end_timestamp is None:
1341
          if job.start_timestamp is None:
1342
            job_age = job.received_timestamp
1343
          else:
1344
            job_age = job.start_timestamp
1345
        else:
1346
          job_age = job.end_timestamp
1347

    
1348
        if age == -1 or now - job_age[0] > age:
1349
          pending.append(job)
1350

    
1351
          # Archive 10 jobs at a time
1352
          if len(pending) >= 10:
1353
            archived_count += self._ArchiveJobsUnlocked(pending)
1354
            pending = []
1355

    
1356
    if pending:
1357
      archived_count += self._ArchiveJobsUnlocked(pending)
1358

    
1359
    return (archived_count, len(all_job_ids) - last_touched - 1)
1360

    
1361
  def _GetJobInfoUnlocked(self, job, fields):
1362
    """Returns information about a job.
1363

1364
    @type job: L{_QueuedJob}
1365
    @param job: the job which we query
1366
    @type fields: list
1367
    @param fields: names of fields to return
1368
    @rtype: list
1369
    @return: list with one element for each field
1370
    @raise errors.OpExecError: when an invalid field
1371
        has been passed
1372

1373
    """
1374
    row = []
1375
    for fname in fields:
1376
      if fname == "id":
1377
        row.append(job.id)
1378
      elif fname == "status":
1379
        row.append(job.CalcStatus())
1380
      elif fname == "ops":
1381
        row.append([op.input.__getstate__() for op in job.ops])
1382
      elif fname == "opresult":
1383
        row.append([op.result for op in job.ops])
1384
      elif fname == "opstatus":
1385
        row.append([op.status for op in job.ops])
1386
      elif fname == "oplog":
1387
        row.append([op.log for op in job.ops])
1388
      elif fname == "opstart":
1389
        row.append([op.start_timestamp for op in job.ops])
1390
      elif fname == "opend":
1391
        row.append([op.end_timestamp for op in job.ops])
1392
      elif fname == "received_ts":
1393
        row.append(job.received_timestamp)
1394
      elif fname == "start_ts":
1395
        row.append(job.start_timestamp)
1396
      elif fname == "end_ts":
1397
        row.append(job.end_timestamp)
1398
      elif fname == "lock_status":
1399
        row.append(job.lock_status)
1400
      elif fname == "summary":
1401
        row.append([op.input.Summary() for op in job.ops])
1402
      else:
1403
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1404
    return row
1405

    
1406
  @utils.LockedMethod
1407
  @_RequireOpenQueue
1408
  def QueryJobs(self, job_ids, fields):
1409
    """Returns a list of jobs in queue.
1410

1411
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1412
    processing for each job.
1413

1414
    @type job_ids: list
1415
    @param job_ids: sequence of job identifiers or None for all
1416
    @type fields: list
1417
    @param fields: names of fields to return
1418
    @rtype: list
1419
    @return: list one element per job, each element being list with
1420
        the requested fields
1421

1422
    """
1423
    jobs = []
1424

    
1425
    for job in self._GetJobsUnlocked(job_ids):
1426
      if job is None:
1427
        jobs.append(None)
1428
      else:
1429
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1430

    
1431
    return jobs
1432

    
1433
  @utils.LockedMethod
1434
  @_RequireOpenQueue
1435
  def Shutdown(self):
1436
    """Stops the job queue.
1437

1438
    This shutdowns all the worker threads an closes the queue.
1439

1440
    """
1441
    self._wpool.TerminateWorkers()
1442

    
1443
    self._queue_lock.Close()
1444
    self._queue_lock = None