Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 6c881c52

History | View | Annotate | Download (41.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  __slots__ = ["input", "status", "result", "log",
84
               "start_timestamp", "end_timestamp",
85
               "__weakref__"]
86

    
87
  def __init__(self, op):
88
    """Constructor for the _QuededOpCode.
89

90
    @type op: L{opcodes.OpCode}
91
    @param op: the opcode we encapsulate
92

93
    """
94
    self.input = op
95
    self.status = constants.OP_STATUS_QUEUED
96
    self.result = None
97
    self.log = []
98
    self.start_timestamp = None
99
    self.end_timestamp = None
100

    
101
  @classmethod
102
  def Restore(cls, state):
103
    """Restore the _QueuedOpCode from the serialized form.
104

105
    @type state: dict
106
    @param state: the serialized state
107
    @rtype: _QueuedOpCode
108
    @return: a new _QueuedOpCode instance
109

110
    """
111
    obj = _QueuedOpCode.__new__(cls)
112
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113
    obj.status = state["status"]
114
    obj.result = state["result"]
115
    obj.log = state["log"]
116
    obj.start_timestamp = state.get("start_timestamp", None)
117
    obj.end_timestamp = state.get("end_timestamp", None)
118
    return obj
119

    
120
  def Serialize(self):
121
    """Serializes this _QueuedOpCode.
122

123
    @rtype: dict
124
    @return: the dictionary holding the serialized state
125

126
    """
127
    return {
128
      "input": self.input.__getstate__(),
129
      "status": self.status,
130
      "result": self.result,
131
      "log": self.log,
132
      "start_timestamp": self.start_timestamp,
133
      "end_timestamp": self.end_timestamp,
134
      }
135

    
136

    
137
class _QueuedJob(object):
138
  """In-memory job representation.
139

140
  This is what we use to track the user-submitted jobs. Locking must
141
  be taken care of by users of this class.
142

143
  @type queue: L{JobQueue}
144
  @ivar queue: the parent queue
145
  @ivar id: the job ID
146
  @type ops: list
147
  @ivar ops: the list of _QueuedOpCode that constitute the job
148
  @type log_serial: int
149
  @ivar log_serial: holds the index for the next log entry
150
  @ivar received_timestamp: the timestamp for when the job was received
151
  @ivar start_timestmap: the timestamp for start of execution
152
  @ivar end_timestamp: the timestamp for end of execution
153
  @ivar lock_status: In-memory locking information for debugging
154
  @ivar change: a Condition variable we use for waiting for job changes
155

156
  """
157
  __slots__ = ["queue", "id", "ops", "log_serial",
158
               "received_timestamp", "start_timestamp", "end_timestamp",
159
               "lock_status", "change",
160
               "__weakref__"]
161

    
162
  def __init__(self, queue, job_id, ops):
163
    """Constructor for the _QueuedJob.
164

165
    @type queue: L{JobQueue}
166
    @param queue: our parent queue
167
    @type job_id: job_id
168
    @param job_id: our job id
169
    @type ops: list
170
    @param ops: the list of opcodes we hold, which will be encapsulated
171
        in _QueuedOpCodes
172

173
    """
174
    if not ops:
175
      # TODO: use a better exception
176
      raise Exception("No opcodes")
177

    
178
    self.queue = queue
179
    self.id = job_id
180
    self.ops = [_QueuedOpCode(op) for op in ops]
181
    self.log_serial = 0
182
    self.received_timestamp = TimeStampNow()
183
    self.start_timestamp = None
184
    self.end_timestamp = None
185

    
186
    # In-memory attributes
187
    self.lock_status = None
188

    
189
    # Condition to wait for changes
190
    self.change = threading.Condition(self.queue._lock)
191

    
192
  @classmethod
193
  def Restore(cls, queue, state):
194
    """Restore a _QueuedJob from serialized state:
195

196
    @type queue: L{JobQueue}
197
    @param queue: to which queue the restored job belongs
198
    @type state: dict
199
    @param state: the serialized state
200
    @rtype: _JobQueue
201
    @return: the restored _JobQueue instance
202

203
    """
204
    obj = _QueuedJob.__new__(cls)
205
    obj.queue = queue
206
    obj.id = state["id"]
207
    obj.received_timestamp = state.get("received_timestamp", None)
208
    obj.start_timestamp = state.get("start_timestamp", None)
209
    obj.end_timestamp = state.get("end_timestamp", None)
210

    
211
    # In-memory attributes
212
    obj.lock_status = None
213

    
214
    obj.ops = []
215
    obj.log_serial = 0
216
    for op_state in state["ops"]:
217
      op = _QueuedOpCode.Restore(op_state)
218
      for log_entry in op.log:
219
        obj.log_serial = max(obj.log_serial, log_entry[0])
220
      obj.ops.append(op)
221

    
222
    # Condition to wait for changes
223
    obj.change = threading.Condition(obj.queue._lock)
224

    
225
    return obj
226

    
227
  def Serialize(self):
228
    """Serialize the _JobQueue instance.
229

230
    @rtype: dict
231
    @return: the serialized state
232

233
    """
234
    return {
235
      "id": self.id,
236
      "ops": [op.Serialize() for op in self.ops],
237
      "start_timestamp": self.start_timestamp,
238
      "end_timestamp": self.end_timestamp,
239
      "received_timestamp": self.received_timestamp,
240
      }
241

    
242
  def CalcStatus(self):
243
    """Compute the status of this job.
244

245
    This function iterates over all the _QueuedOpCodes in the job and
246
    based on their status, computes the job status.
247

248
    The algorithm is:
249
      - if we find a cancelled, or finished with error, the job
250
        status will be the same
251
      - otherwise, the last opcode with the status one of:
252
          - waitlock
253
          - canceling
254
          - running
255

256
        will determine the job status
257

258
      - otherwise, it means either all opcodes are queued, or success,
259
        and the job status will be the same
260

261
    @return: the job status
262

263
    """
264
    status = constants.JOB_STATUS_QUEUED
265

    
266
    all_success = True
267
    for op in self.ops:
268
      if op.status == constants.OP_STATUS_SUCCESS:
269
        continue
270

    
271
      all_success = False
272

    
273
      if op.status == constants.OP_STATUS_QUEUED:
274
        pass
275
      elif op.status == constants.OP_STATUS_WAITLOCK:
276
        status = constants.JOB_STATUS_WAITLOCK
277
      elif op.status == constants.OP_STATUS_RUNNING:
278
        status = constants.JOB_STATUS_RUNNING
279
      elif op.status == constants.OP_STATUS_CANCELING:
280
        status = constants.JOB_STATUS_CANCELING
281
        break
282
      elif op.status == constants.OP_STATUS_ERROR:
283
        status = constants.JOB_STATUS_ERROR
284
        # The whole job fails if one opcode failed
285
        break
286
      elif op.status == constants.OP_STATUS_CANCELED:
287
        status = constants.OP_STATUS_CANCELED
288
        break
289

    
290
    if all_success:
291
      status = constants.JOB_STATUS_SUCCESS
292

    
293
    return status
294

    
295
  def GetLogEntries(self, newer_than):
296
    """Selectively returns the log entries.
297

298
    @type newer_than: None or int
299
    @param newer_than: if this is None, return all log entries,
300
        otherwise return only the log entries with serial higher
301
        than this value
302
    @rtype: list
303
    @return: the list of the log entries selected
304

305
    """
306
    if newer_than is None:
307
      serial = -1
308
    else:
309
      serial = newer_than
310

    
311
    entries = []
312
    for op in self.ops:
313
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
314

    
315
    return entries
316

    
317
  def MarkUnfinishedOps(self, status, result):
318
    """Mark unfinished opcodes with a given status and result.
319

320
    This is an utility function for marking all running or waiting to
321
    be run opcodes with a given status. Opcodes which are already
322
    finalised are not changed.
323

324
    @param status: a given opcode status
325
    @param result: the opcode result
326

327
    """
328
    not_marked = True
329
    for op in self.ops:
330
      if op.status in constants.OPS_FINALIZED:
331
        assert not_marked, "Finalized opcodes found after non-finalized ones"
332
        continue
333
      op.status = status
334
      op.result = result
335
      not_marked = False
336

    
337

    
338
class _OpExecCallbacks(mcpu.OpExecCbBase):
339
  def __init__(self, queue, job, op):
340
    """Initializes this class.
341

342
    @type queue: L{JobQueue}
343
    @param queue: Job queue
344
    @type job: L{_QueuedJob}
345
    @param job: Job object
346
    @type op: L{_QueuedOpCode}
347
    @param op: OpCode
348

349
    """
350
    assert queue, "Queue is missing"
351
    assert job, "Job is missing"
352
    assert op, "Opcode is missing"
353

    
354
    self._queue = queue
355
    self._job = job
356
    self._op = op
357

    
358
  def NotifyStart(self):
359
    """Mark the opcode as running, not lock-waiting.
360

361
    This is called from the mcpu code as a notifier function, when the LU is
362
    finally about to start the Exec() method. Of course, to have end-user
363
    visible results, the opcode must be initially (before calling into
364
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
365

366
    """
367
    self._queue.acquire()
368
    try:
369
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
370
                                 constants.OP_STATUS_CANCELING)
371

    
372
      # All locks are acquired by now
373
      self._job.lock_status = None
374

    
375
      # Cancel here if we were asked to
376
      if self._op.status == constants.OP_STATUS_CANCELING:
377
        raise CancelJob()
378

    
379
      self._op.status = constants.OP_STATUS_RUNNING
380
    finally:
381
      self._queue.release()
382

    
383
  def Feedback(self, *args):
384
    """Append a log entry.
385

386
    """
387
    assert len(args) < 3
388

    
389
    if len(args) == 1:
390
      log_type = constants.ELOG_MESSAGE
391
      log_msg = args[0]
392
    else:
393
      (log_type, log_msg) = args
394

    
395
    # The time is split to make serialization easier and not lose
396
    # precision.
397
    timestamp = utils.SplitTime(time.time())
398

    
399
    self._queue.acquire()
400
    try:
401
      self._job.log_serial += 1
402
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
403

    
404
      self._job.change.notifyAll()
405
    finally:
406
      self._queue.release()
407

    
408
  def ReportLocks(self, msg):
409
    """Write locking information to the job.
410

411
    Called whenever the LU processor is waiting for a lock or has acquired one.
412

413
    """
414
    # Not getting the queue lock because this is a single assignment
415
    self._job.lock_status = msg
416

    
417

    
418
class _JobQueueWorker(workerpool.BaseWorker):
419
  """The actual job workers.
420

421
  """
422
  def RunTask(self, job):
423
    """Job executor.
424

425
    This functions processes a job. It is closely tied to the _QueuedJob and
426
    _QueuedOpCode classes.
427

428
    @type job: L{_QueuedJob}
429
    @param job: the job to be processed
430

431
    """
432
    logging.info("Worker %s processing job %s",
433
                  self.worker_id, job.id)
434
    proc = mcpu.Processor(self.pool.queue.context)
435
    queue = job.queue
436
    try:
437
      try:
438
        count = len(job.ops)
439
        for idx, op in enumerate(job.ops):
440
          op_summary = op.input.Summary()
441
          if op.status == constants.OP_STATUS_SUCCESS:
442
            # this is a job that was partially completed before master
443
            # daemon shutdown, so it can be expected that some opcodes
444
            # are already completed successfully (if any did error
445
            # out, then the whole job should have been aborted and not
446
            # resubmitted for processing)
447
            logging.info("Op %s/%s: opcode %s already processed, skipping",
448
                         idx + 1, count, op_summary)
449
            continue
450
          try:
451
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
452
                         op_summary)
453

    
454
            queue.acquire()
455
            try:
456
              if op.status == constants.OP_STATUS_CANCELED:
457
                raise CancelJob()
458
              assert op.status == constants.OP_STATUS_QUEUED
459
              op.status = constants.OP_STATUS_WAITLOCK
460
              op.result = None
461
              op.start_timestamp = TimeStampNow()
462
              if idx == 0: # first opcode
463
                job.start_timestamp = op.start_timestamp
464
              queue.UpdateJobUnlocked(job)
465

    
466
              input_opcode = op.input
467
            finally:
468
              queue.release()
469

    
470
            # Make sure not to hold queue lock while calling ExecOpCode
471
            result = proc.ExecOpCode(input_opcode,
472
                                     _OpExecCallbacks(queue, job, op))
473

    
474
            queue.acquire()
475
            try:
476
              op.status = constants.OP_STATUS_SUCCESS
477
              op.result = result
478
              op.end_timestamp = TimeStampNow()
479
              queue.UpdateJobUnlocked(job)
480
            finally:
481
              queue.release()
482

    
483
            logging.info("Op %s/%s: Successfully finished opcode %s",
484
                         idx + 1, count, op_summary)
485
          except CancelJob:
486
            # Will be handled further up
487
            raise
488
          except Exception, err:
489
            queue.acquire()
490
            try:
491
              try:
492
                op.status = constants.OP_STATUS_ERROR
493
                if isinstance(err, errors.GenericError):
494
                  op.result = errors.EncodeException(err)
495
                else:
496
                  op.result = str(err)
497
                op.end_timestamp = TimeStampNow()
498
                logging.info("Op %s/%s: Error in opcode %s: %s",
499
                             idx + 1, count, op_summary, err)
500
              finally:
501
                queue.UpdateJobUnlocked(job)
502
            finally:
503
              queue.release()
504
            raise
505

    
506
      except CancelJob:
507
        queue.acquire()
508
        try:
509
          queue.CancelJobUnlocked(job)
510
        finally:
511
          queue.release()
512
      except errors.GenericError, err:
513
        logging.exception("Ganeti exception")
514
      except:
515
        logging.exception("Unhandled exception")
516
    finally:
517
      queue.acquire()
518
      try:
519
        try:
520
          job.lock_status = None
521
          job.end_timestamp = TimeStampNow()
522
          queue.UpdateJobUnlocked(job)
523
        finally:
524
          job_id = job.id
525
          status = job.CalcStatus()
526
      finally:
527
        queue.release()
528

    
529
      logging.info("Worker %s finished job %s, status = %s",
530
                   self.worker_id, job_id, status)
531

    
532

    
533
class _JobQueueWorkerPool(workerpool.WorkerPool):
534
  """Simple class implementing a job-processing workerpool.
535

536
  """
537
  def __init__(self, queue):
538
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
539
                                              _JobQueueWorker)
540
    self.queue = queue
541

    
542

    
543
def _RequireOpenQueue(fn):
544
  """Decorator for "public" functions.
545

546
  This function should be used for all 'public' functions. That is,
547
  functions usually called from other classes. Note that this should
548
  be applied only to methods (not plain functions), since it expects
549
  that the decorated function is called with a first argument that has
550
  a '_queue_lock' argument.
551

552
  @warning: Use this decorator only after utils.LockedMethod!
553

554
  Example::
555
    @utils.LockedMethod
556
    @_RequireOpenQueue
557
    def Example(self):
558
      pass
559

560
  """
561
  def wrapper(self, *args, **kwargs):
562
    assert self._queue_lock is not None, "Queue should be open"
563
    return fn(self, *args, **kwargs)
564
  return wrapper
565

    
566

    
567
class JobQueue(object):
568
  """Queue used to manage the jobs.
569

570
  @cvar _RE_JOB_FILE: regex matching the valid job file names
571

572
  """
573
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
574

    
575
  def __init__(self, context):
576
    """Constructor for JobQueue.
577

578
    The constructor will initialize the job queue object and then
579
    start loading the current jobs from disk, either for starting them
580
    (if they were queue) or for aborting them (if they were already
581
    running).
582

583
    @type context: GanetiContext
584
    @param context: the context object for access to the configuration
585
        data and other ganeti objects
586

587
    """
588
    self.context = context
589
    self._memcache = weakref.WeakValueDictionary()
590
    self._my_hostname = utils.HostInfo().name
591

    
592
    # Locking
593
    self._lock = threading.Lock()
594
    self.acquire = self._lock.acquire
595
    self.release = self._lock.release
596

    
597
    # Initialize
598
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
599

    
600
    # Read serial file
601
    self._last_serial = jstore.ReadSerial()
602
    assert self._last_serial is not None, ("Serial file was modified between"
603
                                           " check in jstore and here")
604

    
605
    # Get initial list of nodes
606
    self._nodes = dict((n.name, n.primary_ip)
607
                       for n in self.context.cfg.GetAllNodesInfo().values()
608
                       if n.master_candidate)
609

    
610
    # Remove master node
611
    try:
612
      del self._nodes[self._my_hostname]
613
    except KeyError:
614
      pass
615

    
616
    # TODO: Check consistency across nodes
617

    
618
    # Setup worker pool
619
    self._wpool = _JobQueueWorkerPool(self)
620
    try:
621
      # We need to lock here because WorkerPool.AddTask() may start a job while
622
      # we're still doing our work.
623
      self.acquire()
624
      try:
625
        logging.info("Inspecting job queue")
626

    
627
        all_job_ids = self._GetJobIDsUnlocked()
628
        jobs_count = len(all_job_ids)
629
        lastinfo = time.time()
630
        for idx, job_id in enumerate(all_job_ids):
631
          # Give an update every 1000 jobs or 10 seconds
632
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
633
              idx == (jobs_count - 1)):
634
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
635
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
636
            lastinfo = time.time()
637

    
638
          job = self._LoadJobUnlocked(job_id)
639

    
640
          # a failure in loading the job can cause 'None' to be returned
641
          if job is None:
642
            continue
643

    
644
          status = job.CalcStatus()
645

    
646
          if status in (constants.JOB_STATUS_QUEUED, ):
647
            self._wpool.AddTask(job)
648

    
649
          elif status in (constants.JOB_STATUS_RUNNING,
650
                          constants.JOB_STATUS_WAITLOCK,
651
                          constants.JOB_STATUS_CANCELING):
652
            logging.warning("Unfinished job %s found: %s", job.id, job)
653
            try:
654
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
655
                                    "Unclean master daemon shutdown")
656
            finally:
657
              self.UpdateJobUnlocked(job)
658

    
659
        logging.info("Job queue inspection finished")
660
      finally:
661
        self.release()
662
    except:
663
      self._wpool.TerminateWorkers()
664
      raise
665

    
666
  @utils.LockedMethod
667
  @_RequireOpenQueue
668
  def AddNode(self, node):
669
    """Register a new node with the queue.
670

671
    @type node: L{objects.Node}
672
    @param node: the node object to be added
673

674
    """
675
    node_name = node.name
676
    assert node_name != self._my_hostname
677

    
678
    # Clean queue directory on added node
679
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
680
    msg = result.fail_msg
681
    if msg:
682
      logging.warning("Cannot cleanup queue directory on node %s: %s",
683
                      node_name, msg)
684

    
685
    if not node.master_candidate:
686
      # remove if existing, ignoring errors
687
      self._nodes.pop(node_name, None)
688
      # and skip the replication of the job ids
689
      return
690

    
691
    # Upload the whole queue excluding archived jobs
692
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
693

    
694
    # Upload current serial file
695
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
696

    
697
    for file_name in files:
698
      # Read file content
699
      content = utils.ReadFile(file_name)
700

    
701
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
702
                                                  [node.primary_ip],
703
                                                  file_name, content)
704
      msg = result[node_name].fail_msg
705
      if msg:
706
        logging.error("Failed to upload file %s to node %s: %s",
707
                      file_name, node_name, msg)
708

    
709
    self._nodes[node_name] = node.primary_ip
710

    
711
  @utils.LockedMethod
712
  @_RequireOpenQueue
713
  def RemoveNode(self, node_name):
714
    """Callback called when removing nodes from the cluster.
715

716
    @type node_name: str
717
    @param node_name: the name of the node to remove
718

719
    """
720
    try:
721
      # The queue is removed by the "leave node" RPC call.
722
      del self._nodes[node_name]
723
    except KeyError:
724
      pass
725

    
726
  def _CheckRpcResult(self, result, nodes, failmsg):
727
    """Verifies the status of an RPC call.
728

729
    Since we aim to keep consistency should this node (the current
730
    master) fail, we will log errors if our rpc fail, and especially
731
    log the case when more than half of the nodes fails.
732

733
    @param result: the data as returned from the rpc call
734
    @type nodes: list
735
    @param nodes: the list of nodes we made the call to
736
    @type failmsg: str
737
    @param failmsg: the identifier to be used for logging
738

739
    """
740
    failed = []
741
    success = []
742

    
743
    for node in nodes:
744
      msg = result[node].fail_msg
745
      if msg:
746
        failed.append(node)
747
        logging.error("RPC call %s failed on node %s: %s",
748
                      result[node].call, node, msg)
749
      else:
750
        success.append(node)
751

    
752
    # +1 for the master node
753
    if (len(success) + 1) < len(failed):
754
      # TODO: Handle failing nodes
755
      logging.error("More than half of the nodes failed")
756

    
757
  def _GetNodeIp(self):
758
    """Helper for returning the node name/ip list.
759

760
    @rtype: (list, list)
761
    @return: a tuple of two lists, the first one with the node
762
        names and the second one with the node addresses
763

764
    """
765
    name_list = self._nodes.keys()
766
    addr_list = [self._nodes[name] for name in name_list]
767
    return name_list, addr_list
768

    
769
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
770
    """Writes a file locally and then replicates it to all nodes.
771

772
    This function will replace the contents of a file on the local
773
    node and then replicate it to all the other nodes we have.
774

775
    @type file_name: str
776
    @param file_name: the path of the file to be replicated
777
    @type data: str
778
    @param data: the new contents of the file
779

780
    """
781
    utils.WriteFile(file_name, data=data)
782

    
783
    names, addrs = self._GetNodeIp()
784
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
785
    self._CheckRpcResult(result, self._nodes,
786
                         "Updating %s" % file_name)
787

    
788
  def _RenameFilesUnlocked(self, rename):
789
    """Renames a file locally and then replicate the change.
790

791
    This function will rename a file in the local queue directory
792
    and then replicate this rename to all the other nodes we have.
793

794
    @type rename: list of (old, new)
795
    @param rename: List containing tuples mapping old to new names
796

797
    """
798
    # Rename them locally
799
    for old, new in rename:
800
      utils.RenameFile(old, new, mkdir=True)
801

    
802
    # ... and on all nodes
803
    names, addrs = self._GetNodeIp()
804
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
805
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
806

    
807
  def _FormatJobID(self, job_id):
808
    """Convert a job ID to string format.
809

810
    Currently this just does C{str(job_id)} after performing some
811
    checks, but if we want to change the job id format this will
812
    abstract this change.
813

814
    @type job_id: int or long
815
    @param job_id: the numeric job id
816
    @rtype: str
817
    @return: the formatted job id
818

819
    """
820
    if not isinstance(job_id, (int, long)):
821
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
822
    if job_id < 0:
823
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
824

    
825
    return str(job_id)
826

    
827
  @classmethod
828
  def _GetArchiveDirectory(cls, job_id):
829
    """Returns the archive directory for a job.
830

831
    @type job_id: str
832
    @param job_id: Job identifier
833
    @rtype: str
834
    @return: Directory name
835

836
    """
837
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
838

    
839
  def _NewSerialsUnlocked(self, count):
840
    """Generates a new job identifier.
841

842
    Job identifiers are unique during the lifetime of a cluster.
843

844
    @type count: integer
845
    @param count: how many serials to return
846
    @rtype: str
847
    @return: a string representing the job identifier.
848

849
    """
850
    assert count > 0
851
    # New number
852
    serial = self._last_serial + count
853

    
854
    # Write to file
855
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
856
                                        "%s\n" % serial)
857

    
858
    result = [self._FormatJobID(v)
859
              for v in range(self._last_serial, serial + 1)]
860
    # Keep it only if we were able to write the file
861
    self._last_serial = serial
862

    
863
    return result
864

    
865
  @staticmethod
866
  def _GetJobPath(job_id):
867
    """Returns the job file for a given job id.
868

869
    @type job_id: str
870
    @param job_id: the job identifier
871
    @rtype: str
872
    @return: the path to the job file
873

874
    """
875
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
876

    
877
  @classmethod
878
  def _GetArchivedJobPath(cls, job_id):
879
    """Returns the archived job file for a give job id.
880

881
    @type job_id: str
882
    @param job_id: the job identifier
883
    @rtype: str
884
    @return: the path to the archived job file
885

886
    """
887
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
888
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
889

    
890
  @classmethod
891
  def _ExtractJobID(cls, name):
892
    """Extract the job id from a filename.
893

894
    @type name: str
895
    @param name: the job filename
896
    @rtype: job id or None
897
    @return: the job id corresponding to the given filename,
898
        or None if the filename does not represent a valid
899
        job file
900

901
    """
902
    m = cls._RE_JOB_FILE.match(name)
903
    if m:
904
      return m.group(1)
905
    else:
906
      return None
907

    
908
  def _GetJobIDsUnlocked(self, archived=False):
909
    """Return all known job IDs.
910

911
    If the parameter archived is True, archived jobs IDs will be
912
    included. Currently this argument is unused.
913

914
    The method only looks at disk because it's a requirement that all
915
    jobs are present on disk (so in the _memcache we don't have any
916
    extra IDs).
917

918
    @rtype: list
919
    @return: the list of job IDs
920

921
    """
922
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
923
    jlist = utils.NiceSort(jlist)
924
    return jlist
925

    
926
  def _ListJobFiles(self):
927
    """Returns the list of current job files.
928

929
    @rtype: list
930
    @return: the list of job file names
931

932
    """
933
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
934
            if self._RE_JOB_FILE.match(name)]
935

    
936
  def _LoadJobUnlocked(self, job_id):
937
    """Loads a job from the disk or memory.
938

939
    Given a job id, this will return the cached job object if
940
    existing, or try to load the job from the disk. If loading from
941
    disk, it will also add the job to the cache.
942

943
    @param job_id: the job id
944
    @rtype: L{_QueuedJob} or None
945
    @return: either None or the job object
946

947
    """
948
    job = self._memcache.get(job_id, None)
949
    if job:
950
      logging.debug("Found job %s in memcache", job_id)
951
      return job
952

    
953
    filepath = self._GetJobPath(job_id)
954
    logging.debug("Loading job from %s", filepath)
955
    try:
956
      raw_data = utils.ReadFile(filepath)
957
    except IOError, err:
958
      if err.errno in (errno.ENOENT, ):
959
        return None
960
      raise
961

    
962
    data = serializer.LoadJson(raw_data)
963

    
964
    try:
965
      job = _QueuedJob.Restore(self, data)
966
    except Exception, err:
967
      new_path = self._GetArchivedJobPath(job_id)
968
      if filepath == new_path:
969
        # job already archived (future case)
970
        logging.exception("Can't parse job %s", job_id)
971
      else:
972
        # non-archived case
973
        logging.exception("Can't parse job %s, will archive.", job_id)
974
        self._RenameFilesUnlocked([(filepath, new_path)])
975
      return None
976

    
977
    self._memcache[job_id] = job
978
    logging.debug("Added job %s to the cache", job_id)
979
    return job
980

    
981
  def _GetJobsUnlocked(self, job_ids):
982
    """Return a list of jobs based on their IDs.
983

984
    @type job_ids: list
985
    @param job_ids: either an empty list (meaning all jobs),
986
        or a list of job IDs
987
    @rtype: list
988
    @return: the list of job objects
989

990
    """
991
    if not job_ids:
992
      job_ids = self._GetJobIDsUnlocked()
993

    
994
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
995

    
996
  @staticmethod
997
  def _IsQueueMarkedDrain():
998
    """Check if the queue is marked from drain.
999

1000
    This currently uses the queue drain file, which makes it a
1001
    per-node flag. In the future this can be moved to the config file.
1002

1003
    @rtype: boolean
1004
    @return: True of the job queue is marked for draining
1005

1006
    """
1007
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1008

    
1009
  @staticmethod
1010
  def SetDrainFlag(drain_flag):
1011
    """Sets the drain flag for the queue.
1012

1013
    This is similar to the function L{backend.JobQueueSetDrainFlag},
1014
    and in the future we might merge them.
1015

1016
    @type drain_flag: boolean
1017
    @param drain_flag: Whether to set or unset the drain flag
1018

1019
    """
1020
    if drain_flag:
1021
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1022
    else:
1023
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1024
    return True
1025

    
1026
  @_RequireOpenQueue
1027
  def _SubmitJobUnlocked(self, job_id, ops):
1028
    """Create and store a new job.
1029

1030
    This enters the job into our job queue and also puts it on the new
1031
    queue, in order for it to be picked up by the queue processors.
1032

1033
    @type job_id: job ID
1034
    @param job_id: the job ID for the new job
1035
    @type ops: list
1036
    @param ops: The list of OpCodes that will become the new job.
1037
    @rtype: job ID
1038
    @return: the job ID of the newly created job
1039
    @raise errors.JobQueueDrainError: if the job is marked for draining
1040

1041
    """
1042
    if self._IsQueueMarkedDrain():
1043
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1044

    
1045
    # Check job queue size
1046
    size = len(self._ListJobFiles())
1047
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1048
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1049
      # submission, though.
1050
      #size = ...
1051
      pass
1052

    
1053
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1054
      raise errors.JobQueueFull()
1055

    
1056
    job = _QueuedJob(self, job_id, ops)
1057

    
1058
    # Write to disk
1059
    self.UpdateJobUnlocked(job)
1060

    
1061
    logging.debug("Adding new job %s to the cache", job_id)
1062
    self._memcache[job_id] = job
1063

    
1064
    # Add to worker pool
1065
    self._wpool.AddTask(job)
1066

    
1067
    return job.id
1068

    
1069
  @utils.LockedMethod
1070
  @_RequireOpenQueue
1071
  def SubmitJob(self, ops):
1072
    """Create and store a new job.
1073

1074
    @see: L{_SubmitJobUnlocked}
1075

1076
    """
1077
    job_id = self._NewSerialsUnlocked(1)[0]
1078
    return self._SubmitJobUnlocked(job_id, ops)
1079

    
1080
  @utils.LockedMethod
1081
  @_RequireOpenQueue
1082
  def SubmitManyJobs(self, jobs):
1083
    """Create and store multiple jobs.
1084

1085
    @see: L{_SubmitJobUnlocked}
1086

1087
    """
1088
    results = []
1089
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1090
    for job_id, ops in zip(all_job_ids, jobs):
1091
      try:
1092
        data = self._SubmitJobUnlocked(job_id, ops)
1093
        status = True
1094
      except errors.GenericError, err:
1095
        data = str(err)
1096
        status = False
1097
      results.append((status, data))
1098

    
1099
    return results
1100

    
1101
  @_RequireOpenQueue
1102
  def UpdateJobUnlocked(self, job):
1103
    """Update a job's on disk storage.
1104

1105
    After a job has been modified, this function needs to be called in
1106
    order to write the changes to disk and replicate them to the other
1107
    nodes.
1108

1109
    @type job: L{_QueuedJob}
1110
    @param job: the changed job
1111

1112
    """
1113
    filename = self._GetJobPath(job.id)
1114
    data = serializer.DumpJson(job.Serialize(), indent=False)
1115
    logging.debug("Writing job %s to %s", job.id, filename)
1116
    self._WriteAndReplicateFileUnlocked(filename, data)
1117

    
1118
    # Notify waiters about potential changes
1119
    job.change.notifyAll()
1120

    
1121
  @utils.LockedMethod
1122
  @_RequireOpenQueue
1123
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1124
                        timeout):
1125
    """Waits for changes in a job.
1126

1127
    @type job_id: string
1128
    @param job_id: Job identifier
1129
    @type fields: list of strings
1130
    @param fields: Which fields to check for changes
1131
    @type prev_job_info: list or None
1132
    @param prev_job_info: Last job information returned
1133
    @type prev_log_serial: int
1134
    @param prev_log_serial: Last job message serial number
1135
    @type timeout: float
1136
    @param timeout: maximum time to wait
1137
    @rtype: tuple (job info, log entries)
1138
    @return: a tuple of the job information as required via
1139
        the fields parameter, and the log entries as a list
1140

1141
        if the job has not changed and the timeout has expired,
1142
        we instead return a special value,
1143
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1144
        as such by the clients
1145

1146
    """
1147
    job = self._LoadJobUnlocked(job_id)
1148
    if not job:
1149
      logging.debug("Job %s not found", job_id)
1150
      return None
1151

    
1152
    def _CheckForChanges():
1153
      logging.debug("Waiting for changes in job %s", job_id)
1154

    
1155
      status = job.CalcStatus()
1156
      job_info = self._GetJobInfoUnlocked(job, fields)
1157
      log_entries = job.GetLogEntries(prev_log_serial)
1158

    
1159
      # Serializing and deserializing data can cause type changes (e.g. from
1160
      # tuple to list) or precision loss. We're doing it here so that we get
1161
      # the same modifications as the data received from the client. Without
1162
      # this, the comparison afterwards might fail without the data being
1163
      # significantly different.
1164
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1165
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1166

    
1167
      # Don't even try to wait if the job is no longer running, there will be
1168
      # no changes.
1169
      if (status not in (constants.JOB_STATUS_QUEUED,
1170
                         constants.JOB_STATUS_RUNNING,
1171
                         constants.JOB_STATUS_WAITLOCK) or
1172
          prev_job_info != job_info or
1173
          (log_entries and prev_log_serial != log_entries[0][0])):
1174
        logging.debug("Job %s changed", job_id)
1175
        return (job_info, log_entries)
1176

    
1177
      raise utils.RetryAgain()
1178

    
1179
    try:
1180
      # Setting wait function to release the queue lock while waiting
1181
      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1182
                         wait_fn=job.change.wait)
1183
    except utils.RetryTimeout:
1184
      return constants.JOB_NOTCHANGED
1185

    
1186
  @utils.LockedMethod
1187
  @_RequireOpenQueue
1188
  def CancelJob(self, job_id):
1189
    """Cancels a job.
1190

1191
    This will only succeed if the job has not started yet.
1192

1193
    @type job_id: string
1194
    @param job_id: job ID of job to be cancelled.
1195

1196
    """
1197
    logging.info("Cancelling job %s", job_id)
1198

    
1199
    job = self._LoadJobUnlocked(job_id)
1200
    if not job:
1201
      logging.debug("Job %s not found", job_id)
1202
      return (False, "Job %s not found" % job_id)
1203

    
1204
    job_status = job.CalcStatus()
1205

    
1206
    if job_status not in (constants.JOB_STATUS_QUEUED,
1207
                          constants.JOB_STATUS_WAITLOCK):
1208
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1209
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1210

    
1211
    if job_status == constants.JOB_STATUS_QUEUED:
1212
      self.CancelJobUnlocked(job)
1213
      return (True, "Job %s canceled" % job.id)
1214

    
1215
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1216
      # The worker will notice the new status and cancel the job
1217
      try:
1218
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1219
      finally:
1220
        self.UpdateJobUnlocked(job)
1221
      return (True, "Job %s will be canceled" % job.id)
1222

    
1223
  @_RequireOpenQueue
1224
  def CancelJobUnlocked(self, job):
1225
    """Marks a job as canceled.
1226

1227
    """
1228
    try:
1229
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1230
                            "Job canceled by request")
1231
    finally:
1232
      self.UpdateJobUnlocked(job)
1233

    
1234
  @_RequireOpenQueue
1235
  def _ArchiveJobsUnlocked(self, jobs):
1236
    """Archives jobs.
1237

1238
    @type jobs: list of L{_QueuedJob}
1239
    @param jobs: Job objects
1240
    @rtype: int
1241
    @return: Number of archived jobs
1242

1243
    """
1244
    archive_jobs = []
1245
    rename_files = []
1246
    for job in jobs:
1247
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1248
                                  constants.JOB_STATUS_SUCCESS,
1249
                                  constants.JOB_STATUS_ERROR):
1250
        logging.debug("Job %s is not yet done", job.id)
1251
        continue
1252

    
1253
      archive_jobs.append(job)
1254

    
1255
      old = self._GetJobPath(job.id)
1256
      new = self._GetArchivedJobPath(job.id)
1257
      rename_files.append((old, new))
1258

    
1259
    # TODO: What if 1..n files fail to rename?
1260
    self._RenameFilesUnlocked(rename_files)
1261

    
1262
    logging.debug("Successfully archived job(s) %s",
1263
                  ", ".join(job.id for job in archive_jobs))
1264

    
1265
    return len(archive_jobs)
1266

    
1267
  @utils.LockedMethod
1268
  @_RequireOpenQueue
1269
  def ArchiveJob(self, job_id):
1270
    """Archives a job.
1271

1272
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1273

1274
    @type job_id: string
1275
    @param job_id: Job ID of job to be archived.
1276
    @rtype: bool
1277
    @return: Whether job was archived
1278

1279
    """
1280
    logging.info("Archiving job %s", job_id)
1281

    
1282
    job = self._LoadJobUnlocked(job_id)
1283
    if not job:
1284
      logging.debug("Job %s not found", job_id)
1285
      return False
1286

    
1287
    return self._ArchiveJobsUnlocked([job]) == 1
1288

    
1289
  @utils.LockedMethod
1290
  @_RequireOpenQueue
1291
  def AutoArchiveJobs(self, age, timeout):
1292
    """Archives all jobs based on age.
1293

1294
    The method will archive all jobs which are older than the age
1295
    parameter. For jobs that don't have an end timestamp, the start
1296
    timestamp will be considered. The special '-1' age will cause
1297
    archival of all jobs (that are not running or queued).
1298

1299
    @type age: int
1300
    @param age: the minimum age in seconds
1301

1302
    """
1303
    logging.info("Archiving jobs with age more than %s seconds", age)
1304

    
1305
    now = time.time()
1306
    end_time = now + timeout
1307
    archived_count = 0
1308
    last_touched = 0
1309

    
1310
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1311
    pending = []
1312
    for idx, job_id in enumerate(all_job_ids):
1313
      last_touched = idx
1314

    
1315
      # Not optimal because jobs could be pending
1316
      # TODO: Measure average duration for job archival and take number of
1317
      # pending jobs into account.
1318
      if time.time() > end_time:
1319
        break
1320

    
1321
      # Returns None if the job failed to load
1322
      job = self._LoadJobUnlocked(job_id)
1323
      if job:
1324
        if job.end_timestamp is None:
1325
          if job.start_timestamp is None:
1326
            job_age = job.received_timestamp
1327
          else:
1328
            job_age = job.start_timestamp
1329
        else:
1330
          job_age = job.end_timestamp
1331

    
1332
        if age == -1 or now - job_age[0] > age:
1333
          pending.append(job)
1334

    
1335
          # Archive 10 jobs at a time
1336
          if len(pending) >= 10:
1337
            archived_count += self._ArchiveJobsUnlocked(pending)
1338
            pending = []
1339

    
1340
    if pending:
1341
      archived_count += self._ArchiveJobsUnlocked(pending)
1342

    
1343
    return (archived_count, len(all_job_ids) - last_touched - 1)
1344

    
1345
  def _GetJobInfoUnlocked(self, job, fields):
1346
    """Returns information about a job.
1347

1348
    @type job: L{_QueuedJob}
1349
    @param job: the job which we query
1350
    @type fields: list
1351
    @param fields: names of fields to return
1352
    @rtype: list
1353
    @return: list with one element for each field
1354
    @raise errors.OpExecError: when an invalid field
1355
        has been passed
1356

1357
    """
1358
    row = []
1359
    for fname in fields:
1360
      if fname == "id":
1361
        row.append(job.id)
1362
      elif fname == "status":
1363
        row.append(job.CalcStatus())
1364
      elif fname == "ops":
1365
        row.append([op.input.__getstate__() for op in job.ops])
1366
      elif fname == "opresult":
1367
        row.append([op.result for op in job.ops])
1368
      elif fname == "opstatus":
1369
        row.append([op.status for op in job.ops])
1370
      elif fname == "oplog":
1371
        row.append([op.log for op in job.ops])
1372
      elif fname == "opstart":
1373
        row.append([op.start_timestamp for op in job.ops])
1374
      elif fname == "opend":
1375
        row.append([op.end_timestamp for op in job.ops])
1376
      elif fname == "received_ts":
1377
        row.append(job.received_timestamp)
1378
      elif fname == "start_ts":
1379
        row.append(job.start_timestamp)
1380
      elif fname == "end_ts":
1381
        row.append(job.end_timestamp)
1382
      elif fname == "lock_status":
1383
        row.append(job.lock_status)
1384
      elif fname == "summary":
1385
        row.append([op.input.Summary() for op in job.ops])
1386
      else:
1387
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1388
    return row
1389

    
1390
  @utils.LockedMethod
1391
  @_RequireOpenQueue
1392
  def QueryJobs(self, job_ids, fields):
1393
    """Returns a list of jobs in queue.
1394

1395
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1396
    processing for each job.
1397

1398
    @type job_ids: list
1399
    @param job_ids: sequence of job identifiers or None for all
1400
    @type fields: list
1401
    @param fields: names of fields to return
1402
    @rtype: list
1403
    @return: list one element per job, each element being list with
1404
        the requested fields
1405

1406
    """
1407
    jobs = []
1408

    
1409
    for job in self._GetJobsUnlocked(job_ids):
1410
      if job is None:
1411
        jobs.append(None)
1412
      else:
1413
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1414

    
1415
    return jobs
1416

    
1417
  @utils.LockedMethod
1418
  @_RequireOpenQueue
1419
  def Shutdown(self):
1420
    """Stops the job queue.
1421

1422
    This shutdowns all the worker threads an closes the queue.
1423

1424
    """
1425
    self._wpool.TerminateWorkers()
1426

    
1427
    self._queue_lock.Close()
1428
    self._queue_lock = None