Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 02fc74da

History | View | Annotate | Download (41.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  __slots__ = ["input", "status", "result", "log",
84
               "start_timestamp", "end_timestamp",
85
               "__weakref__"]
86

    
87
  def __init__(self, op):
88
    """Constructor for the _QuededOpCode.
89

90
    @type op: L{opcodes.OpCode}
91
    @param op: the opcode we encapsulate
92

93
    """
94
    self.input = op
95
    self.status = constants.OP_STATUS_QUEUED
96
    self.result = None
97
    self.log = []
98
    self.start_timestamp = None
99
    self.end_timestamp = None
100

    
101
  @classmethod
102
  def Restore(cls, state):
103
    """Restore the _QueuedOpCode from the serialized form.
104

105
    @type state: dict
106
    @param state: the serialized state
107
    @rtype: _QueuedOpCode
108
    @return: a new _QueuedOpCode instance
109

110
    """
111
    obj = _QueuedOpCode.__new__(cls)
112
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113
    obj.status = state["status"]
114
    obj.result = state["result"]
115
    obj.log = state["log"]
116
    obj.start_timestamp = state.get("start_timestamp", None)
117
    obj.end_timestamp = state.get("end_timestamp", None)
118
    return obj
119

    
120
  def Serialize(self):
121
    """Serializes this _QueuedOpCode.
122

123
    @rtype: dict
124
    @return: the dictionary holding the serialized state
125

126
    """
127
    return {
128
      "input": self.input.__getstate__(),
129
      "status": self.status,
130
      "result": self.result,
131
      "log": self.log,
132
      "start_timestamp": self.start_timestamp,
133
      "end_timestamp": self.end_timestamp,
134
      }
135

    
136

    
137
class _QueuedJob(object):
138
  """In-memory job representation.
139

140
  This is what we use to track the user-submitted jobs. Locking must
141
  be taken care of by users of this class.
142

143
  @type queue: L{JobQueue}
144
  @ivar queue: the parent queue
145
  @ivar id: the job ID
146
  @type ops: list
147
  @ivar ops: the list of _QueuedOpCode that constitute the job
148
  @type log_serial: int
149
  @ivar log_serial: holds the index for the next log entry
150
  @ivar received_timestamp: the timestamp for when the job was received
151
  @ivar start_timestmap: the timestamp for start of execution
152
  @ivar end_timestamp: the timestamp for end of execution
153
  @ivar lock_status: In-memory locking information for debugging
154
  @ivar change: a Condition variable we use for waiting for job changes
155

156
  """
157
  # pylint: disable-msg=W0212
158
  __slots__ = ["queue", "id", "ops", "log_serial",
159
               "received_timestamp", "start_timestamp", "end_timestamp",
160
               "lock_status", "change",
161
               "__weakref__"]
162

    
163
  def __init__(self, queue, job_id, ops):
164
    """Constructor for the _QueuedJob.
165

166
    @type queue: L{JobQueue}
167
    @param queue: our parent queue
168
    @type job_id: job_id
169
    @param job_id: our job id
170
    @type ops: list
171
    @param ops: the list of opcodes we hold, which will be encapsulated
172
        in _QueuedOpCodes
173

174
    """
175
    if not ops:
176
      # TODO: use a better exception
177
      raise Exception("No opcodes")
178

    
179
    self.queue = queue
180
    self.id = job_id
181
    self.ops = [_QueuedOpCode(op) for op in ops]
182
    self.log_serial = 0
183
    self.received_timestamp = TimeStampNow()
184
    self.start_timestamp = None
185
    self.end_timestamp = None
186

    
187
    # In-memory attributes
188
    self.lock_status = None
189

    
190
    # Condition to wait for changes
191
    self.change = threading.Condition(self.queue._lock)
192

    
193
  @classmethod
194
  def Restore(cls, queue, state):
195
    """Restore a _QueuedJob from serialized state:
196

197
    @type queue: L{JobQueue}
198
    @param queue: to which queue the restored job belongs
199
    @type state: dict
200
    @param state: the serialized state
201
    @rtype: _JobQueue
202
    @return: the restored _JobQueue instance
203

204
    """
205
    obj = _QueuedJob.__new__(cls)
206
    obj.queue = queue
207
    obj.id = state["id"]
208
    obj.received_timestamp = state.get("received_timestamp", None)
209
    obj.start_timestamp = state.get("start_timestamp", None)
210
    obj.end_timestamp = state.get("end_timestamp", None)
211

    
212
    # In-memory attributes
213
    obj.lock_status = None
214

    
215
    obj.ops = []
216
    obj.log_serial = 0
217
    for op_state in state["ops"]:
218
      op = _QueuedOpCode.Restore(op_state)
219
      for log_entry in op.log:
220
        obj.log_serial = max(obj.log_serial, log_entry[0])
221
      obj.ops.append(op)
222

    
223
    # Condition to wait for changes
224
    obj.change = threading.Condition(obj.queue._lock)
225

    
226
    return obj
227

    
228
  def Serialize(self):
229
    """Serialize the _JobQueue instance.
230

231
    @rtype: dict
232
    @return: the serialized state
233

234
    """
235
    return {
236
      "id": self.id,
237
      "ops": [op.Serialize() for op in self.ops],
238
      "start_timestamp": self.start_timestamp,
239
      "end_timestamp": self.end_timestamp,
240
      "received_timestamp": self.received_timestamp,
241
      }
242

    
243
  def CalcStatus(self):
244
    """Compute the status of this job.
245

246
    This function iterates over all the _QueuedOpCodes in the job and
247
    based on their status, computes the job status.
248

249
    The algorithm is:
250
      - if we find a cancelled, or finished with error, the job
251
        status will be the same
252
      - otherwise, the last opcode with the status one of:
253
          - waitlock
254
          - canceling
255
          - running
256

257
        will determine the job status
258

259
      - otherwise, it means either all opcodes are queued, or success,
260
        and the job status will be the same
261

262
    @return: the job status
263

264
    """
265
    status = constants.JOB_STATUS_QUEUED
266

    
267
    all_success = True
268
    for op in self.ops:
269
      if op.status == constants.OP_STATUS_SUCCESS:
270
        continue
271

    
272
      all_success = False
273

    
274
      if op.status == constants.OP_STATUS_QUEUED:
275
        pass
276
      elif op.status == constants.OP_STATUS_WAITLOCK:
277
        status = constants.JOB_STATUS_WAITLOCK
278
      elif op.status == constants.OP_STATUS_RUNNING:
279
        status = constants.JOB_STATUS_RUNNING
280
      elif op.status == constants.OP_STATUS_CANCELING:
281
        status = constants.JOB_STATUS_CANCELING
282
        break
283
      elif op.status == constants.OP_STATUS_ERROR:
284
        status = constants.JOB_STATUS_ERROR
285
        # The whole job fails if one opcode failed
286
        break
287
      elif op.status == constants.OP_STATUS_CANCELED:
288
        status = constants.OP_STATUS_CANCELED
289
        break
290

    
291
    if all_success:
292
      status = constants.JOB_STATUS_SUCCESS
293

    
294
    return status
295

    
296
  def GetLogEntries(self, newer_than):
297
    """Selectively returns the log entries.
298

299
    @type newer_than: None or int
300
    @param newer_than: if this is None, return all log entries,
301
        otherwise return only the log entries with serial higher
302
        than this value
303
    @rtype: list
304
    @return: the list of the log entries selected
305

306
    """
307
    if newer_than is None:
308
      serial = -1
309
    else:
310
      serial = newer_than
311

    
312
    entries = []
313
    for op in self.ops:
314
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
315

    
316
    return entries
317

    
318
  def MarkUnfinishedOps(self, status, result):
319
    """Mark unfinished opcodes with a given status and result.
320

321
    This is an utility function for marking all running or waiting to
322
    be run opcodes with a given status. Opcodes which are already
323
    finalised are not changed.
324

325
    @param status: a given opcode status
326
    @param result: the opcode result
327

328
    """
329
    not_marked = True
330
    for op in self.ops:
331
      if op.status in constants.OPS_FINALIZED:
332
        assert not_marked, "Finalized opcodes found after non-finalized ones"
333
        continue
334
      op.status = status
335
      op.result = result
336
      not_marked = False
337

    
338

    
339
class _OpExecCallbacks(mcpu.OpExecCbBase):
340
  def __init__(self, queue, job, op):
341
    """Initializes this class.
342

343
    @type queue: L{JobQueue}
344
    @param queue: Job queue
345
    @type job: L{_QueuedJob}
346
    @param job: Job object
347
    @type op: L{_QueuedOpCode}
348
    @param op: OpCode
349

350
    """
351
    assert queue, "Queue is missing"
352
    assert job, "Job is missing"
353
    assert op, "Opcode is missing"
354

    
355
    self._queue = queue
356
    self._job = job
357
    self._op = op
358

    
359
  def NotifyStart(self):
360
    """Mark the opcode as running, not lock-waiting.
361

362
    This is called from the mcpu code as a notifier function, when the LU is
363
    finally about to start the Exec() method. Of course, to have end-user
364
    visible results, the opcode must be initially (before calling into
365
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
366

367
    """
368
    self._queue.acquire()
369
    try:
370
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
371
                                 constants.OP_STATUS_CANCELING)
372

    
373
      # All locks are acquired by now
374
      self._job.lock_status = None
375

    
376
      # Cancel here if we were asked to
377
      if self._op.status == constants.OP_STATUS_CANCELING:
378
        raise CancelJob()
379

    
380
      self._op.status = constants.OP_STATUS_RUNNING
381
    finally:
382
      self._queue.release()
383

    
384
  def Feedback(self, *args):
385
    """Append a log entry.
386

387
    """
388
    assert len(args) < 3
389

    
390
    if len(args) == 1:
391
      log_type = constants.ELOG_MESSAGE
392
      log_msg = args[0]
393
    else:
394
      (log_type, log_msg) = args
395

    
396
    # The time is split to make serialization easier and not lose
397
    # precision.
398
    timestamp = utils.SplitTime(time.time())
399

    
400
    self._queue.acquire()
401
    try:
402
      self._job.log_serial += 1
403
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
404

    
405
      self._job.change.notifyAll()
406
    finally:
407
      self._queue.release()
408

    
409
  def ReportLocks(self, msg):
410
    """Write locking information to the job.
411

412
    Called whenever the LU processor is waiting for a lock or has acquired one.
413

414
    """
415
    # Not getting the queue lock because this is a single assignment
416
    self._job.lock_status = msg
417

    
418

    
419
class _JobQueueWorker(workerpool.BaseWorker):
420
  """The actual job workers.
421

422
  """
423
  def RunTask(self, job): # pylint: disable-msg=W0221
424
    """Job executor.
425

426
    This functions processes a job. It is closely tied to the _QueuedJob and
427
    _QueuedOpCode classes.
428

429
    @type job: L{_QueuedJob}
430
    @param job: the job to be processed
431

432
    """
433
    logging.info("Processing job %s", job.id)
434
    proc = mcpu.Processor(self.pool.queue.context, job.id)
435
    queue = job.queue
436
    try:
437
      try:
438
        count = len(job.ops)
439
        for idx, op in enumerate(job.ops):
440
          op_summary = op.input.Summary()
441
          if op.status == constants.OP_STATUS_SUCCESS:
442
            # this is a job that was partially completed before master
443
            # daemon shutdown, so it can be expected that some opcodes
444
            # are already completed successfully (if any did error
445
            # out, then the whole job should have been aborted and not
446
            # resubmitted for processing)
447
            logging.info("Op %s/%s: opcode %s already processed, skipping",
448
                         idx + 1, count, op_summary)
449
            continue
450
          try:
451
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
452
                         op_summary)
453

    
454
            queue.acquire()
455
            try:
456
              if op.status == constants.OP_STATUS_CANCELED:
457
                raise CancelJob()
458
              assert op.status == constants.OP_STATUS_QUEUED
459
              op.status = constants.OP_STATUS_WAITLOCK
460
              op.result = None
461
              op.start_timestamp = TimeStampNow()
462
              if idx == 0: # first opcode
463
                job.start_timestamp = op.start_timestamp
464
              queue.UpdateJobUnlocked(job)
465

    
466
              input_opcode = op.input
467
            finally:
468
              queue.release()
469

    
470
            # Make sure not to hold queue lock while calling ExecOpCode
471
            result = proc.ExecOpCode(input_opcode,
472
                                     _OpExecCallbacks(queue, job, op))
473

    
474
            queue.acquire()
475
            try:
476
              op.status = constants.OP_STATUS_SUCCESS
477
              op.result = result
478
              op.end_timestamp = TimeStampNow()
479
              queue.UpdateJobUnlocked(job)
480
            finally:
481
              queue.release()
482

    
483
            logging.info("Op %s/%s: Successfully finished opcode %s",
484
                         idx + 1, count, op_summary)
485
          except CancelJob:
486
            # Will be handled further up
487
            raise
488
          except Exception, err:
489
            queue.acquire()
490
            try:
491
              try:
492
                op.status = constants.OP_STATUS_ERROR
493
                if isinstance(err, errors.GenericError):
494
                  op.result = errors.EncodeException(err)
495
                else:
496
                  op.result = str(err)
497
                op.end_timestamp = TimeStampNow()
498
                logging.info("Op %s/%s: Error in opcode %s: %s",
499
                             idx + 1, count, op_summary, err)
500
              finally:
501
                queue.UpdateJobUnlocked(job)
502
            finally:
503
              queue.release()
504
            raise
505

    
506
      except CancelJob:
507
        queue.acquire()
508
        try:
509
          queue.CancelJobUnlocked(job)
510
        finally:
511
          queue.release()
512
      except errors.GenericError, err:
513
        logging.exception("Ganeti exception")
514
      except:
515
        logging.exception("Unhandled exception")
516
    finally:
517
      queue.acquire()
518
      try:
519
        try:
520
          job.lock_status = None
521
          job.end_timestamp = TimeStampNow()
522
          queue.UpdateJobUnlocked(job)
523
        finally:
524
          job_id = job.id
525
          status = job.CalcStatus()
526
      finally:
527
        queue.release()
528

    
529
      logging.info("Finished job %s, status = %s", job_id, status)
530

    
531

    
532
class _JobQueueWorkerPool(workerpool.WorkerPool):
533
  """Simple class implementing a job-processing workerpool.
534

535
  """
536
  def __init__(self, queue):
537
    super(_JobQueueWorkerPool, self).__init__("JobQueue",
538
                                              JOBQUEUE_THREADS,
539
                                              _JobQueueWorker)
540
    self.queue = queue
541

    
542

    
543
def _RequireOpenQueue(fn):
544
  """Decorator for "public" functions.
545

546
  This function should be used for all 'public' functions. That is,
547
  functions usually called from other classes. Note that this should
548
  be applied only to methods (not plain functions), since it expects
549
  that the decorated function is called with a first argument that has
550
  a '_queue_lock' argument.
551

552
  @warning: Use this decorator only after utils.LockedMethod!
553

554
  Example::
555
    @utils.LockedMethod
556
    @_RequireOpenQueue
557
    def Example(self):
558
      pass
559

560
  """
561
  def wrapper(self, *args, **kwargs):
562
    # pylint: disable-msg=W0212
563
    assert self._queue_lock is not None, "Queue should be open"
564
    return fn(self, *args, **kwargs)
565
  return wrapper
566

    
567

    
568
class JobQueue(object):
569
  """Queue used to manage the jobs.
570

571
  @cvar _RE_JOB_FILE: regex matching the valid job file names
572

573
  """
574
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
575

    
576
  def __init__(self, context):
577
    """Constructor for JobQueue.
578

579
    The constructor will initialize the job queue object and then
580
    start loading the current jobs from disk, either for starting them
581
    (if they were queue) or for aborting them (if they were already
582
    running).
583

584
    @type context: GanetiContext
585
    @param context: the context object for access to the configuration
586
        data and other ganeti objects
587

588
    """
589
    self.context = context
590
    self._memcache = weakref.WeakValueDictionary()
591
    self._my_hostname = utils.HostInfo().name
592

    
593
    # Locking
594
    self._lock = threading.Lock()
595
    self.acquire = self._lock.acquire
596
    self.release = self._lock.release
597

    
598
    # Initialize
599
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
600

    
601
    # Read serial file
602
    self._last_serial = jstore.ReadSerial()
603
    assert self._last_serial is not None, ("Serial file was modified between"
604
                                           " check in jstore and here")
605

    
606
    # Get initial list of nodes
607
    self._nodes = dict((n.name, n.primary_ip)
608
                       for n in self.context.cfg.GetAllNodesInfo().values()
609
                       if n.master_candidate)
610

    
611
    # Remove master node
612
    try:
613
      del self._nodes[self._my_hostname]
614
    except KeyError:
615
      pass
616

    
617
    # TODO: Check consistency across nodes
618

    
619
    # Setup worker pool
620
    self._wpool = _JobQueueWorkerPool(self)
621
    try:
622
      # We need to lock here because WorkerPool.AddTask() may start a job while
623
      # we're still doing our work.
624
      self.acquire()
625
      try:
626
        logging.info("Inspecting job queue")
627

    
628
        all_job_ids = self._GetJobIDsUnlocked()
629
        jobs_count = len(all_job_ids)
630
        lastinfo = time.time()
631
        for idx, job_id in enumerate(all_job_ids):
632
          # Give an update every 1000 jobs or 10 seconds
633
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
634
              idx == (jobs_count - 1)):
635
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
636
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
637
            lastinfo = time.time()
638

    
639
          job = self._LoadJobUnlocked(job_id)
640

    
641
          # a failure in loading the job can cause 'None' to be returned
642
          if job is None:
643
            continue
644

    
645
          status = job.CalcStatus()
646

    
647
          if status in (constants.JOB_STATUS_QUEUED, ):
648
            self._wpool.AddTask(job)
649

    
650
          elif status in (constants.JOB_STATUS_RUNNING,
651
                          constants.JOB_STATUS_WAITLOCK,
652
                          constants.JOB_STATUS_CANCELING):
653
            logging.warning("Unfinished job %s found: %s", job.id, job)
654
            try:
655
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
656
                                    "Unclean master daemon shutdown")
657
            finally:
658
              self.UpdateJobUnlocked(job)
659

    
660
        logging.info("Job queue inspection finished")
661
      finally:
662
        self.release()
663
    except:
664
      self._wpool.TerminateWorkers()
665
      raise
666

    
667
  @utils.LockedMethod
668
  @_RequireOpenQueue
669
  def AddNode(self, node):
670
    """Register a new node with the queue.
671

672
    @type node: L{objects.Node}
673
    @param node: the node object to be added
674

675
    """
676
    node_name = node.name
677
    assert node_name != self._my_hostname
678

    
679
    # Clean queue directory on added node
680
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
681
    msg = result.fail_msg
682
    if msg:
683
      logging.warning("Cannot cleanup queue directory on node %s: %s",
684
                      node_name, msg)
685

    
686
    if not node.master_candidate:
687
      # remove if existing, ignoring errors
688
      self._nodes.pop(node_name, None)
689
      # and skip the replication of the job ids
690
      return
691

    
692
    # Upload the whole queue excluding archived jobs
693
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
694

    
695
    # Upload current serial file
696
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
697

    
698
    for file_name in files:
699
      # Read file content
700
      content = utils.ReadFile(file_name)
701

    
702
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
703
                                                  [node.primary_ip],
704
                                                  file_name, content)
705
      msg = result[node_name].fail_msg
706
      if msg:
707
        logging.error("Failed to upload file %s to node %s: %s",
708
                      file_name, node_name, msg)
709

    
710
    self._nodes[node_name] = node.primary_ip
711

    
712
  @utils.LockedMethod
713
  @_RequireOpenQueue
714
  def RemoveNode(self, node_name):
715
    """Callback called when removing nodes from the cluster.
716

717
    @type node_name: str
718
    @param node_name: the name of the node to remove
719

720
    """
721
    try:
722
      # The queue is removed by the "leave node" RPC call.
723
      del self._nodes[node_name]
724
    except KeyError:
725
      pass
726

    
727
  @staticmethod
728
  def _CheckRpcResult(result, nodes, failmsg):
729
    """Verifies the status of an RPC call.
730

731
    Since we aim to keep consistency should this node (the current
732
    master) fail, we will log errors if our rpc fail, and especially
733
    log the case when more than half of the nodes fails.
734

735
    @param result: the data as returned from the rpc call
736
    @type nodes: list
737
    @param nodes: the list of nodes we made the call to
738
    @type failmsg: str
739
    @param failmsg: the identifier to be used for logging
740

741
    """
742
    failed = []
743
    success = []
744

    
745
    for node in nodes:
746
      msg = result[node].fail_msg
747
      if msg:
748
        failed.append(node)
749
        logging.error("RPC call %s (%s) failed on node %s: %s",
750
                      result[node].call, failmsg, node, msg)
751
      else:
752
        success.append(node)
753

    
754
    # +1 for the master node
755
    if (len(success) + 1) < len(failed):
756
      # TODO: Handle failing nodes
757
      logging.error("More than half of the nodes failed")
758

    
759
  def _GetNodeIp(self):
760
    """Helper for returning the node name/ip list.
761

762
    @rtype: (list, list)
763
    @return: a tuple of two lists, the first one with the node
764
        names and the second one with the node addresses
765

766
    """
767
    name_list = self._nodes.keys()
768
    addr_list = [self._nodes[name] for name in name_list]
769
    return name_list, addr_list
770

    
771
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
772
    """Writes a file locally and then replicates it to all nodes.
773

774
    This function will replace the contents of a file on the local
775
    node and then replicate it to all the other nodes we have.
776

777
    @type file_name: str
778
    @param file_name: the path of the file to be replicated
779
    @type data: str
780
    @param data: the new contents of the file
781

782
    """
783
    utils.WriteFile(file_name, data=data)
784

    
785
    names, addrs = self._GetNodeIp()
786
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
787
    self._CheckRpcResult(result, self._nodes,
788
                         "Updating %s" % file_name)
789

    
790
  def _RenameFilesUnlocked(self, rename):
791
    """Renames a file locally and then replicate the change.
792

793
    This function will rename a file in the local queue directory
794
    and then replicate this rename to all the other nodes we have.
795

796
    @type rename: list of (old, new)
797
    @param rename: List containing tuples mapping old to new names
798

799
    """
800
    # Rename them locally
801
    for old, new in rename:
802
      utils.RenameFile(old, new, mkdir=True)
803

    
804
    # ... and on all nodes
805
    names, addrs = self._GetNodeIp()
806
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
807
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
808

    
809
  @staticmethod
810
  def _FormatJobID(job_id):
811
    """Convert a job ID to string format.
812

813
    Currently this just does C{str(job_id)} after performing some
814
    checks, but if we want to change the job id format this will
815
    abstract this change.
816

817
    @type job_id: int or long
818
    @param job_id: the numeric job id
819
    @rtype: str
820
    @return: the formatted job id
821

822
    """
823
    if not isinstance(job_id, (int, long)):
824
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
825
    if job_id < 0:
826
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
827

    
828
    return str(job_id)
829

    
830
  @classmethod
831
  def _GetArchiveDirectory(cls, job_id):
832
    """Returns the archive directory for a job.
833

834
    @type job_id: str
835
    @param job_id: Job identifier
836
    @rtype: str
837
    @return: Directory name
838

839
    """
840
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
841

    
842
  def _NewSerialsUnlocked(self, count):
843
    """Generates a new job identifier.
844

845
    Job identifiers are unique during the lifetime of a cluster.
846

847
    @type count: integer
848
    @param count: how many serials to return
849
    @rtype: str
850
    @return: a string representing the job identifier.
851

852
    """
853
    assert count > 0
854
    # New number
855
    serial = self._last_serial + count
856

    
857
    # Write to file
858
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
859
                                        "%s\n" % serial)
860

    
861
    result = [self._FormatJobID(v)
862
              for v in range(self._last_serial, serial + 1)]
863
    # Keep it only if we were able to write the file
864
    self._last_serial = serial
865

    
866
    return result
867

    
868
  @staticmethod
869
  def _GetJobPath(job_id):
870
    """Returns the job file for a given job id.
871

872
    @type job_id: str
873
    @param job_id: the job identifier
874
    @rtype: str
875
    @return: the path to the job file
876

877
    """
878
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
879

    
880
  @classmethod
881
  def _GetArchivedJobPath(cls, job_id):
882
    """Returns the archived job file for a give job id.
883

884
    @type job_id: str
885
    @param job_id: the job identifier
886
    @rtype: str
887
    @return: the path to the archived job file
888

889
    """
890
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
891
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
892

    
893
  @classmethod
894
  def _ExtractJobID(cls, name):
895
    """Extract the job id from a filename.
896

897
    @type name: str
898
    @param name: the job filename
899
    @rtype: job id or None
900
    @return: the job id corresponding to the given filename,
901
        or None if the filename does not represent a valid
902
        job file
903

904
    """
905
    m = cls._RE_JOB_FILE.match(name)
906
    if m:
907
      return m.group(1)
908
    else:
909
      return None
910

    
911
  def _GetJobIDsUnlocked(self, archived=False):
912
    """Return all known job IDs.
913

914
    If the parameter archived is True, archived jobs IDs will be
915
    included. Currently this argument is unused.
916

917
    The method only looks at disk because it's a requirement that all
918
    jobs are present on disk (so in the _memcache we don't have any
919
    extra IDs).
920

921
    @rtype: list
922
    @return: the list of job IDs
923

924
    """
925
    # pylint: disable-msg=W0613
926
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
927
    jlist = utils.NiceSort(jlist)
928
    return jlist
929

    
930
  def _ListJobFiles(self):
931
    """Returns the list of current job files.
932

933
    @rtype: list
934
    @return: the list of job file names
935

936
    """
937
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
938
            if self._RE_JOB_FILE.match(name)]
939

    
940
  def _LoadJobUnlocked(self, job_id):
941
    """Loads a job from the disk or memory.
942

943
    Given a job id, this will return the cached job object if
944
    existing, or try to load the job from the disk. If loading from
945
    disk, it will also add the job to the cache.
946

947
    @param job_id: the job id
948
    @rtype: L{_QueuedJob} or None
949
    @return: either None or the job object
950

951
    """
952
    job = self._memcache.get(job_id, None)
953
    if job:
954
      logging.debug("Found job %s in memcache", job_id)
955
      return job
956

    
957
    filepath = self._GetJobPath(job_id)
958
    logging.debug("Loading job from %s", filepath)
959
    try:
960
      raw_data = utils.ReadFile(filepath)
961
    except IOError, err:
962
      if err.errno in (errno.ENOENT, ):
963
        return None
964
      raise
965

    
966
    data = serializer.LoadJson(raw_data)
967

    
968
    try:
969
      job = _QueuedJob.Restore(self, data)
970
    except Exception, err: # pylint: disable-msg=W0703
971
      new_path = self._GetArchivedJobPath(job_id)
972
      if filepath == new_path:
973
        # job already archived (future case)
974
        logging.exception("Can't parse job %s", job_id)
975
      else:
976
        # non-archived case
977
        logging.exception("Can't parse job %s, will archive.", job_id)
978
        self._RenameFilesUnlocked([(filepath, new_path)])
979
      return None
980

    
981
    self._memcache[job_id] = job
982
    logging.debug("Added job %s to the cache", job_id)
983
    return job
984

    
985
  def _GetJobsUnlocked(self, job_ids):
986
    """Return a list of jobs based on their IDs.
987

988
    @type job_ids: list
989
    @param job_ids: either an empty list (meaning all jobs),
990
        or a list of job IDs
991
    @rtype: list
992
    @return: the list of job objects
993

994
    """
995
    if not job_ids:
996
      job_ids = self._GetJobIDsUnlocked()
997

    
998
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
999

    
1000
  @staticmethod
1001
  def _IsQueueMarkedDrain():
1002
    """Check if the queue is marked from drain.
1003

1004
    This currently uses the queue drain file, which makes it a
1005
    per-node flag. In the future this can be moved to the config file.
1006

1007
    @rtype: boolean
1008
    @return: True of the job queue is marked for draining
1009

1010
    """
1011
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1012

    
1013
  @staticmethod
1014
  def SetDrainFlag(drain_flag):
1015
    """Sets the drain flag for the queue.
1016

1017
    This is similar to the function L{backend.JobQueueSetDrainFlag},
1018
    and in the future we might merge them.
1019

1020
    @type drain_flag: boolean
1021
    @param drain_flag: Whether to set or unset the drain flag
1022

1023
    """
1024
    if drain_flag:
1025
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1026
    else:
1027
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1028
    return True
1029

    
1030
  @_RequireOpenQueue
1031
  def _SubmitJobUnlocked(self, job_id, ops):
1032
    """Create and store a new job.
1033

1034
    This enters the job into our job queue and also puts it on the new
1035
    queue, in order for it to be picked up by the queue processors.
1036

1037
    @type job_id: job ID
1038
    @param job_id: the job ID for the new job
1039
    @type ops: list
1040
    @param ops: The list of OpCodes that will become the new job.
1041
    @rtype: job ID
1042
    @return: the job ID of the newly created job
1043
    @raise errors.JobQueueDrainError: if the job is marked for draining
1044

1045
    """
1046
    if self._IsQueueMarkedDrain():
1047
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1048

    
1049
    # Check job queue size
1050
    size = len(self._ListJobFiles())
1051
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1052
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1053
      # submission, though.
1054
      #size = ...
1055
      pass
1056

    
1057
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1058
      raise errors.JobQueueFull()
1059

    
1060
    job = _QueuedJob(self, job_id, ops)
1061

    
1062
    # Write to disk
1063
    self.UpdateJobUnlocked(job)
1064

    
1065
    logging.debug("Adding new job %s to the cache", job_id)
1066
    self._memcache[job_id] = job
1067

    
1068
    # Add to worker pool
1069
    self._wpool.AddTask(job)
1070

    
1071
    return job.id
1072

    
1073
  @utils.LockedMethod
1074
  @_RequireOpenQueue
1075
  def SubmitJob(self, ops):
1076
    """Create and store a new job.
1077

1078
    @see: L{_SubmitJobUnlocked}
1079

1080
    """
1081
    job_id = self._NewSerialsUnlocked(1)[0]
1082
    return self._SubmitJobUnlocked(job_id, ops)
1083

    
1084
  @utils.LockedMethod
1085
  @_RequireOpenQueue
1086
  def SubmitManyJobs(self, jobs):
1087
    """Create and store multiple jobs.
1088

1089
    @see: L{_SubmitJobUnlocked}
1090

1091
    """
1092
    results = []
1093
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1094
    for job_id, ops in zip(all_job_ids, jobs):
1095
      try:
1096
        data = self._SubmitJobUnlocked(job_id, ops)
1097
        status = True
1098
      except errors.GenericError, err:
1099
        data = str(err)
1100
        status = False
1101
      results.append((status, data))
1102

    
1103
    return results
1104

    
1105
  @_RequireOpenQueue
1106
  def UpdateJobUnlocked(self, job):
1107
    """Update a job's on disk storage.
1108

1109
    After a job has been modified, this function needs to be called in
1110
    order to write the changes to disk and replicate them to the other
1111
    nodes.
1112

1113
    @type job: L{_QueuedJob}
1114
    @param job: the changed job
1115

1116
    """
1117
    filename = self._GetJobPath(job.id)
1118
    data = serializer.DumpJson(job.Serialize(), indent=False)
1119
    logging.debug("Writing job %s to %s", job.id, filename)
1120
    self._WriteAndReplicateFileUnlocked(filename, data)
1121

    
1122
    # Notify waiters about potential changes
1123
    job.change.notifyAll()
1124

    
1125
  @utils.LockedMethod
1126
  @_RequireOpenQueue
1127
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1128
                        timeout):
1129
    """Waits for changes in a job.
1130

1131
    @type job_id: string
1132
    @param job_id: Job identifier
1133
    @type fields: list of strings
1134
    @param fields: Which fields to check for changes
1135
    @type prev_job_info: list or None
1136
    @param prev_job_info: Last job information returned
1137
    @type prev_log_serial: int
1138
    @param prev_log_serial: Last job message serial number
1139
    @type timeout: float
1140
    @param timeout: maximum time to wait
1141
    @rtype: tuple (job info, log entries)
1142
    @return: a tuple of the job information as required via
1143
        the fields parameter, and the log entries as a list
1144

1145
        if the job has not changed and the timeout has expired,
1146
        we instead return a special value,
1147
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1148
        as such by the clients
1149

1150
    """
1151
    job = self._LoadJobUnlocked(job_id)
1152
    if not job:
1153
      logging.debug("Job %s not found", job_id)
1154
      return None
1155

    
1156
    def _CheckForChanges():
1157
      logging.debug("Waiting for changes in job %s", job_id)
1158

    
1159
      status = job.CalcStatus()
1160
      job_info = self._GetJobInfoUnlocked(job, fields)
1161
      log_entries = job.GetLogEntries(prev_log_serial)
1162

    
1163
      # Serializing and deserializing data can cause type changes (e.g. from
1164
      # tuple to list) or precision loss. We're doing it here so that we get
1165
      # the same modifications as the data received from the client. Without
1166
      # this, the comparison afterwards might fail without the data being
1167
      # significantly different.
1168
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1169
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1170

    
1171
      # Don't even try to wait if the job is no longer running, there will be
1172
      # no changes.
1173
      if (status not in (constants.JOB_STATUS_QUEUED,
1174
                         constants.JOB_STATUS_RUNNING,
1175
                         constants.JOB_STATUS_WAITLOCK) or
1176
          prev_job_info != job_info or
1177
          (log_entries and prev_log_serial != log_entries[0][0])):
1178
        logging.debug("Job %s changed", job_id)
1179
        return (job_info, log_entries)
1180

    
1181
      raise utils.RetryAgain()
1182

    
1183
    try:
1184
      # Setting wait function to release the queue lock while waiting
1185
      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1186
                         wait_fn=job.change.wait)
1187
    except utils.RetryTimeout:
1188
      return constants.JOB_NOTCHANGED
1189

    
1190
  @utils.LockedMethod
1191
  @_RequireOpenQueue
1192
  def CancelJob(self, job_id):
1193
    """Cancels a job.
1194

1195
    This will only succeed if the job has not started yet.
1196

1197
    @type job_id: string
1198
    @param job_id: job ID of job to be cancelled.
1199

1200
    """
1201
    logging.info("Cancelling job %s", job_id)
1202

    
1203
    job = self._LoadJobUnlocked(job_id)
1204
    if not job:
1205
      logging.debug("Job %s not found", job_id)
1206
      return (False, "Job %s not found" % job_id)
1207

    
1208
    job_status = job.CalcStatus()
1209

    
1210
    if job_status not in (constants.JOB_STATUS_QUEUED,
1211
                          constants.JOB_STATUS_WAITLOCK):
1212
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1213
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1214

    
1215
    if job_status == constants.JOB_STATUS_QUEUED:
1216
      self.CancelJobUnlocked(job)
1217
      return (True, "Job %s canceled" % job.id)
1218

    
1219
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1220
      # The worker will notice the new status and cancel the job
1221
      try:
1222
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1223
      finally:
1224
        self.UpdateJobUnlocked(job)
1225
      return (True, "Job %s will be canceled" % job.id)
1226

    
1227
  @_RequireOpenQueue
1228
  def CancelJobUnlocked(self, job):
1229
    """Marks a job as canceled.
1230

1231
    """
1232
    try:
1233
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1234
                            "Job canceled by request")
1235
    finally:
1236
      self.UpdateJobUnlocked(job)
1237

    
1238
  @_RequireOpenQueue
1239
  def _ArchiveJobsUnlocked(self, jobs):
1240
    """Archives jobs.
1241

1242
    @type jobs: list of L{_QueuedJob}
1243
    @param jobs: Job objects
1244
    @rtype: int
1245
    @return: Number of archived jobs
1246

1247
    """
1248
    archive_jobs = []
1249
    rename_files = []
1250
    for job in jobs:
1251
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1252
                                  constants.JOB_STATUS_SUCCESS,
1253
                                  constants.JOB_STATUS_ERROR):
1254
        logging.debug("Job %s is not yet done", job.id)
1255
        continue
1256

    
1257
      archive_jobs.append(job)
1258

    
1259
      old = self._GetJobPath(job.id)
1260
      new = self._GetArchivedJobPath(job.id)
1261
      rename_files.append((old, new))
1262

    
1263
    # TODO: What if 1..n files fail to rename?
1264
    self._RenameFilesUnlocked(rename_files)
1265

    
1266
    logging.debug("Successfully archived job(s) %s",
1267
                  utils.CommaJoin(job.id for job in archive_jobs))
1268

    
1269
    return len(archive_jobs)
1270

    
1271
  @utils.LockedMethod
1272
  @_RequireOpenQueue
1273
  def ArchiveJob(self, job_id):
1274
    """Archives a job.
1275

1276
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1277

1278
    @type job_id: string
1279
    @param job_id: Job ID of job to be archived.
1280
    @rtype: bool
1281
    @return: Whether job was archived
1282

1283
    """
1284
    logging.info("Archiving job %s", job_id)
1285

    
1286
    job = self._LoadJobUnlocked(job_id)
1287
    if not job:
1288
      logging.debug("Job %s not found", job_id)
1289
      return False
1290

    
1291
    return self._ArchiveJobsUnlocked([job]) == 1
1292

    
1293
  @utils.LockedMethod
1294
  @_RequireOpenQueue
1295
  def AutoArchiveJobs(self, age, timeout):
1296
    """Archives all jobs based on age.
1297

1298
    The method will archive all jobs which are older than the age
1299
    parameter. For jobs that don't have an end timestamp, the start
1300
    timestamp will be considered. The special '-1' age will cause
1301
    archival of all jobs (that are not running or queued).
1302

1303
    @type age: int
1304
    @param age: the minimum age in seconds
1305

1306
    """
1307
    logging.info("Archiving jobs with age more than %s seconds", age)
1308

    
1309
    now = time.time()
1310
    end_time = now + timeout
1311
    archived_count = 0
1312
    last_touched = 0
1313

    
1314
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1315
    pending = []
1316
    for idx, job_id in enumerate(all_job_ids):
1317
      last_touched = idx
1318

    
1319
      # Not optimal because jobs could be pending
1320
      # TODO: Measure average duration for job archival and take number of
1321
      # pending jobs into account.
1322
      if time.time() > end_time:
1323
        break
1324

    
1325
      # Returns None if the job failed to load
1326
      job = self._LoadJobUnlocked(job_id)
1327
      if job:
1328
        if job.end_timestamp is None:
1329
          if job.start_timestamp is None:
1330
            job_age = job.received_timestamp
1331
          else:
1332
            job_age = job.start_timestamp
1333
        else:
1334
          job_age = job.end_timestamp
1335

    
1336
        if age == -1 or now - job_age[0] > age:
1337
          pending.append(job)
1338

    
1339
          # Archive 10 jobs at a time
1340
          if len(pending) >= 10:
1341
            archived_count += self._ArchiveJobsUnlocked(pending)
1342
            pending = []
1343

    
1344
    if pending:
1345
      archived_count += self._ArchiveJobsUnlocked(pending)
1346

    
1347
    return (archived_count, len(all_job_ids) - last_touched - 1)
1348

    
1349
  @staticmethod
1350
  def _GetJobInfoUnlocked(job, fields):
1351
    """Returns information about a job.
1352

1353
    @type job: L{_QueuedJob}
1354
    @param job: the job which we query
1355
    @type fields: list
1356
    @param fields: names of fields to return
1357
    @rtype: list
1358
    @return: list with one element for each field
1359
    @raise errors.OpExecError: when an invalid field
1360
        has been passed
1361

1362
    """
1363
    row = []
1364
    for fname in fields:
1365
      if fname == "id":
1366
        row.append(job.id)
1367
      elif fname == "status":
1368
        row.append(job.CalcStatus())
1369
      elif fname == "ops":
1370
        row.append([op.input.__getstate__() for op in job.ops])
1371
      elif fname == "opresult":
1372
        row.append([op.result for op in job.ops])
1373
      elif fname == "opstatus":
1374
        row.append([op.status for op in job.ops])
1375
      elif fname == "oplog":
1376
        row.append([op.log for op in job.ops])
1377
      elif fname == "opstart":
1378
        row.append([op.start_timestamp for op in job.ops])
1379
      elif fname == "opend":
1380
        row.append([op.end_timestamp for op in job.ops])
1381
      elif fname == "received_ts":
1382
        row.append(job.received_timestamp)
1383
      elif fname == "start_ts":
1384
        row.append(job.start_timestamp)
1385
      elif fname == "end_ts":
1386
        row.append(job.end_timestamp)
1387
      elif fname == "lock_status":
1388
        row.append(job.lock_status)
1389
      elif fname == "summary":
1390
        row.append([op.input.Summary() for op in job.ops])
1391
      else:
1392
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1393
    return row
1394

    
1395
  @utils.LockedMethod
1396
  @_RequireOpenQueue
1397
  def QueryJobs(self, job_ids, fields):
1398
    """Returns a list of jobs in queue.
1399

1400
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1401
    processing for each job.
1402

1403
    @type job_ids: list
1404
    @param job_ids: sequence of job identifiers or None for all
1405
    @type fields: list
1406
    @param fields: names of fields to return
1407
    @rtype: list
1408
    @return: list one element per job, each element being list with
1409
        the requested fields
1410

1411
    """
1412
    jobs = []
1413

    
1414
    for job in self._GetJobsUnlocked(job_ids):
1415
      if job is None:
1416
        jobs.append(None)
1417
      else:
1418
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1419

    
1420
    return jobs
1421

    
1422
  @utils.LockedMethod
1423
  @_RequireOpenQueue
1424
  def Shutdown(self):
1425
    """Stops the job queue.
1426

1427
    This shutdowns all the worker threads an closes the queue.
1428

1429
    """
1430
    self._wpool.TerminateWorkers()
1431

    
1432
    self._queue_lock.Close()
1433
    self._queue_lock = None