Statistics
| Branch: | Tag: | Revision:

root / lib / jqueue.py @ 6b970cef

History | View | Annotate | Download (41.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module implementing the job queue handling.
23

24
Locking: there's a single, large lock in the L{JobQueue} class. It's
25
used by all other classes in this module.
26

27
@var JOBQUEUE_THREADS: the number of worker threads we start for
28
    processing jobs
29

30
"""
31

    
32
import os
33
import logging
34
import threading
35
import errno
36
import re
37
import time
38
import weakref
39

    
40
from ganeti import constants
41
from ganeti import serializer
42
from ganeti import workerpool
43
from ganeti import opcodes
44
from ganeti import errors
45
from ganeti import mcpu
46
from ganeti import utils
47
from ganeti import jstore
48
from ganeti import rpc
49

    
50

    
51
JOBQUEUE_THREADS = 25
52
JOBS_PER_ARCHIVE_DIRECTORY = 10000
53

    
54

    
55
class CancelJob(Exception):
56
  """Special exception to cancel a job.
57

58
  """
59

    
60

    
61
def TimeStampNow():
62
  """Returns the current timestamp.
63

64
  @rtype: tuple
65
  @return: the current time in the (seconds, microseconds) format
66

67
  """
68
  return utils.SplitTime(time.time())
69

    
70

    
71
class _QueuedOpCode(object):
72
  """Encapsulates an opcode object.
73

74
  @ivar log: holds the execution log and consists of tuples
75
  of the form C{(log_serial, timestamp, level, message)}
76
  @ivar input: the OpCode we encapsulate
77
  @ivar status: the current status
78
  @ivar result: the result of the LU execution
79
  @ivar start_timestamp: timestamp for the start of the execution
80
  @ivar stop_timestamp: timestamp for the end of the execution
81

82
  """
83
  __slots__ = ["input", "status", "result", "log",
84
               "start_timestamp", "end_timestamp",
85
               "__weakref__"]
86

    
87
  def __init__(self, op):
88
    """Constructor for the _QuededOpCode.
89

90
    @type op: L{opcodes.OpCode}
91
    @param op: the opcode we encapsulate
92

93
    """
94
    self.input = op
95
    self.status = constants.OP_STATUS_QUEUED
96
    self.result = None
97
    self.log = []
98
    self.start_timestamp = None
99
    self.end_timestamp = None
100

    
101
  @classmethod
102
  def Restore(cls, state):
103
    """Restore the _QueuedOpCode from the serialized form.
104

105
    @type state: dict
106
    @param state: the serialized state
107
    @rtype: _QueuedOpCode
108
    @return: a new _QueuedOpCode instance
109

110
    """
111
    obj = _QueuedOpCode.__new__(cls)
112
    obj.input = opcodes.OpCode.LoadOpCode(state["input"])
113
    obj.status = state["status"]
114
    obj.result = state["result"]
115
    obj.log = state["log"]
116
    obj.start_timestamp = state.get("start_timestamp", None)
117
    obj.end_timestamp = state.get("end_timestamp", None)
118
    return obj
119

    
120
  def Serialize(self):
121
    """Serializes this _QueuedOpCode.
122

123
    @rtype: dict
124
    @return: the dictionary holding the serialized state
125

126
    """
127
    return {
128
      "input": self.input.__getstate__(),
129
      "status": self.status,
130
      "result": self.result,
131
      "log": self.log,
132
      "start_timestamp": self.start_timestamp,
133
      "end_timestamp": self.end_timestamp,
134
      }
135

    
136

    
137
class _QueuedJob(object):
138
  """In-memory job representation.
139

140
  This is what we use to track the user-submitted jobs. Locking must
141
  be taken care of by users of this class.
142

143
  @type queue: L{JobQueue}
144
  @ivar queue: the parent queue
145
  @ivar id: the job ID
146
  @type ops: list
147
  @ivar ops: the list of _QueuedOpCode that constitute the job
148
  @type log_serial: int
149
  @ivar log_serial: holds the index for the next log entry
150
  @ivar received_timestamp: the timestamp for when the job was received
151
  @ivar start_timestmap: the timestamp for start of execution
152
  @ivar end_timestamp: the timestamp for end of execution
153
  @ivar lock_status: In-memory locking information for debugging
154
  @ivar change: a Condition variable we use for waiting for job changes
155

156
  """
157
  __slots__ = ["queue", "id", "ops", "log_serial",
158
               "received_timestamp", "start_timestamp", "end_timestamp",
159
               "lock_status", "change",
160
               "__weakref__"]
161

    
162
  def __init__(self, queue, job_id, ops):
163
    """Constructor for the _QueuedJob.
164

165
    @type queue: L{JobQueue}
166
    @param queue: our parent queue
167
    @type job_id: job_id
168
    @param job_id: our job id
169
    @type ops: list
170
    @param ops: the list of opcodes we hold, which will be encapsulated
171
        in _QueuedOpCodes
172

173
    """
174
    if not ops:
175
      # TODO: use a better exception
176
      raise Exception("No opcodes")
177

    
178
    self.queue = queue
179
    self.id = job_id
180
    self.ops = [_QueuedOpCode(op) for op in ops]
181
    self.log_serial = 0
182
    self.received_timestamp = TimeStampNow()
183
    self.start_timestamp = None
184
    self.end_timestamp = None
185

    
186
    # In-memory attributes
187
    self.lock_status = None
188

    
189
    # Condition to wait for changes
190
    self.change = threading.Condition(self.queue._lock)
191

    
192
  @classmethod
193
  def Restore(cls, queue, state):
194
    """Restore a _QueuedJob from serialized state:
195

196
    @type queue: L{JobQueue}
197
    @param queue: to which queue the restored job belongs
198
    @type state: dict
199
    @param state: the serialized state
200
    @rtype: _JobQueue
201
    @return: the restored _JobQueue instance
202

203
    """
204
    obj = _QueuedJob.__new__(cls)
205
    obj.queue = queue
206
    obj.id = state["id"]
207
    obj.received_timestamp = state.get("received_timestamp", None)
208
    obj.start_timestamp = state.get("start_timestamp", None)
209
    obj.end_timestamp = state.get("end_timestamp", None)
210

    
211
    # In-memory attributes
212
    obj.lock_status = None
213

    
214
    obj.ops = []
215
    obj.log_serial = 0
216
    for op_state in state["ops"]:
217
      op = _QueuedOpCode.Restore(op_state)
218
      for log_entry in op.log:
219
        obj.log_serial = max(obj.log_serial, log_entry[0])
220
      obj.ops.append(op)
221

    
222
    # Condition to wait for changes
223
    obj.change = threading.Condition(obj.queue._lock)
224

    
225
    return obj
226

    
227
  def Serialize(self):
228
    """Serialize the _JobQueue instance.
229

230
    @rtype: dict
231
    @return: the serialized state
232

233
    """
234
    return {
235
      "id": self.id,
236
      "ops": [op.Serialize() for op in self.ops],
237
      "start_timestamp": self.start_timestamp,
238
      "end_timestamp": self.end_timestamp,
239
      "received_timestamp": self.received_timestamp,
240
      }
241

    
242
  def CalcStatus(self):
243
    """Compute the status of this job.
244

245
    This function iterates over all the _QueuedOpCodes in the job and
246
    based on their status, computes the job status.
247

248
    The algorithm is:
249
      - if we find a cancelled, or finished with error, the job
250
        status will be the same
251
      - otherwise, the last opcode with the status one of:
252
          - waitlock
253
          - canceling
254
          - running
255

256
        will determine the job status
257

258
      - otherwise, it means either all opcodes are queued, or success,
259
        and the job status will be the same
260

261
    @return: the job status
262

263
    """
264
    status = constants.JOB_STATUS_QUEUED
265

    
266
    all_success = True
267
    for op in self.ops:
268
      if op.status == constants.OP_STATUS_SUCCESS:
269
        continue
270

    
271
      all_success = False
272

    
273
      if op.status == constants.OP_STATUS_QUEUED:
274
        pass
275
      elif op.status == constants.OP_STATUS_WAITLOCK:
276
        status = constants.JOB_STATUS_WAITLOCK
277
      elif op.status == constants.OP_STATUS_RUNNING:
278
        status = constants.JOB_STATUS_RUNNING
279
      elif op.status == constants.OP_STATUS_CANCELING:
280
        status = constants.JOB_STATUS_CANCELING
281
        break
282
      elif op.status == constants.OP_STATUS_ERROR:
283
        status = constants.JOB_STATUS_ERROR
284
        # The whole job fails if one opcode failed
285
        break
286
      elif op.status == constants.OP_STATUS_CANCELED:
287
        status = constants.OP_STATUS_CANCELED
288
        break
289

    
290
    if all_success:
291
      status = constants.JOB_STATUS_SUCCESS
292

    
293
    return status
294

    
295
  def GetLogEntries(self, newer_than):
296
    """Selectively returns the log entries.
297

298
    @type newer_than: None or int
299
    @param newer_than: if this is None, return all log entries,
300
        otherwise return only the log entries with serial higher
301
        than this value
302
    @rtype: list
303
    @return: the list of the log entries selected
304

305
    """
306
    if newer_than is None:
307
      serial = -1
308
    else:
309
      serial = newer_than
310

    
311
    entries = []
312
    for op in self.ops:
313
      entries.extend(filter(lambda entry: entry[0] > serial, op.log))
314

    
315
    return entries
316

    
317
  def MarkUnfinishedOps(self, status, result):
318
    """Mark unfinished opcodes with a given status and result.
319

320
    This is an utility function for marking all running or waiting to
321
    be run opcodes with a given status. Opcodes which are already
322
    finalised are not changed.
323

324
    @param status: a given opcode status
325
    @param result: the opcode result
326

327
    """
328
    not_marked = True
329
    for op in self.ops:
330
      if op.status in constants.OPS_FINALIZED:
331
        assert not_marked, "Finalized opcodes found after non-finalized ones"
332
        continue
333
      op.status = status
334
      op.result = result
335
      not_marked = False
336

    
337

    
338
class _OpExecCallbacks(mcpu.OpExecCbBase):
339
  def __init__(self, queue, job, op):
340
    """Initializes this class.
341

342
    @type queue: L{JobQueue}
343
    @param queue: Job queue
344
    @type job: L{_QueuedJob}
345
    @param job: Job object
346
    @type op: L{_QueuedOpCode}
347
    @param op: OpCode
348

349
    """
350
    assert queue, "Queue is missing"
351
    assert job, "Job is missing"
352
    assert op, "Opcode is missing"
353

    
354
    self._queue = queue
355
    self._job = job
356
    self._op = op
357

    
358
  def NotifyStart(self):
359
    """Mark the opcode as running, not lock-waiting.
360

361
    This is called from the mcpu code as a notifier function, when the LU is
362
    finally about to start the Exec() method. Of course, to have end-user
363
    visible results, the opcode must be initially (before calling into
364
    Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
365

366
    """
367
    self._queue.acquire()
368
    try:
369
      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
370
                                 constants.OP_STATUS_CANCELING)
371

    
372
      # All locks are acquired by now
373
      self._job.lock_status = None
374

    
375
      # Cancel here if we were asked to
376
      if self._op.status == constants.OP_STATUS_CANCELING:
377
        raise CancelJob()
378

    
379
      self._op.status = constants.OP_STATUS_RUNNING
380
    finally:
381
      self._queue.release()
382

    
383
  def Feedback(self, *args):
384
    """Append a log entry.
385

386
    """
387
    assert len(args) < 3
388

    
389
    if len(args) == 1:
390
      log_type = constants.ELOG_MESSAGE
391
      log_msg = args[0]
392
    else:
393
      (log_type, log_msg) = args
394

    
395
    # The time is split to make serialization easier and not lose
396
    # precision.
397
    timestamp = utils.SplitTime(time.time())
398

    
399
    self._queue.acquire()
400
    try:
401
      self._job.log_serial += 1
402
      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
403

    
404
      self._job.change.notifyAll()
405
    finally:
406
      self._queue.release()
407

    
408
  def ReportLocks(self, msg):
409
    """Write locking information to the job.
410

411
    Called whenever the LU processor is waiting for a lock or has acquired one.
412

413
    """
414
    # Not getting the queue lock because this is a single assignment
415
    self._job.lock_status = msg
416

    
417

    
418
class _JobQueueWorker(workerpool.BaseWorker):
419
  """The actual job workers.
420

421
  """
422
  def RunTask(self, job):
423
    """Job executor.
424

425
    This functions processes a job. It is closely tied to the _QueuedJob and
426
    _QueuedOpCode classes.
427

428
    @type job: L{_QueuedJob}
429
    @param job: the job to be processed
430

431
    """
432
    logging.info("Worker %s processing job %s",
433
                  self.worker_id, job.id)
434
    proc = mcpu.Processor(self.pool.queue.context)
435
    queue = job.queue
436
    try:
437
      try:
438
        count = len(job.ops)
439
        for idx, op in enumerate(job.ops):
440
          op_summary = op.input.Summary()
441
          if op.status == constants.OP_STATUS_SUCCESS:
442
            # this is a job that was partially completed before master
443
            # daemon shutdown, so it can be expected that some opcodes
444
            # are already completed successfully (if any did error
445
            # out, then the whole job should have been aborted and not
446
            # resubmitted for processing)
447
            logging.info("Op %s/%s: opcode %s already processed, skipping",
448
                         idx + 1, count, op_summary)
449
            continue
450
          try:
451
            logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
452
                         op_summary)
453

    
454
            queue.acquire()
455
            try:
456
              if op.status == constants.OP_STATUS_CANCELED:
457
                raise CancelJob()
458
              assert op.status == constants.OP_STATUS_QUEUED
459
              op.status = constants.OP_STATUS_WAITLOCK
460
              op.result = None
461
              op.start_timestamp = TimeStampNow()
462
              if idx == 0: # first opcode
463
                job.start_timestamp = op.start_timestamp
464
              queue.UpdateJobUnlocked(job)
465

    
466
              input_opcode = op.input
467
            finally:
468
              queue.release()
469

    
470
            # Make sure not to hold queue lock while calling ExecOpCode
471
            result = proc.ExecOpCode(input_opcode,
472
                                     _OpExecCallbacks(queue, job, op))
473

    
474
            queue.acquire()
475
            try:
476
              op.status = constants.OP_STATUS_SUCCESS
477
              op.result = result
478
              op.end_timestamp = TimeStampNow()
479
              queue.UpdateJobUnlocked(job)
480
            finally:
481
              queue.release()
482

    
483
            logging.info("Op %s/%s: Successfully finished opcode %s",
484
                         idx + 1, count, op_summary)
485
          except CancelJob:
486
            # Will be handled further up
487
            raise
488
          except Exception, err:
489
            queue.acquire()
490
            try:
491
              try:
492
                op.status = constants.OP_STATUS_ERROR
493
                if isinstance(err, errors.GenericError):
494
                  op.result = errors.EncodeException(err)
495
                else:
496
                  op.result = str(err)
497
                op.end_timestamp = TimeStampNow()
498
                logging.info("Op %s/%s: Error in opcode %s: %s",
499
                             idx + 1, count, op_summary, err)
500
              finally:
501
                queue.UpdateJobUnlocked(job)
502
            finally:
503
              queue.release()
504
            raise
505

    
506
      except CancelJob:
507
        queue.acquire()
508
        try:
509
          queue.CancelJobUnlocked(job)
510
        finally:
511
          queue.release()
512
      except errors.GenericError, err:
513
        logging.exception("Ganeti exception")
514
      except:
515
        logging.exception("Unhandled exception")
516
    finally:
517
      queue.acquire()
518
      try:
519
        try:
520
          job.lock_status = None
521
          job.end_timestamp = TimeStampNow()
522
          queue.UpdateJobUnlocked(job)
523
        finally:
524
          job_id = job.id
525
          status = job.CalcStatus()
526
      finally:
527
        queue.release()
528

    
529
      logging.info("Worker %s finished job %s, status = %s",
530
                   self.worker_id, job_id, status)
531

    
532

    
533
class _JobQueueWorkerPool(workerpool.WorkerPool):
534
  """Simple class implementing a job-processing workerpool.
535

536
  """
537
  def __init__(self, queue):
538
    super(_JobQueueWorkerPool, self).__init__(JOBQUEUE_THREADS,
539
                                              _JobQueueWorker)
540
    self.queue = queue
541

    
542

    
543
class JobQueue(object):
544
  """Queue used to manage the jobs.
545

546
  @cvar _RE_JOB_FILE: regex matching the valid job file names
547

548
  """
549
  _RE_JOB_FILE = re.compile(r"^job-(%s)$" % constants.JOB_ID_TEMPLATE)
550

    
551
  def _RequireOpenQueue(fn):
552
    """Decorator for "public" functions.
553

554
    This function should be used for all 'public' functions. That is,
555
    functions usually called from other classes.
556

557
    @warning: Use this decorator only after utils.LockedMethod!
558

559
    Example::
560
      @utils.LockedMethod
561
      @_RequireOpenQueue
562
      def Example(self):
563
        pass
564

565
    """
566
    def wrapper(self, *args, **kwargs):
567
      assert self._queue_lock is not None, "Queue should be open"
568
      return fn(self, *args, **kwargs)
569
    return wrapper
570

    
571
  def __init__(self, context):
572
    """Constructor for JobQueue.
573

574
    The constructor will initialize the job queue object and then
575
    start loading the current jobs from disk, either for starting them
576
    (if they were queue) or for aborting them (if they were already
577
    running).
578

579
    @type context: GanetiContext
580
    @param context: the context object for access to the configuration
581
        data and other ganeti objects
582

583
    """
584
    self.context = context
585
    self._memcache = weakref.WeakValueDictionary()
586
    self._my_hostname = utils.HostInfo().name
587

    
588
    # Locking
589
    self._lock = threading.Lock()
590
    self.acquire = self._lock.acquire
591
    self.release = self._lock.release
592

    
593
    # Initialize
594
    self._queue_lock = jstore.InitAndVerifyQueue(must_lock=True)
595

    
596
    # Read serial file
597
    self._last_serial = jstore.ReadSerial()
598
    assert self._last_serial is not None, ("Serial file was modified between"
599
                                           " check in jstore and here")
600

    
601
    # Get initial list of nodes
602
    self._nodes = dict((n.name, n.primary_ip)
603
                       for n in self.context.cfg.GetAllNodesInfo().values()
604
                       if n.master_candidate)
605

    
606
    # Remove master node
607
    try:
608
      del self._nodes[self._my_hostname]
609
    except KeyError:
610
      pass
611

    
612
    # TODO: Check consistency across nodes
613

    
614
    # Setup worker pool
615
    self._wpool = _JobQueueWorkerPool(self)
616
    try:
617
      # We need to lock here because WorkerPool.AddTask() may start a job while
618
      # we're still doing our work.
619
      self.acquire()
620
      try:
621
        logging.info("Inspecting job queue")
622

    
623
        all_job_ids = self._GetJobIDsUnlocked()
624
        jobs_count = len(all_job_ids)
625
        lastinfo = time.time()
626
        for idx, job_id in enumerate(all_job_ids):
627
          # Give an update every 1000 jobs or 10 seconds
628
          if (idx % 1000 == 0 or time.time() >= (lastinfo + 10.0) or
629
              idx == (jobs_count - 1)):
630
            logging.info("Job queue inspection: %d/%d (%0.1f %%)",
631
                         idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
632
            lastinfo = time.time()
633

    
634
          job = self._LoadJobUnlocked(job_id)
635

    
636
          # a failure in loading the job can cause 'None' to be returned
637
          if job is None:
638
            continue
639

    
640
          status = job.CalcStatus()
641

    
642
          if status in (constants.JOB_STATUS_QUEUED, ):
643
            self._wpool.AddTask(job)
644

    
645
          elif status in (constants.JOB_STATUS_RUNNING,
646
                          constants.JOB_STATUS_WAITLOCK,
647
                          constants.JOB_STATUS_CANCELING):
648
            logging.warning("Unfinished job %s found: %s", job.id, job)
649
            try:
650
              job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
651
                                    "Unclean master daemon shutdown")
652
            finally:
653
              self.UpdateJobUnlocked(job)
654

    
655
        logging.info("Job queue inspection finished")
656
      finally:
657
        self.release()
658
    except:
659
      self._wpool.TerminateWorkers()
660
      raise
661

    
662
  @utils.LockedMethod
663
  @_RequireOpenQueue
664
  def AddNode(self, node):
665
    """Register a new node with the queue.
666

667
    @type node: L{objects.Node}
668
    @param node: the node object to be added
669

670
    """
671
    node_name = node.name
672
    assert node_name != self._my_hostname
673

    
674
    # Clean queue directory on added node
675
    result = rpc.RpcRunner.call_jobqueue_purge(node_name)
676
    msg = result.fail_msg
677
    if msg:
678
      logging.warning("Cannot cleanup queue directory on node %s: %s",
679
                      node_name, msg)
680

    
681
    if not node.master_candidate:
682
      # remove if existing, ignoring errors
683
      self._nodes.pop(node_name, None)
684
      # and skip the replication of the job ids
685
      return
686

    
687
    # Upload the whole queue excluding archived jobs
688
    files = [self._GetJobPath(job_id) for job_id in self._GetJobIDsUnlocked()]
689

    
690
    # Upload current serial file
691
    files.append(constants.JOB_QUEUE_SERIAL_FILE)
692

    
693
    for file_name in files:
694
      # Read file content
695
      content = utils.ReadFile(file_name)
696

    
697
      result = rpc.RpcRunner.call_jobqueue_update([node_name],
698
                                                  [node.primary_ip],
699
                                                  file_name, content)
700
      msg = result[node_name].fail_msg
701
      if msg:
702
        logging.error("Failed to upload file %s to node %s: %s",
703
                      file_name, node_name, msg)
704

    
705
    self._nodes[node_name] = node.primary_ip
706

    
707
  @utils.LockedMethod
708
  @_RequireOpenQueue
709
  def RemoveNode(self, node_name):
710
    """Callback called when removing nodes from the cluster.
711

712
    @type node_name: str
713
    @param node_name: the name of the node to remove
714

715
    """
716
    try:
717
      # The queue is removed by the "leave node" RPC call.
718
      del self._nodes[node_name]
719
    except KeyError:
720
      pass
721

    
722
  def _CheckRpcResult(self, result, nodes, failmsg):
723
    """Verifies the status of an RPC call.
724

725
    Since we aim to keep consistency should this node (the current
726
    master) fail, we will log errors if our rpc fail, and especially
727
    log the case when more than half of the nodes fails.
728

729
    @param result: the data as returned from the rpc call
730
    @type nodes: list
731
    @param nodes: the list of nodes we made the call to
732
    @type failmsg: str
733
    @param failmsg: the identifier to be used for logging
734

735
    """
736
    failed = []
737
    success = []
738

    
739
    for node in nodes:
740
      msg = result[node].fail_msg
741
      if msg:
742
        failed.append(node)
743
        logging.error("RPC call %s failed on node %s: %s",
744
                      result[node].call, node, msg)
745
      else:
746
        success.append(node)
747

    
748
    # +1 for the master node
749
    if (len(success) + 1) < len(failed):
750
      # TODO: Handle failing nodes
751
      logging.error("More than half of the nodes failed")
752

    
753
  def _GetNodeIp(self):
754
    """Helper for returning the node name/ip list.
755

756
    @rtype: (list, list)
757
    @return: a tuple of two lists, the first one with the node
758
        names and the second one with the node addresses
759

760
    """
761
    name_list = self._nodes.keys()
762
    addr_list = [self._nodes[name] for name in name_list]
763
    return name_list, addr_list
764

    
765
  def _WriteAndReplicateFileUnlocked(self, file_name, data):
766
    """Writes a file locally and then replicates it to all nodes.
767

768
    This function will replace the contents of a file on the local
769
    node and then replicate it to all the other nodes we have.
770

771
    @type file_name: str
772
    @param file_name: the path of the file to be replicated
773
    @type data: str
774
    @param data: the new contents of the file
775

776
    """
777
    utils.WriteFile(file_name, data=data)
778

    
779
    names, addrs = self._GetNodeIp()
780
    result = rpc.RpcRunner.call_jobqueue_update(names, addrs, file_name, data)
781
    self._CheckRpcResult(result, self._nodes,
782
                         "Updating %s" % file_name)
783

    
784
  def _RenameFilesUnlocked(self, rename):
785
    """Renames a file locally and then replicate the change.
786

787
    This function will rename a file in the local queue directory
788
    and then replicate this rename to all the other nodes we have.
789

790
    @type rename: list of (old, new)
791
    @param rename: List containing tuples mapping old to new names
792

793
    """
794
    # Rename them locally
795
    for old, new in rename:
796
      utils.RenameFile(old, new, mkdir=True)
797

    
798
    # ... and on all nodes
799
    names, addrs = self._GetNodeIp()
800
    result = rpc.RpcRunner.call_jobqueue_rename(names, addrs, rename)
801
    self._CheckRpcResult(result, self._nodes, "Renaming files (%r)" % rename)
802

    
803
  def _FormatJobID(self, job_id):
804
    """Convert a job ID to string format.
805

806
    Currently this just does C{str(job_id)} after performing some
807
    checks, but if we want to change the job id format this will
808
    abstract this change.
809

810
    @type job_id: int or long
811
    @param job_id: the numeric job id
812
    @rtype: str
813
    @return: the formatted job id
814

815
    """
816
    if not isinstance(job_id, (int, long)):
817
      raise errors.ProgrammerError("Job ID '%s' not numeric" % job_id)
818
    if job_id < 0:
819
      raise errors.ProgrammerError("Job ID %s is negative" % job_id)
820

    
821
    return str(job_id)
822

    
823
  @classmethod
824
  def _GetArchiveDirectory(cls, job_id):
825
    """Returns the archive directory for a job.
826

827
    @type job_id: str
828
    @param job_id: Job identifier
829
    @rtype: str
830
    @return: Directory name
831

832
    """
833
    return str(int(job_id) / JOBS_PER_ARCHIVE_DIRECTORY)
834

    
835
  def _NewSerialsUnlocked(self, count):
836
    """Generates a new job identifier.
837

838
    Job identifiers are unique during the lifetime of a cluster.
839

840
    @type count: integer
841
    @param count: how many serials to return
842
    @rtype: str
843
    @return: a string representing the job identifier.
844

845
    """
846
    assert count > 0
847
    # New number
848
    serial = self._last_serial + count
849

    
850
    # Write to file
851
    self._WriteAndReplicateFileUnlocked(constants.JOB_QUEUE_SERIAL_FILE,
852
                                        "%s\n" % serial)
853

    
854
    result = [self._FormatJobID(v)
855
              for v in range(self._last_serial, serial + 1)]
856
    # Keep it only if we were able to write the file
857
    self._last_serial = serial
858

    
859
    return result
860

    
861
  @staticmethod
862
  def _GetJobPath(job_id):
863
    """Returns the job file for a given job id.
864

865
    @type job_id: str
866
    @param job_id: the job identifier
867
    @rtype: str
868
    @return: the path to the job file
869

870
    """
871
    return os.path.join(constants.QUEUE_DIR, "job-%s" % job_id)
872

    
873
  @classmethod
874
  def _GetArchivedJobPath(cls, job_id):
875
    """Returns the archived job file for a give job id.
876

877
    @type job_id: str
878
    @param job_id: the job identifier
879
    @rtype: str
880
    @return: the path to the archived job file
881

882
    """
883
    path = "%s/job-%s" % (cls._GetArchiveDirectory(job_id), job_id)
884
    return os.path.join(constants.JOB_QUEUE_ARCHIVE_DIR, path)
885

    
886
  @classmethod
887
  def _ExtractJobID(cls, name):
888
    """Extract the job id from a filename.
889

890
    @type name: str
891
    @param name: the job filename
892
    @rtype: job id or None
893
    @return: the job id corresponding to the given filename,
894
        or None if the filename does not represent a valid
895
        job file
896

897
    """
898
    m = cls._RE_JOB_FILE.match(name)
899
    if m:
900
      return m.group(1)
901
    else:
902
      return None
903

    
904
  def _GetJobIDsUnlocked(self, archived=False):
905
    """Return all known job IDs.
906

907
    If the parameter archived is True, archived jobs IDs will be
908
    included. Currently this argument is unused.
909

910
    The method only looks at disk because it's a requirement that all
911
    jobs are present on disk (so in the _memcache we don't have any
912
    extra IDs).
913

914
    @rtype: list
915
    @return: the list of job IDs
916

917
    """
918
    jlist = [self._ExtractJobID(name) for name in self._ListJobFiles()]
919
    jlist = utils.NiceSort(jlist)
920
    return jlist
921

    
922
  def _ListJobFiles(self):
923
    """Returns the list of current job files.
924

925
    @rtype: list
926
    @return: the list of job file names
927

928
    """
929
    return [name for name in utils.ListVisibleFiles(constants.QUEUE_DIR)
930
            if self._RE_JOB_FILE.match(name)]
931

    
932
  def _LoadJobUnlocked(self, job_id):
933
    """Loads a job from the disk or memory.
934

935
    Given a job id, this will return the cached job object if
936
    existing, or try to load the job from the disk. If loading from
937
    disk, it will also add the job to the cache.
938

939
    @param job_id: the job id
940
    @rtype: L{_QueuedJob} or None
941
    @return: either None or the job object
942

943
    """
944
    job = self._memcache.get(job_id, None)
945
    if job:
946
      logging.debug("Found job %s in memcache", job_id)
947
      return job
948

    
949
    filepath = self._GetJobPath(job_id)
950
    logging.debug("Loading job from %s", filepath)
951
    try:
952
      raw_data = utils.ReadFile(filepath)
953
    except IOError, err:
954
      if err.errno in (errno.ENOENT, ):
955
        return None
956
      raise
957

    
958
    data = serializer.LoadJson(raw_data)
959

    
960
    try:
961
      job = _QueuedJob.Restore(self, data)
962
    except Exception, err:
963
      new_path = self._GetArchivedJobPath(job_id)
964
      if filepath == new_path:
965
        # job already archived (future case)
966
        logging.exception("Can't parse job %s", job_id)
967
      else:
968
        # non-archived case
969
        logging.exception("Can't parse job %s, will archive.", job_id)
970
        self._RenameFilesUnlocked([(filepath, new_path)])
971
      return None
972

    
973
    self._memcache[job_id] = job
974
    logging.debug("Added job %s to the cache", job_id)
975
    return job
976

    
977
  def _GetJobsUnlocked(self, job_ids):
978
    """Return a list of jobs based on their IDs.
979

980
    @type job_ids: list
981
    @param job_ids: either an empty list (meaning all jobs),
982
        or a list of job IDs
983
    @rtype: list
984
    @return: the list of job objects
985

986
    """
987
    if not job_ids:
988
      job_ids = self._GetJobIDsUnlocked()
989

    
990
    return [self._LoadJobUnlocked(job_id) for job_id in job_ids]
991

    
992
  @staticmethod
993
  def _IsQueueMarkedDrain():
994
    """Check if the queue is marked from drain.
995

996
    This currently uses the queue drain file, which makes it a
997
    per-node flag. In the future this can be moved to the config file.
998

999
    @rtype: boolean
1000
    @return: True of the job queue is marked for draining
1001

1002
    """
1003
    return os.path.exists(constants.JOB_QUEUE_DRAIN_FILE)
1004

    
1005
  @staticmethod
1006
  def SetDrainFlag(drain_flag):
1007
    """Sets the drain flag for the queue.
1008

1009
    This is similar to the function L{backend.JobQueueSetDrainFlag},
1010
    and in the future we might merge them.
1011

1012
    @type drain_flag: boolean
1013
    @param drain_flag: Whether to set or unset the drain flag
1014

1015
    """
1016
    if drain_flag:
1017
      utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1018
    else:
1019
      utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1020
    return True
1021

    
1022
  @_RequireOpenQueue
1023
  def _SubmitJobUnlocked(self, job_id, ops):
1024
    """Create and store a new job.
1025

1026
    This enters the job into our job queue and also puts it on the new
1027
    queue, in order for it to be picked up by the queue processors.
1028

1029
    @type job_id: job ID
1030
    @param job_id: the job ID for the new job
1031
    @type ops: list
1032
    @param ops: The list of OpCodes that will become the new job.
1033
    @rtype: job ID
1034
    @return: the job ID of the newly created job
1035
    @raise errors.JobQueueDrainError: if the job is marked for draining
1036

1037
    """
1038
    if self._IsQueueMarkedDrain():
1039
      raise errors.JobQueueDrainError("Job queue is drained, refusing job")
1040

    
1041
    # Check job queue size
1042
    size = len(self._ListJobFiles())
1043
    if size >= constants.JOB_QUEUE_SIZE_SOFT_LIMIT:
1044
      # TODO: Autoarchive jobs. Make sure it's not done on every job
1045
      # submission, though.
1046
      #size = ...
1047
      pass
1048

    
1049
    if size >= constants.JOB_QUEUE_SIZE_HARD_LIMIT:
1050
      raise errors.JobQueueFull()
1051

    
1052
    job = _QueuedJob(self, job_id, ops)
1053

    
1054
    # Write to disk
1055
    self.UpdateJobUnlocked(job)
1056

    
1057
    logging.debug("Adding new job %s to the cache", job_id)
1058
    self._memcache[job_id] = job
1059

    
1060
    # Add to worker pool
1061
    self._wpool.AddTask(job)
1062

    
1063
    return job.id
1064

    
1065
  @utils.LockedMethod
1066
  @_RequireOpenQueue
1067
  def SubmitJob(self, ops):
1068
    """Create and store a new job.
1069

1070
    @see: L{_SubmitJobUnlocked}
1071

1072
    """
1073
    job_id = self._NewSerialsUnlocked(1)[0]
1074
    return self._SubmitJobUnlocked(job_id, ops)
1075

    
1076
  @utils.LockedMethod
1077
  @_RequireOpenQueue
1078
  def SubmitManyJobs(self, jobs):
1079
    """Create and store multiple jobs.
1080

1081
    @see: L{_SubmitJobUnlocked}
1082

1083
    """
1084
    results = []
1085
    all_job_ids = self._NewSerialsUnlocked(len(jobs))
1086
    for job_id, ops in zip(all_job_ids, jobs):
1087
      try:
1088
        data = self._SubmitJobUnlocked(job_id, ops)
1089
        status = True
1090
      except errors.GenericError, err:
1091
        data = str(err)
1092
        status = False
1093
      results.append((status, data))
1094

    
1095
    return results
1096

    
1097
  @_RequireOpenQueue
1098
  def UpdateJobUnlocked(self, job):
1099
    """Update a job's on disk storage.
1100

1101
    After a job has been modified, this function needs to be called in
1102
    order to write the changes to disk and replicate them to the other
1103
    nodes.
1104

1105
    @type job: L{_QueuedJob}
1106
    @param job: the changed job
1107

1108
    """
1109
    filename = self._GetJobPath(job.id)
1110
    data = serializer.DumpJson(job.Serialize(), indent=False)
1111
    logging.debug("Writing job %s to %s", job.id, filename)
1112
    self._WriteAndReplicateFileUnlocked(filename, data)
1113

    
1114
    # Notify waiters about potential changes
1115
    job.change.notifyAll()
1116

    
1117
  @utils.LockedMethod
1118
  @_RequireOpenQueue
1119
  def WaitForJobChanges(self, job_id, fields, prev_job_info, prev_log_serial,
1120
                        timeout):
1121
    """Waits for changes in a job.
1122

1123
    @type job_id: string
1124
    @param job_id: Job identifier
1125
    @type fields: list of strings
1126
    @param fields: Which fields to check for changes
1127
    @type prev_job_info: list or None
1128
    @param prev_job_info: Last job information returned
1129
    @type prev_log_serial: int
1130
    @param prev_log_serial: Last job message serial number
1131
    @type timeout: float
1132
    @param timeout: maximum time to wait
1133
    @rtype: tuple (job info, log entries)
1134
    @return: a tuple of the job information as required via
1135
        the fields parameter, and the log entries as a list
1136

1137
        if the job has not changed and the timeout has expired,
1138
        we instead return a special value,
1139
        L{constants.JOB_NOTCHANGED}, which should be interpreted
1140
        as such by the clients
1141

1142
    """
1143
    job = self._LoadJobUnlocked(job_id)
1144
    if not job:
1145
      logging.debug("Job %s not found", job_id)
1146
      return None
1147

    
1148
    def _CheckForChanges():
1149
      logging.debug("Waiting for changes in job %s", job_id)
1150

    
1151
      status = job.CalcStatus()
1152
      job_info = self._GetJobInfoUnlocked(job, fields)
1153
      log_entries = job.GetLogEntries(prev_log_serial)
1154

    
1155
      # Serializing and deserializing data can cause type changes (e.g. from
1156
      # tuple to list) or precision loss. We're doing it here so that we get
1157
      # the same modifications as the data received from the client. Without
1158
      # this, the comparison afterwards might fail without the data being
1159
      # significantly different.
1160
      job_info = serializer.LoadJson(serializer.DumpJson(job_info))
1161
      log_entries = serializer.LoadJson(serializer.DumpJson(log_entries))
1162

    
1163
      # Don't even try to wait if the job is no longer running, there will be
1164
      # no changes.
1165
      if (status not in (constants.JOB_STATUS_QUEUED,
1166
                         constants.JOB_STATUS_RUNNING,
1167
                         constants.JOB_STATUS_WAITLOCK) or
1168
          prev_job_info != job_info or
1169
          (log_entries and prev_log_serial != log_entries[0][0])):
1170
        logging.debug("Job %s changed", job_id)
1171
        return (job_info, log_entries)
1172

    
1173
      raise utils.RetryAgain()
1174

    
1175
    try:
1176
      # Setting wait function to release the queue lock while waiting
1177
      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
1178
                         wait_fn=job.change.wait)
1179
    except utils.RetryTimeout:
1180
      return constants.JOB_NOTCHANGED
1181

    
1182
  @utils.LockedMethod
1183
  @_RequireOpenQueue
1184
  def CancelJob(self, job_id):
1185
    """Cancels a job.
1186

1187
    This will only succeed if the job has not started yet.
1188

1189
    @type job_id: string
1190
    @param job_id: job ID of job to be cancelled.
1191

1192
    """
1193
    logging.info("Cancelling job %s", job_id)
1194

    
1195
    job = self._LoadJobUnlocked(job_id)
1196
    if not job:
1197
      logging.debug("Job %s not found", job_id)
1198
      return (False, "Job %s not found" % job_id)
1199

    
1200
    job_status = job.CalcStatus()
1201

    
1202
    if job_status not in (constants.JOB_STATUS_QUEUED,
1203
                          constants.JOB_STATUS_WAITLOCK):
1204
      logging.debug("Job %s is no longer waiting in the queue", job.id)
1205
      return (False, "Job %s is no longer waiting in the queue" % job.id)
1206

    
1207
    if job_status == constants.JOB_STATUS_QUEUED:
1208
      self.CancelJobUnlocked(job)
1209
      return (True, "Job %s canceled" % job.id)
1210

    
1211
    elif job_status == constants.JOB_STATUS_WAITLOCK:
1212
      # The worker will notice the new status and cancel the job
1213
      try:
1214
        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
1215
      finally:
1216
        self.UpdateJobUnlocked(job)
1217
      return (True, "Job %s will be canceled" % job.id)
1218

    
1219
  @_RequireOpenQueue
1220
  def CancelJobUnlocked(self, job):
1221
    """Marks a job as canceled.
1222

1223
    """
1224
    try:
1225
      job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
1226
                            "Job canceled by request")
1227
    finally:
1228
      self.UpdateJobUnlocked(job)
1229

    
1230
  @_RequireOpenQueue
1231
  def _ArchiveJobsUnlocked(self, jobs):
1232
    """Archives jobs.
1233

1234
    @type jobs: list of L{_QueuedJob}
1235
    @param jobs: Job objects
1236
    @rtype: int
1237
    @return: Number of archived jobs
1238

1239
    """
1240
    archive_jobs = []
1241
    rename_files = []
1242
    for job in jobs:
1243
      if job.CalcStatus() not in (constants.JOB_STATUS_CANCELED,
1244
                                  constants.JOB_STATUS_SUCCESS,
1245
                                  constants.JOB_STATUS_ERROR):
1246
        logging.debug("Job %s is not yet done", job.id)
1247
        continue
1248

    
1249
      archive_jobs.append(job)
1250

    
1251
      old = self._GetJobPath(job.id)
1252
      new = self._GetArchivedJobPath(job.id)
1253
      rename_files.append((old, new))
1254

    
1255
    # TODO: What if 1..n files fail to rename?
1256
    self._RenameFilesUnlocked(rename_files)
1257

    
1258
    logging.debug("Successfully archived job(s) %s",
1259
                  ", ".join(job.id for job in archive_jobs))
1260

    
1261
    return len(archive_jobs)
1262

    
1263
  @utils.LockedMethod
1264
  @_RequireOpenQueue
1265
  def ArchiveJob(self, job_id):
1266
    """Archives a job.
1267

1268
    This is just a wrapper over L{_ArchiveJobsUnlocked}.
1269

1270
    @type job_id: string
1271
    @param job_id: Job ID of job to be archived.
1272
    @rtype: bool
1273
    @return: Whether job was archived
1274

1275
    """
1276
    logging.info("Archiving job %s", job_id)
1277

    
1278
    job = self._LoadJobUnlocked(job_id)
1279
    if not job:
1280
      logging.debug("Job %s not found", job_id)
1281
      return False
1282

    
1283
    return self._ArchiveJobsUnlocked([job]) == 1
1284

    
1285
  @utils.LockedMethod
1286
  @_RequireOpenQueue
1287
  def AutoArchiveJobs(self, age, timeout):
1288
    """Archives all jobs based on age.
1289

1290
    The method will archive all jobs which are older than the age
1291
    parameter. For jobs that don't have an end timestamp, the start
1292
    timestamp will be considered. The special '-1' age will cause
1293
    archival of all jobs (that are not running or queued).
1294

1295
    @type age: int
1296
    @param age: the minimum age in seconds
1297

1298
    """
1299
    logging.info("Archiving jobs with age more than %s seconds", age)
1300

    
1301
    now = time.time()
1302
    end_time = now + timeout
1303
    archived_count = 0
1304
    last_touched = 0
1305

    
1306
    all_job_ids = self._GetJobIDsUnlocked(archived=False)
1307
    pending = []
1308
    for idx, job_id in enumerate(all_job_ids):
1309
      last_touched = idx
1310

    
1311
      # Not optimal because jobs could be pending
1312
      # TODO: Measure average duration for job archival and take number of
1313
      # pending jobs into account.
1314
      if time.time() > end_time:
1315
        break
1316

    
1317
      # Returns None if the job failed to load
1318
      job = self._LoadJobUnlocked(job_id)
1319
      if job:
1320
        if job.end_timestamp is None:
1321
          if job.start_timestamp is None:
1322
            job_age = job.received_timestamp
1323
          else:
1324
            job_age = job.start_timestamp
1325
        else:
1326
          job_age = job.end_timestamp
1327

    
1328
        if age == -1 or now - job_age[0] > age:
1329
          pending.append(job)
1330

    
1331
          # Archive 10 jobs at a time
1332
          if len(pending) >= 10:
1333
            archived_count += self._ArchiveJobsUnlocked(pending)
1334
            pending = []
1335

    
1336
    if pending:
1337
      archived_count += self._ArchiveJobsUnlocked(pending)
1338

    
1339
    return (archived_count, len(all_job_ids) - last_touched - 1)
1340

    
1341
  def _GetJobInfoUnlocked(self, job, fields):
1342
    """Returns information about a job.
1343

1344
    @type job: L{_QueuedJob}
1345
    @param job: the job which we query
1346
    @type fields: list
1347
    @param fields: names of fields to return
1348
    @rtype: list
1349
    @return: list with one element for each field
1350
    @raise errors.OpExecError: when an invalid field
1351
        has been passed
1352

1353
    """
1354
    row = []
1355
    for fname in fields:
1356
      if fname == "id":
1357
        row.append(job.id)
1358
      elif fname == "status":
1359
        row.append(job.CalcStatus())
1360
      elif fname == "ops":
1361
        row.append([op.input.__getstate__() for op in job.ops])
1362
      elif fname == "opresult":
1363
        row.append([op.result for op in job.ops])
1364
      elif fname == "opstatus":
1365
        row.append([op.status for op in job.ops])
1366
      elif fname == "oplog":
1367
        row.append([op.log for op in job.ops])
1368
      elif fname == "opstart":
1369
        row.append([op.start_timestamp for op in job.ops])
1370
      elif fname == "opend":
1371
        row.append([op.end_timestamp for op in job.ops])
1372
      elif fname == "received_ts":
1373
        row.append(job.received_timestamp)
1374
      elif fname == "start_ts":
1375
        row.append(job.start_timestamp)
1376
      elif fname == "end_ts":
1377
        row.append(job.end_timestamp)
1378
      elif fname == "lock_status":
1379
        row.append(job.lock_status)
1380
      elif fname == "summary":
1381
        row.append([op.input.Summary() for op in job.ops])
1382
      else:
1383
        raise errors.OpExecError("Invalid job query field '%s'" % fname)
1384
    return row
1385

    
1386
  @utils.LockedMethod
1387
  @_RequireOpenQueue
1388
  def QueryJobs(self, job_ids, fields):
1389
    """Returns a list of jobs in queue.
1390

1391
    This is a wrapper of L{_GetJobsUnlocked}, which actually does the
1392
    processing for each job.
1393

1394
    @type job_ids: list
1395
    @param job_ids: sequence of job identifiers or None for all
1396
    @type fields: list
1397
    @param fields: names of fields to return
1398
    @rtype: list
1399
    @return: list one element per job, each element being list with
1400
        the requested fields
1401

1402
    """
1403
    jobs = []
1404

    
1405
    for job in self._GetJobsUnlocked(job_ids):
1406
      if job is None:
1407
        jobs.append(None)
1408
      else:
1409
        jobs.append(self._GetJobInfoUnlocked(job, fields))
1410

    
1411
    return jobs
1412

    
1413
  @utils.LockedMethod
1414
  @_RequireOpenQueue
1415
  def Shutdown(self):
1416
    """Stops the job queue.
1417

1418
    This shutdowns all the worker threads an closes the queue.
1419

1420
    """
1421
    self._wpool.TerminateWorkers()
1422

    
1423
    self._queue_lock.Close()
1424
    self._queue_lock = None